mq发送消息之后,业务代码回滚,导致发了一条中奖消息给用户!!

背景是这样的:在一个名为"幸运大转盘"的线上活动中,用户可以通过消耗一定的积分来参与抽奖,有机会赢取各种奖品。这个活动的后台系统使用了消息队列(MQ)来处理用户的抽奖请求和发送中奖消息。

一天,有一个名叫小明的用户在活动中抽中了一台新的iPhone。然而,就在他抽中奖品的同时,后台系统的业务代码出现了一个严重的错误,导致抽奖请求的处理过程中出现了问题,最终导致了一个回滚操作。

虽然回滚操作成功地撤销了小明的抽奖请求,但是由于消息队列的特性,中奖消息已经被发送出去,小明收到了一条中奖消息。最后来兑奖的时候产生了这样的一个闹剧。

这样的业务其实在我们开发过程中可以说很平常了,当在一个方法中即有业务代码和发送mq消息的时候,就会出现这样的情况,那么,我们如何避免这样的情况出现呢,这个时候就用到了RocketMQ里面的事务消息了。

我们先来看一张官方关于事务消息的流程图

在这里插入图片描述

事务消息发送步骤如下:

  1. 生产者将半事务消息发送至 RocketMQ Broker1
  2. RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息2
  3. 生产者开始执行本地事务逻辑3
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下4
  • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
  • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  1. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查5
  2. note 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置6
    事务消息回查步骤如下:
  3. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果7
  4. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理8

其实博主之前已经写过一篇RocketMq的消息基本使用文章,但是里面没有讲详细,所以这篇文章是更详细的MQ事务消息讲解:

【Spring Cloud Alibaba】RocketMQ的基础使用,如何发送消息和消费消息

使用rocketmq-client-java

我们先来一个不使用rocketmq-spring-boot-starter,使用原生的rocketmq-client-javaSDK来讲解一下mq的事务消息。

		 <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.0</version>
        </dependency>

我们先来看代码:

package com.masiyi.provider.service;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.setNamesrvAddr("120.76.201.118:9876");
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }

    static class TransactionListenerImpl implements TransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);

        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            return LocalTransactionState.UNKNOW;
        }

        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    default:
                        return LocalTransactionState.COMMIT_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
}

创建一个RocketMQ事务消息生产者,发送10条带有不同标签的事务消息,它使用自定义的TransactionListenerImpl来处理和检查本地事务状态。可以看到他使用for循环一共发送了10条消息,那么我们运行一下看结果:

在这里插入图片描述

可以看到我们的控制台最终只有三条信息发送成功了,我们可以看到结果是其他的7条消息都没有发送成功,也就是进行了回滚操作。

我们来看一下我们的代码

main方法

  1. TransactionListener transactionListener = new TransactionListenerImpl(); - 创建一个事务监听器的实例。这个监听器需要实现TransactionListener接口,以处理事务消息的三种状态:提交、回滚和未知。

  2. TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); - 创建一个事务消息的生产者。"please_rename_unique_group_name"是生产者的组名,应该是唯一的。

  3. ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {...}); - 创建一个线程池,用于处理事务消息的检查。这个线程池的大小是2-5,任务队列的大小是2000。

  4. producer.setExecutorService(executorService); - 设置生产者的线程池。

  5. producer.setTransactionListener(transactionListener); - 设置生产者的事务监听器。

  6. producer.setNamesrvAddr("120.76.201.118:9876"); - 设置生产者的NameServer地址。

  7. producer.start(); - 启动生产者。

  8. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; - 创建一个标签数组。

  9. for (int i = 0; i < 10; i++) {...} - 发送10条事务消息。每条消息都有一个不同的标签,以便于在消费者端进行过滤。

  10. for (int i = 0; i < 100000; i++) {...} - 休眠100000秒,以便于在这段时间内处理事务消息的状态。

  11. producer.shutdown(); - 关闭生产者。

TransactionListenerImpl类

