【转】ActiveMQ与虚拟通道

简介:

ActiveMQ提供了虚拟通道的特性(Virtual Destination),它允许一个逻辑通道(logical destination)映射成一个或者多个物理通道(physical destination);它可以非常灵活的解决"消息整合"方面的问题,它可以实现:

    1) 提供了VirtualTopic特性,可以让一个订阅者的消息列表,作为Queue来消费。

    2) 提供了Composite特性,可以把一个逻辑通道中的消息,转发到任意多的物理通道中。

 

一. VirtualTopic

   Topic最大的限制就是同一个ClientId的订阅者,任何时刻只能有一个活跃。所以我们在分布式部署时,就会很麻烦,比如一个应用部署成多个实例,且它们都有相同的Topic Consumer配置,那么意味着

一个实例部署成功后,其它的实例都会因为无法订阅Topic而导致故障;同时也意味着,如果这个Topic Consumer失效后,我们不能自动让其他Consumer的接管它。但是Queue却没有这些限制,因为Queue可以同时有任意多个消费者,它们可以并发的消费消息,从而实现“负载均衡”。如果我们期望Topic也能如此,那么可以用VirtualTopic。

Java代码   收藏代码
  1. <broker xmlns="http://activemq.apache.org/schema/core">  
  2.   <destinationInterceptors>  
  3.     <virtualDestinationInterceptor>  
  4.       <virtualDestinations>  
  5.         <virtualTopic name=">" prefix="VConsumers.*." selectorAware="false"/>  
  6.       </virtualDestinations>  
  7.     </virtualDestinationInterceptor>  
  8.   </destinationInterceptors>  
  9. </broker>  

 

    对于所有的VirtualTopic,它们的namespace一定是"VirtualTopic.",Broker将会根据此namespace来判定逻辑通道是否为VirtualTopic,反过来说,如果你希望一个逻辑通道是一个VirtualTopic,那么它必须以“VirtualTopic.”作为前缀。比如“VirtualTopic.order”,那么它就是一个VirtualTopic,它的物理通道名称为"order"。

 

    对于Producer端,需要按照正常的Topic发送消息,通道名称为逻辑通道全名:

Java代码   收藏代码
  1. Topic topic = session.createTopic("VirtualTopic.order");  
  2. MessageProducer producer = session.createProducer(topic);  
  3. //producer.send(message);  

 

     VirtualTopic对Consumer而言,是一个逻辑的Queue,Queue的全名有上述配置文件中的“prefix” + Client标识 + 逻辑虚拟通道;假如“Client标识”为"dbcenter"【相当于ClientId】,用来订阅Order消息,那么最终逻辑Queue的全名为:"VConsumers.dbcenter.VirtualTopic.order",其中需要注意prefix中的"*"占位符就是用来替换“Client标识”的;Broker端默认的prefix为"Consumer.*."。当然我们还可以postfix【后缀】,不过通常没有必要。

Java代码   收藏代码
  1. Queue queue = session.createQueue("VConsumers.dbcenter.VirtualTopic.order");  
  2. MessageConsumer consumer = session.createConsumer(queue);  

 

    对于Order通道而言,“dbcenter”相当于是一个订阅者;Broker将dbcenter订阅的消息转发到了“VConsumers.dbcenter.VirtualTopic.order”队列中;最重要的一点,这个虚拟Queue完全具有队列的所有特性,它的Consumer可以并行消费。

 

    其中还有一个重要的参数"selectorAware",它表示从Topic中将消息转发给Queue时,是否关注Consumer的selector情况。如果为false,那么Topic中的消息全部转发给Queue,否则只会转发匹配Queue Consumer的selector的消息。需要非常注意,当selectorAware为true时,如果消息不匹配任何selector或者Queue中没有任何Consumer活跃,那么消息将不会转发给Queue。

 

     VirtualTopic仍然可以被正常的订阅者消费,即:

Java代码   收藏代码
  1. Topic topic = session.createTopic("VirtualTopic.order");  
  2. TopicSubscriber subscriber = session.createDurableSubscriber(topic,"dbcenter");  

 

     同时需要注意,VirtualTopic只会转发“Client标识”注册之后的消息,且即使Queue消费了消息,VirtualTopic中的消息仍然不会被删除(看起来仍然是Dequeued=0),对于Broker而言,逻辑Queue不被认为是一个“Durable Subscriber”,只有真正的Subscriber消费消息后,Topic中的消息才会Dequeue。不过消息Dequeue后,不会影响Queue中的消息,因为这是基于Copy的。

 

    到目前为止,我尚不清楚,如果VirtualTopic中没有真正的Subscriber,这些消息该如何Dequeue。

 

    当真正的subscriber和Queue都同时存在VirtualTopic中的时候,而且你的broker架构采用了“forward-brige”结构,那么你需要增加如下配置来避免消息的重复转发问题。在forward-brige架构中,任何通道中的消息都会forward到其他network node中(其他broker上),当然这个虚拟的Queue的消息也不例外。

Java代码   收藏代码
  1. <networkConnectors>  
  2.   <networkConnector uri="static://(tcp://localhost:61617)">  
  3.     <excludedDestinations>  
  4.         <!-- prefix和VirtualTopic保持一致  
  5.         <queue physicalName="VConsumer.*.VirtualTopic.>"/>  
  6.     </excludedDestinations>  
  7.   </networkConnector>  
  8. </networkConnectors>  

 

 二. Composite Destinations

    复合通道,它允许一条消息在多个物理通道间转发,就像一个通道映射成多个一样(one-many);比如复合通道A,映射成B和C,那么发往A的消息会同时转发给B和C,那么消费者可以直接通过B或者C获取消息;

