RPC四-netty异步双向责任链

RPC系列

  1. RPC一-线程模型
  2. RPC二-NettyHandler处理消息
  3. RPC三-rpc协议和编解码
  4. RPC四-netty异步双向责任链
  5. RPC五-可靠性设计
  6. RPC六-动态代理
  7. 服务发现-注册中心设计

概述

什么是责任链模式,简单说就是击鼓传花,按顺序依次传递。

这种模式在框架中无处不在,使用过servlet的都知道,可以添加多个filter处理请求,实现过滤拦截的功能。它的好处很明显:不改变源代码的情况下方便扩展,即开放封闭原则。

示例

public abstract class Handler {
    protected Handler next;
    abstract void doHand(String str);
    protected void setNext(Handler next) {
        this.next = next;
    }
}
public class Handler1 extends Handler{
    @Override
    void doHand(String str) {
        if ("xx".equals(str)) {
            System.out.println("handler1 处理...");
        }else {
            System.out.println("给下一个Handler处理...");
            next.doHand(str);
        }
    }
}
public class Handler2 extends Handler {
    @Override
    void doHand(String str) {
        System.out.println("handler2 处理...");
        if (next != null) {
            next.doHand(str);
        }
    }
}
public class Client {
    public static void main(String[] args) {
        Handler handler1 = new Handler1();
        Handler handler2 = new Handler2();
        handler1.setNext(handler2);
        handler1.doHand("yy");
    }
}

给下一个Handler处理… handler2 处理…

这是最简单的责任链模式实现,

另一种实现方式

ok http

public interface Interceptor {
    interface Chain{
        /**
         * 用于从链中获取 request
         */
        Request request();
        Response proceed(Request request);
    }
    Response intercept(Chain chain);
}
public class RealChain implements Interceptor.Chain {
    private List<Interceptor> interceptors;
    private Request request;
    private int index;

    public RealChain(List<Interceptor> interceptors, int index, Request request) {
        this.index = index;
        this.interceptors = interceptors;
        this.request = request;
    }
    Response exec(){
        return proceed(request);
    }

    @Override
    public Request request() {
        return this.request;
    }

    @Override
    public Response proceed(Request request) {
        if (index >= interceptors.size()) {
            throw new AssertionError();
        }
        RealChain realChain = new RealChain(this.interceptors, index + 1, request);
        Interceptor interceptor = interceptors.get(index);
        return interceptor.intercept(realChain);
    }

}
public class Request {
}
public class Response {
}
public class Client {
    @Test
    public void testInterceptor() {
        RealChain chain = newCall(new Request());
        Response response = chain.exec();
    }
    public RealChain newCall(Request request){
        List<Interceptor> interceptorList = new ArrayList<>();
        interceptorList.add(new LogInterceptor());
        interceptorList.add(new MyInterceptor());
        interceptorList.add(new NetworkRequest());
        return new RealChain(interceptorList, 0, request);
    }
    class MyInterceptor implements Interceptor{
        @Override
        public Response intercept(Chain chain) {
            System.out.println("other interceptor: do something ...");
            return chain.proceed(chain.request());
        }
    }
    class LogInterceptor implements Interceptor{
        @Override
        public Response intercept(Chain chain) {
            System.out.println("log interceptor : begin request....");
            Response proceed = chain.proceed(chain.request());
            System.out.println("log interceptor : end request....");
            return proceed;
        }
    }
    class NetworkRequest implements Interceptor{
        @Override
        public Response intercept(Chain chain) {
            System.out.println("last interceptor , create response. ");
            return new Response();
        }
    }
}

这种实现方式的好处是代码更优雅写,不需要分别设置继承者,而是把一个个拦截器注册到链上。

异步责任链模式

我们看看这个连接器:

 class LogInterceptor implements Interceptor{
    @Override
    public Response intercept(Chain chain) {
       // 交给下一个处理者之前做点事
        Response proceed = chain.proceed(chain.request());
         // 所有后续的处理者完成后做 的事
        return proceed;
    }
}

不难理解,同一个线程中,和递归一样,调用栈一层层往下执行,再一层层弹出来。每个拦截器都能拿到后一个拦截器的处理结果。

那异步的情况下怎么办?

我们马上想到使用Future。

Future<Response> future = chain.proceed(chain.request());

但是要实现对结果的处理,只能future.get()阻塞等待,又变成了同步。如果一层层注册回调函数,将会复杂无比。如果使用这样的拦截器实现是框架的扩展接口,那扩展者用起来很不友好,应该把异步逻辑封装起来。

netty的ChannelHandler就是责任链模式的实现,但它的读写都是异步的,怎么让读写都经过Handler呢,看看ChannelPipeline的注释就明白了。

 *                                                 I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+

出站和入站分为两条线,通过双向链表关联起来。

这个图有点乱,不过基本能表达出双向链表处理:

  • 一个chennel对应有一个Pipeline
  • ChannelHandlerContext把ChannelHandler包装起来
  • Pipeline默认有TailContext,HeadContext分别头尾相连 入站
  1. 从网络读取数据后,初始化Channel,初始化Channel的过程包括:分配eventloop,初始化Pipeline,把自定义的handler包装成HandlerContext,把HandlerContext以双向链表的形式插入tail和head之间。
  2. Channel中调用pipeline.fireChannelRead(byteBuf)方法:通过pipline拿到head,通过链表找到下一个入站handler(可以看这个方法findContextInbound())。

出站 我在handler中使用这个方法写数据:

 ctx.writeAndFlush(rpcContext);

他的流程如下:

  1. ctx获取持有的pipline。
  2. pipeline使用tailContext开始找到prev outBoundHandler链式调用
  3. 直到HeadContext的write和flush方法。

示例

CONTENTS