RabbitMQ如何实现延迟队列?

简介: 什么是延迟队列延迟队列存储的对象肯定是对应的延迟消息,所谓”延迟消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。

什么是延迟队列

延迟队列存储的对象肯定是对应的延迟消息,所谓”延迟消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延迟队列将订单信息发送到延迟队列。

场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到只能设备。


RabbitMQ怎么实现延迟队列

AMQP协议,以及RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过TTL和DLX模拟出延迟队列的功能。

TTL(Time To Live)
RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
RabbitMQ针对队列中的消息过期时间有两种方法可以设置。

  • A: 通过队列属性设置,队列中所有消息都有相同的过期时间。
  • B: 对消息进行单独设置,每条消息TTL可以不同。

如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter

详细可以参考:RabbitMQ之TTL(Time-To-Live 过期时间)

DLX (Dead-Letter-Exchange)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由。

  • x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
  • x-dead-letter-routing-key:指定routing-key发送

队列出现dead letter的情况有:

  • 消息或者队列的TTL过期
  • 队列达到最大长度
  • 消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false

  • 利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。这时候消息就可以重新被消费。

详细可以参考: RabbitMQ之死信队列


代码示例

首先建立2个exchange和2个queue:

  • exchange_delay_begin:这个是producer端发送时调用的exchange, 将消息发送至queue_dealy_begin中。
  • queue_delay_begin: 通过routingKey=”delay”绑定exchang_delay_begin, 同时配置DLX=exchange_delay_done, 当消息变成死信时,发往exchange_delay_done中。
  • exchange_delay_done: 死信的exchange, 如果不配置x-dead-letter-routing-key则采用原有默认的routingKey,即queue_delay_begin绑定exchang_delay_beghin采用的“delay”。
  • queue_delay_done:消息在TTL到期之后,最终通过exchang_delay_done发送值此queue,消费端通过消费此queue的消息,即可以达到延迟的效果。

1. 建立exchange和queue的代码(当然这里可以通过RabbitMQ的管理界面来实现,无需code相关代码):

channel.exchangeDeclare("exchange_delay_begin", "direct", true);
channel.exchangeDeclare("exchange_delay_done", "direct", true);

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "exchange_delay_done");
channel.queueDeclare("queue_delay_begin", true, false, false, args);
channel.queueDeclare("queue_delay_done", true, false, false, null);

channel.queueBind("queue_delay_begin", "exchange_delay_begin", "delay");
channel.queueBind("queue_delay_done", "exchange_delay_done", "delay");

2. consumer端代码:

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("queue_delay_done", false, consumer);

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String msg = new String(delivery.getBody());
    System.out.println("receive msg time:" + new Date() + ", msg body:" + msg);
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

3. producer端代码:设置消息的延迟时间为1min。

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("60000");//设置消息TTL
builder.deliveryMode(2);//设置消息持久化
AMQP.BasicProperties properties = builder.build();

String message = String.valueOf(new Date());
channel.basicPublish("exchange_delay_begin","delay",properties,message.getBytes());

在创建完exchange和queue之后,首先执行consumer端的代码,之后执行producer端的代码,待producer发送完毕之后,查看consumer端的输出:

receive msg time:Tue Feb 14 21:06:19 CST 2017, msg body:Tue Feb 14 21:05:19 CST 2017

可以看到延迟1min消费了相关消息。大功告成~

欲了解更多消息中间件的内容,可以关注:消息中间件收录集


参考资料

  1. rabbitmq 实现延迟队列的两种方式
  2. RabbitMQ之TTL(Time-To-Live 过期时间)
  3. RabbitMQ之死信队列
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3天前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
96 0
|
3天前
|
消息中间件 存储 NoSQL
RabbitMQ的幂等性、优先级队列和惰性队列
**摘要:** 本文讨论了RabbitMQ中的幂等性、优先级队列和惰性队列。幂等性确保了重复请求不会导致副作用,关键在于消费端的幂等性保障,如使用唯一ID和Redis的原子性操作。优先级队列适用于处理不同重要性消息,如大客户订单优先处理,通过设置`x-max-priority`属性实现。惰性队列自3.6.0版起提供,用于延迟将消息加载到内存,适合大量消息存储和消费者延迟消费的场景。
29 4
|
3天前
|
消息中间件 微服务
RabbitMQ入门指南(十):延迟消息-死信交换机
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了死信交换机、死信交换机实现延迟消息等内容。
54 0
|
3天前
|
消息中间件 Java API
RabbitMQ入门指南(五):Java声明队列、交换机以及绑定
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了Java声明队列、交换机以及绑定队列和交换机等内容。
36 0
|
3天前
|
消息中间件 存储 监控
解析RocketMQ:高性能分布式消息队列的原理与应用
RocketMQ是阿里开源的高性能分布式消息队列,具备低延迟、高吞吐和高可靠性,广泛应用于电商、金融等领域。其核心概念包括Topic、Producer、Consumer、Message和Name Server/Broker。RocketMQ支持异步通信、系统解耦、异步处理和流量削峰。关键特性有分布式架构、顺序消息、高可用性设计和消息事务。提供发布/订阅和点对点模型,以及消息过滤功能。通过集群模式、存储方式、发送和消费方式的选择进行性能优化。RocketMQ易于部署,可与Spring集成,并与Kafka等系统对比各有优势,拥有丰富的生态系统。
170 4
|
3天前
|
消息中间件 Java Maven
springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称
springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称
|
3天前
|
消息中间件 存储 NoSQL
rocketmq实现延迟队列思路探讨
本文介绍了两种实现RocketMQ延迟消息的方法。非任意时间延迟可通过在服务器端配置`messageDelayLevel`实现,但需重启服务。任意时间延迟则分为两种策略:一是结合原生逻辑和时间轮,利用RocketMQ的默认延迟等级组合支持任意延迟,但可能丢失1分钟内的数据;二是使用存储介质(如Redis)加时间轮,消息存储和定时发送结合,能处理数据不一致和丢失问题,但涉及更多组件。推荐项目[civism-rocket](https://github.com/civism/civism-rocket)作为参考。
80 1
|
3天前
|
消息中间件 前端开发 算法
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
50 1
|
3天前
|
消息中间件 监控 数据挖掘
兔子的后院奇遇:深入了解RabbitMQ中的死信队列【RabbitMQ 四】
兔子的后院奇遇:深入了解RabbitMQ中的死信队列【RabbitMQ 四】
53 0
|
3天前
|
消息中间件 Docker 容器
docker构建rabbitmq并配置延迟队列插件
docker构建rabbitmq并配置延迟队列插件
48 0