ZeroMQ之Publish/Subscribe (Java)

简介: 前面的文章介绍了比较简单的Request/Subscribe模式, 这篇文章介绍更为经典的Publish/Subscribe通信模式用来ZeroMQ的实现,其通信方式如下图:客户端(subscriber)向服务器(publisher)订阅消息,然后服务器可以将消息推送到所有订阅了消息的客户端,这里也可以理解为广播吧。

前面的文章介绍了比较简单的Request/Subscribe模式, 这篇文章介绍更为经典的Publish/Subscribe通信模式用来ZeroMQ的实现,其通信方式如下图:



客户端(subscriber)向服务器(publisher)订阅消息,然后服务器可以将消息推送到所有订阅了消息的客户端,这里也可以理解为广播吧。。。。


好了,闲话不多说了,直接上用ZeroMQ实现这种通信模式的代码吧:

(1)服务端(publisher):

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. package pubsub;  
  2.   
  3.   
  4. import org.zeromq.ZMQ;  
  5.   
  6. public class Publisher {  
  7.     public static void main(String args[]) {  
  8.       
  9.         ZMQ.Context context = ZMQ.context(1);  //创创建包含一个I/O线程的context  
  10.         ZMQ.Socket publisher = context.socket(ZMQ.PUB);   //创建一个publisher类型的socket,他可以向所有订阅的subscriber广播数据  
  11.           
  12.         publisher.bind("tcp://*:5555");  //将当前publisher绑定到5555端口上,可以接受subscriber的订阅  
  13.           
  14.         while (!Thread.currentThread ().isInterrupted ()) {  
  15.             String message = "fjs hello";  //最开始可以理解为pub的channel,subscribe需要订阅fjs这个channel才能接收到消息  
  16.             publisher.send(message.getBytes());  
  17.         }  
  18.   
  19.         publisher.close();  
  20.         context.term();  
  21.     }  
  22. }  

代码很简单吧,这里publisher来充当服务端,所有的subscriber都需要建立于publisher的连接。,,。,


(2)客户端(subscriber)代码:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. package pubsub;  
  2.   
  3. import org.zeromq.ZMQ;  
  4.   
  5. public class Subscriber {  
  6.     public static void main(String args[]) {  
  7.         for (int j = 0; j < 100; j++) {  
  8.             new Thread(new Runnable(){  
  9.   
  10.                 public void run() {  
  11.                     // TODO Auto-generated method stub  
  12.                     ZMQ.Context context = ZMQ.context(1);  //创建1个I/O线程的上下文  
  13.                     ZMQ.Socket subscriber = context.socket(ZMQ.SUB);     //创建一个sub类型,也就是subscriber类型的socket  
  14.                     subscriber.connect("tcp://127.0.0.1:5555");    //与在5555端口监听的publisher建立连接  
  15.                     subscriber.subscribe("fjs".getBytes());     //订阅fjs这个channel  
  16.                       
  17.                     for (int i = 0; i < 100; i++) {  
  18.                         byte[] message = subscriber.recv();  //接收publisher发送过来的消息  
  19.                         System.out.println("receive : " + new String(message));  
  20.                     }  
  21.                     subscriber.close();  
  22.                     context.term();  
  23.                 }  
  24.                   
  25.             }).start();  
  26.         }  
  27.           
  28.           
  29.     }  
  30. }  

这里需要注意订阅的channel问题,如果这里错了的话,subscriber是不会受到publisher发送过来的数据的


好了,到这里publish/subscribe的实现就算ok了

若转载请注明出处!若有疑问,请回复交流!
目录
相关文章
|
消息中间件 Java Kafka
Java消息队列总结只需一篇解决ActiveMQ、RabbitMQ、ZeroMQ、Kafka
  一、消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
