今天就跟大家聊聊有关如何进行kafka批量消费多消费者问题分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
package com.llw.medical.bs.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;import java.util.List;import java.util.Optional;@Componentpublic class KafakaListener {@KafkaListener(id = "1", topics = {"topic2"})public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("----------------- record =" + record);
System.out.println("----------------- message =" + message);
}
}@KafkaListener(id = "2", topicPartitions =
{@TopicPartition(topic = "topic1",
partitions = {"1", "2", "3"}// partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") )
})public void listen2(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("----------------- record 1=" + record);
System.out.println("------------------ message 1=" + message);
}
}//id = "4", //id="4" @KafkaListener( id= "4",groupId = "1",topics="topic1", /*topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") )*//* },*/ containerFactory = "kafkaBatchListener6")public void listen3(List<ConsumerRecord<?, ?>> records) {//, Acknowledgment ack try {for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("----------------- record 4=" + record);// System.out.println("------------------ message 4=" + message); }
}
} finally {// ack.acknowledge(); }
}//id="5" @KafkaListener(id = "5",groupId = "1",topics="topic1", /*topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) },*/ containerFactory = "kafkaBatchListener6")public void listen2(List<ConsumerRecord<?, ?>> records) {//, Acknowledgment ack try {for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("----------------- record 6=" + record);// System.out.println("------------------ message 6=" + message); }
}
} finally {// ack.acknowledge(); }
}//https://www.cnblogs.com/linjiqin/p/13171789.html @KafkaListener(id = "6",groupId = "1",topics="topic1",/* topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) }, */containerFactory = "kafkaBatchListener6")public void listen4(List<ConsumerRecord<?, ?>> records) {try {for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("----------------- record 3=" + record);// System.out.println("------------------ message 6=" + message); }
}
} finally {// ack.acknowledge(); }
}
}
一个partition只能有一个消费者,如果多个消费者会是广播模式,每个消费者都会有一条数据,kafka是一个发布和订阅模式的主键,并不是队列模式,
spring boot整合时,如果使用topicPartitions 注解参数指定partition会有消息重复消费的问题,最好使用topics注解,并指定groupId。
看完上述内容,你们对如何进行kafka批量消费多消费者问题分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注亿速云行业资讯频道,感谢大家的支持。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/3971821/blog/4548979