RPC系列
概述
什么是责任链模式,简单说就是击鼓传花,按顺序依次传递。
这种模式在框架中无处不在,使用过servlet的都知道,可以添加多个filter处理请求,实现过滤拦截的功能。它的好处很明显:不改变源代码的情况下方便扩展,即开放封闭原则。
示例
public abstract class Handler {
protected Handler next;
abstract void doHand(String str);
protected void setNext(Handler next) {
this.next = next;
}
}
public class Handler1 extends Handler{
@Override
void doHand(String str) {
if ("xx".equals(str)) {
System.out.println("handler1 处理...");
}else {
System.out.println("给下一个Handler处理...");
next.doHand(str);
}
}
}
public class Handler2 extends Handler {
@Override
void doHand(String str) {
System.out.println("handler2 处理...");
if (next != null) {
next.doHand(str);
}
}
}
public class Client {
public static void main(String[] args) {
Handler handler1 = new Handler1();
Handler handler2 = new Handler2();
handler1.setNext(handler2);
handler1.doHand("yy");
}
}
给下一个Handler处理… handler2 处理…
这是最简单的责任链模式实现,
另一种实现方式
ok http
public interface Interceptor {
interface Chain{
/**
* 用于从链中获取 request
*/
Request request();
Response proceed(Request request);
}
Response intercept(Chain chain);
}
public class RealChain implements Interceptor.Chain {
private List<Interceptor> interceptors;
private Request request;
private int index;
public RealChain(List<Interceptor> interceptors, int index, Request request) {
this.index = index;
this.interceptors = interceptors;
this.request = request;
}
Response exec(){
return proceed(request);
}
@Override
public Request request() {
return this.request;
}
@Override
public Response proceed(Request request) {
if (index >= interceptors.size()) {
throw new AssertionError();
}
RealChain realChain = new RealChain(this.interceptors, index + 1, request);
Interceptor interceptor = interceptors.get(index);
return interceptor.intercept(realChain);
}
}
public class Request {
}
public class Response {
}
public class Client {
@Test
public void testInterceptor() {
RealChain chain = newCall(new Request());
Response response = chain.exec();
}
public RealChain newCall(Request request){
List<Interceptor> interceptorList = new ArrayList<>();
interceptorList.add(new LogInterceptor());
interceptorList.add(new MyInterceptor());
interceptorList.add(new NetworkRequest());
return new RealChain(interceptorList, 0, request);
}
class MyInterceptor implements Interceptor{
@Override
public Response intercept(Chain chain) {
System.out.println("other interceptor: do something ...");
return chain.proceed(chain.request());
}
}
class LogInterceptor implements Interceptor{
@Override
public Response intercept(Chain chain) {
System.out.println("log interceptor : begin request....");
Response proceed = chain.proceed(chain.request());
System.out.println("log interceptor : end request....");
return proceed;
}
}
class NetworkRequest implements Interceptor{
@Override
public Response intercept(Chain chain) {
System.out.println("last interceptor , create response. ");
return new Response();
}
}
}
这种实现方式的好处是代码更优雅写,不需要分别设置继承者,而是把一个个拦截器注册到链上。
异步责任链模式
我们看看这个连接器:
class LogInterceptor implements Interceptor{
@Override
public Response intercept(Chain chain) {
// 交给下一个处理者之前做点事
Response proceed = chain.proceed(chain.request());
// 所有后续的处理者完成后做 的事
return proceed;
}
}
不难理解,同一个线程中,和递归一样,调用栈一层层往下执行,再一层层弹出来。每个拦截器都能拿到后一个拦截器的处理结果。
那异步的情况下怎么办?
我们马上想到使用Future。
Future<Response> future = chain.proceed(chain.request());
但是要实现对结果的处理,只能future.get()阻塞等待,又变成了同步。如果一层层注册回调函数,将会复杂无比。如果使用这样的拦截器实现是框架的扩展接口,那扩展者用起来很不友好,应该把异步逻辑封装起来。
netty的ChannelHandler就是责任链模式的实现,但它的读写都是异步的,怎么让读写都经过Handler呢,看看ChannelPipeline的注释就明白了。
* I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
出站和入站分为两条线,通过双向链表关联起来。
这个图有点乱,不过基本能表达出双向链表处理:
- 一个chennel对应有一个Pipeline
- ChannelHandlerContext把ChannelHandler包装起来
- Pipeline默认有TailContext,HeadContext分别头尾相连 入站
- 从网络读取数据后,初始化Channel,初始化Channel的过程包括:分配eventloop,初始化Pipeline,把自定义的handler包装成HandlerContext,把HandlerContext以双向链表的形式插入tail和head之间。
- Channel中调用pipeline.fireChannelRead(byteBuf)方法:通过pipline拿到head,通过链表找到下一个入站handler(可以看这个方法findContextInbound())。
出站 我在handler中使用这个方法写数据:
ctx.writeAndFlush(rpcContext);
他的流程如下:
- ctx获取持有的pipline。
- pipeline使用tailContext开始找到prev outBoundHandler链式调用
- 直到HeadContext的write和flush方法。