重学SpringBoot3-集成RocketMQ(二)

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》
期待您的点赞👍收藏⭐评论✍

重学SpringBoot3-集成RocketMQ(二)

  • 1. 基础概念
  • 2. 准备工作
  • 3. 实现事务消息的生产者
  • 4. 事务监听器实现
  • 5. 消费者示例
  • 6. 发送事务消息
  • 7. 测试
    • 7.1 模拟本地事务正常提交
    • 7.2 模拟本地事务提交失败,未回查
    • 7.3 模拟本地事务提交失败,回查成功
    • 7.4 模拟本地事务提交失败,回查失败
    • 7.5 模拟本地事务提交成功,消费失败
  • 关键点总结

今天介绍下如何在 Spring Boot 3 中与 RocketMQ 整合实现分布式事务。RocketMQ 提供了类似 X/Open XA 的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA 是一种分布式事务解决方案,一种分布式事务处理模式。下面详细介绍下 RocketMQ 如何实现事务消息。

1. 基础概念

在 RocketMQ 中,半消息(Half Message)主要用于实现事务消息。它是指生产者在发送事务消息时,RocketMQ 会先将消息保存为 半消息,等待事务状态的最终确认,确保消息的可靠性和一致性。

图片来源:https://www.cnblogs.com/dennyzhangdd/p/14572024.html

半消息的工作流程:

  1. 发送半消息:生产者首先将消息发送到 RocketMQ,RocketMQ 将其标记为半消息,并暂时存储到消息队列中,但这时消费者不会收到该消息。
  2. 执行本地事务:生产者在发送半消息后,开始执行自己的本地事务操作。
  3. 提交或回滚消息
    • 如果本地事务成功,生产者会通知 RocketMQ 提交消息,RocketMQ 将半消息转换为正常消息,并发送给消费者。
    • 如果本地事务失败,生产者会通知 RocketMQ回滚消息,RocketMQ 会删除该半消息。
  4. 事务状态回查:如果 RocketMQ 没有收到生产者的事务状态确认,RocketMQ 会通过回查机制询问生产者事务的最终状态,确保消息的一致性。

现有一个案例,后端文件上传接口,同时上传 OSS 和 MySQL,数据库负责文件元信息的增删改查, OSS负责存储文件对象,如何保证最终一致性?下面用RocketMQ 的事务消息来实现最终一致性。

2. 准备工作

请参考《重学SpringBoot3-集成RocketMQ(一)》进行环境搭建和配置工作。配置文件新增如下配置:

  consumer2:group: springboot-consumer-group2  # 新的消费者组名称topic: transaction-topic  # 订阅新的主题access-key: RocketMQ    # 若启用了 ACL 功能secret-key: 12345678    # 若启用了 ACL 功能

3. 实现事务消息的生产者

创建一个事务消息的生产者类,通过事务生产者发送消息,并处理本地事务逻辑。

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
public class TransactionalMessageProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送事务消息*/public void sendTransactionMessage(String topic, String message) {TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic, MessageBuilder.withPayload(message).build(), null);System.out.println("Transaction message sent: " + sendResult.getLocalTransactionState());}
}

4. 事务监听器实现

通过实现 RocketMQLocalTransactionListener 接口,定义事务的提交或回滚逻辑。

