转载RabbitMQ入门(5)--主题

简介: 主题(topic) (使用Java客户端) 在先前的指南中我们改进了我们的日志系统。取代使用fanout类型的交易所,那个仅仅有能力实现哑的广播,我们使用一个direct类型的交易所,获得一个可以有选择性的接收日志。

主题(topic)

(使用Java客户端)

在先前的指南中我们改进了我们的日志系统。取代使用fanout类型的交易所,那个仅仅有能力实现哑的广播,我们使用一个direct类型的交易所,获得一个可以有选择性的接收日志。

虽然使用direct交易所类型已经改善了我们的系统,但它依旧有限制-它不能根据多个条件进行路由。

我们的日志系统中,我们可能想要订阅不仅仅基于严格的日志,同样基于发布日志的源码。你可能了解到syslog unix tool的概念,那个基于严格的(info/warn/crit…)和灵巧的(auth/cron/kern…)路由日志。

那个将会给我们许多灵活性-我们可能仅仅想监听来自于cron的关键性的错误和所有来自于kern的日志。

为了在我们日志系统中实现那个,我们需要学习更复杂的topic类型交易所。

topic类型交易所

发送到topic类型的交易所不能有任意的路由的关键字-它必须是一个关键字列表,由点分隔。这关键字可以是任意的,但 是通常可以说明消息的基本的联系。几个合法的路由关键字例 子:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。可能有很多你想要的路由关键字,上限是 255个字节。
这绑定关键字必须也在这同样的表单里。topic交易所逻辑背后是与direct交易所类型类似-一个带特别的路由关键字的消息将会被传递到所有匹配绑定的关键字的队列。但是有两个特别重要的绑定关键字。

>* (星标) 能替代任意一个单词。
># (哈希) 能代替零个或多个单词。

这个例子中是很容易解释的:
python-five.png

在这个例子中,我们发送的消息都描述的是动物。被发送的消息的路由关键字是由三个单词(两个点)组成。路由关键字中第一个单词描述的是速度,第二个描述的是颜色,第三个是物种:
“<速度>.<颜色>.<物种>“。

我们创建了三个绑定:Q1s是由绑定关键字”。orange.“所约束,Q2由”。rabbit"和"lazy.#“所约束。
这些绑定可以概括为:

>Q1 是对orange颜色的动物感兴趣。
>Q2 想了解关于兔子的所有信息和所有慢吞吞的动物信息。

一个路由关键字为"quick.orange.rabbit"消息将会被传递到所有队列。消息"lazy.orange.elephant"同样也 传递到所有队列。另一方面"quick.orange.fox"仅进入第一队列,“lazy.brown.fox"仅进入第二个队列。 “lazy.pink.rabbit"仅传递到第二个队列一次,即使它会匹配两个绑定。“quick.brown.fox"不符合任何绑定关键字,所以会 被丢弃。

如果我们打破我们的约定,发送一个消息带一个或四个单词的关键字,像"orange"或"quick.orange.male.rabbit”,会发生什么呢?好吧,这些消息不会匹配任何绑定,将会丢失。
另一方面"lazy.orange.male.rabbit”,即使它有四个单词,将会匹配最最后那个绑定,将消息传递到第二个队列。

topic类型交易所
topic类型交易所是强大的,能表现的像其他的交易所。
Topic exchange is powerful and can behave like other exchanges.
当一个队列绑定到了”#“(哈希)绑定关键字-它会接收所有消息,不管路由关键字是什么-类似于fanout类型交易所
topic类型交易所中没有使用像”*“(星标)和”#“(哈希)的特殊字符,它的行为类似于direct类型交易所

把所有放在一起

我们将会在我们的日志系统中使用topic类型交易所。我们假设我们的工作的日志消息的路由关键字是由两个单词组成,格式为:“ . “。
这代码和先前的几乎一样:
EmitLogTopic.java的代码:

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); connection.close(); } //... } 

ReceiveLogsTopic.java的代码:

public class ReceiveLogsTopic {

    private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for(String bindingKey : argv){ channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } } 

运行接下来的例子,在windows环境中,使用%CP%,包含指南一种的类路径。
接收所有日志:

$ java -cp $CP ReceiveLogsTopic "#"

接收所有灵巧的kern日志:

$ java -cp $CP ReceiveLogsTopic "kern.*"

或者你仅仅想接收'critical'日志:

$ java -cp $CP ReceiveLogsTopic "*.critical"

你可以创建多个绑定:

$ java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

发出一个路由关键字为"kern.critical"的日志,输入:

$ java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"

跟这些程序玩的开心。注意代码没有对特定的路由和绑定关键字做臆断,你可以操作多于两个的路由关键字参数。

一些难题:

>““绑定会捕获路由关键字是空的消息吗?
>“#.
“会捕获消息中关键字带”..“的吗?它会捕获一个单词的关键字吗?
>“a.*.#和"a.#“之间有什么不同?

EmitLogTopic.javaReceiveLogsTopic.java的源代码。

接下来,让我们在指南的第六部分,弄清当一个远端程序被调用,如何做一个一个往返的消息。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
27天前
|
消息中间件 弹性计算 物联网
MQTT常见问题之发布MQTT主题消息失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
5月前
|
传感器 物联网
详解MQTT主题和通配符
详解MQTT主题和通配符
367 0
详解MQTT主题和通配符
|
5月前
|
消息中间件 数据安全/隐私保护 网络架构
Windows下RabbitMQ安装及入门
Windows下RabbitMQ安装及入门
|
9天前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
23 0
RabbitMQ入门指南(九):消费者可靠性
|
9天前
|
消息中间件 存储 Java
RabbitMQ入门指南(八):MQ可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了MQ数据持久化、LazyQueue模式、管理控制台配置Lazy模式、代码配置Lazy模式、更新已有队列为lazy模式等内容。
27 0
|
9天前
|
消息中间件 微服务
RabbitMQ入门指南(四):交换机与案例解析
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了交换机在RabbitMQ中的作用与类型、交换机案例(Fanout交换机、Direct交换机、Topic交换机)等内容。
17 0
|
9天前
|
消息中间件 存储 数据库
RabbitMQ入门指南(二):架构和管理控制台的使用
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了RabbitMQ架构和管理控制台的使用等内容。
27 0
RabbitMQ入门指南(二):架构和管理控制台的使用
|
2月前
|
消息中间件 Java Kafka
【RabbitMQ】RabbitMQ快速入门 通俗易懂 初学者入门
【RabbitMQ】RabbitMQ快速入门 通俗易懂 初学者入门
106 0
|
2月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
53 0
|
3月前
|
消息中间件 存储 Kafka
MQ消息队列学习入门
MQ消息队列学习入门
77 0