zeromq_传说中最快的消息队列

简介:

Zeromq的资源:

Zeromq模式:

http://blog.codingnow.com/2011/02/zeromq_message_patterns.html

zeromq主页:

http://www.zeromq.org/

Zeromq Guild:

http://zguide.zeromq.org/page:all#Fixing-the-World

Zeromq 中文简介:

http://blog.csdn.net/program_think/article/details/6687076

Zero wiki:

http://en.wikipedia.org/wiki/%C3%98MQ

zeromq系列:

http://iyuan.iteye.com/blog/972949

Zeromq资源阅读:

ØMQ(Zeromq) 是一个更为高效的传输层

优势是:

1 程序接口库是一个并发框架

2 在集群和超级计算机上表现得比TCP更快

3 通过inproc, IPC, TCP, 和 multicast进行传播消息

4 通过发散,订阅,流水线,请求的方式连接

5 对于不定规模的多核消息传输应用使用异步IO

6 有非常大并且活跃的开源社区

7 支持30+的语言

8 支持多种系统

 

Zeromq定义为“史上最快的消息队列”

从网络通信的角度看,它处于会话层之上,应用层之下。

ØMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPL open source.


Zeromq中传递的数据格式是由用户自己负责,就是说如果server发送的string是有带"\0"的,那么client就必须要知道有这个

 

Pub_Sub模式。

the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.

在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,使用proxy。

Zeromq示例:

1 获取例子

git clone --depth=1 git://github.com/imatix/zguide.git

2 服务器端:

(当服务器收到消息的时候,服务器回复“World”)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?php
     /*
     *  Hello World server
     *  Binds REP socket to tcp://*:5555
     *  Expects "Hello" from client, replies with "World"
     * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
     */
     
     $context  = new  ZMQContext(1);
     
     //  Socket to talk to clients
     $responder  = new  ZMQSocket( $context , ZMQ::SOCKET_REP);
     $responder ->bind( "tcp://*:5555" );
     
     while (true) {
         //  Wait for next request from client
         $request  = $responder ->recv();
         printf ( "Received request: [%s]\n" , $request );
     
         //  Do some 'work'
         sleep (1);
     
         //  Send reply back to client
         $responder ->send( "World" );   
 
}

3 客户端:

(客户端发送消息)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php
     /*
     *  Hello World client
     *  Connects REQ socket to tcp://localhost:5555
     *  Sends "Hello" to server, expects "World" back
     * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
     */
     
     $context  = new  ZMQContext();
     
     //  Socket to talk to server
     echo  "Connecting to hello world server…\n" ;
     $requester  = new  ZMQSocket( $context , ZMQ::SOCKET_REQ);
     $requester ->connect( "tcp://localhost:5555" );
     
     for ( $request_nbr  = 0; $request_nbr  != 10; $request_nbr ++) {
         printf ( "Sending request %d…\n" , $request_nbr );
         $requester ->send( "Hello" );
         
         $reply  = $requester ->recv();
         printf ( "Received reply %d: [%s]\n" , $request_nbr , $reply );
 
}
1
 

天气气候订阅系统:(pub-sub)

1 server端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?php
     /*
     *  Weather update server
     *  Binds PUB socket to tcp://*:5556
     *  Publishes random weather updates
     * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
     */
     
     //  Prepare our context and publisher
     $context  = new  ZMQContext();
     $publisher  = $context ->getSocket(ZMQ::SOCKET_PUB);
     $publisher ->bind( "tcp://*:5556" );
     $publisher ->bind( "ipc://weather.ipc" );
     
     while  (true) {
         //  Get values that will fool the boss
         $zipcode      = mt_rand(0, 100000);
         $temperature  = mt_rand(-80, 135);
         $relhumidity  = mt_rand(10, 60);
     
         //  Send message to all subscribers
         $update  = sprintf ( "%05d %d %d" , $zipcode , $temperature , $relhumidity );
         $publisher ->send( $update );
     }

