【剖析 | SOFARPC 框架】之SOFARPC 连接管理与心跳剖析

简介: 本文介绍连接管理的策略和SOFARPC中连接管理与心跳机制的实现,希望通过这篇文章,大家对此有一个了解,如果对其中有疑问的,也欢迎留言与我们讨论。

前言

在 RPC 调用过程中,我们经常会和多个服务端进行远程调用,如果在每次调用的时候,都进行 TCP连接,会对 RPC的性能有比较大的影响,因此,实际的场景中,我们经常要对连接进行管理和保持。

SOFARPC应用心跳包以及断线重连实现,结合系统tcp-keepalive机制,来实现对RPC连接的管理和保持。

连接管理

首先我们将会介绍连接管理相关的一些背景知识。

长连接和短连接

短连接,一般是指客户端向服务端发起连接请求。连接建立后,发送数据,接收服务端数据返回,然后触发连接断开,下次再重新重复以上过程。

长连接,则是在建立连接后,发送数据,接收数据,但是不主动断开,并且主动通过心跳等机制来维持这个连接可用,当再次有数据发送请求时,不需要进行建立连接的过程。

一般的,长连接多用于数据发送频繁,点对点的通讯,因为每个TCP连接都需要进行握手,这是需要时间的,在一些跨城,或者长距离的情况下,如果每个操作都是先连接,再发送数据的话,那么业务处理速度会降低很多,所以每个操作完后都不断开,再次处理时直接发送数据包即可,节省再次建立连接的过程。

但是,客户端不主动断开,并不是说连接就不会断。因为系统设置原因,网络原因,网络设备防火墙,都可能导致连接断开。因此我们需要实现对长连接的管理。

TCP层keep-alive

tcp的keep-alive是什么

tcp-keepalive,顾名思义,它可以尽量让 TCP 连接“活着”,或者让一些对方无响应的 TCP 连接断开,

使用场景主要是:

  1. 一些特定环境,比如两个机器之间有防火墙,防火墙能维持的连接有限,可能会自动断开长期无活动的 TCP 连接。
  2. 还有就是客户端,断电重启,卡死等等,都会导致tcp连接无法释放。

这会导致:

一旦有热数据需要传递,若此时连接已经被中介设备断开,应用程序没有及时感知的话,那么就会导致在一个无效的数据链路层面发送业务数据,结果就是发送失败。

无论是因为客户端意外断电、死机、崩溃、重启,还是中间路由网络无故断开、NAT超时等,服务器端要做到快速感知失败,减少无效链接操作。

而 tcp-keepalive 机制可以在连接无活动一段时间后,发送一个空 ack,使 TCP 连接不会被防火墙关闭。

默认值

tcp-keepalive,操作系统内核支持,但是不默认开启,应用需要自行开启,开启之后有三个参数会生效,来决定一个keepalive的行为。

net.ipv4.tcp_keepalive_time = 7200
net.ipv4.tcp_keepalive_probes = 9
net.ipv4.tcp_keepalive_intvl = 75

可以通过如下命令查看系统tcp-keepalive参数配置。

sysctl -a | grep keepalive

cat /proc/sys/net/ipv4/tcp_keepalive_time

sysctl net.ipv4.tcp_keepalive_time

系统默认值可以通过这个查看。

tcp_keepalive_time,在TCP保活打开的情况下,最后一次数据交换到TCP发送第一个保活探测包的间隔,即允许的持续空闲时长,或者说每次正常发送心跳的周期,默认值为7200s(2h)。
tcp_keepalive_probes 在tcp_keepalive_time之后,没有接收到对方确认,继续发送保活探测包次数,默认值为9(次)。
tcp_keepalive_intvl,在tcp_keepalive_time之后,没有接收到对方确认,继续发送保活探测包的发送频率,默认值为75s。

这个不够直观,直接看下面这个图的说明


