RPC协议是应用层的通讯协议,在client与server建立起连接的时候,一般client会进行多次的远程调用,我们希望能保持这个连接,而如何保持这个连接,就需要心跳机制。
什么是心跳机制
顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性.
心跳机制确定了连接双发的存活,保证了连接的有效性。
如何实现心跳
- 网络的传输层TCP协议实现了Keepalive机制
- 在应用层自定义RPC自己的心跳机制
基于TCP层面的心跳机制,虽然可以复用,但是存在以下的缺点:
- 不是TCP的标准协议,默认是关闭的
- TCP的keepalive依赖于操作系统的实现,默认是两个小时,修改需要修改系统配置,不够灵活。
所以了解的大多数RPC协议都实现了自定义的心跳机制,LeezyRPC是基于Netty实现的网络传输,Netty框架同时提供了开启基于TCP的keepalive心跳机制和自定义的应用层心跳机制。
使用Netty实现RPC心跳机制
netty默认支持开启TCP的keepalive机制:
1 2
| ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.childOption(ChannelOption.TCP_NODELAY,true);
|
netty同时为开发者提供了自定义心跳机制的实现方式,最为关键的就是IdleStateHandler。顾名思义,是用来处理空闲状态的类,可以添加到pipeline中,构造IdleStateHandler需要传入四个参数:
1 2 3
| public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS); }
|
- readerIdleTimeSeconds, 读超时. 即当在指定的事件间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.
- writerIdleTimeSeconds, 写超时. 即当在指定的事件间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件
- allIdleTimeSeconds, 读/写超时. 即当在指定的事件间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.
以上超时时间,会触发userEventTrigger()方法的调用,我们在userEventTrigger()方法中就可以实现对心跳包的处理。
心跳逻辑设计
client作为请求的发起方,应该去主动向server发送心跳包去维护连接,作为server一方,执行逻辑简单,只需要监听channel中的读事件,如果读事件超时,说明client已经放弃了连接,那么就关闭链接。
1 2 3 4 5 6 7 8 9 10
| ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS)); } });
|
作为client一方,在正常发送调用请求之外,还要负责发送维护连接的心跳包。那么只需要监听channel中的写事件,如果定时没有调用请求,就发送一个心跳包。
1 2 3 4 5 6 7 8 9 10
| Bootsrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .hanlder(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new IdleStateHandler(0,5,0, TimeUnit.SECONDS)); } })
|
Server一方处理心跳包和正常调用包都在RpcServerHandler中,对于正常包正常进行解析调用流程,心跳包返回一个回复心跳包(Ping-Pong)不做处理。userEventTrigger()函数被触发的时候,说明发生了读超时,关闭链接即可: (优化心跳包机制,server无需回复,减少网络读写)
1 2 3 4 5 6 7 8 9 10 11 12
| @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent){ IdleState state = ((IdleStateEvent)evt).state(); if(state == IdleState.READER_IDLE){ log.info("idle check happen,so clse the connection"); ctx.close(); } }else { super.userEventTriggered(ctx,evt); } }
|
Client一方5s没有正常调用事件之后,就发送心跳包维持连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent){ IdleState idleState = ((IdleStateEvent)evt).state(); if(idleState == IdleState.WRITER_IDLE){ log.info("write idle happed [{}]",ctx.channel().remoteAddress()); Channel channel = rpcNettyClient.getChannel((InetSocketAddress) ctx.channel().remoteAddress()); RpcMessage rpcMessage = new RpcMessage(); rpcMessage.setCodec(SerializationTypeEnum.PROTOSTUFF.getCode()); rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode()); rpcMessage.setMessageType(RpcConstants.HEARTBEAT_REQUEST_TYPE); rpcMessage.setBody(RpcConstants.PING); channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } else { super.userEventTriggered(ctx,evt); } }
|