消息中间件系列五、rabbit消息的确认机制

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 一、消息的确认机制 1、消费者收到的每一条消息都必须进行确认。(分为自动确认和消费者自行确认)   消费者在声明队列时,指定autoAck参数,true自动确认,false时rabbitmq会等到消费者显示的发回一个ack信号才会删除消息。

前言:这是中间件一个系列的文章之一,有需要的朋友可以看看这个系列的其他文章:
消息中间件系列一、消息中间件的基本了解
消息中间件系列二、Windows下的activeMQ和rabbitMQ的安装
消息中间件系列三、JMS和activeMQ的简单使用
消息中间件系列四、认识AMQP和RabbiyMq的简单使用
消息中间件系列五、rabbit消息的确认机制
消息中间件系列六,rabbit与spring集成实战

一、消息的确认机制

1、消费者收到的每一条消息都必须进行确认。(分为自动确认和消费者自行确认)

  消费者在声明队列时,指定autoAck参数,true自动确认,false时rabbitmq会等到消费者显示的发回一个ack信号才会删除消息。autoAck=false,有足够时间让消费者处理消息,直到消费者显示调用basicAck为止。Rabbitmq中消息分为了两部分:
  1、等待投递的消息;
  2、已经投递,但是还没有收到ack信号的。如果消费者断连了,服务器会把消息重新入队,投递给下一个消费者。
  
  补充:未ack的消息是没有超时时间的,没有处理会一直在队列中,知道内存溢出。

2、如何明确拒绝消息

a、消费者断连,
b、消费者使用reject命令(requeue=true,重新分发消息,false移除消息),
c、nack命令(批量的拒绝)(rabbitMQ的特有命令)

二、为什么要有个发送方(生产者)确认模式?

生产者不知道消息是否真正到达RabbitMq,也就是说发布操作不返回任何消息给生产者。
AMQP协议层面为我们提供的事务机制解决了这个问题。
AMQP事务:讲几个消息打包一起发给队列,如果队列有一个或部分消息没有成功接收处理,那么这几个消息就会被回退。

但是事务机制本身也会带来问题:

1、严重的性能问题

2、使生产者应用程序产生同步

RabbitMQ团队为我们拿出了更好的方案,即采用发送方确认模式,该模式(异步模式)比事务更轻量,性能影响几乎可以忽略不计。

发送方确认模式的机制

三、消费者确认

首先当然要添加依赖,下面的所有代码都是在同一个项目中,生产者确认也是,下文不再重复说依赖的问题。

添加依赖

客户端Jar包和源码包下载地址:
http://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.0.0/amqp-client-5.0.0.jar
http://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.0.0/amqp-client-5.0.0-sources.jar
如果是引入jar包的形式还需要引入slf4j-api-1.6.1.jar。

如果是Maven工程加入:

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.0.0</version>
</dependency>

注意:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x(具体的版本号到Maven的中央仓库查)的版本。

1、确认回复

消费者在通道channel调用basicConsume方法声明队列的时候,可以设置为自动确认和消费者自行确认。
a、自动确认
basicConsume第二个参数autoAck(是否自动确认)设置为true,那么队列每次接收到消息之后都会向队列发送确认消息,确认之后队列会删除相应的消息。上一篇博客消息中间件系列四、认识AMQP和RabbiyMq的简单使用的用例就全部是用这种确认方式(需要的朋友可以去这篇文章看用例,其实代码差别不多)。

b、自行确认
basicConsume当第二个参数设置为false时,就要消费者自己确认了,否则消息会一直留在队列中直到内存溢出。那么怎么确认呢?
在消息监听回调方法里面,获取通道,然后调用basicAck即可进行手动确认了,方法参数为:deliveryTag投递的标记符,multiple是否进行批量回复

this.getChannel().basicAck(envelope.getDeliveryTag(),false);//参数:deliveryTag投递的标记符,multiple是否进行批量回复

下面看代码实例
生产者实例代码:

package dongnaoedu.consumerconfirm;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerConfirmProducer {
    private final static String EXCHANGE_NAME = "direct_cc_confirm_1";
    private final static String ROUTE_KEY = "error";
    public static void main(String[] args) throws IOException, TimeoutException,
            InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
/*        factory.setUsername(..);
        factory.setPort();
        factory.setVirtualHost();*/
        Connection connection = factory.newConnection();//连接
        Channel channel = connection.createChannel();//信道
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器
        for(int i=0;i<10;i++){
            String message = "Hello world_"+(i+1);
            channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,message.getBytes());
            System.out.println("Sent "+ROUTE_KEY+":"+message);
        }
        channel.close();
        connection.close();
    }
}