TransactionListener接口是Apache RocketMQ中用于处理事务消息的接口,它有两个方法:executeLocalTransactioncheckLocalTransaction

  1. private AtomicInteger transactionIndex = new AtomicInteger(0); - 创建一个原子整数,用于生成事务索引。

  2. private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); - 创建一个线程安全的哈希表,用于存储本地事务的状态。

  3. executeLocalTransaction方法 - 这个方法在发送事务消息时被调用,用于执行本地事务。在这个方法中,我们首先获取一个事务索引,然后根据这个索引的值来模拟本地事务的执行结果。最后,我们将事务的状态(0、1或2)存储在localTrans哈希表中,并返回LocalTransactionState.UNKNOW,表示本地事务的状态未知。

  4. checkLocalTransaction方法 - 这个方法在检查本地事务的状态时被调用。在这个方法中,我们首先从localTrans哈希表中获取事务的状态,然后根据这个状态来返回相应的LocalTransactionState。如果事务的状态未知,我们返回LocalTransactionState.UNKNOW;如果事务已经提交,我们返回LocalTransactionState.COMMIT_MESSAGE;如果事务已经回滚,我们返回LocalTransactionState.ROLLBACK_MESSAGE

我们再回到之前的图中,在这个main方法里面我们只做了112233个步骤也就是到了producer.sendMessageInTransaction(msg, null);的时候,我们才会调用执行本地事务,也就是这段代码:

  @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            return LocalTransactionState.UNKNOW;
        }

而到了这个步骤,就是步骤44了,
可以看方法的名字就知道这是我们的本地事务方法:executeLocalTransaction,我们可以把本地事务写到这里,例如在我们的案例中,我们可以把保存中奖信息的操作放入到这个方法里面,通过手动控制事务的方式控制业务代码的提交和返回消息的结果

	COMMIT_MESSAGE,
    ROLLBACK_MESSAGE,
    UNKNOW;

如果本地返回的结果(二次确认结果)为
Commit:服务端将半事务消息标记为可投递,并投递给消费者。
Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
如果是上面两个状态,则不会执行后面的步骤了。但是我们可以看到我们的代码里面返回本地事务的状态都是UNKNOW,所以说后面还会触发55和66这两个步骤,即示例里面的代码:

  @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    default:
                        return LocalTransactionState.COMMIT_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }

可以看到这里面根据status的值去判断是否是commit或者rollback。结合我们的业务来看,这个里面就可以通过查询数据库看有没有这条小明的信息或者看他有没有中奖来返回对于的commit或者rollback即步骤77,最后执行步骤88

经过这个一系列的操作,我们就可以把消息和业务绑定起来,即业务回滚了消息也不回正常发出去,这样就不会出现我们的标题所示的情况了。

使用rocketmq-spring-boot-starter

那么如果我们使用rocketmq-spring-boot-starter又该如何配置呢?那么就请移步RocketMQ的基础使用,如何发送消息和消费消息:发送事务消息
里面的功能和我们上面原生的操作方法是一模一样的,只不过里面加了一个策略模式去区分不同的主题topic。

那么本文仓库已经开源,地址就是:

RocketMq的使用


  1. 步骤1 ↩︎ ↩︎

  2. 步骤2 ↩︎ ↩︎

  3. 步骤3 ↩︎ ↩︎

  4. 步骤4 ↩︎ ↩︎

  5. 步骤5 ↩︎ ↩︎

  6. 步骤6 ↩︎ ↩︎

  7. 步骤7 ↩︎ ↩︎

  8. 步骤8 ↩︎ ↩︎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/594852.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

linux(ubuntu18.04.2) Qt编译 MySQL(8.0以上版本)链接库 Qt版本 5.12.12及以上 包含Mysql动态库缺失问题

整理这篇文档的意义在于&#xff1a;自己走了很多弯路&#xff0c;淋过雨所以想为别人撑伞&#xff0c;也方便回顾&#xff0c;仅供参考 一、搭建开发环境&#xff1a; 虚拟机&#xff08;ubuntu-20.04.6-desktop-amd64&#xff09;&#xff1a;Mysql数据库 8.0.36Workbench …

