RPC二-NettyHandler处理消息

RPC系列
1. RPC一-线程模型
2. RPC二-NettyHandler处理消息
3. RPC三-rpc协议和编解码
4. RPC四-netty异步双向责任链
5. RPC五-可靠性设计
6. RPC六-动态代理
7. 服务发现-注册中心设计

概述

netty官网给出了EchoServer示例,很容易实现一个应答服务。EchoClientHandler处理客户端发出请求和接收消息。我稍微改动下,大概代码如下:

public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    	  // 请求:发送data
        ctx.writeAndFlush("hello".getBytes());
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws IOException {
        // 返回:读取返回值
        System.out.println(in.toString()); 
    }
}

示例中,ClientHandler在channelActive()就绪后发送数据,在监听到返回值后执行channelRead0()方法。看起来很简单,但是channelActive和channelRead0两个方法都是异步触发,我们需要找到发送数据的入口和拿到数据的方法。

发送数据

方案:ClientHandler持有ChannelHandlerContext,在handler就绪后初始化handler持有的context,如:

public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
	private ChannelHandlerContext ctx;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;// 拿到ctx,赋值
    }
    // 调用此方法发送数据
    pulic byte[] sendMsg(bytep[] bytes){
    	ByteBuf buf = Unpooled.buffer();
    	buf.writeBytes(bytes);
    	// 如果chanel未就绪,ctx为空,此处将出现空指针异常
    	ctx.writeAndFlush(buf);
    }
}

获取数据

使用阻塞队列

Netty中所有IO操作都是异步的,所以ctx.writeAndFlush(buf)告诉handler执行后马上返回,不会等到服务端返回数据。

所以,需要一个异步通知的机制,在channelRead0()方法中通知客户端。
可以通过BlockingQueue实现:

public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
	private final BlockingQueue<String> responses = new LinkedBlockingQueue<>();
	private ChannelHandlerContext ctx;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;// 拿到ctx
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws IOException {
        responses.add(in.toString());
    }
    // 调用此方法发送数据
    pulic String sendMsg(bytep[] bytes){
    	ByteBuf buf = Unpooled.buffer();
    	buf.writeBytes(bytes);
    	ctx.writeAndFlush(buf);
    	String result = responses.take()
    	return result;
    }
}

多路复用

使用netty tcp客户端,一般都是多路复用,建立连接后,把handler放入缓存。使用时直接从缓存中获取,而不是每个请求都建立一次连接。也就是多路复用,这儿就成了复用handler。

            +-----------+       +--------+
thread a+-> |           |       |        |
            |  handler  +-----> | server |
            |           |       |        |
thread b+-> +-----------+       +--------+

多线程调用时,每个请求的返回顺序并不一致,如上图,线程a先执行,线程b后执行,但哪个线程的结果先返回就说不好了。如何判断返回值属于哪个线程呢?

对上文中的BlockingQueue做改造:

ConcurrentHashMap<Long, BlockingQueue<Response>> responseMap = new ConcurrentHashMap<>();

引入ConcurrentHashMap:

  • key: request分配的唯一id
  • value: 长度为1的LinkedBlockingQueue为value

步骤:
1. 构造request并生成唯一requestId
2. responseMap.put(requstId,new LinkedBlockingQueue(1))
3. 执行异步IO:ctx.writeAndFlush()
4. 阻塞等待:responseMap.get(requestId).poll()
5. 处理事件:如果监听到事件,responseMap.get(requestId).add(Response)
6. 等待结束,返回response。

使用ListenableFuture

上文中直接使用阻塞队列的方式还是不太好,也就是说,只能发起同步调用,如果是异步,需要返回LinkedBlockingQueue给客户端自己处理。使用更高抽象的Future-promise模型是更好的选择。

Future
它的值是一个异步计算的结果集。可以判断异步结果是否执行完成,可以取消(不再关心结果),可以阻塞获取结果。但只能读取,无法修改。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get(); 
    V get(long timeout, TimeUnit unit);
}

guava又在其基础上作了扩展。如ListenableFuture,可以添加监听器,可以阻塞等待,操作更自由,不需要循环判断future.isDone()。netty又扩展了更丰富的方法如syncUninterruptibly()

Promise:
promise由Future扩展而来,区别是不仅能读取,还可以主动写返回。如setSuccess(),setFailure()

对Future使用,可以参考我在其他文章给出的示例jdk、guava、netty的3个示例:给Future一个Promise

异步获取netty客户端请求返回值,Future-promise再适合不过。如下图:

guava提供了几个比较好的类如:
* AbstractFuture: 一个抽象类,继承可以实ListenableFuture
* SettableFuture:AbstractFuture的一个实现类,提供set(),setException(),setFuture()
* Futures: 创建Future的工程方法。

下文基于guava几个类,创建自己的RpcFuture(promis):

public class RpcFuture<V> extends AbstractFuture<V>{
    private final Executor executor;
    public RpcFuture(Executor executor) {
        this.executor = executor;
    }
    public void addListener(final Runnable listener) {
        super.addListener(listener, executor);
    }
    @Override
    public boolean set(V value) {
        boolean result = super.set(value);
        if (!result) {
            throw new IllegalStateException("set future result failed");
        }
        return result;
    }
}

模仿Futures也创建一个自己的工程方法:

public class Futures {
    public static <T> RpcFuture<T> createListenableFuture() {
        return new RpcFuture<>(MoreExecutors.directExecutor());
    }
}

于是,我们的客户端请求方法就变成了:

RpcFuture<Response> future = client.invoke(request);
RpcFuture<Response> future = client.invoke(request.callback);

同步调用通过future.get()返回Response,异步调用直接返回Future。


http://netty.io/4.1/xref/io/netty/example/echo/package-summary.html
https://www.jianshu.com/p/a06da3256f0c
http://colobu.com/2015/06/11/Scala-Future-and-Promise/

CONTENTS