最佳实践:如何基于MNS实现事务消息

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 事务消息的背景: 有时候我们需要实现本地操作和消息发送的事务一致性功能。即:消息发送成功,则本地操作成功;反之,如果消息发送失败,本地操作失败(成功也需要rollback)。保证不出现操作成功但消息发送失败;或者操作失败但消息发送成功的情况; 另外,消费端,我们也希望消息一定被成功处理一次,不会

事务消息的背景

有时候我们需要实现本地操作和消息发送的事务一致性功能。即:消息发送成功,则本地操作成功;反之,如果消息发送失败,本地操作失败(成功也需要rollback)。保证不出现操作成功但消息发送失败;或者操作失败但消息发送成功的情况;

另外,消费端,我们也希望消息一定被成功处理一次,不会因为消息端程序崩溃而导致消息没有成功处理,进而需要人工重置消费进度。

 

解决方案

利用消息服务MNS的延迟消息来实现。

准备工作

创建两个队列:

1.事务消息队列

消息的有效期小于消息延迟时间。即如果生产者不主动修改(提交)消息可见时间,消息对消费者不可见;

2.操作日志队列

记录事务消息的操作记录信息。消息延迟时间为事务操作超时时间。日志队列中的消息确认(删除)后将对消费者不可见。

 

具体步骤

1.发送一条事务准备消息到事务消息队列;

2.写操作日志信息到操作日志队列,日志中包含步骤1消息的消息句柄;

3.执行本地事务操作;

4.如果步骤3成功,提交消息(消息对消费者可见);反之,回滚消息;

5.确认步骤2中的操作日志(删除该日志消息);

6.步骤4后,消费者可以接收到事务消息;

7.消费者处理消息;

8.消费者确认删除消息;

 

如下图:

7df0a2f8929e58548b01b836546ad6fa60cdd096


异常分析:

生产者异常(例如:进程重启):

A.读取操作日志队列超时未确认日志

B.检查事务结果

C.如果检查得到事务已经成功,则提交消息(重复提交无副作用,同一句柄的消息只能成功提交一次)

D.确认操作日志

 

消费者异常(例如:进程重启):

消息服务提供至少保证消费一次的特性,只要步骤8不成功,消息在一段时间后可以继续可见,被当前消费者或者其他消费者处理。

 

消息服务不可达(例如:断网)

消息发送和接收处理状态以及操作日志都在消息服务端,消息服务本身具备高可靠和高可用的特点,所以只要网络恢复,事务可以继续,能保证只要生产者:操作成功,则消费者一定能够拿到消息并处理成功;或操作失败, 则消费者收不到消息的最终一致性。

 

代码实现:

MNS最新的Java SDK (1.1.5)(下载中的TransactionQueue支持上述事务消息方案。使用者只需要在TransactionOperationsTransactionChecker 两个接口添加业务操作和检查逻辑,就可以方便的实现事务消息。


Demo 代码

public class TransactionMessageDemo{
    public class MyTransactionChecker implements TransactionChecker
    {
        public boolean checkTransactionStatus(Message message)
        {
            boolean checkResult = false;
            String messageHandler = message.getReceiptHandle();
            try{
                //TODO: check if the messageHandler related transaction is success.
                checkResult = true;
            }catch(Exception e)
            {
                checkResult = false;
            }
            return checkResult;
        }
    }

    public class MyTransactionOperations implements TransactionOperations
    {
        public boolean doTransaction(Message message)
        {
            boolean transactionResult = false;
            String messageHandler = message.getReceiptHandle();
            String messageBody = message.getMessageBody();
            try{
                //TODO: do your local transaction according to the messageHandler and messageBody here.
                transactionResult = true;
            }catch(Exception e)
            {
                transactionResult = false;
            }
            return transactionResult;
        }
    }

    public static void main(String[] args) {
        System.out.println("Start TransactionMessageDemo");
        String transQueueName = "transQueueName";
        String accessKeyId = ServiceSettings.getMNSAccessKeyId();
        String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();
        String endpoint = ServiceSettings.getMNSAccountEndpoint();
	
        CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
        MNSClient client = account.getMNSClient(); //this client need only initialize once

        // create queue for transaction queue.
        QueueMeta queueMeta = new QueueMeta();
        queueMeta.setQueueName(transQueueName);
        queueMeta.setPollingWaitSeconds(15);
        
        TransactionMessageDemo demo = new TransactionMessageDemo();
        TransactionChecker transChecker = demo.new MyTransactionChecker();
        TransactionOperations transOperations = demo.new MyTransactionOperations();
        
        TransactionQueue transQueue = client.createTransQueue(queueMeta, transChecker);

        // do transaction.
        Message msg = new Message();
        String messageBody = "TransactionMessageDemo";
        msg.setMessageBody(messageBody); 
        transQueue.sendTransMessage(msg, transOperations);

        // delete queue and close client if we won't use them.
        transQueue.delete();

        // close the client at the end.
        client.close();
        System.out.println("End TransactionMessageDemo");
    }

}


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 弹性计算 Java
使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践
使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践
|
12月前
|
消息中间件 存储 缓存
分布式中间件核心原理与RocketMQ最佳实践
随着互联网业务的不断扩展和复杂化,分布式系统的需求也越来越迫切。为了满足这一需求,分布式中间件应运而生。在分布式系统中,中间件的角色是协调和管理各个节点之间的通信和数据交换,它起到了桥梁的作用。本文将介绍分布式中间件的核心原理和RocketMQ最佳实践,帮助读者更好地理解和应用分布式中间件。
326 0
|
消息中间件 存储 算法
RocketMQ 重试机制详解及最佳实践
本文主要介绍在使用 RocketMQ 时为什么需要重试与兜底机制,生产者与消费者触发重试的条件和具体行为,如何在 RocketMQ 中合理使用重试机制,帮助构建弹性,高可用系统的最佳实践。
737 0
RocketMQ 重试机制详解及最佳实践
|
5月前
|
消息中间件 监控 安全
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(3)
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践
58 0
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(3)
|
5月前
|
消息中间件 Java Kafka
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(2)
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(2)
56 0
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(2)
|
5月前
|
消息中间件 Cloud Native Apache
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(1)
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践
42 0
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(1)
|
11月前
|
消息中间件 Cloud Native 中间件
带你读《企业级云原生白皮书项目实战》——4.1.3 消息队列RocketMQ版最佳实践(上)
带你读《企业级云原生白皮书项目实战》——4.1.3 消息队列RocketMQ版最佳实践(上)
187 0
|
11月前
|
消息中间件 存储 数据采集
带你读《企业级云原生白皮书项目实战》——4.1.3 消息队列RocketMQ版最佳实践(下)
带你读《企业级云原生白皮书项目实战》——4.1.3 消息队列RocketMQ版最佳实践(下)
184 0
|
11月前
|
消息中间件 存储 分布式计算
带你读《企业级云原生白皮书项目实战》——4.1.6 消息队列Kafka版最佳实践(上)
带你读《企业级云原生白皮书项目实战》——4.1.6 消息队列Kafka版最佳实践(上)
291 0
|
11月前
|
消息中间件 存储 分布式计算
带你读《企业级云原生白皮书项目实战》——4.1.6 消息队列Kafka版最佳实践(下)
带你读《企业级云原生白皮书项目实战》——4.1.6 消息队列Kafka版最佳实践(下)
232 0

热门文章

最新文章