QtWindows任务栏

目录 引言任务栏进度右键菜单缩略图工具栏完整代码 引言 针对Windows系统的任务栏&#xff0c;Qt基于系统的原生接口封装有一些非常见类&#xff0c;如QWinTaskbarButton、QWinTaskbarButton、QWinThumbnailToolBar等&#xff0c;用于利用工具栏提供更多的信息&#xff0c;诸如…

开源电子邮件营销平台 listmonk 使用教程

做产品肯定要做电子邮件营销&#xff0c;特别是面向海外的产品&#xff0c;电子邮件营销已成为企业与客户沟通、建立品牌忠诚度和推动销售的重要工具&#xff0c;可以直接接触到目标受众&#xff0c;提供个性化内容&#xff0c;并以相对较低的成本获得可观的投资回报。你看&…

用HAL库改写江科大的stm32入门例子_1、按键控制led灯

1 如下图设置PB11 管脚 2 设置PB11为下降沿中断&#xff1a; 3 PA1 设置为推挽输出 4、NVIC 开启中断使能&#xff1a; 5、写中断事件&#xff1a; 完整代码如下&#xff1a; void EXTI15_10_IRQHandler(void) {/* USER CODE BEGIN EXTI15_10_IRQn 0 *///torning on the led…

母婴店运用商城小程序店铺的效果是什么

母婴市场规模高&#xff0c;还可与不少行业无缝衔接&#xff0c;尤其是以90后、00后为主的年轻人&#xff0c;在备孕生育和婴儿护理前后等整体流程往往不惜重金且时间长&#xff0c;母婴用品无疑是必需品&#xff0c;商家需要多方面拓展全面的客户及打通场景随时消费路径。 运…

24.5.5(离散化+树状数组,线段树)

星期一&#xff1a; dp题单 背包 第四题 混可乐 cf传送门 思路&#xff1a;条件可演化为每种可乐值为 ai-n&#xff0c;选最少的可乐使总和为0&#xff08;具体可看官方题解 到这会发现背包并不适合了&#xff0c;其实这是道bfs伪装的背包…

【Linux网络】网络文件共享

目录 一、存储类型 二、FTP文件传输协议 2.1 FTP工作原理 2.2 FTP用户类型 2.3 FTP软件使用 2.3.1 服务端软件vsftpd 2.3.2 客户端软件ftp 2.4 FTP的应用 2.4.1 修改端口号 2.4.2 匿名用户的权限 2.4.3 传输速率 三、NFS 3.1 工作原理 3.2 NFS软件介绍 3.3 NFS配…

数据结构===二叉树

文章目录 概要二叉树的概念分类存储遍历前序中序后序 小结 概要 简单写下二叉树都有哪些内容&#xff0c;这篇文章要写什么 二叉树的概念分类&#xff0c;都有哪些二叉树遍历 对一个数据结构&#xff0c;最先入手的都是定义&#xff0c;然后才会有哪些分类&#xff0c;对二叉…

C语言 | Leetcode C语言题解之第70题爬楼梯

题目&#xff1a; 题解&#xff1a; int climbStairs(int n) {double sqrt5 sqrt(5);double fibn pow((1 sqrt5) / 2, n 1) - pow((1 - sqrt5) / 2, n 1);return (int) round(fibn / sqrt5); }

机器人系统ros2-开发实践05-将静态坐标系广播到 tf2(Python)-定义机器人底座与其传感器或非移动部件之间的关系

发布静态变换对于定义机器人底座与其传感器或非移动部件之间的关系非常有用。例如&#xff0c;最容易推断激光扫描仪中心框架中的激光扫描测量结果。 1. 创建包 首先&#xff0c;我们将创建一个用于本教程和后续教程的包。调用的包learning_tf2_py将依赖于geometry_msgs、pyth…

【负载均衡式在线OJ项目day1】项目结构

