RabbitMQ之队列优先级

简介: 优先级队列,顾名思义,具有更高优先级的队列具有较高的优先权,优先级高的消息具备优先被消费的特权。 本文主要讲解如何使用RabbitMQ实现队列优先级。可以通过RabbitMQ管理界面配置队列的优先级属性,如下图的x-max-priority.

优先级队列,顾名思义,具有更高优先级的队列具有较高的优先权,优先级高的消息具备优先被消费的特权。
本文主要讲解如何使用RabbitMQ实现队列优先级。

可以通过RabbitMQ管理界面配置队列的优先级属性,如下图的x-max-priority.
这里写图片描述
也可以通过代码去实现,比如:

Map<String,Object> args = new HashMap<String,Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("queue_priority", true, false, false, args);

配置了队列优先级的属性之后,可以在管理页面看到Pri的标记:
这里写图片描述

上面配置的是一个队列queue的最大优先级。之后要在发送的消息中设置消息本身的优先级,如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange_priority","rk_priority",properties,("messages").getBytes());

下面演示一段生产-消费的代码。首先producer端先生产10个消息,第奇数个消息具备优先级,第偶数个消息就是默认的优先级(最低:0)。
生产端:

package com.vms.test.zzh.rabbitmq.priority;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * Created by hidden on 2017/2/14.
 */
public class PriorityProducer {
    public static final String ip = "10.198.197.73";
    public static final int port = 5672;
    public static final String username = "root";
    public static final String password = "root";

    public static void main(String[] arggs) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setPassword(password);
        connectionFactory.setUsername(username);
        connectionFactory.setPort(port);
        connectionFactory.setHost(ip);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        //create exchange
        channel.exchangeDeclare("exchange_priority","direct",true);

        //create queue with priority
        Map<String,Object> args = new HashMap<String,Object>();
        args.put("x-max-priority", 10);
        channel.queueDeclare("queue_priority", true, false, false, args);
        channel.queueBind("queue_priority", "exchange_priority", "rk_priority");

        //send message with priority
        for(int i=0;i<10;i++) {
            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
            if(i%2!=0)
                builder.priority(5);
            AMQP.BasicProperties properties = builder.build();
            channel.basicPublish("exchange_priority","rk_priority",properties,("messages-"+i).getBytes());
        }

        channel.close();
        connection.close();
    }
}

消费端:

package com.vms.test.zzh.rabbitmq.priority;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by hidden on 2017/2/14.
 */
public class PriorityConsumer {
    public static final String ip = "10.198.197.73";
    public static final int port = 5672;
    public static final String username = "root";
    public static final String password = "root";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setPassword(password);
        connectionFactory.setUsername(username);
        connectionFactory.setPort(port);
        connectionFactory.setHost(ip);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume("queue_priority", false, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(msg);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }
    }
}

消费端运行结果:

messages-1
messages-3
messages-5
messages-7
messages-9
messages-0
messages-2
messages-4
messages-6
messages-8

查看运行结果可以验证优先级队列的作用。

当然,在消费端速度大于生产端速度,且broker中没有消息堆积的话,对发送的消息设置优先级也没什么实际意义,因为发送端刚发送完一条消息就被消费端消费了,那么就相当于broker至多只有一条消息,那么对于单条消息来说优先级是没有什么意义的。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3天前
|
消息中间件 Java
RabbitMQ优先级消息
RabbitMQ优先级消息
28 0
|
3天前
|
消息中间件
RabbitMQ中的消息优先级是如何实现的?
RabbitMQ中的消息优先级是如何实现的?
123 0
|
3天前
|
消息中间件 Java
SpringBoot RabbitMQ死信队列
SpringBoot RabbitMQ死信队列
68 0
|
3天前
|
消息中间件
|
3天前
|
消息中间件 存储 NoSQL
RabbitMQ的幂等性、优先级队列和惰性队列
【1月更文挑战第12天】用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等
234 1
|
3天前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
96 0
|
3天前
|
消息中间件 存储 NoSQL
RabbitMQ的幂等性、优先级队列和惰性队列
**摘要:** 本文讨论了RabbitMQ中的幂等性、优先级队列和惰性队列。幂等性确保了重复请求不会导致副作用,关键在于消费端的幂等性保障,如使用唯一ID和Redis的原子性操作。优先级队列适用于处理不同重要性消息,如大客户订单优先处理,通过设置`x-max-priority`属性实现。惰性队列自3.6.0版起提供,用于延迟将消息加载到内存,适合大量消息存储和消费者延迟消费的场景。
29 4
|
3天前
|
消息中间件 Java API
RabbitMQ入门指南(五):Java声明队列、交换机以及绑定
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了Java声明队列、交换机以及绑定队列和交换机等内容。
36 0
|
3天前
|
消息中间件 存储 监控
解析RocketMQ:高性能分布式消息队列的原理与应用
RocketMQ是阿里开源的高性能分布式消息队列,具备低延迟、高吞吐和高可靠性,广泛应用于电商、金融等领域。其核心概念包括Topic、Producer、Consumer、Message和Name Server/Broker。RocketMQ支持异步通信、系统解耦、异步处理和流量削峰。关键特性有分布式架构、顺序消息、高可用性设计和消息事务。提供发布/订阅和点对点模型,以及消息过滤功能。通过集群模式、存储方式、发送和消费方式的选择进行性能优化。RocketMQ易于部署,可与Spring集成,并与Kafka等系统对比各有优势,拥有丰富的生态系统。
170 4
|
3天前
|
消息中间件 Java Maven
springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称
springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称