Thrift异步IO服务器源码分析

简介: 最近在使用 libevent 开发项目,想起之前写 Thrift源码剖析 的时候说到关于 TNonblockingServer 以后会单独写一篇解析, 现在是时候了,就这篇了。


最近在使用 libevent 开发项目,想起之前写 Thrift源码剖析 的时候说到关于 TNonblockingServer 以后会单独写一篇解析, 现在是时候了,就这篇了。

以下内容依然是基于 thrift-0.9.0

概述

现在随着 Node.js 的兴起,很多人着迷 eventloop , 经常是不明真相就会各种追捧,其实 eventloop 只是 一种高并发的解决方案。Thrift 的 TNonblockingServer 就是该解决方案的典型实现之一。

而且,Thrift 的 TNonblockingServer 实现代码干净,注释丰富, 并没有用到什么奇淫巧计,核心就是使用 libevent 进行异步 驱动和状态的转换,只要有些 libevent 经验的人就很容易 能看懂。

想进一步了解 libevent 可以看看 C1000K之Libevent源码分析

事件注册

Thrift 使用 libevent 作为服务的事件驱动器, libevent 其实就是 epoll 更高级的封装而已(在linux下是epoll),而 struct event 事件是 libevent 编程的最小单元,只要是使用 libevent 就会使用到它,或者是包装它。

整个 TNonblockingServer 有三个关键的地方和 libevent 有关。

1. listener_event

第一种是服务的监听事件也就是服务负责 listen 和 accept 的 主 socket ,如下。

// Register the server event
event_set(&serverEvent_,
          listenSocket_,
          EV_READ | EV_PERSIST,
          TNonblockingIOThread::listenHandler,
          server_);

当新的连接请求进来的时候,TNonblockingIOThread::listenHandler 函数被 触发,在 TNonblockingIOThread::listenHandler 里主要负责 accept 新连接。

2. pipe_event

第二种比较有意思,这个事件对应的文件描述符是 socket pair ,使用 evutil_socketpair 创建,其实也是调用linux接口 socketpair 搞出来的。这个东西不是之前理解的 网络通信套接字,在这里可以把它理解成一个管道来使用,如下:

// Create an event to be notified when a task finishes
event_set(&notificationEvent_,
        getNotificationRecvFD(),
        EV_READ | EV_PERSIST,
        TNonblockingIOThread::notifyHandler,
        this);

代码里面的 getNotificationRecvFD 就是拿这个 socket pair 管道的读文件描述符, 也就是当这个 socketpair 管道有数据可读时,该事件就会被触发,也就是回调函数 TNonblockingIOThread::notifyHandler 会被调用。

其实第二种事件非常好理解,可以类比多线程编程里面的任务队列, 不同线程之间共享着同一个任务队列来进行消息的传递。 而在 TNonblockingServer 里面,则通过该管道进行消息的传递。

3. connection_event

第三种是每个连接的状态变化事件,每一个 TConnection 代表一个连接, 每一个 TConnection 含有一个 socket 文件描述符,并且当 TConnection 生成之后,会为它注册一个事件,负责对该 socket 的异步读写。 如下:

event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
    TConnection::eventHandler, this);

注意到,每个连接都会注册一个 第三种事件, 也就是说,程序的整个运行过程中,假设并发连接数为 n , 则第三种事件的数量也为 n,而第一种和第二种始终 只有一个事件。 所以真个程序运行过程中事件的数量是【2 + n】。

socket状态转移

因为是异步编程,每个socket都必须设置为非阻塞。 当可读的事件发生时,则读,可写的事件发生时,则写。 读和写两种操作会互相交替进行,所以我们需要用 状态值来进行不同的逻辑处理。

TNonblockingServer 里的状态值有以下三种:

/// Three states for sockets: recv frame size, recv data, and send mode
enum TSocketState {
  SOCKET_RECV_FRAMING,
  SOCKET_RECV,
  SOCKET_SEND
};

需要补充说明的是,要和 Thrift 的 TNonblockingServer 通信,则客户端 需要使用

shared_ptr<TTransport> transport(new TFramedTransport(socket));

来作为传输工具,就是因为 TNonblockingServer 的 socket recv 数据是 按 frame 来一帧帧的接受。所以第一个状态值 SOCKET_RECV_FRAMING 代表进入该状态就是有帧头(数据包的大小)可以读取, 而第二个状态值 SOCKET_RECV 代表有数据可以读取,先读完帧头才读该数据。 第三个状态 SOCKET_SEND 代表有数据可以发送。

每次 rpc 调用的过程的状态转移先后过程如下:

SOCKET_RECV_FRAMING -> SOCKET_RECV -> SOCKET_SEND

这三个状态都有可能被重复调用,取决于数据包的大小。

每次 socket 状态转移靠 workSocket 函数完成:

/**
* Libevent handler called (via our static wrapper) when the connection
* socket had something happen.  Rather than use the flags [libevent] passed,
* we use the connection state to determine whether we need to read or
* write the socket.
*/
void workSocket();

app状态转移

上面的 socket 状态转移,是针对每个连接的数据收发状态转移, 和 socket 紧密相关,而这里的 app状态转移则是针对整个 rpc 远程函数调用(不过每次rpc调用其实也是建立在某个连接的基础之上)。

app状态的代码如下:

enum TAppState {
  APP_INIT,
  APP_READ_FRAME_SIZE,
  APP_READ_REQUEST,
  APP_WAIT_TASK,
  APP_SEND_RESULT,
  APP_CLOSE_CONNECTION
};

状态的转移顺序如下:

  1. APP_INIT 初始状态。
  2. APP_READ_FRAME_SIZE 读取帧数据。
  3. APP_READ_REQUEST 读取请求的数据,并根据请求的数据 进行数据的解析和任务的生成,并且将任务扔进线程池。
  4. APP_WAIT_TASK 等待任务的完成
  5. APP_SEND_RESULT 任务已经完成,将任务结果发送。
  6. APP_CLOSE_CONNECTION 关闭连接。

每次app状态转移由 TConnetion::transition 函数完成:

/**
* This is called when the application transitions from one state into
* another. This means that it has finished writing the data that it needed
* to, or finished receiving the data that it needed to.
*/
void transition();

状态3 -> 状态4 -> 状态5 转移很关键,涉及到线程池和主线程的交互。 请看下文。

任务的线程池

总所周知的是,异步服务器最适合的场景是高并发,IO 密集型程序。 对于 CPU 密集型的应用场景一般使用多线程服务来解决。 而对于 RPC 服务,TNonblockingServer 想使用异步 IO 来应对高并发。 但是对于 rpc 远程函数调用,如果被方法的函数是 CPU 密集型的函数, 则运行该函数的过程整个主线程就会被阻塞,也就是传说中的 【block the whole world】, 对于此,TNonblockingServer 的解决方法是将该函数包装成一个任务, 然后扔进线程池,以此来避免主线程的阻塞。

线程池本身没什么好说的,但是在 TNonblockingServer 里 面需要了解的就是 线程池和主线程的交互:

当 TConnetion 的 app状态 进入 APP_READ_REQUEST 之后 读取完请求数据之后,则将任务包装好扔进线程池。 并且将状态改变(APP_READ_REQUEST -> APP_WAIT_TASK):

// The application is now waiting on the task to finish
appState_ = APP_WAIT_TASK;

并且将该连接标识为 Idle ,如下函数:

// Set this connection idle so that [libevent] doesn't process more
// data on it while we're still waiting for the threadmanager to
// finish this task
setIdle();

setIdle 的目的在于将该连接对应的 socket事件标志位清空, 也就是在 Idle阶段不再关心该 socket是否有数据可读或者可写。

而当线程池里的某个 Task 运行完毕后,则会触发主线程的 pipe_event (上文中的已注册事件种的第二种事件),告知主线程任务已完成。 如下:

// Signal completion back to the libevent thread via a pipe
if (!connection_->notifyIOThread()) {
  throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
}

主线程收到通知之后,则会从 状态4(APP_WAIT_TASK) 转 移向 状态5(APP_SEND_RESULT) ,进入向 客户端发送函数调用结果的过程。

总结

Thrift 的 TNonblockingServer 注释很丰富,原理清晰。 个人认为基本上是事件驱动服务器的入门教科书级代码了, 事件驱动服务器核心在于状态转移, 因为事件驱动的原因,每次转换 事件我们都需要保存当前的状态。 没啥,都是状态而已。

