从零实现RPC之传输协议与拆包粘包 | Enplee's blog
0%

从零实现RPC之传输协议与拆包粘包

在心跳机制中提到,RPC框架中是维护了一个长连接的,多个请求包会复用这个全双工的长连接。这种情况下就会先拆包与粘包的问题。下面将对拆包和粘包问题进行详细的介绍。

拆包与粘包

  • 什么是拆包和粘包、产生原因

TCP协议是面向字节流的,对于应用层的数据,TCP协议没有应用层级别的数据包概念,他只会根据缓冲区、窗口大小等,将这个二进制流分割成若干个包进行可靠传输。那么带来的后果就是,如果应用层不能预先指定好相应的协议,在read的时候,就可能一次read到多个包粘连在一块(粘包),也可能读取到不完整的包(拆包)。

  • 如何解决拆包与粘包

TCP协议是无法理解和感知道上层的业务逻辑,所以在TCP层是无法保证数据包不被拆分和重组的。所以,只能通过应用层协议栈来解决,目前有以下几种解决策略:

  • 固定消息的长度,读取到了一定的长度才认为是一个完成的消息。不足就缓存和之后的拼接。
  • 将回车作为消息结束符
  • 使用特使的消息结束符
  • 长度不固定的消息,通过在消息头中定义长度字段辅助解析

Netty框架中提供了上述四种方案对应的实现:

1
2
3
4
public class FixedLengthFrameDecoder extends ByteToMessageDecoder
public class LineBasedFrameDecoder extends ByteToMessageDecoder
public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder

这四个类统一继承与ByteToMessageDecoder。负责将二进制流解析成明消息。

在RPC协议中,显然请求不可能是固定长度的,而使用特定字符又不够灵活,存在限制。所以,使用基于消息头长度字段的方式,来进行包的解析。

LengthFieldBasedFrameDecoder的构造函数如下:

1
2
3
4
5
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip
}
  • maxFrameLength:实现约定帧的最大长度。
  • lengthFieldOffset: 消息包中,长度域所在的指针偏移量。
  • lengthFieldLength: 长度域的长度,多少字节
  • lengthAdjustment: 指针需要调整的长度
  • initialBytesToStrip: 舍弃的长度

整个机制的原理就是,首先定位到长度域其实点,根据长度读取并解析消息长度,将指针偏移会开始位置,然后读取指定长度的字节。余下的信息会进行缓存和后续的流拼接进行重新读取。

RPC传输协议Frame设计

Frame结构参考HTTP协议,可以设计为消息头+消息体,消息头中包含了请求的基本和必要信息。比如,无论请求体是request还是response,都需要序列化和压缩流程,那么在消息头中就要包含是哪种序列化方式和压缩方式。目前的必要信息有: 版本号、长度、压缩方式、序列化方式、requestId、消息类型。所以整个消息结构设计为如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 * <pre>
* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
* +-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+----- --+-----+-----+-------+
* | magic code |version | full length | messageType| codec|compress| RequestId |
* +-----------------------+--------+---------------------+-----------+-----------+-----------+------------+
* | |
* | body |
* | |
* | ... ... |
* +-------------------------------------------------------------------------------------------------------+
* 4B magic code(魔法数) 1B version(版本) 4B full length(消息长度) 1B messageType(消息类型)
* 1B compress(压缩类型) 1B codec(序列化类型) 4B requestId(请求的Id)
* body(object类型数据)
* </pre>

public class RpcMessage {
// message type: HEARTBEAT_REQUEST_TYPE HEARTBEAT_RESPONCE_TYPE NORMAL_TYPE
private byte messageType;
// serialization type
private byte codec;
// compress type
private byte compress;
// request ID
private int requestId;
// message Body (RpcReq,RpcResp)
private Object body;
}

