(二): 基于ZeroMQ的实时通讯平台

简介:   基于ZeroMQ的实时通讯平台  上篇:C++分布式实时应用框架 (Cpp Distributed Real-time Application Framework)----(一):整体介绍 版权声明:本文版权及所用技术归属smartguys团队所有,对于抄袭,非经同意转载等行为保留法律追究的权利!   通讯平台作为C++分布式实时应用框架(Cpp Distributed Real-time Application Framework)的最核心模块,承担了分布式实时框架的基础通讯功能。

  基于ZeroMQ的实时通讯平台

  上篇:C++分布式实时应用框架 (Cpp Distributed Real-time Application Framework)----(一):整体介绍

 

版权声明:本文版权及所用技术归属smartguys团队所有,对于抄袭,非经同意转载等行为保留法律追究的权利!

 

  通讯平台作为C++分布式实时应用框架(Cpp Distributed Real-time Application Framework)的最核心模块,承担了分布式实时框架的基础通讯功能。通讯平台框架具备了基于Reactor模式的网络通讯能力,并且依赖于ZeroMQ库,因此支持非持久化的message queue的功能。基于配置文件来自动建立链接关系的功能,可以和状态中心一起配合,实现无需重启节点的动态扩容缩容等功能。强大的实时监控能力,可以实时上报每个通讯子节点的TPS和时延等关键性能数据。管控业务进程的能力,业务进程的心跳检测,故障时自动重启、保证系统正常运行。完善的平台工具,可以通过通讯平台向业务进程发送各种命令,如:调整日志级别,刷新业务参数,启停业务进程等等。下面将逐一介绍通讯平台的功能细节。

  一、根据配置文件自动建立通讯链接拓扑关系

  常见的分布式系统通常将进程间、节点间的各种通讯关系写死在业务代码中,这是导致代码复杂难以理解的原因。我们创新地将所有的通讯关系提取到AppInit.json配置文件中,业务代码中不再包含任何与通讯连接相关的内容,使业务代码可以更专注于业务处理,而不用分心于复杂的分布式节点通讯当中。下面我们将带大家看下图所示通讯关系的配置。

 

  OLC作为数据分发节点,给多个业务处理节点分发消息。业务处理节点内部由OCDis接收外部消息,转发给内部的OCPro业务处理进程,并负责处理完后的回包。

OLC配置部分:

   "OLC" : {
      "AUTO_START" : "YES",
      "ENDPOINTS" : [
         {  // 用于与SmartMonitor建立心跳
            "name" : "MonitorSUB",   
            "zmq_socket_action" : "CONNECT",  // ZMQ的连接模式
            "zmq_socket_type" : "ZMQ_SUB"     // ZMQ的通讯模式
         },
         { // 下发消息给OCDis,这边存在转发功能,支持业务实现按条件转发
            "downstream" : [ "OCDis2OLC"],
            "name" : "NE2OLC",                // 根据这个名字在业务代码中实现转发
            "zmq_socket_action" : "BIND",
            "zmq_socket_type" : "ZMQ_STREAM" 
         },
         { // OLC到OCDis的链路
            "name" : "OCDis2OLC",
            "statistics_on" : true,
            "zmq_socket_action" : "CONNECT",
            "zmq_socket_type" : "ZMQ_DEALER"
         },
         { // OCDis回OLC的链路,之所以来去分开,主要用于实现优雅启停功能(启停节点保证不丢消息)
            "name" : "OCDis2OLC_Backway",
            "statistics_on" : true,
            "zmq_socket_action" : "CONNECT",
            "zmq_socket_type" : "ZMQ_DEALER",
            "backway_pair" : "OCDis2OLC"
         },
         {  // 用于与SmartMonitor的命令消息链路
            "name" : "OLC2Monitor",
            "zmq_socket_action" : "CONNECT",
            "zmq_socket_type" : "ZMQ_DEALER"
         },
      ],
      "ENDPOINT_TO_MONITOR" : "OLC2Monitor",
      "INSTANCE_GROUP" : [
         {
            "instance_endpoints_address" : [
               {
                  "endpoint_name" : "NE2OLC",
                  "zmq_socket_address" : "tcp://*:6701"
               },
               {
                  "endpoint_name" : "OCDis2OLC",
                  "zmq_socket_address" : [
                     "tcp://127.0.0.1:7201"   // 跨机的IP地址与端口,配合状态中心可实现自动管理,无需人工参与配置
                  ]
               },
               {
                  "endpoint_name" : "OCDis2OLC_Backway",
                  "zmq_socket_address" : [
                     "tcp://127.0.0.1:7202"
                  ]
               },
               {
                  "endpoint_name" : "OLC2Monitor",
                  "zmq_socket_address" : "ipc://Monitor2Business_IPC"
               },
               {
                  "endpoint_name" : "MonitorSUB",
                  "zmq_socket_address" : "ipc://MonitorPUB"
               }
            ],
            "instance_group_name" : "1"
         }
      ]
   },

 OLC程序:

