【原创】MySQL Proxy中socketpair的使用

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介:

      学习 MySQL Proxy 0.8.3 的源码后可知,其全部事件处理线程均对全局 socketpair 的读端进行了监听,以实现通知管道的功能:threads->event_notify_fds[0] 。  
?
1
2
3
4
5
6
7
8
9
10
11
12
13
int chassis_event_threads_init_thread(chassis_event_threads_t *threads, chassis_event_thread_t *event_thread, chassis *chas) {
 
     event_thread->event_base = event_base_new();
   ...
     // 设置当前线程监听 fd 为 socketpair 的读端 fd
     event_thread->notify_fd = dup(threads->event_notify_fds[0]);
   ...
     event_set(&(event_thread->notify_fd_event), event_thread->notify_fd, EV_READ | EV_PERSIST, chassis_event_handle, event_thread);
     event_base_set(event_thread->event_base, &(event_thread->notify_fd_event));
     event_add(&(event_thread->notify_fd_event), NULL);
 
     return 0;
}
该 socketpair 是在主线程初始化过程中创建的:  
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
chassis_event_threads_t *chassis_event_threads_new() {
   ...
     threads = g_new0(chassis_event_threads_t, 1);
 
     /* create the ping-fds
      *
      * the event-thread write a byte to the ping-pipe to trigger a fd-event when
      * something is available in the event-async-queues
      */
     // 创建 socketpair
     if (0 != evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, threads->event_notify_fds)) {
     ...
     }
   ...
     /* make both ends non-blocking */
     evutil_make_socket_nonblocking(threads->event_notify_fds[0]);
     evutil_make_socket_nonblocking(threads->event_notify_fds[1]);
 
     return threads;
}
其中 evutil_socketpair 实现如下(取自 libevent 1.4.13):   
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
int
evutil_socketpair( int family, int type, int protocol, int fd[2])
{
#ifndef WIN32
     return socketpair(family, type, protocol, fd);
#else
     /* This code is originally from Tor.  Used with permission. */
 
     /* This socketpair does not work when localhost is down. So
      * it's really not the same thing at all. But it's close enough
      * for now, and really, when localhost is down sometimes, we
      * have other problems too.
      */
     int listener = -1;
     int connector = -1;
     int acceptor = -1;
     struct sockaddr_in listen_addr;
     struct sockaddr_in connect_addr;
     int size;
     int saved_errno = -1;
 
     if (protocol
#ifdef AF_UNIX
         || family != AF_UNIX
#endif
         ) {
         EVUTIL_SET_SOCKET_ERROR(WSAEAFNOSUPPORT);
         return -1;
     }
     if (!fd) {
         EVUTIL_SET_SOCKET_ERROR(WSAEINVAL);
         return -1;
     }
 
     // 创建作为listener 的socket
     listener = socket(AF_INET, type, 0);
     if (listener < 0)
         return -1;
     memset (&listen_addr, 0, sizeof (listen_addr));
     listen_addr.sin_family = AF_INET;
     listen_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
     listen_addr.sin_port = 0;   /* kernel chooses port.  */
     // 进行绑定,内核会分配port
     if (bind(listener, ( struct sockaddr *) &listen_addr, sizeof (listen_addr)) == -1)
         goto tidy_up_and_fail;
     // 宣告开始监听连接请求
     if (listen(listener, 1) == -1)
         goto tidy_up_and_fail;
 
        // 创建作为connector 的socket
     connector = socket(AF_INET, type, 0);
     if (connector < 0)
         goto tidy_up_and_fail;
     /* We want to find out the port number to connect to.  */
     size = sizeof (connect_addr);
     // 获取bind 后内核为listener 分配的port ( ip 为INADDR_LOOPBACK )
     if (getsockname(listener, ( struct sockaddr *) &connect_addr, &size) == -1)
         goto tidy_up_and_fail;
     if (size != sizeof (connect_addr))
         goto abort_tidy_up_and_fail;
     // 从connector 向listener 发起连接,connect_addr 为连接目的地址
     if (connect(connector, ( struct sockaddr *) &connect_addr, sizeof (connect_addr)) == -1)
         goto tidy_up_and_fail;
 
     size = sizeof (listen_addr);
     // 在套接字listener 上accept ,函数返回后listen_addr 中为对端地址
     acceptor = accept(listener, ( struct sockaddr *) &listen_addr, &size);
     if (acceptor < 0)
         goto tidy_up_and_fail;
     if (size != sizeof (listen_addr))
         goto abort_tidy_up_and_fail;
     // 关闭listener
     EVUTIL_CLOSESOCKET(listener);
     /* Now check we are talking to ourself by matching port and host on the
        two sockets.  */
     // 获取connect 后内核为connector 分配的地址信息-- 自动绑定功能
     if (getsockname(connector, ( struct sockaddr *) &connect_addr, &size) == -1)
         goto tidy_up_and_fail;
     // 将从两侧分别获得的地址地址进行比较
     if (size != sizeof (connect_addr)
         || listen_addr.sin_family != connect_addr.sin_family
         || listen_addr.sin_addr.s_addr != connect_addr.sin_addr.s_addr
         || listen_addr.sin_port != connect_addr.sin_port)
         goto abort_tidy_up_and_fail;
     fd[0] = connector;
     fd[1] = acceptor;
 
     return 0;
 
  abort_tidy_up_and_fail:
     saved_errno = WSAECONNABORTED;
  tidy_up_and_fail:
     if (saved_errno < 0)
         saved_errno = WSAGetLastError();
     if (listener != -1)
         EVUTIL_CLOSESOCKET(listener);
     if (connector != -1)
         EVUTIL_CLOSESOCKET(connector);
     if (acceptor != -1)
         EVUTIL_CLOSESOCKET(acceptor);
 
     EVUTIL_SET_SOCKET_ERROR(saved_errno);
     return -1;
#endif
}
      从上述实现中可以看出,在非 WIN32 平台,直接就可以使用现成的 API 函数创建 socketpair ;在 WIN32 平台上,是通过创建两个本地 socket 相互连接建立的 socketpair 。  

      实现上述功能的另外一种方式是,使用 pipe 。用法很简单,摘抄代码如下(摘自 memcached-1.4.14):  
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void thread_init( int nthreads, struct event_base *main_base) {
...
     // nthreads 为创建的工作线程数
     for (i = 0; i < nthreads; i++) {
         int fds[2];
         if (pipe(fds)) {  // 使用pipe 作为工作线程获取任务的通道
             perror ( "Can't create notify pipe" );
             exit (1);
         }
 
         threads[i].notify_receive_fd = fds[0];  // 读端
         threads[i].notify_send_fd = fds[1];  // 写端
 
         // 设置用于每个工作线程的libevent 相关信息并创建CQ 结构
         setup_thread(&threads[i]);
         ...
     }
 
     /* Create threads after we've done all the libevent setup. */
     // 创建工作线程
     for (i = 0; i < nthreads; i++) {
         create_worker(worker_libevent, &threads[i]);
     }
...
}
         至于用哪种更好,大家自己思考~~  