消费者实例代码:

package dongnaoedu.consumerconfirm;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ClientConsumerAck {
    private static final String EXCHANGE_NAME = "direct_cc_confirm_1";
    public static void main(String[] argv) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        Connection connection = factory.newConnection();//连接
        Channel channel = connection.createChannel();//信道
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器
        //声明队列
        String queueName = "consumer_confirm";
        //创建队列,参数分别为:队列名,durable是否进行持久化,exlusive是否私有,auto-delete没有消费者是自动删除,arguments相关参数
        channel.queueDeclare(queueName,false,false,false,null);
        String server = "error";
        channel.queueBind(queueName,EXCHANGE_NAME,server);
        System.out.println("Waiting message.......");
        Consumer consumerB = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
                this.getChannel().basicAck(envelope.getDeliveryTag(),false);//参数:投递的标记符,b:是否进行批量回复
            }
        };
        //三个参数:queueName队列名,autoAck是否自动确认,callback消息监听回调
        channel.basicConsume(queueName,false,consumerB);
    }
}

分别启动消费者和生产者,可以看到队列里存留的信息是0,说明生产者产生的消息都被消费者确认消费了

image

2、拒绝回复

在某些情况,需要消费者收到消息后不清除队列中的消息,那么消费者就要拒绝回复,。那么怎么拒绝呢?
在消息监听回调方法里面,获取通道,然后调用basicReject拒绝,方法参数为:deliveryTag投递的标记符,requeue拒绝后是否让其他消费者处理消息。
新建一个消费者,只需在上面的消费中改动21行以下的代码,其他不变:

        Consumer consumerB = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                this.getChannel().basicReject(envelope.getDeliveryTag(),true);//b:拒绝后是否让其他消费者处理消息
                System.out.println("Reject:"+envelope.getRoutingKey() +":"+new String(body,"UTF-8"));
            }
        };
        channel.basicConsume(queueName,false,consumerB);

分别启动消费者和生产者,消费者发完10条消息就停了,由于消费者拒绝确认,又设置转让其他消费者处理,并且只有一个消费者,就会一直循环接收消息,拒绝消息。而队列中的消息一直都是10条。

image

image

image
清空队列消息,再同时把上面确认回复的消费者启动,启动生产者,两个消费者的打印信息如下:

image

image

说明队列先是轮询给消费者发消息,ClientConsumerReject拒绝并转给其他队列处理,这时消息有可能再次发给自己,消息处理完之后队列消息会清空:
image

四、生产者确认

为什么队列要给生产者回复消息确认在目录二已经说过了,这里不再重复,发送方确认可以分为同步确认和异步确认。
先上消费者,设置简单的自动确认模式:

package dongnaoedu.producerconfirm;

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ProducerConfirmConsumer {
    private static final String EXCHANGE_NAME = "producer_confirm";
    public static void main(String[] argv) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        // 打开连接和创建频道,与发送端一样
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器
        String queueName = "producer_confirm";
        //参数分别为:队列名,durable是否进行持久化,exlusive是否私有,auto-delete没有消费者是自动删除,arguments相关参数
        channel.queueDeclare(queueName,false,false,false,null);
        String severity="error";//只关注error级别的日志
        channel.queueBind(queueName, EXCHANGE_NAME, severity);//把队列按路由键绑定到交换器上
        System.out.println(" [*] Waiting for messages......");
        // 创建队列消费者
        final Consumer consumerB = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println( "Received ["+ envelope.getRoutingKey() + "] "+message);
            }
        };
        //三个参数:queueName队列名,autoAck是否自动确认,callback消息监听回调
        channel.basicConsume(queueName, true, consumerB);
    }
}

1、生产者同步确认

要实现发送方确认需要调用通道的confirmSelect将信道设置为发送方确认。

channel.confirmSelect();

生产者发送消息之后,可以调用通道的waitForConfirms方法,等待队列的回复,改方法会阻塞进程。

package dongnaoedu.producerconfirm;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 发送方确认同步模式
 */
public class ProducerConfirm {

    private final static String EXCHANGE_NAME = "producer_confirm";
    private final static String ROUTE_KEY = "error";