<div id="2y3gkp" data-type="image" data-display="block" data-align="" data-src="https://cdn.nlark.com/yuque/0/2018/png/156121/1535513238795-57f765f2-908b-4689-b0a7-b6aa3a3599f7.png" data-width="747">
  <img src="https://cdn.nlark.com/yuque/0/2018/png/156121/1535513238795-57f765f2-908b-4689-b0a7-b6aa3a3599f7.png" width="747" />
</div>


如何使用

应用层,以Java的Netty为例,服务端和客户端设置即可。

ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .childOption(ChannelOption.SO_KEEPALIVE, true)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(
                             new EchoServerHandler());
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(port).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();

就是这里面的ChannelOption.SO_KEEPALIVE, true 对应即可打开.

目前bolt中也是默认打开的.

 .childOption(ChannelOption.SO_KEEPALIVE,
                Boolean.parseBoolean(System.getProperty(Configs.TCP_SO_KEEPALIVE, "true")));

Java程序只能做到设置SO_KEEPALIVE选项,至于TCP_KEEPCNT,TCP_KEEPIDLE,TCP_KEEPINTVL等参数配置,只能依赖于sysctl配置,系统进行读取。

检查

查看tcp连接tcp_keepalive状态

我们可以用 `netstat -no|grep keepalive` 命令来查看当前哪些 tcp 连接开启了 tcp keepalive.


<div id="wiwvxz" data-type="image" data-display="block" data-align="left" data-src="https://cdn.nlark.com/yuque/0/2018/png/156121/1534919046315-385e6336-a8c7-43af-af63-0aa7a29405c9.png" data-width="747">
  <img src="https://cdn.nlark.com/yuque/0/2018/png/156121/1534919046315-385e6336-a8c7-43af-af63-0aa7a29405c9.png" width="747" />
</div>


应用层keep-alive

应用层keep-alive方案,一般叫做心跳包,跟tcp-keepalive类似,心跳包就是用来及时监测是否断线的一种
机制,通过每间隔一定时间发送心跳数据,来检测对方是否连接,是属于应用程序协议的一部分。

心跳是什么

心跳想要实现的和tcp keep-alive是一样的。

由于连接丢失时,TCP不会立即通知应用程序。比如说,客户端程序断线了,服务端的TCP连接不会检测到断线,而是一直处于连接状态。这就带来了很大的麻烦,明明客户端已经断了,服务端还维护着客户端的连接,比如游戏的场景下,用户客户端都关机了,但是连接没有正常关闭,服务端无法知晓,还照常执行着该玩家的游戏逻辑。

听上去和tcp-alive类似,那为什么要有应用层心跳?

原因主要是默认的tcp keep-alive超时时间太长默认是7200秒,也就是2个小时。并且是系统级别,一旦更改,影响所有服务器上开启keep alive选项的应用行为。另外,socks proxy会让tcp keep-alive失效,
socks协议只管转发TCP层具体的数据包,而不会转发TCP协议内的实现细节的包(也做不到)。

所以,一个应用如果使用了socks代理,那么tcp keep-alive机制就失效了,所以应用要自己有心跳包。
socks proxy只是一个例子,真实的网络很复杂,可能会有各种原因让tcp keep-alive失效。

如何使用

基于netty开发的话,还是很简单的。这里不多做介绍,因为后面说到rpc中的连接管理的时候,会介绍。

应用层心跳还是Keep-Alive

默认情况下使用keepalive周期为2个小时,

系统keep-alive优势:

1.TCP协议层面保活探测机制,系统内核完全替上层应用自动给做好了。
2.内核层面计时器相比上层应用,更为高效。
3.上层应用只需要处理数据收发、连接异常通知即可。
4.数据包将更为紧凑。

应用keep-alive优势:

关闭TCP的keepalive,完全使用业务层面心跳保活机制。完全应用掌管心跳,灵活和可控,比如每一个连接心跳周期的可根据需要减少或延长。