这是一种实现消息复制转发、通道映射的便捷办法。复合通道包括CompositeQueue和CompositeTopic两种。

Java代码   收藏代码
  1. <broker persistent="false" useJmx="false" xmlns="http://activemq.apache.org/schema/core">  
  2.     <destinationInterceptors>  
  3.       <virtualDestinationInterceptor>  
  4.         <virtualDestinations>  
  5.           <compositeQueue name="order">  
  6.             <forwardTo>  
  7.               <queue physicalName="order.dbcenter" />  
  8.               <topic physicalName="order.statistic" />  
  9.             </forwardTo>  
  10.           </compositeQueue>  
  11.  <!--   
  12.  <compositeTopic name="order" forwardOnly="false">  
  13.             <forwardTo>  
  14.               <queue physicalName="order.dbcenter" />  
  15.               <topic physicalName="order.statistic" />  
  16.             </forwardTo>  
  17.           </compositeTopic>  
  18.  -->  
  19.         </virtualDestinations>  
  20.       </virtualDestinationInterceptor>  
  21.     </destinationInterceptors>  
  22.    
  23.   </broker>  

 

    上述配置forwardOnly属性表示发往CompositeQueue中的消息是否“仅仅转发,而不本地保留”,如果forwardOnly为true,那么消息将不会在order队列中保留,即order队列中不会有任何消息。如果为false,那么消息将会转发完成后,添加到order中,消费者仍然可以消费order队列中的消息。无论是Compsite通道还是转发的通道,它们和普通的通道没有任何区别,开发者仍然可以像使用普通的通道一样使用它们(消费消息和发送消息)。

  

    很多时候,我们希望在转发消息时,能够使用selector,此时我们可以使用filteredDestination,这样我们可以消息转发时控制消息。

Java代码   收藏代码
  1. <compositeQueue name="MY.QUEUE">  
  2.     <forwardTo>  
  3.          <filteredDestination selector="orderType = 1" queue="food.order"/>  
  4.         <filteredDestination selector="status = 1" topic="order.statistic"/>  
  5.     </forwardTo>  
  6. </compositeQueue>  

原文链接:[http://wely.iteye.com/blog/2328781 ]
相关文章
|
18天前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的定向交换机(SpringAMQP 实现Direct定向交换机)
SpringBoot实现RabbitMQ的定向交换机(SpringAMQP 实现Direct定向交换机)
20 1
|
4月前
|
消息中间件 JSON Java
RabbitMQ 核心部分之简单模式和工作模式
【1月更文挑战第9天】 1.消息属性 RabbitMQ是基于AMQP消息传输协议来实现的消息中间件;类似HTTP有header和body两部分数据,Message是RabbitMQ中的消息体概念。 Message由Properties和Body组成,前者是一些元信息,如消息的优先级、持久化、传输格式(如JSON)、延迟等高级特性,Body则是传递的消息数据实体 2.消息投递 Exchange、Queue与Routing Key三个概念是理解RabbitMQ消息投递的关键。RabbitMQ中一个核心的原则是,消息不能直接投递到Queue中。 Producer只能将自己的消息投递到Exc
126 3
|
10月前
|
消息中间件 网络架构
RabbitMQ如何支持动态路由和消息转发
RabbitMQ支持动态路由和消息转发的方式主要是通过Exchange的特性来实现的。
148 0
|
6月前
|
消息中间件 存储 安全
zeromq怎么一个端口发送多个主题
我们这里使用czmq4 版本处理。 在CZMQ的版本4中,在一个端口上发布多个订阅主题。这是通过使用PUB/SUB模式实现的。在这种模式下,一个或多个发布者将消息发布到一个或多个主题,然后一个或多个订阅者可以订阅一个或多个主题来接收消息。
77 0
|
7月前
|
自然语言处理 负载均衡 算法
Nacos架构与原理 - 通信通道
Nacos架构与原理 - 通信通道
89 1
|
7月前
|
消息中间件 存储 Java
RabbitMQ之Exchange(交换机)属性及备用交换机解读
RabbitMQ之Exchange(交换机)属性及备用交换机解读
|
消息中间件 负载均衡 RocketMQ
一文带你理解 RocketMQ 广播模式实现机制
一文带你理解 RocketMQ 广播模式实现机制
586 0
一文带你理解 RocketMQ 广播模式实现机制
|
算法 物联网
设备通过mqtt通道的动态免预注册
一型一密认证方式下,同一产品下所有设备可以烧录相同的设备标志信息,即所有设备包含相同的产品证书(ProductKey和ProductSecret)。设备发送激活请求时,物联网平台会进行身份确认,认证通过后,下发设备接入所需信息。
345 0
|
消息中间件 负载均衡 RocketMQ
消费者广播模式和负载均衡模式|学习笔记
快速学习消费者广播模式和负载均衡模式
77 0
消费者广播模式和负载均衡模式|学习笔记
|
消息中间件 网络架构
SpringCloudStream学习(二)RabbitMQ中的交换机跟工作模式
SpringCloudStream学习(二)RabbitMQ中的交换机跟工作模式
164 0
SpringCloudStream学习(二)RabbitMQ中的交换机跟工作模式