2 client端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php
     /*
     *  Weather update client
     *  Connects SUB socket to tcp://localhost:5556
     *  Collects weather updates and finds avg temp in zipcode
     * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
     */
     
     $context  = new  ZMQContext();
     
     //  Socket to talk to server
     echo  "Collecting updates from weather server…" , PHP_EOL;
     $subscriber  = new  ZMQSocket( $context , ZMQ::SOCKET_SUB);
     $subscriber ->connect( "tcp://localhost:5556" );
     
     //  Subscribe to zipcode, default is NYC, 10001
     $filter  = $_SERVER [ 'argc' ] > 1 ? $_SERVER [ 'argv' ][1] : "10001" ;
     $subscriber ->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter );
     
     //  Process 100 updates
     $total_temp  = 0;
     for  ( $update_nbr  = 0; $update_nbr  < 100; $update_nbr ++) {
         $string  = $subscriber ->recv();
         sscanf ( $string , "%d %d %d" , $zipcode , $temperature , $relhumidity );
         $total_temp  += $temperature ;
     }
     printf ( "Average temperature for zipcode '%s' was %dF\n" ,
         $filter , (int) ( $total_temp  / $update_nbr ));
1
------------------------
1
pub-sub的proxy模式:
1
图示是:

clip_image001_thumb[2]

Proxy节点的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?php
     /*
     *  Weather proxy device
     * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
     */
     
     $context  = new  ZMQContext();
     
     //  This is where the weather server sits
     $frontend  = new  ZMQSocket( $context , ZMQ::SOCKET_SUB);
     $frontend ->connect( "tcp://192.168.55.210:5556" );
     
     //  This is our public endpoint for subscribers
     $backend  = new  ZMQSocket( $context , ZMQ::SOCKET_PUB);
     $backend ->bind( "tcp://10.1.1.0:8100" );
     
     //  Subscribe on everything
     $frontend ->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "" );
     
     //  Shunt messages out to our own subscribers
     while (true) {
         while (true) {
             //  Process all parts of the message
             $message  = $frontend ->recv();
             $more  = $frontend ->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
             $backend ->send( $message , $more  ? ZMQ::SOCKOPT_SNDMORE : 0);
             if (! $more ) {
                 break ; // Last message part
             }
         }
 
}
其实就是proxy同时是作为pub又作为sub的

----------------------

作者:yjf512(轩脉刃)

出处:http://www.cnblogs.com/yjf512/

本文版权归yjf512和cnBlog共有,欢迎转载,但未经作者同意必须保留此段声明

目录
相关文章
|
3月前
|
消息中间件 JSON Java
RabbitMQ消息队列
RabbitMQ消息队列
44 0
|
9天前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
13 0
|
9月前
|
消息中间件 存储 自然语言处理
RabbitMQ:深入理解消息队列
前言 在分布式系统中,消息队列是一种常用的解耦和通信方式。它允许应用程序之间通过消息的形式进行通信,从而降低了系统各部分之间的耦合。RabbitMQ是一款开源的、高性能的、可扩展的消息队列系统,它基于AMQP协议实现。本篇博客将带你深入理解RabbitMQ的原理和实践,让你更好地运用这一技术。
122 0
|
消息中间件 存储 新零售
初步了解消息队列 RabbitMQ 版|学习笔记
快速学习初步了解消息队列 RabbitMQ 版
149 0
|
消息中间件 监控 大数据
RabbitMQ:什么是消息队列MQ?为什么使用消息队列MQ?入门MQ先学哪种?(一)
MQ(Message Queue):消息队列,如今在各类业务场景中已经被广泛使用,特别在并发量日益增涨的业务和微服务架构中,消息队列能够帮助我们解决很多传统方式所不能解决的问题。 所以今天,我们就开始学习消息队列啦
377 0
RabbitMQ:什么是消息队列MQ?为什么使用消息队列MQ?入门MQ先学哪种?(一)
|
消息中间件 Kafka RocketMQ
【消息队列系列6】RabbitMQ使用姿势
主要讲述RabbitMQ常用的使用姿势。
184 0
【消息队列系列6】RabbitMQ使用姿势
|
消息中间件 存储 网络协议
消息队列系列3 - 原理初探之RabbitMQ
RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。Erlang语言在数据交互方面性能优秀,有着和原生Socket一样的延迟,这也是RabbitMQ高性能的原因所在。可谓“人如其名”,RabbitMQ像兔子一样迅速。
169 0
消息队列系列3 - 原理初探之RabbitMQ
|
消息中间件 网络协议 PHP
|
消息中间件 负载均衡 中间件
|
消息中间件 存储 Java