今天就跟大家聊聊有关如何整合RocketMQ事务消息,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ选型
RocketMQ提供了事务消息回查,查看官方Demo
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {
private static final String TX_PGROUP_NAME = "myTxProducerGroup";
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${demo.rocketmq.transTopic}")
private String springTransTopic;
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
// Send transactional messages
testTransaction();
}
private void testTransaction() throws MessagingException {
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg = MessageBuilder
.withPayload("Hello RocketMQ " + i)
.setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i)
.build();
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,
springTransTopic + ":" + tags[i % tags.length],
msg,
null);
System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",
msg.getPayload(),
sendResult.getSendStatus());
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME)
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n", transId);
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(transId, status);
if (status == 0) {
// Return local transaction with success(commit), in this case,
// this message will not be checked in checkLocalTransaction()
System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
return RocketMQLocalTransactionState.COMMIT;
}
if (status == 1) {
// Return local transaction with failure(rollback) , in this case,
// this message will not be checked in checkLocalTransaction()
System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
return RocketMQLocalTransactionState.ROLLBACK;
}
System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
Integer status = localTrans.get(transId);
if (null != status) {
switch (status) {
case 0:
retState = RocketMQLocalTransactionState.UNKNOWN;
break;
case 1:
retState = RocketMQLocalTransactionState.COMMIT;
break;
case 2:
retState = RocketMQLocalTransactionState.ROLLBACK;
break;
}
}
System.out.printf("------ !!! checkLocalTransaction is executed once," +
" msgTransactionId=%s, TransactionState=%s status=%s %n",
transId, retState, status);
return retState;
}
}
}
需要在testTransaction()
中发送消息,然后在TransactionListenerImpl
类中实现executeLocalTransaction()
方法才能执行整个本地事务,然后在checkLocalTransaction()
中实现事务消息回查。
查看源代码可以知道testTransaction()
方法和executeLocalTransaction()
是在同一个线程当中,只不过包装RocketMQTemplate
中。
消息发送的事务消息回调查询和本地事务没严格的先后顺序,怎么保证,回查时,事务操作肯定已经完成。
事务消息回调使用transaction_id
查询,那么transaction_id
存放在哪里,同时保证transaction_id
关联的业务操作执行成功。
怎么把事务回调查询操作隔离出业务,保证不侵入代码中。
下游消费者怎么保证接口幂等性。
下游消费者怎么提高幂等性查询性能。
怎么把幂等性操作隔离出业务,保证不侵入代码中。
因为数据库或者其他业务操作可能会存在延时,那么不能保证回查时业务操作已完成,那么可以多次回查,并设置最大回查次数,同时不能丢弃MQ消息持久化,方便手动恢复。
可以使用本地消息表落地的发送消息,同时可以采用切面、继承等等方式将落地消息隔离出业务代码之外,保证本地消息落库不侵入,注意必须要保证本地消息落库和本地业务落库在同一个事务之内!
事务消息回查可以使用第2点的本地消息表,根据transaction_id
查询,判断本地事务的执行结果,也和第2点一样,可以使用一些方式将事务消息回查代码隔离出业务代码,保证不侵入。
幂等性的方法:
数据库唯一约束
状态机CAS单向流转
消息去重表
,在执行本地业务前,先对redis判断是业务id是否存在,存在则直接返回消费成功,在执行本地业务之后,可以将消费信息异步落地到redis当中。注意:需要保证本地业务和消息幂等性操作在同一个事务当中,同时redis落地操作在事务之外。
比较好的方案应该是数据库唯一约束 + 消息去重表,在消息去重表中对业务id设置唯一约束,同时将消息落地操作隔离出本地业务之外,保证不侵入。
定时清理历史的本地消息表(消息去重表)。
看完上述内容,你们对如何整合RocketMQ事务消息有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注亿速云行业资讯频道,感谢大家的支持。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/teddyIH/blog/3105696