RPC系列
概述
netty官网给出了EchoServer示例,很容易实现一个应答服务。EchoClientHandler处理客户端发出请求和接收消息。我稍微改动下,大概代码如下:
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 请求:发送data
ctx.writeAndFlush("hello".getBytes());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws IOException {
// 返回:读取返回值
System.out.println(in.toString());
}
}
示例中,ClientHandler在channelActive()就绪后发送数据,在监听到返回值后执行channelRead0()方法。看起来很简单,但是channelActive和channelRead0两个方法都是异步触发,我们需要找到发送数据的入口和拿到数据的方法。
发送数据
方案:ClientHandler持有ChannelHandlerContext,在handler就绪后初始化handler持有的context,如:
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private ChannelHandlerContext ctx;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;// 拿到ctx,赋值
}
// 调用此方法发送数据
pulic byte[] sendMsg(bytep[] bytes){
ByteBuf buf = Unpooled.buffer();
buf.writeBytes(bytes);
// 如果chanel未就绪,ctx为空,此处将出现空指针异常
ctx.writeAndFlush(buf);
}
}
获取数据
使用阻塞队列
Netty中所有IO操作都是异步的,所以ctx.writeAndFlush(buf)
告诉handler执行后马上返回,不会等到服务端返回数据。
所以,需要一个异步通知的机制,在channelRead0()方法中通知客户端。 可以通过BlockingQueue实现:
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final BlockingQueue<String> responses = new LinkedBlockingQueue<>();
private ChannelHandlerContext ctx;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;// 拿到ctx
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws IOException {
responses.add(in.toString());
}
// 调用此方法发送数据
pulic String sendMsg(bytep[] bytes){
ByteBuf buf = Unpooled.buffer();
buf.writeBytes(bytes);
ctx.writeAndFlush(buf);
String result = responses.take()
return result;
}
}
多路复用
使用netty tcp客户端,一般都是多路复用,建立连接后,把handler放入缓存。使用时直接从缓存中获取,而不是每个请求都建立一次连接。也就是多路复用,这儿就成了复用handler。
+-----------+ +--------+
thread a+-> | | | |
| handler +-----> | server |
| | | |
thread b+-> +-----------+ +--------+
多线程调用时,每个请求的返回顺序并不一致,如上图,线程a先执行,线程b后执行,但哪个线程的结果先返回就说不好了。如何判断返回值属于哪个线程呢?
对上文中的BlockingQueue做改造:
ConcurrentHashMap<Long, BlockingQueue<Response>> responseMap = new ConcurrentHashMap<>();
引入ConcurrentHashMap:
- key: request分配的唯一id
- value: 长度为1的LinkedBlockingQueue为value
步骤:
- 构造request并生成唯一requestId
- responseMap.put(requstId,new LinkedBlockingQueue(1))
- 执行异步IO:ctx.writeAndFlush()
- 阻塞等待:responseMap.get(requestId).poll()
- 处理事件:如果监听到事件,responseMap.get(requestId).add(Response)
- 等待结束,返回response。
使用ListenableFuture
上文中直接使用阻塞队列的方式还是不太好,也就是说,只能发起同步调用,如果是异步,需要返回LinkedBlockingQueue给客户端自己处理。使用更高抽象的Future-promise模型是更好的选择。
Future: 它的值是一个异步计算的结果集。可以判断异步结果是否执行完成,可以取消(不再关心结果),可以阻塞获取结果。但只能读取,无法修改。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get();
V get(long timeout, TimeUnit unit);
}
guava又在其基础上作了扩展。如ListenableFuture,可以添加监听器,可以阻塞等待,操作更自由,不需要循环判断future.isDone()
。netty又扩展了更丰富的方法如syncUninterruptibly()
。
Promise:
promise由Future扩展而来,区别是不仅能读取,还可以主动写返回。如setSuccess()
,setFailure()
。
对Future使用,可以参考我在其他文章给出的示例jdk、guava、netty的3个示例:给Future一个Promise。
异步获取netty客户端请求返回值,Future-promise再适合不过。如下图:
guava提供了几个比较好的类如:
- AbstractFuture: 一个抽象类,继承可以实ListenableFuture
- SettableFuture:AbstractFuture的一个实现类,提供set(),setException(),setFuture()
- Futures: 创建Future的工程方法。
下文基于guava几个类,创建自己的RpcFuture(promis):
public class RpcFuture<V> extends AbstractFuture<V>{
private final Executor executor;
public RpcFuture(Executor executor) {
this.executor = executor;
}
public void addListener(final Runnable listener) {
super.addListener(listener, executor);
}
@Override
public boolean set(V value) {
boolean result = super.set(value);
if (!result) {
throw new IllegalStateException("set future result failed");
}
return result;
}
}
模仿Futures也创建一个自己的工程方法:
public class Futures {
public static <T> RpcFuture<T> createListenableFuture() {
return new RpcFuture<>(MoreExecutors.directExecutor());
}
}
于是,我们的客户端请求方法就变成了:
RpcFuture<Response> future = client.invoke(request);
RpcFuture<Response> future = client.invoke(request.callback);
同步调用通过future.get()返回Response,异步调用直接返回Future。
http://netty.io/4.1/xref/io/netty/example/echo/package-summary.html https://www.jianshu.com/p/a06da3256f0c http://colobu.com/2015/06/11/Scala-Future-and-Promise/