温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

RocketMQ事务消息的示例分析

发布时间:2021-12-18 11:16:10 来源:亿速云 阅读:228 作者:小新 栏目:大数据

小编给大家分享一下RocketMQ事务消息的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

一、大事务 = 小事务 + 异步

我们以一个转帐的场景为例来说明这个问题,Bob向Smith转账100块。这个列子在瓜子也有很多实际场景映射,如:车源状态变化,订单状态变化,金融放款,物流运输……

在单机环境下,执行事务的情况,大概是下面这个样子 

RocketMQ事务消息的示例分析

当用户增长到一定程度,Bob和Smith的账户及余额信息已经不在同一台服务器上了,那么上面的流程就变成了这样

RocketMQ事务消息的示例分析

这时候你会发现,同样是一个转账的业务,在集群环境下,耗时居然成倍的增长,这显然是不能够接受的。而且跨网络调用的事务需要解决网络不稳定的因素,直接放到业务代码里控制,成本很高。那如何来规避这个问题?

大事务 = 小事务 + 异步

将大事务拆分成多个小事务异步执行。这样基本上能够将跨机事务的执行效率优化到与单机一致。转账的事务就可以分解成如下两个小事务 

RocketMQ事务消息的示例分析

图中执行本地事务(Bob账户扣款)和发送异步消息应该保证同时成功或者同时失败,也就是扣款成功了,发送消息一定要成功,如果扣款失败了,就不能再发送消息。

二、什么是事务消息(Transactional message)

RocketMQ官方是这样定义的。可以将其视为两阶段提交消息实现,以确保分布式系统中的最终一致性。事务性消息确保可以原子方式执行本地事务的执行和消息的发送。

1、事务状态

事务消息有3种状态

(1)TransactionStatus.CommitTransaction,提交事务,表示允许消费者消费(使用)这条消息

(2)TransactionStatus.RollbackTransaction,回滚事务,表示消息将被删除,不允许使用

(3)TransactionStatus.Unknown,中间状态,表示需要MQ向消息发送方进行检查以确定状态

2、如何发送事务消息

RocketMQ(4.5.1版本)已经把事务消息的发送方式封装得非常优雅,只需要两个大的环节就能够完成,创建事务消息生产者和实现TransactionListener接口。看一下官方的例子代码

(1)创建事务消息生产者

使用TransactionMqProducer类创建消息生产客户端,并指定唯一的ProducerGroup

设置自定义线程池来处理检查请求

执行本地事务之后,需要根据执行结果回复MQ,回复上一小节中描述的状态 

RocketMQ事务消息的示例分析

(2)实现TransactionListener接口

“executeLocalTransaction”方法用于在发送半条消息成功时执行本地事务。它返回上一节中提到的三个事务状态之一。

“check local transaction”方法用于检查本地事务状态并响应MQ检查请求。它还返回前一节中提到的三个事务状态之一。 

RocketMQ事务消息的示例分析

3、事务消息的执行流程

代码写起来非常简单,以至于光看代码,并不能知道事务消息具体的执行过程。

RocketMQ 事务消息的设计流程借鉴了两阶段提交理论,整体交互流程如下图所示 

RocketMQ事务消息的示例分析

事务发起方(即消息发送者)首先发送 prepare 消息到 MQ。

事务发起方(即消息发送者)在发送 prepare 消息成功后执行本地事务。

根据本地事务执行结果发送 commit 或者是 rollback 给 MQ。

如果消息是 rollback,MQ 将删除该 prepare 消息不进行下发。

如果消息是 commit,MQ 将会把这个消息发送给 consumer 端。

如果执行本地事务过程中,执行端挂掉,或者超时,导致 MQ 收不到任何的消息(不知道是该 commit 还是该 rollback),RocketMQ 会定期扫描消息集群中的事务消息,这时候发现了某个 prepare 消息还不知道该怎么处理,它会向消息发送者确认,所以消息发送者需要实现一个 check 接口,RocketMQ 会根据消息发送者设置的策略来决定是 rollback 还是继续 commit。这样就保证了消息发送与本地事务同时成功或同时失败。

Consumer 端的消费成功机制由 MQ 保证。

4、事务消息的存储模型

在具体实现上,RocketMQ 通过使用 Half Topic 以及 Operation Topic 两个内部队列来存储事务消息推进状态,如下图所示 

RocketMQ事务消息的示例分析

其中,Half Topic 对应队列中存放着 prepare 消息,Operation Topic 对应的队列则存放了 prepare message 对应的 commit/rollback 消息,消息体中则是 prepare message 对应的 offset,服务端通过比对两个队列的差值来找到尚未提交的超时事务,进行回查。

从用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态即可;而在 service 层,则对事务消息的两阶段提交进行了抽象,同时针对超时事务实现了回查逻辑,通过不断扫描当前事务推进状态,来不断反向请求 Producer 端获取超时事务的执行状态,在避免事务挂起的同时,也避免了 Producer 端的单点故障。

而在存储层,RocketMQ 通过 Bridge 封装了与底层队列存储的相关操作,用以操作两个对应的内部队列,用户也可以依赖其他存储介质实现自己的 service,RocketMQ 会通过 ServiceProvider 加载进来。

三、Notify的异曲同工

Notify和MetaQ是阿里的两个消息中间件。MetaQ是一个高性能的存储队列;Notify是淘宝自主研发的一套消息服务引擎。贴两个图就什么都明白了 

RocketMQ事务消息的示例分析

RocketMQ事务消息的示例分析

整体方案跟RocketMQ是完全相同的,只是两者的Storage不同。

四、瓜子该怎么做事务一致性这块工作

针对这个典型场景,有很多解决方案

1、Kafka换成RocketMQ

不行。有太多的业务跑在了Kafka上,替换消息中间件的成本基本不能接受。

2、类似去哪儿qmq的方案

这个方案研发简单,但是侵入具体业务的数据库,而且增加了部署运维的成本。

3、有人提出binlog+TCC的方案

没有仔细研究,但是业务会经常调整,想想负责配置数据库日志的同学肯定会抓狂(DBA没有那么了解业务)。

4、为Kafka配一个类似Notify的消息引擎

这个方案有一定的可行性

(1)把Kafka定位为MetaQ,研制一个Notify,为prepare message提供单独的存储

(2)现在各业务系统所采用的Kafka客户端已经是瓜子定制化开发的,可以模仿RocketMQ的客户端进行改造。已有代码的逻辑完全不受影响;需要事务一致性的功能,只需要换个接口,实现check逻辑即可,而原有消费方毫无感觉。

(3)似乎有可能结合spring的@Transactional标签,在完全不改业务代码(只升级自研Kafka客户端)的情况下,也能缓解一些不一致问题

以上是“RocketMQ事务消息的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI