这篇文章主要介绍了spring-cloud-stream结合kafka的使用方法是什么,具有一定借鉴价值,需要的朋友可以参考下。希望大家阅读完这篇文章后大有收获。下面让小编带着大家一起了解一下。
1.pom文件导入依赖
<!-- kafka --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
2.application.yml文件配置
spring: cloud: stream: kafka: binder: brokers: xxx.xxx.xxx.xx:xxxx // Kafka的消息中间件服务器地址 bindings: xxx_output: // 通道名称 destination: xxx // 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的 // 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json) xxx_input: destination: xxx // 消息发往的目的地,对应topic group: xxx // 对应kafka的group
3.创建消息发送者
@EnableBinding(Source.class) // @EnableBinding 是绑定通道的,Soure.class是spring 提供的,表示这是一个可绑定的发布通道 @Service public class MqService { @Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT) private MessageChannel oesWorkbenchChannel; /** * 发送一条kafka消息 */ public boolean sendLifeData(Object object) { return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT); } } // 发布通道 public interface Source { @Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT) MessageChannel oesWorkbenchLifeDataOutput(); // 发布通道用MessageChannel }
4.创建消息监听者
@Slf4j @EnableBinding(Sink.class) public class WorkbenchStreamListener { @Resource private FileService fileService; @StreamListener(KafkaConstants.xxx_input) // 监听接受通道 public void receiveData(MoveMessage moveMessage) { } } // 接受通道 public interface Sink { @Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT) SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel }
接下来就可以愉快的发送监听消息了
感谢你能够认真阅读完这篇文章,希望小编分享spring-cloud-stream结合kafka的使用方法是什么内容对大家有帮助,同时也希望大家多多支持亿速云,关注亿速云行业资讯频道,遇到问题就找亿速云,详细的解决方法等着你来学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。