1.应用的心跳包,具有更大的灵活性,可以自己控制检测的间隔,检测的方式等等。
2.心跳包同时适用于TCP和UDP,在切换TCP和UDP时,上层的心跳包功能都适用。
3.有些情况下,心跳包可以附带一些其他信息,定时在服务端和客户端之间同步。(比如帧数同步)

所以大多数情况采用业务心跳+TCP keepalive一起使用,互相作为补充。

SOFARPC如何实现

SOFABOLT基于系统tcp-keepalive机制实现

这个比较简单,直接打开 KeepAlive选项即可。

客户端

RpcConnectionFactory用于创建RPC连接,生成用户触发事件,init()方法初始化Bootstrap通过option()方法给每条连接设置TCP底层相关的属性,ChannelOption.SO_KEEPALIVE表示是否开启TCP底层心跳机制,默认打开SO_KEEPALIVE选项。

/**
 * Rpc connection factory, create rpc connections. And generate user triggered event.
 */
public class RpcConnectionFactory implements ConnectionFactory {
  public void init(final ConnectionEventHandler connectionEventHandler) {
    bootstrap = new Bootstrap();
    bootstrap.group(workerGroup).channel(NioSocketChannel.class)
        ...
        .option(ChannelOption.SO_KEEPALIVE, SystemProperties.tcp_so_keepalive());
    ...
  }
}

服务端

RpcServer服务端启动类ServerBootstrap初始化通过option()方法给每条连接设置TCP底层相关的属性,默认设置ChannelOption.SO_KEEPALIVE选项为true,即表示RPC连接开启TCP底层心跳机制。

/**
 * Server for Rpc.
 */
public class RpcServer extends RemotingServer {
  protected void doInit() {
    this.bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
       ...
       .childOption(ChannelOption.SO_KEEPALIVE, SystemProperties.tcp_so_keepalive());
    ...
  }
}

SOFABOLT基于Netty IdleStateHandler心跳实现

SOFABOLT基于 Netty的事件来实现心跳,Netty的 IdleStateHandler当连接的空闲时间(读或者写)太长时,将会触发一个IdleStateEvent事件,然后BOLT通过重写userEventTrigged方法来处理该事件。如果连接超过指定时间没有接收或者发送任何的数据即连接的空闲时间太长,IdleStateHandler使用IdleStateEvent事件调用fireUserEventTriggered()方法,当检测到IdleStateEvent事件执行发送心跳消息等业务逻辑。


<div id="ggbngd" data-type="image" data-display="block" data-align="" data-src="https://cdn.nlark.com/yuque/0/2018/png/156121/1534923163448-5354810e-a2a6-40c5-ad57-8e1557517752.png" data-width="747">
  <img src="https://cdn.nlark.com/yuque/0/2018/png/156121/1534923163448-5354810e-a2a6-40c5-ad57-8e1557517752.png" width="747" />
</div>


简而言之,向 Netty中注册一个处理 Idle事件的监听器。同时注册的时候,会传入 idle产生的事件,比如读IDLE还是写 IDLE,还是都有,多久没有读写则认为是 IDLE等。

客户端

final boolean idleSwitch = SystemProperties.tcp_idle_switch();
final int idleTime = SystemProperties.tcp_idle();
final RpcHandler rpcHandler = new RpcHandler(userProcessors);
final HeartbeatHandler heartbeatHandler = new HeartbeatHandler();
bootstrap.handler(new ChannelInitializer<SocketChannel>() {

    protected void initChannel(SocketChannel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        ...
        if (idleSwitch) {
            pipeline.addLast("idleStateHandler", new IdleStateHandler(idleTime, idleTime,
                0, TimeUnit.MILLISECONDS));
            pipeline.addLast("heartbeatHandler", heartbeatHandler);
        }
        ...
    }

});

