轻松搞定RabbitMQ(二)——工作队列之消息分发机制

简介:        上一篇博文中简单介绍了一下RabbitMQ的基础知识,并写了一个经典语言入门程序——HelloWorld。本篇博文中我们将会创建一个工作队列用来在工作者(consumer)间分发耗时任务。

       上一篇博文中简单介绍了一下RabbitMQ的基础知识,并写了一个经典语言入门程序——HelloWorld。本篇博文中我们将会创建一个工作队列用来在工作者(consumer)间分发耗时任务。同样是翻译的官网实例

工作队列


       在前一篇博文中,我们完成了一个简单的对声明的队列进行发送和接受消息程序。下面我们将创建一个工作队列,来向多个工作者(consumer)分发耗时任务。

       工作队列(又名:任务队列)的主要任务是为了避免立即做一个资源密集型的却又必须等待完成的任务。相反的,我们进行任务调度:将任务封装为消息并发给队列。在后台运行的工作者(consumer)将其取出,然后最终执行。当你运行多个工作者(consumer),队列中的任务被工作进行共享执行。

       这样的概念对于在一个HTTP短链接的请求窗口中处理复杂任务的web应用程序,是非常有用的。

准备

       使用Thread.Sleep()方法来模拟耗时。采用小数点的数量来表示任务的复杂性。每一个点将住哪用1s的“工作”。例如,Hello... 处理完需要3s的时间。

       发送端(生产者):NewTask.java

public class NewTask {
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] args) throws IOException {
		/**
		 * 创建连接连接到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		// 设置MabbitMQ所在主机ip或者主机名
		factory.setHost("127.0.0.1");
		// 创建一个连接
		Connection connection = factory.newConnection();
		// 创建一个频道
		Channel channel = connection.createChannel();
		// 指定一个队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		// 发送的消息
		String message = "Hello World...";
		// 往队列中发出一条消息
		channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
		System.out.println(" [x] Sent '" + message + "'");
		// 关闭频道和连接
		channel.close();
		connection.close();
	}
}

       工作者(消费者)Worker.java

public class Worker {
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		// 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");

			    System.out.println(" [x] Received '" + message + "'");
			    System.out.println(" [x] Proccessing... at " +new Date().toLocaleString());
			    try {
			    	for (char ch: message.toCharArray()) {
				        if (ch == '.') {
				        	Thread.sleep(1000);
				        }
				    }
				} catch (InterruptedException e) {
				} finally {
			      System.out.println(" [x] Done! at " +new Date().toLocaleString());
			    }
			  }
			};
			channel.basicConsume(QUEUE_NAME, true, consumer);
	}
}
       运行结果如下:



任务分发机制

       正主来了。。。下面开始介绍各种任务分发机制。


Round-robin(轮询分发)

       使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。

修改一下NewTask,使用for循环模拟多次发送消息的过程:

		for (int i = 0; i < 5; i++) {
			// 发送的消息
			String message = "Hello World"+Strings.repeat(".", i);
			// 往队列中发出一条消息
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
			System.out.println(" [x] Sent '" + message + "'");
		}

       我们先启动1个生产者实例,2个工作者实例,看一下如何执行:


       从上述的结果中,我们可以得知,在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。


Fair dispatch(公平分发)

       您可能已经注意到,任务分发仍然没有完全按照我们想要的那样。比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。

       这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将第n条消息发给第n个消费者。


       为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。

int prefetchCount = 1;
channel.basicQos(prefetchCount);
       注:如果所有的工作者都处于繁忙状态,你的队列有可能被填充满。你可能会观察队列的使用情况,然后增加工作者,或者使用别的什么策略。
       还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。这些内容会在下篇博文中讲述。

       整体代码如下:生产者NewTask.java

public class NewTask {
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] args) throws IOException {
		/**
		 * 创建连接连接到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		// 设置MabbitMQ所在主机ip或者主机名
		factory.setHost("127.0.0.1");
		// 创建一个连接
		Connection connection = factory.newConnection();
		// 创建一个频道
		Channel channel = connection.createChannel();
		// 指定一个队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		int prefetchCount = 1;
		//限制发给同一个消费者不得超过1条消息
		channel.basicQos(prefetchCount);
		for (int i = 0; i < 5; i++) {
			// 发送的消息
			String message = "Hello World"+Strings.repeat(".",5-i)+(5-i);
			// 往队列中发出一条消息
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
			System.out.println(" [x] Sent '" + message + "'");
		}
		// 关闭频道和连接
		channel.close();
		connection.close();
	}
}

       消费者Worker.java

public class Worker {
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		// 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		channel.basicQos(1);//保证一次只分发一个
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");

			    System.out.println(" [x] Received '" + message + "'");
			    try {
			    	for (char ch: message.toCharArray()) {
				        if (ch == '.') {
				        	Thread.sleep(1000);
				        }
				    }
				} catch (InterruptedException e) {
				} finally {
			      System.out.println(" [x] Done! at " +new Date().toLocaleString());
			      channel.basicAck(envelope.getDeliveryTag(), false);  
			    }
			  }
			};
			channel.basicConsume(QUEUE_NAME, false, consumer);
	}
}
       运行结果如下:




相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 存储 监控
|
1月前
|
消息中间件 存储 运维
|
3月前
|
消息中间件 存储 NoSQL
RabbitMQ的幂等性、优先级队列和惰性队列
【1月更文挑战第12天】用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等
230 1
|
2月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
63 0
|
27天前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
25 0
|
1月前
|
消息中间件 前端开发 算法
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
42 1
|
1月前
|
消息中间件 存储 Cloud Native
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
|
2月前
|
消息中间件 监控 数据挖掘
兔子的后院奇遇:深入了解RabbitMQ中的死信队列【RabbitMQ 四】
兔子的后院奇遇:深入了解RabbitMQ中的死信队列【RabbitMQ 四】
48 0
|
2月前
|
消息中间件 Docker 容器
docker构建rabbitmq并配置延迟队列插件
docker构建rabbitmq并配置延迟队列插件
35 0
|
2月前
|
消息中间件
rabbitmq动态创建队列
rabbitmq动态创建队列
36 0

热门文章

最新文章