哦对了,在下读源码的时候习惯加 cout ,然后跑起来看结果, 文末有一份运行示例可以帮助理解,有兴趣的可以看看, 修改后的源码在 MyTNonblockingServer

运行示例

TNonblockingServer.cpp945TNonblockingServer::handleEvent
TNonblockingServer.cpp990Create a new TConnection for this client socket.
TNonblockingServer.cpp1015iothreadnumber = 0
TNonblockingServer.cpp714APP_INIT
TNonblockingServer.cpp442TNonblockingServer::TConnection::workSocket()
TNonblockingServer.cpp448SOCKET_RECV_FRAMING
TNonblockingServer.cpp493size known; now get the rest of the frame
TNonblockingServer.cpp736APP_READ_FRAME_SIZE
TNonblockingServer.cpp442TNonblockingServer::TConnection::workSocket()
TNonblockingServer.cpp498SOCKET_RECV
TNonblockingServer.cpp523 We are done reading, move onto the next state
TNonblockingServer.cpp590APP_READ_REQUEST
TNonblockingServer.cpp625setIdle
TNonblockingServer.cpp374connection_->notifyIOThread()
TNonblockingServer.cpp1401notifyHandler
TNonblockingServer.cpp1415TNonblockingIOThread::notifyHandler
TNonblockingServer.cpp661APP_WAIT_TASK
TNonblockingServer.cpp442TNonblockingServer::TConnection::workSocket()
TNonblockingServer.cpp535SOCKET_SEND
TNonblockingServer.cpp564writeBufferPos_ done
TNonblockingServer.cpp698APP_SEND_RESULT
TNonblockingServer.cpp714APP_INIT
TNonblockingServer.cpp442TNonblockingServer::TConnection::workSocket()
TNonblockingServer.cpp448SOCKET_RECV_FRAMING
http://yanyiwu.com/work/2014/12/06/thrift-tnonblockingserver-analysis.html
目录
相关文章
|
18天前
|
算法 数据处理 Python
Python并发编程:解密异步IO与多线程
本文将深入探讨Python中的并发编程技术,重点介绍异步IO和多线程两种常见的并发模型。通过对比它们的特点、适用场景和实现方式,帮助读者更好地理解并发编程的核心概念,并掌握在不同场景下选择合适的并发模型的方法。
|
2月前
|
网络协议 安全 测试技术
手撕测试tcp服务器效率工具——以epoll和io_uring对比为例
手撕测试tcp服务器效率工具——以epoll和io_uring对比为例
40 2
|
3月前
|
存储 网络协议
TCP服务器 IO多路复用的实现:select、poll、epoll
TCP服务器 IO多路复用的实现:select、poll、epoll
34 0
|
3月前
|
网络协议 Linux C++
Linux C/C++ 开发(学习笔记十二 ):TCP服务器(并发网络编程io多路复用epoll)
Linux C/C++ 开发(学习笔记十二 ):TCP服务器(并发网络编程io多路复用epoll)
56 0
|
25天前
|
Java 关系型数据库 MySQL
Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
【2月更文挑战第33天】Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
43 2
|
1月前
|
调度 数据库 Python
Python中的并发编程:使用asyncio库实现异步IO
传统的Python程序在面对IO密集型任务时,往往会遇到性能瓶颈。本文将介绍如何利用Python中的asyncio库,通过异步IO的方式来提升程序的效率和性能,让你的Python程序能够更好地处理并发任务。
|
2月前
|
程序员 调度 云计算
Python并发编程的未来趋势:协程、异步IO与多进程的融合
Python并发编程的未来趋势:协程、异步IO与多进程的融合
|
2月前
|
数据采集 Python
Python多线程与异步IO的对比:何时选择哪种并发模型
Python多线程与异步IO的对比:何时选择哪种并发模型
|
2月前
|
开发者 Python
Python中的并发编程与异步IO
在当今快节奏的互联网时代,如何提高程序的执行效率成为了开发者们关注的焦点。本文将探讨Python中的并发编程与异步IO技术,介绍其原理、应用场景以及优缺点,帮助读者更好地理解和运用这些技术来提升程序性能。

热门文章

最新文章