SOFABOLT心跳检测客户端默认基于IdleStateHandler(15000ms, 150000 ms, 0)即15秒没有读或者写操作,注册给了 Netty,之后调用HeartbeatHandler的userEventTriggered()方法触发RpcHeartbeatTrigger发送心跳消息。RpcHeartbeatTrigger心跳检测判断成功标准为是否接收到服务端回复成功响应,如果心跳失败次数超过最大心跳次数(默认为3)则关闭连接。

/**
 * Heart beat triggerd.
 */
@Sharable
public class HeartbeatHandler extends ChannelDuplexHandler {

    @Override
    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            ProtocolCode protocolCode = ctx.channel().attr(Connection.PROTOCOL).get();
            Protocol protocol = ProtocolManager.getProtocol(protocolCode);
            protocol.getHeartbeatTrigger().heartbeatTriggered(ctx);
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

服务端

SOFABOLT心跳检测服务端默认基于IdleStateHandler(0,0, 90000 ms)即90秒没有读或者写操作为空闲,调用ServerIdleHandler的userEventTriggered()方法触发关闭连接。

SOFABOLT心跳检测由客户端在没有对TCP有读或者写操作后触发定时发送心跳消息,服务端接收到提供响应;如果客户端持续没有发送心跳无法满足保活目的则服务端在90秒后触发关闭连接操作。正常情况由于默认客户端15秒/服务端90秒进行心跳检测,因此一般场景服务端不会运行到90秒仍旧没有任何读写操作的,并且只有当客户端下线或者抛异常的时候等待90秒过后服务端主动关闭与客户端的连接。如果是tcp-keepalive需要等到90秒之后,在此期间则为读写异常。

this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

    protected void initChannel(SocketChannel channel) throws Exception {
        ...
        if (idleSwitch) {
            pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime,
                TimeUnit.MILLISECONDS));
            pipeline.addLast("serverIdleHandler", serverIdleHandler);
        }
        ...
        createConnection(channel);
    }
    ...
});

服务端一旦产生 IDLE,那么说明服务端已经6个15s没有发送或者接收到数据了。这时候认为客户端已经不可用。直接断开连接。

/**
 * Server Idle handler.
 * 
 * In the server side, the connection will be closed if it is idle for a certain period of time.
 */
@Sharable
public class ServerIdleHandler extends ChannelDuplexHandler {

    private static final Logger logger = BoltLoggerFactory.getLogger("CommonDefault");