package com.example.boot308rocketmq;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;/*** @author CoderJia* @create 2024/09/12 15:06* @Description**/
@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务逻辑,根据业务情况返回事务的提交或回滚状态try {// 模拟本地事务处理逻辑System.out.println("Executing local transaction...");boolean success = performLocalTransaction();if (success) {return RocketMQLocalTransactionState.COMMIT;} else {return RocketMQLocalTransactionState.ROLLBACK;}} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// 事务回查逻辑,确认本地事务的最终状态System.out.println("Checking local transaction...");// 根据本地事务的处理结果返回 COMMIT_MESSAGE 或 ROLLBACK_MESSAGESystem.out.println("local transaction check success");return RocketMQLocalTransactionState.COMMIT;}private boolean performLocalTransaction() {// TODO 模拟本地事务处理文件上传OSStry {System.out.println("Upload files to OSS...");Thread.sleep(3000);System.out.println("File upload to OSS completed");return true;} catch (InterruptedException e) {System.out.println("Failed to upload file to OSS");return false;}}}

5. 消费者示例

创建一个消费者,订阅并消费事务消息。

RocketMQListener<String> 是一个接口类型,用于定义一个 RocketMQ 消息监听器,它指定接收的消息类型为 String。在 RocketMQ 中,消费者可以通过实现 RocketMQListener 接口来自动处理接收到的消息。

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "transaction-consumer-group")
public class TransactionalMessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理接收到的消息System.out.printf("Received message: %s%n", message);}
}

6. 发送事务消息

在服务中调用事务消息生产者:

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MessageController {@Resourceprivate RocketMQProducer rocketMQProducer;@GetMapping("/sendTransactionMessage")public ResponseEntity<String> sendTransactionMessage(@RequestParam String message) {rocketMQProducer.sendTransactionMessage("transaction-topic", message);return ResponseEntity.ok("Transaction message sent: " + message);}
}

7. 测试

7.1 模拟本地事务正常提交

如下图观察到,当本地事务即文件上传完成之后,生产者会通知 RocketMQ 提交消息,RocketMQ 将半消息转换为正常消息,并发送给消费进行消费。

本地事务正常提交

7.2 模拟本地事务提交失败,未回查

在 RocketMQ 中,如果消息生产者没有在规定的时间内向消息队列确认事务状态,RocketMQ 会通过回查机制(即“回滚检查”或“事务回查”)来询问生产者事务的最终状态,从而确保消息的一致性。broker.conf 中配置 transactionCheckMax=10000 表示 RocketMQ 最长等待 10 秒后进行事务状态回查。

本地事务提交失败,未回查

7.3 模拟本地事务提交失败,回查成功

本人搭建 RocketMQ 设置的回查时间为15s,所以将本地事务执行时间修改为 16s,这样会触发 RocketMQ 进行事务状态回查。

transactionCheckMax

事务状态回查成功

7.4 模拟本地事务提交失败,回查失败

修改回查方法的返回值,让RocketMQ 回查本地状态值将消息进行回滚,消费者同样不会消费消息。

回查失败

7.5 模拟本地事务提交成功,消费失败

例如生产者本地事务执行成功,但是消费者消费失败的情况,RocketMQ 会进行消息重试,直至成功。

修改一下消费者处理逻辑:

package com.example.boot308rocketmq;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;/*** @author CoderJia* @create 2024/09/12 15:06* @Description**/
@Slf4j
@Service
@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "springboot-consumer-group2")
public class TransactionalMessageConsumer02 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {try {// 处理消息的业务逻辑String msgBody = new String(message.getBody(), "UTF-8");log.info("Received message:{}", msgBody);// 模拟业务处理boolean success = processBusinessLogic(msgBody);if (!success) {throw new RuntimeException("Business processing failure");}log.info("Business processing successful");} catch (Exception e) {log.error("MsgID:{},reconsumeTimes:{},e:{}", message.getMsgId(), message.getReconsumeTimes(), e.getMessage());// 重新抛出异常,让 RocketMQ 进行重试throw new RuntimeException("Message consumption failed, retrying");}}private boolean processBusinessLogic(String message) {// 这里是业务逻辑,返回 true 表示成功,false 表示失败// 例如:数据库操作或其他远程调用return Math.random() > 0.5; // 模拟随机成功或失败}
}

消费失败

关键点总结

