RabbitMq的延时队列以及本地延迟队列

简介: RabbitMq的延时队列以及本地延迟队列

之前对延迟队列业务场景的理解,应该就是像延时订单未付款的自动取消、短信重发通知、或者其他终结状态的触发。但是也有另外一种场景可以用:比如场景切换,服务复用的时候可以考虑一下。比如讲讲开发中碰到的问题吧。是一个信息授权的场景。以前新增一个下级渠道商,需要手动去授权本渠道的销售信息给下一级渠道。然后现在需要改造,新增的时候同时授权过去。以前授权信息的消费者是定时(几分钟一次)去读渠道商的数据库信息,其他时候是拿到缓存里的渠道商去同步授权。所以就存在一个问题,即刻创建下一级渠道即刻去触发授权,授权消费者还没重新去捞渠道商,目前渠道商的缓存里没有新增的那一个,就没有授权信息下去。然而捞不到就不要同步,没毛病页不用报错。只能说这设计不适用当前我们改造的场景罢了。其实也很纳闷,消息带过去的渠道ID消费者在当前缓存找不到渠道,就去数据库查,如果有就直接加载到缓存一起同步授权不就好了,不就能适应更多场景,毕竟作为一个公用同步模块。但是已有的没办法改造,想想别的办法把。那就。。。等消费者去执行下一次捞渠道商再去触发他吧,那就是延迟发送消息了,延迟队列想法就这样用上了。
尝试了两种延迟队列,分别是java.util.concurrent.Delayed包下的本地延迟队列和RabbitMq的死信队列。对比了下肯定是要是要使用RabbitMq异步中间件的。DelayQueue是一个无界的BlockingQueue,实现的是一个单机的、JVM内存中的延迟队列,并没有集群的支持,而且无法满足在对业务系统泵机(即服务重启也会丢失)的时、消息消费异常的时候做相应的逻辑处理。但是也附上本地延迟队列的实现:
(1)先定义一个实现Delayed是队列消息
image
(2)再定义消费者,实现线程Runnable,等待线程池调用
image
(3)发送延迟队列,等待消费
DelayQueue queue = new DelayQueue();
AuthGoodsMessage message = new AuthGoodsMessage(new Random().nextInt(), dto, 120);
queue.offer(message);
ExecutorService exec = Executors.newFixedThreadPool(1);
exec.execute(new AuthGoodsConsumer(queue));
exec.shutdown();

对于延迟消息RabbitMq的实战:
image
(1)定义队列以及路由、交换机的配置(类加上注解@Configuration)

@Bean
public Queue businessTestDeadQueue(){
    Map<String,Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange","business.test.dead.exchange.name");
    args.put("x-dead-letter-routing-key","business.test.dead.routing.key.name");
    args.put("x-message-ttl",120 * 1000);
    return new Queue("business.test.dead.queue.name",true,false,false,args);
}

@Bean
 public TopicExchange businessTestDeadExchange(){
    return new TopicExchange("business.test.dead.produce.exchange.name",true,false);
}

@Bean
public Binding businessTestDeadBinding() {
    return BindingBuilder.bind(goodsAuthDeadQueue()).to(goodsAuthDeadExchange()).with("business.test.dead.produce.routing.key.name");
}

@Bean
public Queue businessTestDeadRealQueue(){
    return new Queue("business.test.dead.real.queue.name",true);
}

@Bean
public TopicExchange businessTestDeadRealExchange(){
    return new TopicExchange("business.test.dead.exchange.name",true,false);
}

@Bean
public Binding businessTestDeadRealBinding() {
    return BindingBuilder.bind(goodsAuthDeadRealQueue()).to(goodsAuthDeadRealExchange()).with("business.test.dead.routing.key.name");
}

(2)定义真实消费者(类加上注解@RabbitListener(queues = "business.test.dead.real.queue.name", containerFactory = ListenerSelector.multiThread)

public void exe(@Payload byte[] body) {
    String data = new String(body);
    logger.info("----------接受延时信息:{}------------------", data);
    具体实体 object = JSONObject.parseObject(data, 具体实体.class);

 }

(3)发送延迟消息
image

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 存储 NoSQL
RabbitMQ的幂等性、优先级队列和惰性队列
【1月更文挑战第12天】用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等
230 1
|
2月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
58 0
|
1月前
|
消息中间件 前端开发 算法
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
40 1
|
2月前
|
消息中间件 监控 数据挖掘
兔子的后院奇遇:深入了解RabbitMQ中的死信队列【RabbitMQ 四】
兔子的后院奇遇:深入了解RabbitMQ中的死信队列【RabbitMQ 四】
48 0
|
2月前
|
消息中间件 Docker 容器
docker构建rabbitmq并配置延迟队列插件
docker构建rabbitmq并配置延迟队列插件
33 0
|
2月前
|
消息中间件
rabbitmq动态创建队列
rabbitmq动态创建队列
33 0
|
8月前
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
136 0
|
8月前
|
消息中间件 中间件 微服务
RabbitMQ 入门简介及安装
RabbitMQ 入门简介及安装
88 0
|
8月前
|
消息中间件 Ubuntu Shell
ubuntu安装rabbitmq教程 避坑
ubuntu安装rabbitmq教程 避坑
291 0
|
8月前
|
消息中间件 存储 网络协议
Rabbitmq的安装与使用
Rabbitmq的安装与使用
221 0