RPC系列
概述
本文指的rpc网络通信可靠性设计,涉及超时,重试,心跳等几方面。
超时处理
如果服务端处理时间过长或者其他网络原因,客户端需要主队失败,否则一直阻塞,导致客户端线程一直被占用,影响客户端处理能力。
netty实现客户端连接超时很简单:
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
这个值绑定到Channel,可以为不同Channel设置不同的超时时间。
如果客户端连接超时,netty提供了ReadTimeoutHandler超时处理器,如果超时,会抛出ReadTimeoutException,可以扩展这个handler实现超时处理逻辑。
对于客户端读超时(read timeout),自己实现也比较简单。
// 客户端异步发送请求
cxt. channel.writeAndFlush(requestFuture);
// 所有请求用一个hash时间轮定时器
StaticHashedWheelTime.get().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
channel.eventLoop().submit(() -> {
// 请求Future队列中移出,不再关心结果
REQUEST_MAP.remove(requestFuture.getRequestId);
// 构造错误返回
Response errResult = exceptionResponse(ErrorStatusEnum.READ_TIMEOUT);
// promis,通知Future完成
listenableFuture.set(errResult);
});
}
}, 3, TimeUnit.SECONDS);
- 异步发送请求后,马上注册一个定时任务到时间轮定时器
- 超时后:定时器执行,根据RequestId清理请求Future缓存,设置Future完成
- 客户端Future自动获取异常消息
这里用到了时间轮定时器HashedWheelTimer,由netty提供,可以处理大量定时任务。原理可以参考apache-kafka-purgatory-hierarchical-timing-wheels。
这个定时器使用场景很多,包括后续客户端发送心跳,服务端的tcp连接验活等都能用它实现。
心跳
心跳的请求放回(ping-pong)可以和普通请求区别开来,在协议通过专门标识识别。netty实现较简单,注册一个IdleStateHandler到pipline中,他会在Channel空闲时按时触发IdleStateEvent:
ch.pipeline().addLast("idle", new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS));
让后客户端注册一个HeartBeatClientHandler处理IdleStateEvent:
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
// 如果有IdleStateEvent,则发送心跳请求
if (evt instanceof IdleStateEvent) {
currentTime++;
RpcContext rpcContext = new RpcContext();
rpcContext.setType(RequestType.HEARTBEAT_REQUEST.getType());
ctx.channel().writeAndFlush(rpcContext)
// 发送失败关闭连接
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
log.info("发送心跳:" + new Date());
}
}
服务端只需要判断是心跳请求,直接放回即可。另外,服务端对一直没检测到心跳的连接,需要清理掉,原理和客户端类型,通过IdleStateEvent完成。
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state() == IdleState.READER_IDLE) {
loss_connect_time++;
log.info("5 秒没有接收到客户端的信息了");
if (loss_connect_time > 2) {
log.info("关闭这个不活跃的channel");
ctx.channel().close();
}
}
} else {
super.userEventTriggered(ctx, evt);
}
}
https://segmentfault.com/a/1190000006931568 http://www.infoq.com/cn/articles/netty-reliability