这篇文章给大家分享的是有关springboot整合rocketmq如何实现分布式事务的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
(1) 发送方向 MQ 服务端发送消息。
(2) MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
(3) 发送方开始执行本地事务逻辑。
(4) 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
(5) 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后MQ Server 将对该消息发起消息回查。
(6) 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
(7) 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.71</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.0.RELEASE</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
rocketmq:
name-server: 192.168.38.50:9876
producer:
group: transcation-group
@RocketMQTransactionListener(txProducerGroup = "transaction-producer-group")
@Slf4j
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>();
/**
* 执行业务逻辑
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
try {
System.out.println("用户A账户减500元.");
System.out.println("用户B账户加500元.");
STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
}
STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);
return RocketMQLocalTransactionState.UNKNOWN;
}
/**
* 回查
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
log.info("回查消息 -> transId ={} , state = {}", transId, STATE_MAP.get(transId));
return STATE_MAP.get(transId);
}
}
@Component
@Slf4j
public class SpringTransactionProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送消息
*
*/
public void sendMsg(String topic, String msg) {
Message<String> message = MessageBuilder.withPayload(msg).build();
this.rocketMQTemplate.sendMessageInTransaction("transaction-producer-group", topic, message, null);
log.info("发送成功");
}
}
@Component
@RocketMQMessageListener(topic = "pay_topic",
consumerGroup = "transaction-consumer-group",
selectorExpression = "*")
@Slf4j
public class SpringTxConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
log.info("接收到消息 -> {}", msg);
}
}
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private SpringTransactionProducer springTransactionProducer;
@GetMapping("/sendMsg")
public String sendMsg() {
springTransactionProducer.sendMsg("pay_topic", "用户A账户减500元,用户B账户加500元。");
return "发送成功";
}
}
@SpringBootApplication
public class RocketApplication {
public static void main(String[] args) {
SpringApplication.run(RocketApplication.class);
}
}
描述: 正常启动及可。
描述: 执行本地事务时添加异常,重启测试,发现消费者没有收到消息。
感谢各位的阅读!关于“springboot整合rocketmq如何实现分布式事务”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。