RPC五-可靠性设计

RPC系列
1. RPC一-线程模型
2. RPC二-NettyHandler处理消息
3. RPC三-rpc协议和编解码
4. RPC四-netty异步双向责任链
5. RPC五-可靠性设计
6. RPC六-动态代理
7. 服务发现-注册中心设计

概述

本文指的rpc网络通信可靠性设计,涉及超时,重试,心跳等几方面。

超时处理

如果服务端处理时间过长或者其他网络原因,客户端需要主队失败,否则一直阻塞,导致客户端线程一直被占用,影响客户端处理能力。

netty实现客户端连接超时很简单:

b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);

这个值绑定到Channel,可以为不同Channel设置不同的超时时间。

如果客户端连接超时,netty提供了ReadTimeoutHandler超时处理器,如果超时,会抛出ReadTimeoutException,可以扩展这个handler实现超时处理逻辑。

对于客户端读超时(read timeout),自己实现也比较简单。

// 客户端异步发送请求
cxt. channel.writeAndFlush(requestFuture);
// 所有请求用一个hash时间轮定时器
StaticHashedWheelTime.get().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            channel.eventLoop().submit(() -> {
                // 请求Future队列中移出,不再关心结果
                REQUEST_MAP.remove(requestFuture.getRequestId);
                // 构造错误返回
                Response errResult = exceptionResponse(ErrorStatusEnum.READ_TIMEOUT);
                // promis,通知Future完成
                listenableFuture.set(errResult);
            });
        }
    }, 3, TimeUnit.SECONDS);
  1. 异步发送请求后,马上注册一个定时任务到时间轮定时器
  2. 超时后:定时器执行,根据RequestId清理请求Future缓存,设置Future完成
  3. 客户端Future自动获取异常消息

这里用到了时间轮定时器HashedWheelTimer,由netty提供,可以处理大量定时任务。原理可以参考apache-kafka-purgatory-hierarchical-timing-wheels

这个定时器使用场景很多,包括后续客户端发送心跳,服务端的tcp连接验活等都能用它实现。

心跳

心跳的请求放回(ping-pong)可以和普通请求区别开来,在协议通过专门标识识别。netty实现较简单,注册一个IdleStateHandler到pipline中,他会在Channel空闲时按时触发IdleStateEvent:

ch.pipeline().addLast("idle", new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS));

让后客户端注册一个HeartBeatClientHandler处理IdleStateEvent:

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
    // 如果有IdleStateEvent,则发送心跳请求
    if (evt instanceof IdleStateEvent) {
        currentTime++;
        RpcContext rpcContext = new RpcContext();
        rpcContext.setType(RequestType.HEARTBEAT_REQUEST.getType());
        ctx.channel().writeAndFlush(rpcContext)
           // 发送失败关闭连接
           .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        log.info("发送心跳:" + new Date());
    }
}

服务端只需要判断是心跳请求,直接放回即可。另外,服务端对一直没检测到心跳的连接,需要清理掉,原理和客户端类型,通过IdleStateEvent完成。

 @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent event = (IdleStateEvent)evt;
        if (event.state() == IdleState.READER_IDLE) {
            loss_connect_time++;
            log.info("5 秒没有接收到客户端的信息了");
            if (loss_connect_time > 2) {
                log.info("关闭这个不活跃的channel");
                ctx.channel().close();
            }
        }
    } else {
        super.userEventTriggered(ctx, evt);
    }
}

https://segmentfault.com/a/1190000006931568
http://www.infoq.com/cn/articles/netty-reliability

CONTENTS