  1. 事务消息发送:通过 RocketMQTemplate.sendMessageInTransaction() 方法发送事务消息。
  2. 本地事务处理:实现 TransactionListener 接口的 executeLocalTransaction() 方法处理本地事务逻辑。
  3. 事务回查:在 checkLocalTransaction() 方法中定义如何检查事务消息的最终状态。
    以上就是 SpringBoot3 结合 RocketMQ 实现的事务消息,提供了分布式事务中的最终一致性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1534454.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

【新片场-注册安全分析报告-无验证方式导致安全隐患】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 1. 暴力破解密码&#xff0c;造成用户信息泄露 2. 短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉 3. 带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造…

如何用SQL Server和Oracle进行数据同步?

数据同步是大数据应用中非常重要的环节&#xff0c;它可以保证数据的实时性和一致性&#xff0c;为数据分析和决策提供重要依据。常见的数据同步方式包括ETL实时同步和实时ETL工具&#xff0c;其中实时ETL工具又可以分为基于日志追踪和基于触发器两种。 针对不同的数据库系统&…

在grafana上配置显示全部node资源信息概览

在grafana上配置显示全部node资源信息概览&#xff0c;便于巡检 1&#xff0c;注册grafana官网账号&#xff1a;Grafana dashboards | Grafana Labs&#xfeff; 2、寻找可以展示所有node资源概览信息的dashboard&#xff0c;并下载支持prometheus数据源的dashboard&#xff…

DevOps 中常常被忽略却至关重要的一个工具

在今天的云原生开发时代&#xff0c;DevOps 已经成为许多企业提高开发和运维效率的标准流程。DevOps 工具链广泛而复杂&#xff0c;涵盖了从规划、开发、测试到部署、监控和运维的多个环节。例如&#xff1a;规划与项目管理工具 GitLab、Jira&#xff1b;版本控制工具 Git&…

c++—多态【万字】【多态的原理】【重写的深入学习】【各种继承关系下的虚表查看】

目录 C—多态1.多态的概念2.多态的定义及实现2.1多态的构成条件2.2虚函数的重写2.2.1虚函数重写的两个例外&#xff1a;2.2.1.1协变2.2.1.2析构函数的重写 2.3 c11的override和final2.3.1final2.3.2override 2.4 重载、重写、重定义的对比 3.抽象类3.1抽象类的概念3.2接口继承和…

搜索树和Map

一.搜索树 1.概念 二叉搜索树又叫二叉排序树&#xff0c;它可以是一颗空树也可以是具有以下性质的二叉树 若它的左子树不为空&#xff0c;则左子树上所有节点的值都小于根节点的值若它的右子树不为空&#xff0c;则右子树上所有节点的值都大于根节点的值它的左子树也分别为二…

Redis 篇-深入了解使用 Redis 中的 GEO 数据结构实现查询附近店铺、BitMap 实现签到功能、HyperLogLog 实现 UV 流量统计

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 GEO 数据结构的基本用法 1.1 使用 GEO 导入数据 1.2 使用 GEO 实现查找附近店铺功能 2.0 BitMap 基本用法 2.1 使用 BitMap 实现签到功能 2.2 统计连续签到功能 3…

windows server2012 配制nginx安装为服务的时候,直接跳要安装.net框架,用自动的安装,直接失败的解决。

1、上一个已成功在安装过程中的图&#xff1a; 2、之前安装过程中错误的图&#xff1a; 3、离线安装解决&#xff1a; 下载.net framework 3.5&#xff0c;然后解压后&#xff0c;选择指定备用源路径&#xff0c;然后选择.net安装包所在目录&#xff1a; 只要指定上面全路径就…

4G模块点对点传输手把手教程!如何实现远程设备直接通信

使用4G模块进行点对点传输&#xff0c;可以实现远程设备的直接通信&#xff0c;广泛应用于工业控制、远程监控、物联网等领域。本教程将详细讲解如何通过4G模块&#xff0c;构建设备之间的点对点&#xff08;P2P&#xff09;传输系统&#xff0c;从配置设备、建立通信通道到实际…

Delphi Web和Web服务开发目前有哪些选择

Delphi Web和Web服务开发目前有哪些选择 Delphi Web和Web服务开发目前有以下几个选择&#xff1a; Delphi MVC Framework&#xff08;https://github.com/delphimvcframework/delphimvcframework&#xff09;&#xff1a;这是一个开源的Delphi Web框架&#xff0c;基于MVC&am…

【Linux】基本指令及其周边知识

1.准备阶段 在介绍Linux的基本指令之前&#xff0c;我先先向大家介绍一下我的Linux平台&#xff0c;首先我是在阿里云买了个服务器&#xff0c;然后使用Xshell来远程登录Linux&#xff0c;之后有关Linux上的操作都是在这上面进行的。如果你也买了相关的服务器并且设置了相关示…

Parallels Desktop19中文版2024九月最新

Parallels Desktop可以使轻松地在 MAC上运行成千上万款 Windows应用程序&#xff0c;如Excel&#xff0c;会计交易软件等。针对最新版 windows11和macOS Sonoma 进行优化。在 MAC虚拟机中跨多个操作系统开发和测试。包含 Parallels Toolbox – 40 多个适用于 Mac 和 PC 的一键…

ROS1录包偶现一次崩溃问题定位

现象&#xff1a;崩到了mogo_reporter里面 堆栈&#xff1a;crash里面同时存在两个主线程的堆栈 代码 #include "boost/program_options.hpp" #include <signal.h> #include <string> #include <sstream> #include <iostream> #include <…

[“1“, “2“, “3“].map(parseInt)结果

parseInt 的用法 parseInt 是 JavaScript 中的一个全局函数&#xff0c;用于将字符串转换为整数。它的基本语法如下&#xff1a; parseInt(string, radix);string&#xff1a;要解析的字符串。radix&#xff08;可选&#xff09;&#xff1a;字符串的基数&#xff0c;可以是 …

高科技企业选择跨网文件系统最容易踩坑的地方

在数字化时代&#xff0c;高科技企业频繁使用跨网文件交换系统的原因多种多样。首先&#xff0c;随着全球化的推进&#xff0c;企业需要在不同地理位置的分支机构之间传输敏感数据和重要文件。其次&#xff0c;跨网文件交换能够提高工作效率&#xff0c;确保信息的实时更新和共…

开源 TTS 模型「Fish Speech」1.4 发布;GameGen-O :生成开放世界游戏视频模型丨 RTE 开发者日报

开发者朋友们大家好&#xff1a; 这里是 「RTE 开发者日报」 &#xff0c;每天和大家一起看新闻、聊八卦。 我们的社区编辑团队会整理分享 RTE&#xff08;Real-Time Engagement&#xff09; 领域内「有话题的新闻」、「有态度的观点」、「有意思的数据」、「有思考的文章」、…

高并发下的生存之道:如何巧妙化解热Key危机?

我是小米,一个喜欢分享技术的29岁程序员。如果你喜欢我的文章,欢迎关注我的微信公众号“软件求生”,获取更多技术干货! 哈喽,大家好!我是小米,29岁,喜欢分享技术的小米上线啦!今天咱们来聊聊在互联网高并发场景下,一个让大家又爱又恨的问题——热Key问题。热Key是什么…

【C++】_stack和_queue容器适配器、_deque

当别人都在关注你飞的有多高的时候&#xff0c;只有父母在关心你飞的累不累。&#x1f493;&#x1f493;&#x1f493; 目录 ✨说在前面 &#x1f34b;知识点一&#xff1a;stack •&#x1f330;1.stack介绍 •&#x1f330;2.stack的基本操作 &#x1f34b;知识点二&…

【电路笔记】-反相运算放大器

反相运算放大器 文章目录 反相运算放大器1、概述2、理想反相运算放大器3、实际反相运算放大器3.1 闭环增益3.2 输入阻抗3.3 输出阻抗4、反相运算放大器示例5、总结1、概述 上一篇关于同相运算放大器的文章中已介绍了该运算放大器配置的所有细节,该配置在同相引脚 (+) 上获取输…

LSS如何创建视锥

1 完整代码 def create_frustum(self):# 128 352, 22 8in_H