这篇文章主要介绍在Spring Boot应用程序中如何使用Apache Kafka,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
第1步:生成我们的项目: Spring Initializr 来生成我们的项目。我们的项目将提供Spring MVC / Web支持和Apache Kafka支持。
第2步:发布/读取Kafka主题中的消息:
<b>public</b> <b>class</b> User { <b>private</b> String name; <b>private</b> <b>int</b> age; <b>public</b> User(String name, <b>int</b> age) { <b>this</b>.name = name; <b>this</b>.age = age; } }
第3步:通过application.yml
配置文件配置Kafka:
我们需要创建配置文件。我们需要以某种方式配置我们的Kafka生产者和消费者,以便能够发布和读取与主题相关的消息。相比建立一个使用@Configuration
标注的Java类,我们可以直接使用配置文件application.properties或application.yml。Spring Boot让我们避免像过去一样编写的所有样板代码,同时为我们提供了更加智能的配置应用程序的方法,如下所示:
server: port: 9000 spring: kafka: consumer: bootstrap: localhost:9092 group-id: group_id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
第4步:创建一个生产者,创建生产者会将我们的消息写入该主题。
<b>public</b> <b>class</b> Producer { <b>private</b> <b>static</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Producer.<b>class</b>); <b>private</b> <b>static</b> <b>final</b> String TOPIC = <font>"users"</font><font>; @Autowired <b>private</b> KafkaTemplate<String, String> kafkaTemplate; <b>public</b> <b>void</b> sendMessage(String message) { logger.info(String.format(</font><font>"#### -> Producing message -> %s"</font><font>, message)); <b>this</b>.kafkaTemplate.send(TOPIC, message); } } </font>
自动连接autowire
到 KafkaTemplate
,使用它将消息发布到主题 - 这就是消息的生产者!
第5步:创建一个消费者,消费者是负责根据您自己的业务逻辑的需求阅读处理消息的消息的服务。要进行设置,请输入以下内容:
@Service <b>public</b> <b>class</b> Consumer { <b>private</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Producer.<b>class</b>); @KafkaListener(topics = <font>"users"</font><font>, groupId = </font><font>"group_id"</font><font>) <b>public</b> <b>void</b> consume(String message) throws IOException { logger.info(String.format(</font><font>"#### -> Consumed message -> %s"</font><font>, message)); } } </font>
在这里,我们告诉我们的方法void consume(String message)
订阅用户的主题,并将每条消息发送到应用程序日志。在您的实际应用程序中,您可以按照业务需要的方式处理消息。
第6步:创建REST控制器,们已经拥有了能够消费Kafka消息所需的全部内容。
为了充分展示我们创建的所有内容的工作原理,我们需要创建一个具有单一端点的控制器。消息将发布到此端点,然后由我们的生产者处理。然后,我们的消费者将通过登录到控制台来捕获并处理它。
@RestController @RequestMapping(value = <font>"/kafka"</font><font>) <b>public</b> <b>class</b> KafkaController { <b>private</b> <b>final</b> Producer producer; @Autowired KafkaController(Producer producer) { <b>this</b>.producer = producer; } @PostMapping(value = </font><font>"/publish"</font><font>) <b>public</b> <b>void</b> sendMessageToKafkaTopic(@RequestParam(</font><font>"message"</font><font>) String message) { <b>this</b>.producer.sendMessage(message); } } </font>
让我们使用cURL将消息发送给Kafka:
curl -X POST -F 'message=test' http://localhost:9000/kafka/publish
以上是“在Spring Boot应用程序中如何使用Apache Kafka”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注亿速云行业资讯频道!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。