2421 0
|
消息中间件 Java 数据格式
ZeroMQ(java)之Router与Dealer运行原理
在开始这部分的内容之前,先来看看ZeroMQ中HWM概念---High-Water Marks 当系统的数据量很大,而且发送频率很高的情况下,内存就很重要了,如果处理不好会出现很多问题,例如如下场景: A很快速的向B发送数据,但是B处理起来却很慢,这样子的话,数据就可能会在A的发送缓冲区,或者B的接收缓冲区累计起来.
1177 0
|
消息中间件 Java 网络协议
ZeroMQ之Request/Response (Java)
自己最开始是在cloud foundry中接触过消息服务器(nats),或者说是消息中间件,也算是初步知道了一个消息服务器对于分布式的网络系统的重要性,后来自己也曾想过在一些项目中使用它,尤其是在一些分布式的环境下,可以极大的方便整个系统的实现。
1547 0
|
消息中间件 Java BI
ZeroMQ之Push与Pull (Java)
本系列文章均转自:http://blog.csdn.net/kobejayandy/article/details/20163431 在ZeroMQ中并没有绝对的服务端与客户端之分,所有的数据接收与发送都是以连接为单位的,只区分ZeroMQ定义的类型,例如Response与Request,Publisher与Subscriber,Push与Pull等。
1187 0
|
消息中间件 网络协议 Java
ZeroMQ(java)之Router/Dealer模式
本教程转自:http://blog.csdn.net/kobejayandy/article/details/20163527 在开始之前先把guid里面提到的几个ZeroMQ的特性列一下吧: (1)ZeroMQ有自己的I/O线程来异步的处理I/O,而且后台采用了无锁的数据结构 (2)在ZeroMQ中,所有的组件都可以动态的加入和移除,而且可以启动组件以任何的顺利,例如我们可以先启动request,再启动response,依然可以工作,而且还会自动的重连接。
1914 0
|
消息中间件 负载均衡 Java
ZeroMQ(java)之负载均衡
我们在实际的应用中最常遇到的场景如下: A向B发送请求,B向A返回结果。。。。 但是这种场景就会很容易变成这个样子: 很多A向B发送请求,所以B要不断的处理这些请求,所以就会很容易想到对B进行扩展,由多个B来处理这些请求,那么这里就出现了另外一个问题: B对请求处理的速度可能不同,那么B之间他们的负载也是不同的,那么应该如何对请求进行分发就成了一个比较重要的问题。
958 0
|
消息中间件 Java
ZeroMQ(java)之I/O线程的实现与组件间的通信
算是开始读ZeroMQ(java)的代码实现了吧,现在有了一个大体的了解,看起来实现是比较的干净的,抽象什么的不算复杂。。。 这里先来看看它的I/O线程的实现吧,顺带看看是如何实现组件的通信的。。。。
1453 0
|
消息中间件 网络协议 Java
ZeroMQ(java)中对IO的封装(StreamEngine)
哎,各种各样杂七杂八的事情。。。好久没有看代码了,其实要搞明白一个与IO相关的框架,最好的办法就是把它的I/0的读写两个过程搞清楚。。。例如在netty中,如果能将eventLoop的运行原理搞清楚,然后摸清楚整个I/O读写两个过程,那么也就差不太多了。
851 0
|
消息中间件 Java
ZeroMQ(java)中组件间数据传输(Pipe的实现)
在ZeroMQ(java)中,整个IO的处理流程都是分层来进行的,当然处于最下端的肯定是前面介绍过的poller以及StreamEngin了。。。。涉及到上层的话就还有session,以及socket,先用一张图来大概的描述一下整个层次关系吧。
951 0
|
消息中间件 Java
ZeroMQ(java)中的数据流SessionBase与SocketBase
前面的文章中已经比较的清楚了ZeroMQ(java)中如何在底层处理IO, 通过StreamEngine对象来维护SelectableChannel对象以及IO的事件回调,然后通过Poller对象来维护Selector对象,然后用IOObject对象来具体的管理SelectableChannel对...
973 0