    @Override
    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            try {
                ctx.close();
            } catch (Exception e) {
                ...
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

SOFARPC连接管理断开重连实现

通常RPC调用过程是不需要断链与重连的。因为每次RPC调用过程都校验是否有可用连接,如果没有则新建连接。但有一些场景是需要断链和保持长连接的:

  • 自动断连:比如通过LVS VIP或者F5建立多个连接的场景,因为网络设备的负载均衡机制,有可能某一些连接固定映射到了某几台后端的RS上面,此时需要自动断连然后重连,靠建连过程的随机性来实现最终负载均衡。注意开启自动断连的场景通常需要配合重连使用。
  • 重连:比如客户端发起建连后由服务端通过双工通信发起请求到客户端,此时如果没有重连机制则无法实现。

连接管理是客户端的逻辑,启动好,连接管理开启异步线程。


<div id="pph1mp" data-type="image" data-display="block" data-align="" data-src="https://cdn.nlark.com/yuque/0/2018/png/156121/1534926707654-5f4c6fc0-cd94-403a-8f99-bc6c164babfd.png" data-width="747">
  <img src="https://cdn.nlark.com/yuque/0/2018/png/156121/1534926707654-5f4c6fc0-cd94-403a-8f99-bc6c164babfd.png" width="747" />
</div>


其中,SOFARPC连接管理ConnectionHolder维护存活的客户端列表aliveConnections和失败待重试的客户端列表retryConnections,RPC启动守护线程以默认10秒的间隔检查存活和失败待重试的客户端列表的可用连接:

  1. 检查存活的客户端列表aliveConnections是否可用,如果存活列表里连接已经不可用则需要放到待重试列表retryConnections里面;
  2. 遍历失败待重试的客户端列表retryConnections,如果连接命中重连周期则进行重连,重连成功放到存活列表aliveConnections里面,如果待重试连接多次重连失败则直接丢弃。

核心代码在连接管理器的方法中:

com.alipay.sofa.rpc.client.AllConnectConnectionHolder#doReconnect

篇幅有限,我们不贴具体代码,欢迎大家通过源码来学习了解。

最后

本文介绍连接管理的策略和SOFARPC中连接管理与心跳机制的实现,希望通过这篇文章,大家对此有一个了解,如果对其中有疑问的,也欢迎留言与我们讨论。

参考文档

TCP-Keepalive-HOWTO
随手记之TCP Keepalive笔记
为什么基于TCP的应用需要心跳包
Netty心跳简单Demo
浅析 Netty 实现心跳机制与断线重连
image | left | 216x216

长按关注,获取分布式架构干货

欢迎大家共同打造 SOFAStack https://github.com/alipay

目录
相关文章
|
9月前
|
运维 监控 Dubbo
Dubbo协议异步单一长连接原理与优势
Dubbo协议异步单一长连接原理与优势
420 0
|
Dubbo Java 应用服务中间件
dubbo协议下的单一长连接与多线程并发如何协同工作
dubbo协议下的单一长连接与多线程并发如何协同工作
181 0
dubbo协议下的单一长连接与多线程并发如何协同工作
RPC框架(7 - 实现服务端自动注册服务)
RPC框架(7 - 实现服务端自动注册服务)
|
SQL Dubbo 安全
从应用层到网络层排查 Dubbo 接口超时全记录
从应用层到网络层排查 Dubbo 接口超时全记录
|
负载均衡 监控 Dubbo
分布式RPC服务调用框架选型:使用Dubbo实现分布式服务调用
本文是一篇详细介绍分布式RPC调用框架Dubbo的文章,介绍了Dubbo服务治理和服务调用的实现。分析了Dubbo中的核心功能,包括Remoting,Cluster和RetRegistry的作用和功能。详细说明了Dubbo中几个角色以及各个角色之间的调用关系。通过这篇文章,可以快速了解Dubbo框架的基本面貌和重要原理,为以后更加深入细致的学习RPC调用框架做出准备。
338 0
分布式RPC服务调用框架选型:使用Dubbo实现分布式服务调用
|
网络协议 Dubbo 架构师
Dubbo 的心跳设计,值得学习!
谈到RPC肯定绕不开TCP通信,而主流的RPC框架都依赖于Netty等通信框架,这时候我们还要考虑是使用长连接还是短连接:
|
Java 缓存 微服务
【剖析 | SOFARPC 框架】系列之 SOFARPC 优雅关闭剖析
Scalable Open Financial Architecture 是蚂蚁金服自主研发的金融级分布式中间件,包含了构建金融级云原生架构所需的各个组件,是在金融场景里锤炼出来的最佳实践。
812 0
【剖析 | SOFARPC 框架】系列之 SOFARPC 数据透传剖析
Scalable Open Financial Architecture 是蚂蚁金服自主研发的金融级分布式中间件,包含了构建金融级云原生架构所需的各个组件,是在金融场景里锤炼出来的最佳实践。
909 0
|
Java
【剖析 | SOFARPC 框架】系列之 SOFARPC 同步异步实现剖析
Scalable Open Financial Architecture 是蚂蚁金服自主研发的金融级分布式中间件,包含了构建金融级云原生架构所需的各个组件,是在金融场景里锤炼出来的最佳实践。
792 0
|
存储 负载均衡
【剖析 | SOFARPC 框架】系列之 SOFARPC 单机故障剔除剖析
Scalable Open Financial Architecture 是蚂蚁金服自主研发的金融级分布式中间件,包含了构建金融级云原生架构所需的各个组件,是在金融场景里锤炼出来的最佳实践。
624 0

热门文章

最新文章