一.功能 查看题目列表&#xff0c;在线编程&#xff0c;判题功能&#xff0c;即leetcode的部分功能 二.宏观结构 整个项目是BS模式&#xff0c;客户端是浏览器&#xff0c;和用户交互并向服务器发起请求。 服务端从功能上来说分为两个模块&#xff0c;第一个是OJServer&…

FFmpeg———encode_video(学习)

目录 前言源码函数最终效果 前言 encode_video:实现了对图片使用指定编码进行编码&#xff0c;生成可播放的视频流&#xff0c;编译时出现了一些错误&#xff0c;做了一些调整。 基本流程&#xff1a; 1、获取指定的编码器 2、编码器内存申请 3、编码器上下文内容参数设置 4、…

平平科技工作室-Python-超级玛丽

一.准备图片 放在文件夹取名为images 二.准备一些音频和文字格式 放在文件夹media 三.编写代码 import sys, os sys.path.append(os.getcwd()) # coding:UTF-8 import pygame,sys import os from pygame.locals import* import time pygame.init() # 设置一个长为1250,宽为…

03.配置监控一台服务器主机

配置监控一台服务器主机 安装zabbix-agent rpm -ivh https://mirror.tuna.tsinghua.edu.cn/zabbix/zabbix/4.0/rhel/7/x86_64/zabbix-agent-4.0.11-1.el7.x86_64.rpm配置zabbix-agent,配置的IP地址是zabbix-server的地址&#xff0c;因为要监控这台主机 vim /etc/zabbix/zab…

淘宝线上扭蛋机小程序:推动扭蛋机销量

扭蛋机作为一个新兴的娱乐消费模式&#xff0c;能够带给消费者“盲盒式”的消费乐趣&#xff0c;正在快速发展中。消费者通过投币、扫码支付等&#xff0c;在机器上扭下按钮就可以随机获得一个扭蛋商品&#xff0c;这些商品也包括动漫衍生周边、IP主题商品等&#xff0c;种类多…

先电2.4的openstack搭建

先电2.4版本的openstack&#xff0c;前期虚拟机部署参考上一篇2.2版本&#xff0c;基本步骤是一样的&#xff0c;准备两个镜像文件CentOS-7.5-x86_64-DVD-1804.iso&#xff0c;XianDian-IaaS-V2.4.iso [rootcontroller ~]# cat /etc/sysconfig/network-scripts/ifcfg-eno16777…

新手开抖店多久可以出单?做好这两点!七天必出单!

哈喽~我是电商月月 很多新手开抖店长时间不出单&#xff0c;觉得不正常&#xff0c;害怕新手根本做不起来店&#xff0c;就会搜索&#xff1a;新手开抖店多久可以出单&#xff1f; 新手开店&#xff0c;合理运营的话&#xff0c;七天里肯定是能出几单的&#xff0c;但没做好的…

AI新突破:多标签预测技术助力语言模型提速3倍

DeepVisionary 每日深度学习前沿科技推送&顶会论文分享&#xff0c;与你一起了解前沿深度学习信息&#xff01; 引言&#xff1a;多标签预测的新视角 在人工智能领域&#xff0c;尤其是在自然语言处理&#xff08;NLP&#xff09;中&#xff0c;预测模型的训练方法一直在…

Android(一)

坏境 java版本 下载 Android Studio 和应用工具 - Android 开发者 | Android Developers 进入安卓官网下载 勾选协议 next 如果本地有设置文件&#xff0c;选择Config or installation folder 如果本地没有设置文件&#xff0c;选择Do not import settings 同意两个协议 耐…

Android 14 init进程解析

前言 当bootloader启动后&#xff0c;启动kernel&#xff0c;kernel启动完后&#xff0c;在用户空间启动init进程&#xff0c;再通过init进程&#xff0c;来读取init.rc中的相关配置&#xff0c;从而来启动其他相关进程以及其他操作。 init进程启动主要分为两个阶段&#xff1…
最新文章