RocketMQ事务消息实战

简介: 本文主要是考虑在使用消息中间件时,如果保证不丢消息的一些实践思考。

若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群


我们以一个订单流转流程来举例,例如订单子系统创建订单,需要将订单数据下发到其他子系统(与第三方系统对接)这个场景,我们通常会将两个系统进行解耦,不直接使用服务调用的方式进行交互。其业务实现步骤通常为:

  1. A系统创建订单并入库。
  2. 发送消息到MQ。
  3. MQ消费者消费消息,发送远程RPC服务调用,完成订单数据的同步。

1、方案一
伪代码如下:
clipboard
方案弊端:

  1. 如果消息发送成功,在提交事务的时候JVM突然挂掉,事务没有成功提交,导致两个系统之间数据不一致。
  2. 由于消息是在事务提交之前提交,发送的消息内容是订单实体的内容,会造成在消费端进行消费时如果需要去验证订单是否存在时可能出现订单不存在。

方案二:
伪代码如下:
clipboard
然后在控制器层,使用异步发送,将消息发送,并在消息发送成功后,更新待发送状态为已发送。
然后通过定时任务,扫描待发送,结合创建时间的记录(小于当前时间5分钟的消息待发送记录),进行消息发送。
方案弊端:
1、消息有可能重复发送,但在消费端可以通过唯一业务编号来进行去重设计。
2、实现过于复杂,为了避免极端情况下的消息丢失,需要使用定时任务。

方案三:基于RocketMQ4.3版本事务消息
clipboard
额外需要实现事务会查监听器:TransactionListener,其实例代码:

package org.apache.rocketmq.example.transaction;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;

@SuppressWarnings("unused")
public class OrderTransactionListenerImpl implements TransactionListener {
    
    private ConcurrentHashMap<String, Integer> countHashMap = new ConcurrentHashMap<>();
    
    private final static int MAX_COUNT = 5;


    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 
        String bizUniNo = msg.getUserProperty("bizUniNo"); // 从消息中获取业务唯一ID。
        // 将bizUniNo入库,表名:t_message_transaction,表结构  bizUniNo(主键),业务类型。
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = 0;
        // 从数据库查查询t_message_transaction表,如果该表中存在记录,则提交,
        String bizUniNo = msg.getUserProperty("bizUniNo"); // 从消息中获取业务唯一ID。
        // 然后t_message_transaction 表,是否存在bizUniNo,如果存在,则返回COMMIT_MESSAGE,
        // 不存在,则记录查询次数,未超过次数,返回UNKNOW,超过次数,返回ROLLBACK_MESSAGE
        
        if(query(bizUniNo) > 0 ) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        
        return rollBackOrUnown(bizUniNo);
    }
    
    public int query(String bizUniNo) {
        return 1; //select count(1) from t_message_transaction a where a.biz_uni_no=#{bizUniNo}
    }
    
    public LocalTransactionState rollBackOrUnown(String bizUniNo) {
        Integer num = countHashMap.get(bizUniNo);
        
        if(num != null &&  ++num > MAX_COUNT) {
            countHashMap.remove(bizUniNo);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        
        if(num == null) {
            num = new Integer(1);
        }
        
        countHashMap.put(bizUniNo, num);
        return LocalTransactionState.UNKNOW;
        
    }
  
} 

TransactionListener 实现要点:

  • executeLocalTransaction:
    该方法,主要是设置本地事务状态,该方法与业务方代码在一个事务中,例如OrderServer#createMap中,只要本地事务提交成功,该方法也会提交成功。

在这里主要是t_message_transaction添加一条记录,在事务会查时,如果存在记录,就认为是该消息需要提交。

  • checkLocalTransaction:
    该方法主要是告知RocketMQ消息是否需要提交还是回滚,如果本地事务表(t_message_transaction)存在记录,则认为提交,如果不存在,可以设置会查次数,如果指定次数内还是未查到消息,则回滚,否则返回未知,rocketmq会按一定的频率回查事务,当然回查次数也有限制,默认为5次,可配置。

本节的分享就到此结束了,本文主要是考虑在使用消息中间件时,如果保证不丢消息的一些实践思考。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
3月前
|
消息中间件 RocketMQ 微服务
RocketMQ 分布式事务消息实战指南
RocketMQ 分布式事务消息实战指南
304 1
|
3月前
|
消息中间件 存储 监控
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
36 1
|
3月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
88 0
|
2月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
30 0
|
2月前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
53 1
|
2月前
|
消息中间件 RocketMQ
MQ与本地事务一致性问题---RocketMQ事务型消息
MQ与本地事务一致性问题---RocketMQ事务型消息
25 2
|
2月前
|
消息中间件 前端开发 算法
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
45 1
|
2月前
|
消息中间件
动力RabbitMQ实战视频教程
动力RabbitMQ实战视频教程
22 3
动力RabbitMQ实战视频教程
|
2月前
|
消息中间件 存储 缓存
【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶
【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