本篇内容介绍了“RocketMQ的事务消息发送流程是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
半消息实现了分布式环境下的数据一致性的处理,生产者发送事务消息的流程如上图所示,通过对源码的学习,我们可以弄清楚下面几点,也是半消息机制的核心:
1.为什么prepare消息不会被Consumer消费?
2.事务消息是如何提交和回滚的?
3.定时回查本地事务状态的实现细节。
发送事务消息方法TransactionMQProducer.sendMessageInTransaction:
msg:消息
tranExecuter:本地事务执行器
arg:本地事务执行器参数
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } // 忽视消息延迟的属性 if (msg.getDelayTimeLevel() != 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } Validators.checkMessage(msg, this.defaultMQProducer); // 发送半消息 SendResult sendResult = null; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } // 处理发送半消息的结果 LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { // 发送半消息成功,执行本地事务逻辑 case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } // 执行本地事务逻辑 if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; // 发送半消息失败,标记本地事务状态为回滚 case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } // 结束事务,设置消息 COMMIT / ROLLBACK try { this.endTransaction(msg, sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } // 返回事务发送结果 TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); // 提取Prepared消息的uniqID transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult; }
该方法的入参包含有一个需要用户实现本地事务的LocalTransactionExecuter executer,executer中会进行事务操作以保证本地事务和消息发送这两个操作的原子性。
由上面的源码可知:
Producer会首先发送一个半消息到Broker中:
半消息发送成功,执行事务
半消息发送失败,不执行事务
半消息发送到Broker后不会被Consumer消费掉的原因有以下两点:
Broker在将消息写入CommitLog时会判断消息类型,如果是prepare或者rollback消息,ConsumeQueue的offset不变
Broker在构造ConsumeQueue时会判断是否是处于prepare或者rollback状态的消息,如果是则不会将该消息放入ConsumeQueue里,Consumer在拉取消息时也就不会拉取到这条消息
Producer会根据半消息的发送结果和本地任务执行结果来决定如何处理事务(commit或rollback),方法最后调用了endTransaction来处理事务的执行结果,源码如下:
sendResult:发送半消息的结果
localTransactionState:本地事务状态
localException:执行本地事务逻辑产生的异常
RemotingException:远程调用异常
MQBrokerException:Broker异常
InterruptedException:当线程中断异常
UnknownHostException:未知host异常
public void endTransaction( final Message msg, final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { // 解码消息id final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } // 创建请求 String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; // 提交 commit / rollback 消息 this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }
该方法是将事务执行的结果发送给Broker,再由Broker决定是否进行消息投递,执行步骤如下:
1.收到消息后先检查是否是事务消息,如果不是事务消息则直接返回
2.根据请求头里的offset查询半消息,如果查询结果为空则直接返回
3.根据半消息构造新消息,新构造的消息会被重新写入到CommitLog里,rollback消息的消息体为空
4.如果是rollback消息,则该消息不会被投递
具体原因上文中已经分析过:只有commit消息才会被Broker投递给consumer
RocketMQ会将commit消息和rollback消息都写入到commitLog里,但rollback消息的消息体为空且不会被投递,CommitLog在删除过期消息时才会将其删除。当事务commit成功之后,RocketMQ会重新封装半消息并将其投递给Consumer端消费。
Broker发起
相较于普通消息,事务消息主要依赖下面三个类:
1.TransactionStateService:事务状态服务,负责对事务消息进行管理,包括存储和更新事务消息状态、回查状态等
2.TranStateTable:事务消息状态存储表,基于MappedFileQueue实现
3.TranRedoLog:TranStateTable的日志,每次写入操作都会记录日志,当Broker宕机时,可以利用这个文件做数据恢复
存储半消息到CommitLog时,使用offset索引到对应的TranStateTable的位置
“RocketMQ的事务消息发送流程是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。