其中,magic_code参考java的”cafebaby”,定义的是”lrpc”,长度为4byte。full_length定义为4个字节。所以,根据协议Frame的设计,我们可以通过LengthFieldBasedFrameDecoder提供的方式来解析Frame:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class RpcMessageDecoder extends LengthFieldBasedFrameDecoder {
public RpcMessageDecoder(){
/**
* lengthFieldOffset = magic code (4B) + version(1B) = 5B
* lengthFieldLength = full lenth is 4B
* lengthAdjustment = start offset = -4B -5B
* initialBytesToStrip = we need magic code and version,so do not need trip. 0B
*/
this(RpcConstants.MAX_FRAME_LENGTH,5,4,-9,0);
}
public RpcMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
}

自己定义了RpcMessageDecoder继承自LengthFieldBasedFrameDecoder,并且构造方式实际上是调用的父类的构造方法。之后,我们需要重写LengthFieldBasedFrameDecoder的decode方法。在decode方法中,我们首先调用父类的方法,将frame提取出来,之后按照RpcMessage的格式将Frame解码成对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
log.info("ByteBuf in size {}",in);
Object decoded = super.decode(ctx, in);
if(decoded instanceof ByteBuf) {
ByteBuf frame = (ByteBuf) decoded;
if(frame.readableBytes() >= RpcConstants.TOTAL_LENGTH) {
try {
return decodeFrame(frame);
}catch (Exception e){
log.error("Decode frame error!",e);
}finally {
frame.release();
}
}
}
return decoded;
}

frame封装成RpcMessage的过程,主要是在decodeFrame方法中实现。主要实现过程是,首先检查MagicNumber和Version,检查通过,将消息头中的字段提取出来,根据MessageType分别处理: HEARTBEAT_REQUEST_TYPE、HEARTBEAT_RESPONCE_TYPE和正常反序列化和解压缩body对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 首先检查版本和MagicNumber 
private void checkVersion(ByteBuf in) {
byte version = in.readByte();
if (version != RpcConstants.VERSION){
throw new RuntimeException("version isn`t compatible" + version);
}
}

private void checkMagicNumber(ByteBuf in) {
int len = RpcConstants.MAGIC_NUMBER.length;
byte[] read = new byte[len];
in.readBytes(read);
for(int i=0;i<len;i++){
if(read[i] != RpcConstants.MAGIC_NUMBER[i]) {
throw new IllegalArgumentException("Unknow magic code" + Arrays.toString(read));
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 之后解析消息头,并解压缩和发序列化body,最后封装成RpcMessage,向PipeLine下游传导。    
private Object decodeFrame(ByteBuf in) {
checkMagicNumber(in);
checkVersion(in); // offset -> 5
int fullLength = in.readInt();
byte messageType = in.readByte();
byte codec = in.readByte();
byte compress = in.readByte();
int requestId = in.readInt();
RpcMessage rpcMessage = RpcMessage.builder().messageType(messageType).codec(codec).compress(compress).requestId(requestId).build();

if(messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE){
rpcMessage.setBody(RpcConstants.PING);
return rpcMessage;
}
if(messageType == RpcConstants.HEARTBEAT_RESPONCE_TYPE){
rpcMessage.setBody(RpcConstants.PONG);
return rpcMessage;
}

int bodyLength = fullLength-RpcConstants.MAX_HEAD_LENGTH;
if(bodyLength > 0) {
byte[] bytes = new byte[bodyLength];
in.readBytes(bytes);
//TODO: 使用SPI机制,让接口和实现解耦
//decompress the bytes accoding to the compressType
Compressor compressor = new GzipCompressor();
byte[] decompress = compressor.decompress(bytes);
//deserialize the bytes accoding to the codec
Serializer serializer = new ProtoStuffSerializer();
if(messageType == RpcConstants.REQUEST_TYPE){
RpcRequest rpcRequest = serializer.deSerialize(decompress, RpcRequest.class);
rpcMessage.setBody(rpcRequest);
}
if(messageType == RpcConstants.RESPONCE_TYPE){
RpcResponce rpcResponce = serializer.deSerialize(decompress, RpcResponce.class);
rpcMessage.setBody(rpcResponce);
}
}
return rpcMessage;
}

到此,整个Rpc框架的Frame设计和粘包拆包处理已经完成了。

-------------本文结束感谢您的阅读-------------