    public static void main(String[] args) throws IOException, TimeoutException,
            InterruptedException {
        /**
         * 创建连接连接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        // 设置MabbitMQ所在主机ip或者主机名
        factory.setHost("127.0.0.1");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个信道
        Channel channel = connection.createChannel();
        //将信道设置为发送方确认
        channel.confirmSelect();
        for(int i=0;i<2;i++){
            String msg = "Hello "+(i+1);
            channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,msg.getBytes());
            if (channel.waitForConfirms()){//同步阻塞到消费者发送确认消息,效率太低,还不如用事务
                System.out.println(ROUTE_KEY+":"+msg);
            }
        }
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

由于这种同步模式效率实在太低,会阻塞到消费者发送确认消息,还不如用事务。现在看看异步模式的:

2、生产者异步确认

要实现发送方确认需要调用通道的confirmSelect将信道设置为发送方确认。

channel.confirmSelect();

生产者异步确认需要在通道通过addConfirmListener方法添加确认监听接口,接口有两个方法要实现,分别是
handleAck:当rabbitMQ队列返回确认的时候调用的方法;
handleNack:如果rabbitMQ队列出现内部错误,发生数据丢失调用的方法;

channel.addConfirmListener(new ConfirmListener() {
            //当rabbitMQ队列返回确认的时候调用的方法
            public void handleAck(long deliveryTag, boolean multiple)
                    throws IOException {
                //确认回调执行的代码
            }

            //如果rabbitMQ队列出现内部错误,发生数据丢失调用的方法
            public void handleNack(long deliveryTag, boolean multiple)
                    throws IOException {
                //队列发送错误回调的代码
            }
        });

最好还在通道通过addReturnListener方法添加返回监听接口,接口需要实现一个方法
handleReturn:当投递消息时无法找到一个合适的队列时回调的方法。

        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode, String replyText,
                                     String exchange, String routingKey,
                                     AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //回调代码。。。
            }
        });

补充:在通道调用basicPublish方法声明队列的时候,把mandatory(默认是false)设置为true,且投递消息时无法找到一个合适的队列才会回调addReturnListener接口的handleReturn方法;如果把mandatory(默认是false)设置为false,且投递消息时无法找到一个合适的队列,那么就会丢弃消息(缺省)。
调用basicPublish方法如下:

//参数分别为:交换器名称,路由键,mandatory(默认是false),参数,消息
channel.basicPublish(EXCHANGE_NAME, severity, false,null, message.getBytes());

发送方确认异步模式整体代码:

package dongnaoedu.producerconfirm;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 发送方确认异步模式
 */
public class ProducerConfirmAsync {

    private final static String EXCHANGE_NAME = "producer_confirm";

    public static void main(String[] args) throws IOException, TimeoutException,
            InterruptedException {
        /**
         * 创建连接连接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();

        // 设置MabbitMQ所在主机ip或者主机名
        factory.setHost("127.0.0.1");
        // 创建一个连接
        Connection connection = factory.newConnection();
        //连接被关闭
        //connection.addShutdownListener();

        // 创建一个信道
        Channel channel = connection.createChannel();
        // 指定转发
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //将信道设置为发送方确认
        channel.confirmSelect();
        //信道被关闭
        //channel.addShutdownListener();

        //deliveryTag代表了(channel)唯一的投递,是个单调递增的正整数
        //multiple:false,是否把本channel内容小于deliveryTag的消息一次性确认
        //添加异步监听器,
        channel.addConfirmListener(new ConfirmListener() {
            //当rabbitMQ队列返回确认的时候调用的方法
            public void handleAck(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("Ack deliveryTag="+deliveryTag
                        +"multiple:"+multiple);
            }

            //如果rabbitMQ队列出现内部错误,发生数据丢失调用的方法
            public void handleNack(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("Ack deliveryTag="+deliveryTag
                        +"multiple:"+multiple);
            }
        });

        //服务端的返回方法
        //投递消息时无法找到一个合适的队列
        //1、mandatory参数为true
        //消息返回给生产者
        //false 丢弃消息(缺省)
        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode, String replyText,
                                     String exchange, String routingKey,
                                     AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body);
                System.out.println("replyText:"+replyText);
                System.out.println("exchange:"+exchange);
                System.out.println("routingKey:"+routingKey);
                System.out.println("msg:"+msg);
            }
        });
        String[] severities={"error","info","warning"};
        for(int i=0;i<3;i++){
            String severity = severities[i%3];
            // 发送的消息
            String message = "Hello World_"+(i+1)+("_"+System.currentTimeMillis());
            //参数分别为:交换器名称,路由键,mandatory(默认是false),参数,消息
            channel.basicPublish(EXCHANGE_NAME, severity, false,null, message.getBytes());
            //channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,msg.getBytes());
            System.out.println("----------------------------------------------------");
            System.out.println(" Sent Message: [" + severity +"]:'"+ message + "'");
            Thread.sleep(200);//延长程序生命时长,并且隔离每条消息
        }
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

看代码可知生产者发了三条消息,分别是"error","info","warning"这三个类型的。
分别启动消费者和生产者:
由于队列只绑定的路由键只有error,所以消费者只能接收到error类的信息。

image

生产者打印信息解释:生产者没发送一条消息的时候都打印出一条横线并把消息信息也打印出来;每次发送消息给队列,队列都能正常接收并会返回确认,所以发送每条消息后都会回调ConfirmListener的handleAck方法;由于"info","warning"这两个类型的路由键没有队列绑定,所以没有队列能接收者两条消息,而且mandatory参数为true,所以发现没有合适的队列会回调ReturnListener接口的handleReturn方法,打印出相应的信息。

image

生产者发的消息中,error类型的被确认消费,其他两个类型没有找到合适的队列(没有队列绑定相应的路由键),所以消息会忽略,会把消息丢弃(这个有在上一篇博客消息中间件系列四、认识AMQP和RabbiyMq的简单使用 说明并证实过)。

image

注意:生产者确认模式和消费者对消息的确认是不同的,发送方确认是消息发给rabbit队列之后,rabbit给生产者回复消息说明队列接收到消息了;消费者确认是消费者收到消息后给rabbit队列回复消息说明消费者正常收到消息并处理了,然后消息从队列中删除。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 Java Spring
消息中间件系列六,rabbit与spring集成实战
前言:这是中间件一个系列的文章之一,有需要的朋友可以看看这个系列的其他文章:消息中间件系列一、消息中间件的基本了解消息中间件系列二、Windows下的activeMQ和rabbitMQ的安装消息中间件系列三、JMS和activeMQ的简单使用消息中间件系列四、认识AMQP和RabbiyMq的简单使用消息中间件系列五、rabbit消息的确认机制目前还在持续更新中,敬请期待。
2123 1
|
8月前
|
NoSQL Java Redis
阿里Java高级岗中间件二面:GC+IO+JVM+多线程+Redis+数据库+源码
虽然“钱多、事少、离家近”的工作可能离技术人比较远,但是找到一份合适的工作,其实并不像想象中那么难。但是,有些技术人确实是认真努力工作,但在面试时表现出的能力水平却不足以通过面试,或拿到高薪,其实不外乎以下 2 个原因:
|
8月前
|
算法 NoSQL Java
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
8月前
|
算法 NoSQL Java
2021年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
9月前
|
消息中间件 数据采集 Java
开发神技!阿里消息中间件进阶手册限时开源,请接住我的下巴
相信大家在实际工作中都用过消息中间件进行系统间数据交换,解决应用解耦、异步消息、流量削峰等问题,由此消息中间件的强大功能想必也不用我多说了!目前业界上关于消息中间件的实现多达好几十种,可谓百花齐放,所用的实现语言同样也五花八门。不管使用哪一个消息中间件,我们的目的都是实现高性能、高可用、可伸缩和最终一致性架构。
|
11月前
|
缓存 NoSQL 容灾
《Java应用提速(速度与激情)》——六、阿里中间件提速
《Java应用提速(速度与激情)》——六、阿里中间件提速
|
11月前
|
消息中间件 NoSQL Dubbo
阿里Java高级岗中间件二面:GC+IO+JVM+多线程+Redis+数据库+源码
一转眼,都2023年了,你是否在满意的公司?拿着理想的薪水? 虽然“钱多、事少、离家近”的工作可能离技术人比较远,但是找到一份合适的工作,其实并不像想象中那么难。但是,有些技术人确实是认真努力工作,但在面试时表现出的能力水平却不足以通过面试,或拿到高薪,其实不外乎以下 2 个原因: 第一,“知其然不知其所以然”。做了多年技术,开发了很多业务应用,但似乎并未思考过种种技术选择背后的逻辑。所以,他无法向面试官展现出自己未来技术能力的成长潜力。面试官也不会放心把具有一定深度的任务交给他。 第二,知识碎片化,不成系统。在面试中,面试者似乎无法完整、清晰地描述自己所开发的系统,或者使用的相关技术。
|
11月前
|
SQL 算法 NoSQL
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
12月前
|
存储 缓存 人工智能
2022互联网寒冬,看看阿里中间件团队如何降本提效?(2)
2022互联网寒冬,看看阿里中间件团队如何降本提效?
199 1
|
12月前
|
人工智能 Kubernetes 算法
2022互联网寒冬,看看阿里中间件团队如何降本提效?(1)
2022互联网寒冬,看看阿里中间件团队如何降本提效?
181 1