今天就跟大家聊聊有关如何进行kafka批量消费多消费者问题分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
package com.llw.medical.bs.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;import java.util.List;import java.util.Optional;@Componentpublic class KafakaListener {@KafkaListener(id = "1", topics = {"topic2"})public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record =" + record); System.out.println("----------------- message =" + message); } }@KafkaListener(id = "2", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"1", "2", "3"}// partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) })public void listen2(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 1=" + record); System.out.println("------------------ message 1=" + message); } }//id = "4", //id="4" @KafkaListener( id= "4",groupId = "1",topics="topic1", /*topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") )*//* },*/ containerFactory = "kafkaBatchListener6")public void listen3(List<ConsumerRecord<?, ?>> records) {//, Acknowledgment ack try {for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 4=" + record);// System.out.println("------------------ message 4=" + message); } } } finally {// ack.acknowledge(); } }//id="5" @KafkaListener(id = "5",groupId = "1",topics="topic1", /*topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) },*/ containerFactory = "kafkaBatchListener6")public void listen2(List<ConsumerRecord<?, ?>> records) {//, Acknowledgment ack try {for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 6=" + record);// System.out.println("------------------ message 6=" + message); } } } finally {// ack.acknowledge(); } }//https://www.cnblogs.com/linjiqin/p/13171789.html @KafkaListener(id = "6",groupId = "1",topics="topic1",/* topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) }, */containerFactory = "kafkaBatchListener6")public void listen4(List<ConsumerRecord<?, ?>> records) {try {for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 3=" + record);// System.out.println("------------------ message 6=" + message); } } } finally {// ack.acknowledge(); } } }
一个partition只能有一个消费者,如果多个消费者会是广播模式,每个消费者都会有一条数据,kafka是一个发布和订阅模式的主键,并不是队列模式,
spring boot整合时,如果使用topicPartitions 注解参数指定partition会有消息重复消费的问题,最好使用topics注解,并指定groupId。
看完上述内容,你们对如何进行kafka批量消费多消费者问题分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注亿速云行业资讯频道,感谢大家的支持。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。