从零实现RPC之设计心跳机制 | Enplee's blog
0%

从零实现RPC之设计心跳机制

RPC协议是应用层的通讯协议,在client与server建立起连接的时候,一般client会进行多次的远程调用,我们希望能保持这个连接,而如何保持这个连接,就需要心跳机制。

什么是心跳机制

顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性.

心跳机制确定了连接双发的存活,保证了连接的有效性。

如何实现心跳

  • 网络的传输层TCP协议实现了Keepalive机制
  • 在应用层自定义RPC自己的心跳机制

基于TCP层面的心跳机制,虽然可以复用,但是存在以下的缺点:

  • 不是TCP的标准协议,默认是关闭的
  • TCP的keepalive依赖于操作系统的实现,默认是两个小时,修改需要修改系统配置,不够灵活。

所以了解的大多数RPC协议都实现了自定义的心跳机制,LeezyRPC是基于Netty实现的网络传输,Netty框架同时提供了开启基于TCP的keepalive心跳机制和自定义的应用层心跳机制。

使用Netty实现RPC心跳机制

netty默认支持开启TCP的keepalive机制:

1
2
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.TCP_NODELAY,true);

netty同时为开发者提供了自定义心跳机制的实现方式,最为关键的就是IdleStateHandler。顾名思义,是用来处理空闲状态的类,可以添加到pipeline中,构造IdleStateHandler需要传入四个参数:

1
2
3
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
  • readerIdleTimeSeconds, 读超时. 即当在指定的事件间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.
  • writerIdleTimeSeconds, 写超时. 即当在指定的事件间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件
  • allIdleTimeSeconds, 读/写超时. 即当在指定的事件间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.

以上超时时间,会触发userEventTrigger()方法的调用,我们在userEventTrigger()方法中就可以实现对心跳包的处理。

心跳逻辑设计

client作为请求的发起方,应该去主动向server发送心跳包去维护连接,作为server一方,执行逻辑简单,只需要监听channel中的读事件,如果读事件超时,说明client已经放弃了连接,那么就关闭链接。

1
2
3
4
5
6
7
8
9
10
// pipeline add IdleStateHanlder 关注channel中的read事件,30s认为超时。
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
}
});

作为client一方,在正常发送调用请求之外,还要负责发送维护连接的心跳包。那么只需要监听channel中的写事件,如果定时没有调用请求,就发送一个心跳包。

1
2
3
4
5
6
7
8
9
10
// pipeline add IdleStateHanlder 关注channel中的write事件,5s一个心跳包。
Bootsrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.hanlder(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new IdleStateHandler(0,5,0, TimeUnit.SECONDS));
}
})

Server一方处理心跳包和正常调用包都在RpcServerHandler中,对于正常包正常进行解析调用流程,心跳包返回一个回复心跳包(Ping-Pong)不做处理。userEventTrigger()函数被触发的时候,说明发生了读超时,关闭链接即可: (优化心跳包机制,server无需回复,减少网络读写)

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleState state = ((IdleStateEvent)evt).state();
if(state == IdleState.READER_IDLE){
log.info("idle check happen,so clse the connection");
ctx.close();
}
}else {
super.userEventTriggered(ctx,evt);
}
}

Client一方5s没有正常调用事件之后,就发送心跳包维持连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleState idleState = ((IdleStateEvent)evt).state();
if(idleState == IdleState.WRITER_IDLE){
log.info("write idle happed [{}]",ctx.channel().remoteAddress());
Channel channel = rpcNettyClient.getChannel((InetSocketAddress) ctx.channel().remoteAddress());
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(SerializationTypeEnum.PROTOSTUFF.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
rpcMessage.setMessageType(RpcConstants.HEARTBEAT_REQUEST_TYPE);
rpcMessage.setBody(RpcConstants.PING);
channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx,evt);
}
}
-------------本文结束感谢您的阅读-------------