static const char * ENDPOINT_NE2OLC = "NE2OLC";
static const char * ENDPOINT_OLC2OCDIS = "OCDis2OLC";
static const char * ENDPOINT_MONITORSUB = "MonitorSUB";

int main(int argc, char * argv[]) {

    SmartUtilities::Daemonize();
    OLCProxyServer server(argc, argv);

    if (!server.Initialize(logger))
        return -1;
  
// OLC与OCDis的消息处理 server.SetCallbackOnReceivingMessage(ENDPOINT_OLC2OCDIS, bind(&OLCProxyServer::ReceiveFromOCDis, &server, _1, _2, _3));

  // OLC与SmartMonitor的消息处理 server.SetCallbackOnReceivingMessage(ENDPOINT_MONITORSUB, bind(&OLCProxyServer::ReceiveFromMonitorSUB, &server, _1, _2, _3));
  // 解析消息包实现业务功能 server.SetPacketParserFunction(ENDPOINT_NE2OLC, bind(&OLCProxyServer::ParseStreamCCR, &server, _1, _2, _3));
  // 设置消息转发具体规则 server.SetDownstreamSelector(ENDPOINT_NE2OLC, bind(&OLCProxyServer::StreamSelector, &server, _1, _2)); server.Run(); return 0; }

  二、在线更新链接拓扑能力

  通讯平台支持在线重新读取更新的配置文件,更新网络拓扑,自动建立新链接、断开旧链接的能力。配合状态中心可以实现无需重启节点的动态扩容缩容等功能。

  

  三、SmartMonitor进程监控管理业务进程与SmartTool工具进程

  业务进程可以跟SmartMonitor建立通讯联系,SmartMonitor可以检测业务进程的心跳,以保证业务进程的可用。SmartMonitor通过AppCount.json来管理节点业务进程,实现统一启停等功能。

{
  "OCPro": {
    "IN":  2,      // 业务进程可以有不同的种类,后面代表进程数
    "PS":  3,
    "SMS": 4,
  },
  "OCDis": 3,
  "SERVER_TYPE":"OCS"  // 节点的类型
}

  还可以通过SmartTool工具进程,来给业务进程发送各种命令,如:调整日志级别,刷新业务参数,启停业务进程等等。

 

     1. 启动平台

      SmartMonitor

 

      2. 停平台

      SmartTool stop all

      停指定进程(停止后会被SmartMonitor重新拉起)

      SmartTool stop OCPro 停止所有业务的OCPro进程

      SmartTool stop  OCPro.IN 停止IN业务的OCPro进程

      SmartTool stop 4829 停止PID为4829的进程

 

      3. 调整应用层、框架层日志级别

      其中,日志级别为error,warn,info,debug,trace

      SmartTool log 进程名 level=日志级别,flush=日志级别

      比如: SmartTool log  OCPro level=debug,flush=debug

  四、通讯平台性能数据 

 

 

 进程Z负载控制消息流量,进程A负责发、收消息,统计时延数据。进程B收到消息后负责回消息。

 

 性能瓶颈主要在A机,既要负责收发包,又要统计时延数据,还要控制流量。

 

未完待续...

 

 技术交流合作QQ群:436466587 欢迎讨论交流

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
5月前
|
消息中间件 Java 应用服务中间件
详解rocketMq通信模块&升级构想(下)
详解rocketMq通信模块&升级构想(下)
196 0
详解rocketMq通信模块&升级构想(下)
|
5月前
|
消息中间件 Java 中间件
详解rocketMq通信模块&升级构想(上)
详解rocketMq通信模块&升级构想(上)
111 0
|
消息中间件 Java Kafka
一款消息队列的客户端框架——启明信息车联网MQ演进实践分享
一款消息队列的客户端框架——启明信息车联网MQ演进实践分享 分享人:阿里云MVP曾宪宇,2014开始 就职于启明信息,负责车联网平台的架构和建设,坐标吉林长春。 分享内容:结合主流MQ,介绍一款基于Java的开源消息队列客户端框架。
2862 0
一款消息队列的客户端框架——启明信息车联网MQ演进实践分享
|
2月前
|
存储 网络协议 JavaScript
浅谈WebSocket及如何搭建实时聊天系统
WebSocket 是一种在客户端和服务器之间提供全双工、双向通信的网络协议。它是基于TCP/IP协议栈的应用层协议,旨在实现浏览器与服务器之间的实时、低延迟且高效的长连接通信
151 1
|
9月前
|
编解码 应用服务中间件 nginx
手机直播源码开发,协议讨论篇(三):RTMP实时消息传输协议
通过今天的讨论,大家都不难看出,RTMP协议是手机直播源码平台不可或缺的协议之一,为用户提供了低延迟、高质量的直播体验,也为平台带来了用户,增加了收益。
手机直播源码开发,协议讨论篇(三):RTMP实时消息传输协议
|
9月前
|
消息中间件 负载均衡 监控
在Linux服务器上安装EMQX平台:构建高性能的开源物联网消息中间件
EMQX是一个开源的物联网消息中间件平台,提供高性能、高可用性的MQTT和CoAP协议支持,适用于大规模物联网应用场景。本文将详细介绍在Linux服务器上安装EMQ X平台的步骤,帮助开发者快速搭建功能强大的物联网消息中间件。
2217 1
|
移动开发 网络协议 网络安全
即时通讯系列: WebSocket从原理到企业项目技术选型(2)
阅读本篇可能需要的预备知识 《试图取代 TCP 的 QUIC 协议到底是什么》、《抓包与反抓包》、《趣谈网络协议》
180 0
|
设计模式 网络协议 算法
即时通讯系列: WebSocket从原理到企业项目技术选型(1)
阅读本篇可能需要的预备知识 《试图取代 TCP 的 QUIC 协议到底是什么》、《抓包与反抓包》、《趣谈网络协议》
462 0
即时通讯系列: WebSocket从原理到企业项目技术选型(1)
|
网络协议 架构师 Java
即时通讯技术文集(第3期):高性能网络编程系列 [共14篇]
为了更好地分类阅读52im.net 总计1000多篇精编文章,我将在每周三推送新的一期技术文集,本次是第3 期。
132 0
即时通讯技术文集(第3期):高性能网络编程系列 [共14篇]
|
JSON 负载均衡 NoSQL
WebSocket集群分布式改造——实现多人在线聊天室
本文内容摘要: 为何要改造为分布式集群 如何改造为分布式集群 用户在聊天室集群如何发消息 用户在聊天室集群如何接收消息 补充知识点:STOMP 简介 功能一:向聊天室集群中的全体用户发消息——Redis的订阅/发布 功能二:集群集群用户上下线通知——Redis订阅发布 功能三:集群用户信息维护——Redis集合 WebSocket集群还有哪些可能性
1078 0