====== 更新 2013-11-11 ======  

      最近写 Modb 代码时,想要利用上面的线程间通信机制,所以使用了相对简单的 pipe 实现方案,但在 windows 下调试时总会遇到    “Unknown error 10038” 错误。查阅相关文档后发现,结论是    windows 下不能将 pipe 和 select 一起使用,因为会认为 pipe 不是一个合法的 socket 句柄,然后 linux 下是没有这个问题的。  
解决方案:  
  • 通过 socket 模拟 pipe 的实现;
  • 使用上面的 socketpair 实现;
      网上找到一份“为了 windows 上能够对 pipe 句柄进行 select” 而采用 socket 模拟 pipe 的  实现   。代码留存如下: 
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int pipe( int fildes[2])
{
     int tcp1, tcp2;
     sockaddr_in name;
     memset (&name, 0, sizeof (name));
     name.sin_family = AF_INET;
     name.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
     int namelen = sizeof (name);
     tcp1 = tcp2 = -1;
 
     int tcp = socket(AF_INET, SOCK_STREAM, 0);
     if (tcp == -1){
         goto clean;
     }
     if (bind(tcp, (sockaddr*)&name, namelen) == -1){
         goto clean;
     }
     if (listen(tcp, 5) == -1){
         goto clean;
     }
     if (getsockname(tcp, (sockaddr*)&name, &namelen) == -1){
         goto clean;
     }
     tcp1 = socket(AF_INET, SOCK_STREAM, 0);
     if (tcp1 == -1){
         goto clean;
     }
     if (-1 == connect(tcp1, (sockaddr*)&name, namelen)){
         goto clean;
     }
 
     tcp2 = accept(tcp, (sockaddr*)&name, &namelen);
     if (tcp2 == -1){
         goto clean;
     }
     if (closesocket(tcp) == -1){
         goto clean;
     }
     fildes[0] = tcp1;
     fildes[1] = tcp2;
     return 0;
clean:
     if (tcp != -1){
         closesocket(tcp);
     }
     if (tcp2 != -1){
         closesocket(tcp2);
     }
     if (tcp1 != -1){
         closesocket(tcp1);
     }
     return -1;
}
原文作者指出有如下缺点:   
  • 效率低下(是否所有其他实现方式都比基于 socket 的方式高效?)
  • 占用了两个 TCP 端口(pipe 不会占用端口)
  • accept 的返回值未必就是 tcp1 连接过来的(多线程或者别的进程在干预), 所以最好通过发送数据进行确认(这个比较严重,在有多个连接同时进入的时候确实无法保证当前连接时正确的)
  • 由于不是匿名的, 所以可以在 netstat 里面看到(看到又怎样?)
优点只有一个, 可以使用 select 调用。

      将该 pipe 实现和上面的 socketpair 的实现进行对比,发现两者根本就是同一个东东,并且 pipe 的实现没有 libevent 中 socketpair 实现写的好。所以 pipe 实现的作者指出的那些缺点,本人持保留意见。看客自己斟酌。  

补充:由于上面的 socketpair 是基于    INADDR_LOOPBACK 的,所以如果 lo 必须处于 up 状态才行。
相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
监控 MySQL 关系型数据库
|
关系型数据库 MySQL 开发工具
|
关系型数据库 MySQL 开发工具
|
关系型数据库 MySQL 测试技术
|
关系型数据库 MySQL 测试技术