在心跳机制中提到,RPC框架中是维护了一个长连接的,多个请求包会复用这个全双工的长连接。这种情况下就会先拆包与粘包的问题。下面将对拆包和粘包问题进行详细的介绍。
拆包与粘包
TCP协议是面向字节流的,对于应用层的数据,TCP协议没有应用层级别的数据包概念,他只会根据缓冲区、窗口大小等,将这个二进制流分割成若干个包进行可靠传输。那么带来的后果就是,如果应用层不能预先指定好相应的协议,在read的时候,就可能一次read到多个包粘连在一块(粘包),也可能读取到不完整的包(拆包)。
TCP协议是无法理解和感知道上层的业务逻辑,所以在TCP层是无法保证数据包不被拆分和重组的。所以,只能通过应用层协议栈来解决,目前有以下几种解决策略:
- 固定消息的长度,读取到了一定的长度才认为是一个完成的消息。不足就缓存和之后的拼接。
- 将回车作为消息结束符
- 使用特使的消息结束符
- 长度不固定的消息,通过在消息头中定义长度字段辅助解析
Netty框架中提供了上述四种方案对应的实现:
1 2 3 4
| public class FixedLengthFrameDecoder extends ByteToMessageDecoder public class LineBasedFrameDecoder extends ByteToMessageDecoder public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder
|
这四个类统一继承与ByteToMessageDecoder。负责将二进制流解析成明消息。
在RPC协议中,显然请求不可能是固定长度的,而使用特定字符又不够灵活,存在限制。所以,使用基于消息头长度字段的方式,来进行包的解析。
LengthFieldBasedFrameDecoder的构造函数如下:
1 2 3 4 5
| public LengthFieldBasedFrameDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip }
|
- maxFrameLength:实现约定帧的最大长度。
- lengthFieldOffset: 消息包中,长度域所在的指针偏移量。
- lengthFieldLength: 长度域的长度,多少字节
- lengthAdjustment: 指针需要调整的长度
- initialBytesToStrip: 舍弃的长度
整个机制的原理就是,首先定位到长度域其实点,根据长度读取并解析消息长度,将指针偏移会开始位置,然后读取指定长度的字节。余下的信息会进行缓存和后续的流拼接进行重新读取。
RPC传输协议Frame设计
Frame结构参考HTTP协议,可以设计为消息头+消息体,消息头中包含了请求的基本和必要信息。比如,无论请求体是request还是response,都需要序列化和压缩流程,那么在消息头中就要包含是哪种序列化方式和压缩方式。目前的必要信息有: 版本号、长度、压缩方式、序列化方式、requestId、消息类型。所以整个消息结构设计为如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| * <pre> * 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 * +-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+----- --+-----+-----+-------+ * | magic code |version | full length | messageType| codec|compress| RequestId | * +-----------------------+--------+---------------------+-----------+-----------+-----------+------------+ * | | * | body | * | | * | ... ... | * +-------------------------------------------------------------------------------------------------------+ * 4B magic code(魔法数) 1B version(版本) 4B full length(消息长度) 1B messageType(消息类型) * 1B compress(压缩类型) 1B codec(序列化类型) 4B requestId(请求的Id) * body(object类型数据) * </pre> public class RpcMessage { private byte messageType; private byte codec; private byte compress; private int requestId; private Object body; }
|
其中,magic_code参考java的”cafebaby”,定义的是”lrpc”,长度为4byte。full_length定义为4个字节。所以,根据协议Frame的设计,我们可以通过LengthFieldBasedFrameDecoder提供的方式来解析Frame:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class RpcMessageDecoder extends LengthFieldBasedFrameDecoder { public RpcMessageDecoder(){
this(RpcConstants.MAX_FRAME_LENGTH,5,4,-9,0); } public RpcMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip); } }
|
自己定义了RpcMessageDecoder继承自LengthFieldBasedFrameDecoder,并且构造方式实际上是调用的父类的构造方法。之后,我们需要重写LengthFieldBasedFrameDecoder的decode方法。在decode方法中,我们首先调用父类的方法,将frame提取出来,之后按照RpcMessage的格式将Frame解码成对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { log.info("ByteBuf in size {}",in); Object decoded = super.decode(ctx, in); if(decoded instanceof ByteBuf) { ByteBuf frame = (ByteBuf) decoded; if(frame.readableBytes() >= RpcConstants.TOTAL_LENGTH) { try { return decodeFrame(frame); }catch (Exception e){ log.error("Decode frame error!",e); }finally { frame.release(); } } } return decoded; }
|
frame封装成RpcMessage的过程,主要是在decodeFrame方法中实现。主要实现过程是,首先检查MagicNumber和Version,检查通过,将消息头中的字段提取出来,根据MessageType分别处理: HEARTBEAT_REQUEST_TYPE、HEARTBEAT_RESPONCE_TYPE和正常反序列化和解压缩body对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| private void checkVersion(ByteBuf in) { byte version = in.readByte(); if (version != RpcConstants.VERSION){ throw new RuntimeException("version isn`t compatible" + version); } } private void checkMagicNumber(ByteBuf in) { int len = RpcConstants.MAGIC_NUMBER.length; byte[] read = new byte[len]; in.readBytes(read); for(int i=0;i<len;i++){ if(read[i] != RpcConstants.MAGIC_NUMBER[i]) { throw new IllegalArgumentException("Unknow magic code" + Arrays.toString(read)); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| private Object decodeFrame(ByteBuf in) { checkMagicNumber(in); checkVersion(in); int fullLength = in.readInt(); byte messageType = in.readByte(); byte codec = in.readByte(); byte compress = in.readByte(); int requestId = in.readInt(); RpcMessage rpcMessage = RpcMessage.builder().messageType(messageType).codec(codec).compress(compress).requestId(requestId).build();
if(messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE){ rpcMessage.setBody(RpcConstants.PING); return rpcMessage; } if(messageType == RpcConstants.HEARTBEAT_RESPONCE_TYPE){ rpcMessage.setBody(RpcConstants.PONG); return rpcMessage; }
int bodyLength = fullLength-RpcConstants.MAX_HEAD_LENGTH; if(bodyLength > 0) { byte[] bytes = new byte[bodyLength]; in.readBytes(bytes); Compressor compressor = new GzipCompressor(); byte[] decompress = compressor.decompress(bytes); Serializer serializer = new ProtoStuffSerializer(); if(messageType == RpcConstants.REQUEST_TYPE){ RpcRequest rpcRequest = serializer.deSerialize(decompress, RpcRequest.class); rpcMessage.setBody(rpcRequest); } if(messageType == RpcConstants.RESPONCE_TYPE){ RpcResponce rpcResponce = serializer.deSerialize(decompress, RpcResponce.class); rpcMessage.setBody(rpcResponce); } } return rpcMessage; }
|
到此,整个Rpc框架的Frame设计和粘包拆包处理已经完成了。