RPC系列
rpc协议
rpc涉及的网络传输协议,有两种选择:公有协议、私有协议。
公有协议如grpc使用http2,公有协议因为要满足大多数使用场景,协议包含很多东西,而且像http header使用使用键值对的文本编码,使得报文包太大,自定义的好处就是灵活满足rpc需求即可,可以极大减少包大小。本文主要讨论私有协议。 一般都是消息头和消息体构成:
名称 | 类型 | 长度 | 含义 | 备注 |
---|---|---|---|---|
hader | header | 不定 | 消息头 | |
body | byte[] | 不定 | 消息体 |
消息头
消息头主要存放协议公共字段和扩展字段。
名字 | 字节数 | 含义 | 备注 |
---|---|---|---|
verion | 1 | 版本 | 为了以后协议更改的兼容性考虑 |
type | 1 | 消息类型 | 通过0,1,2,3..表示不同消息类型 |
serializer | 1 | 序列化方式 | 头信息必须包含序列化方式,否则没法获取消息体 |
requestId | 8 | 请求ID | Long类型的请求Id,用于标识请求的唯一性 |
ext | 4 | 扩展字段 | 默认都为0,方便扩展时为其定义指定含义 |
bodylength | 4 | body字节数 | 用于指定长度读取消息体 |
可以在前面添加协议标识,方便代理或者抓包软件判断 可以把超时时间、接口、方法等放到header,减少包大小。
消息体
消息体主要存放消息实体,为了扩展性,可以封装一个成标准对象。
public void Request{
private String interfaceName;
private String methodName;
private Object[] args;
}
##协议编码解码 网络节点间的传输通过字节,如何把字节转为目标节点的数据格式,和从节点数据格式转为传输字节,就是编码器和解码器改干的活。
如果上面提到的rpc协议用java数据格式来描述,如下:
public class RequestPacket{
private byte version;
private byte type;
private byte serializer;
private long requestId;
private byte[] ext;
private int bodylength;
private byte[] body;
}
netty提供了字节容器ByteBuf,灵活易用的api方面我们实现编解码的处理,于是,我们编码的主要工作就变为:
- 编码:Pocket—>ByteBuf
- 解码: ByteBuf–>Pocket
netty 编码器
netty提供了抽象类MessageToByteEncoder,继承它并实现encode()方法,它会把ByteBuf转发给ChannelPipeline的下一个出站处理器。
public class Encoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
if (msg instanceof RequestPacket) {
RequestPacket requestPacket = (RequestPacket) msg;
// 分配buf:容量= Body+头
ByteBuf byteBuf = ctx.alloc().buffer(ProtocolConstants.HEADER_LENGTH + body.length);
// header
byteBuf.writeByte(requestPacket.getVersion());
byteBuf.writeByte(requestPacket.getType());
byteBuf.writeByte(requestPacket.getSerializer());
byteBuf.writeLong(requestPacket.getRequestId());
for (int i = 0; i < ProtocolConstants.HEADER_EXT_SIZE; i++) {
// 扩展字段,默认0
byteBuf.writeByte(0);
}
byteBuf.writeInt(body.length);
// body
byteBuf.writeBytes(body);
out.writeBytes(byteBuf);
}
}
}
这就是一个客户端请求报格式写入byteBuf的示例,netty会指定把byteBuf中的数据写入通道。
因为netty灵活的链式处理,编码器可以有多个。比如上游可以继承MessageToMessageEncoder
,把消息添加到pipeline。
public class Encoder extends MessageToMessageEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {
// msg处理
out.add(msg);// 再交给MessageToByteEncoder处理
}
}
同样,服务端处理返回包也是类似,只是RequestPacket换成了ResponsePacket
netty解码器
解码刚好和编码反过来。常规使用的抽象类:
- ByteToMessageDecoder
- MessageToMessageDecoder
实现decode方法,读取ByteBuf中的字节,映射到对应实体。
不过解码器的实现有很多需要注意的细节,比如TCP粘包拆包。
TCP粘包拆包 tcp套字节缓存区有大小,于是可能会有3情况:
- 一个tcp数据包就是一条完整的消息
- 一条消息大于缓冲区大小,被拆成2个甚至更多的包
- 一个数据包包含几条消息
ByteToMessageDecoder会循环读取数据并调用子类实现的decode方法。所以我们面对的其实就是ByteBuf,它可能包含多个完整消息,也可能是一个不完整的消息(半包)。下面看看这么在覆写的decode方法中实现读取网站消息:
public class Decoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < ProtocolConstants.HEADER_LENGTH) {
// **** 可读字节数连header长度都不到,不处理
return;
}
// 1. 获取读索引
int readerIndex = in.readerIndex();
// 2. 读header
byte version = in.getByte(readerIndex);
byte type = in.getByte(readerIndex + 1);
byte serializer = in.getByte(readerIndex + 2);
Long requestId = in.getLong(readerIndex + 3);
int bodyLength = in.getInt(readerIndex + 15);
int pocketLength = ProtocolConstants.HEADER_LENGTH + bodyLength;
if (pocketLength > in.readableBytes()) {
// **** 不是正常请求应答消息,可能是半包,等待完整后处理
return;
}
// 3. 读body
byte[] body = new byte[bodyLength];
in.getBytes(readerIndex + ProtocolConstants.HEADER_LENGTH, body);
// 4. 通过header和body构造RequestPocket
// 5. 把RequestPocket交给后续的handlers处理
out.add(requestPocket);
// 6. 此消息读取结束,修改byteBuf读索引
in.skipBytes(pocketLength);
// 7. 如果还有可读消息,继续解析下一个消息
if (in.readableBytes() > ProtocolConstants.HEADER_LENGTH) {
decode(ctx, in, out);
}
}
}
上面伪代码基本表达了一个解码器的工作:
- 读ByteBuf中的数据使用getXX()方法,方便半包不处理时不改变readerIndex。但是如果接收到完整消息后,一定记得skipBytes(pocketLength),否则每次都从buf的0位置重复读。
- 通过in.readableBytes()获取可读数据大小,用来作半包判断。
- 有可能读完一个消息后,数据包中还有另一个消息,注释7下代码决定是否继续解析。