这篇文章将为大家详细讲解有关Springboot如何整合RocketMQ收发消息,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
pom.xml添加rocketmq-spring-boot-starter
依赖。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
application.yml
rocketmq:
name-server: 192.168.64.141:9876
application-demo1.yml
使用 demo1 profile 指定生产者组组名
rocketmq:
producer:
group: producer-demo1
application-demo2.yml
使用 demo2 profile 指定生产者组组名
rocketmq:
producer:
group: producer-demo2
发送普通消息
发送 Spring 的通用 Message 对象
发送异步消息
发送顺序消息
生产者
package cn.tedu.demo2.m1;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private RocketMQTemplate t ;
public void send(){
//发送同步消息
t.convertAndSend("Topic1:TagA", "Hello world! ");
//发送spring的Message
Message<String> message = MessageBuilder.withPayload("Hello Spring message! ").build();
t.send("Topic1:TagA",message);
//发送异步消息
t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
});
//发送顺序消息
t.syncSendOrderly("Topic1", "98456237,创建", "98456237");
t.syncSendOrderly("Topic1", "98456237,支付", "98456237");
t.syncSendOrderly("Topic1", "98456237,完成", "98456237");
}
}
消费者
package cn.tedu.demo2.m1;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("收到"+s);
}
}
主类
package cn.tedu.demo2.m1;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
测试类
需要放在 test 文件夹
激活 demo1 profile @ActiveProfiles("demo1")
package cn.tedu.demo2.m1;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo1")
public class Test1 {
@Autowired
private Producer producer;
@Test
public void test1(){
producer.send();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
发送事务消息
生产者
package cn.tedu.demo2.m2;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private RocketMQTemplate t;
public void send(){
Message<String> message = MessageBuilder.withPayload("Hello world").build();
//一旦发送消息,则执行监听器
t.sendMessageInTransaction("Topic2",message,null);
}
@RocketMQTransactionListener
class Lis implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("执行本地事务");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("执行事务回查");
return RocketMQLocalTransactionState.COMMIT;
}
}
}
消费者
package cn.tedu.demo2.m2;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("收到"+s);
}
}
主类
package cn.tedu.demo2.m2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
测试类
package cn.tedu.demo2.m2;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo2")
public class Test2 {
@Autowired
private Producer producer;
@Test
public void test1(){
producer.send();
//为了能够收到消费者消费的数据,这里通过休眠模拟等待时间
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
关于“Springboot如何整合RocketMQ收发消息”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。