RPC三-rpc协议和编解码

RPC系列
1. RPC一-线程模型
2. RPC二-NettyHandler处理消息
3. RPC三-rpc协议和编解码
4. RPC四-netty异步双向责任链
5. RPC五-可靠性设计
6. RPC六-动态代理
7. 服务发现-注册中心设计

rpc协议

rpc涉及的网络传输协议,有两种选择:公有协议、私有协议。

公有协议如grpc使用http2,公有协议因为要满足大多数使用场景,协议包含很多东西,而且像http header使用使用键值对的文本编码,使得报文包太大,自定义的好处就是灵活满足rpc需求即可,可以极大减少包大小。本文主要讨论私有协议。
一般都是消息头和消息体构成:

名称 类型 长度 含义 备注
hader header 不定 消息头
body byte[] 不定 消息体

消息头

消息头主要存放协议公共字段和扩展字段。

名字 字节数 含义 备注
verion 1 版本 为了以后协议更改的兼容性考虑
type 1 消息类型 通过0,1,2,3..表示不同消息类型
serializer 1 序列化方式 头信息必须包含序列化方式,否则没法获取消息体
requestId 8 请求ID Long类型的请求Id,用于标识请求的唯一性
ext 4 扩展字段 默认都为0,方便扩展时为其定义指定含义
bodylength 4 body字节数 用于指定长度读取消息体

可以在前面添加协议标识,方便代理或者抓包软件判断
可以把超时时间、接口、方法等放到header,减少包大小。

消息体

消息体主要存放消息实体,为了扩展性,可以封装一个成标准对象。

public void Request{
    private String interfaceName;
    private String methodName;
    private Object[] args;
}

##协议编码解码
网络节点间的传输通过字节,如何把字节转为目标节点的数据格式,和从节点数据格式转为传输字节,就是编码器和解码器改干的活。

如果上面提到的rpc协议用java数据格式来描述,如下:

public class RequestPacket{
    private byte version;
    private byte type;
    private byte serializer;
    private long requestId;
    private byte[] ext;
    private int bodylength;
    private byte[] body;
}

netty提供了字节容器ByteBuf,灵活易用的api方面我们实现编解码的处理,于是,我们编码的主要工作就变为:
* 编码:Pocket—>ByteBuf
* 解码: ByteBuf–>Pocket

netty 编码器

netty提供了抽象类MessageToByteEncoder,继承它并实现encode()方法,它会把ByteBuf转发给ChannelPipeline的下一个出站处理器。

public class Encoder extends MessageToByteEncoder {
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
        if (msg instanceof RequestPacket) {
            RequestPacket requestPacket = (RequestPacket) msg;
            // 分配buf:容量= Body+头
            ByteBuf byteBuf = ctx.alloc().buffer(ProtocolConstants.HEADER_LENGTH + body.length);
            // header
            byteBuf.writeByte(requestPacket.getVersion());
            byteBuf.writeByte(requestPacket.getType());
            byteBuf.writeByte(requestPacket.getSerializer());
            byteBuf.writeLong(requestPacket.getRequestId());
            for (int i = 0; i < ProtocolConstants.HEADER_EXT_SIZE; i++) {
                // 扩展字段,默认0
                byteBuf.writeByte(0);
            }
            byteBuf.writeInt(body.length);

            // body
            byteBuf.writeBytes(body);
            out.writeBytes(byteBuf);
        }
    }
}

这就是一个客户端请求报格式写入byteBuf的示例,netty会指定把byteBuf中的数据写入通道。

因为netty灵活的链式处理,编码器可以有多个。比如上游可以继承MessageToMessageEncoder,把消息添加到pipeline。

public class Encoder extends MessageToMessageEncoder {
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {
        // msg处理
        out.add(msg);// 再交给MessageToByteEncoder处理
    }
}

同样,服务端处理返回包也是类似,只是RequestPacket换成了ResponsePacket

netty解码器

解码刚好和编码反过来。常规使用的抽象类:
* ByteToMessageDecoder
* MessageToMessageDecoder

实现decode方法,读取ByteBuf中的字节,映射到对应实体。

不过解码器的实现有很多需要注意的细节,比如TCP粘包拆包。

TCP粘包拆包
tcp套字节缓存区有大小,于是可能会有3情况:
* 一个tcp数据包就是一条完整的消息
* 一条消息大于缓冲区大小,被拆成2个甚至更多的包
* 一个数据包包含几条消息

ByteToMessageDecoder会循环读取数据并调用子类实现的decode方法。所以我们面对的其实就是ByteBuf,它可能包含多个完整消息,也可能是一个不完整的消息(半包)。下面看看这么在覆写的decode方法中实现读取网站消息:

public class Decoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() < ProtocolConstants.HEADER_LENGTH) {
            // **** 可读字节数连header长度都不到,不处理
            return;
        }
        // 1. 获取读索引
        int readerIndex = in.readerIndex();

        // 2. 读header
        byte version = in.getByte(readerIndex);
        byte type = in.getByte(readerIndex + 1);
        byte serializer = in.getByte(readerIndex + 2);
        Long requestId = in.getLong(readerIndex + 3);
        int bodyLength = in.getInt(readerIndex + 15);
        int pocketLength = ProtocolConstants.HEADER_LENGTH + bodyLength;
        if (pocketLength > in.readableBytes()) {
            //  **** 不是正常请求应答消息,可能是半包,等待完整后处理
            return;
        }
        // 3. 读body
        byte[] body = new byte[bodyLength];
        in.getBytes(readerIndex + ProtocolConstants.HEADER_LENGTH, body);
        // 4. 通过header和body构造RequestPocket
        // 5. 把RequestPocket交给后续的handlers处理
        out.add(requestPocket);
        // 6. 此消息读取结束,修改byteBuf读索引
        in.skipBytes(pocketLength);
        
        // 7. 如果还有可读消息,继续解析下一个消息
        if (in.readableBytes() > ProtocolConstants.HEADER_LENGTH) {
            decode(ctx, in, out);
        }
    }
}

上面伪代码基本表达了一个解码器的工作:
1. 读ByteBuf中的数据使用getXX()方法,方便半包不处理时不改变readerIndex。但是如果接收到完整消息后,一定记得skipBytes(pocketLength),否则每次都从buf的0位置重复读。
2. 通过in.readableBytes()获取可读数据大小,用来作半包判断。
3. 有可能读完一个消息后,数据包中还有另一个消息,注释7下代码决定是否继续解析。


https://my.oschina.net/andylucc/blog/625315

CONTENTS