本篇内容介绍了“Kafka2.7是如何重设消费者组位移”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
首先看看重置位移前的消费进度
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --describe
根据进度截图,能看到所有分区的Lag
均为0,说明消息已经被消费完,现在根据Earliest
策略重置消费进度,要求重置后所有的消息均可重新消费。
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic --to-earliest --execute
此时再度查看消费进度,可以看到 此时消费者可以重新消费这些消息。
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic:1,2 --to-earliest --execute
Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());final String topic = "mytopic";try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList(topic)); consumer.poll(0); Collection<TopicPartition> partitions = consumer.partitionsFor(topic).stream() .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())) .collect(Collectors.toList()); consumer.seekToBeginning(partitions); consumer.partitionsFor(topic).forEach(i -> consumer.position(new TopicPartition(topic, i.partition())));}
需要特殊说明的是,seekToBeginning
、seekToEnd
等方法执行完需要执行position
才会立刻生效
Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());final String topic = "mytopic";try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList(topic)); consumer.poll(0); List<TopicPartition> partitions = new ArrayList<TopicPartition>(); partitions.add(new TopicPartition(topic, 1)); partitions.add(new TopicPartition(topic, 2)); consumer.seekToBeginning(partitions); consumer.position(new TopicPartition(topic, 1)); consumer.position(new TopicPartition(topic, 2));}
首先看看重置位移前的消费进度。
根据上图可以看到,kafka当前没有任何消息被消费,现在根据Latest
策略重置消费进度,要求重置后原消息不再消费。
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic --to-latest --execute
重置后
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic:1,2 --to-latest --execute
Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");final String topic = "mytopic";try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList(topic)); consumer.poll(0); consumer.seekToEnd(consumer.partitionsFor(topic).stream() .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())) .collect(Collectors.toList())); consumer.partitionsFor(topic).forEach(i -> consumer.position(new TopicPartition(topic, i.partition())));}
Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");final String topic = "mytopic";try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList(topic)); consumer.poll(0); List<TopicPartition> partitions = new ArrayList<TopicPartition>(); partitions.add(new TopicPartition(topic, 1)); partitions.add(new TopicPartition(topic, 2)); consumer.seekToEnd(partitions); consumer.position(new TopicPartition(topic, 1)); consumer.position(new TopicPartition(topic, 2));}
此方法暂时联想不到相应的应用场景,粗略跳过,待以后了解后再补充。
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic --to-current --execute
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic:1,2 --to-current --execute
Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");final String topic = "mytopic";try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList(topic)); consumer.poll(0); consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())).forEach(tp -> { long committedOffset = consumer.committed(tp).offset(); consumer.seek(tp, committedOffset); });}
Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");final String topic = "mytopic";try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList(topic)); consumer.poll(0); TopicPartition tp1 = new TopicPartition(topic, 1); TopicPartition tp2 = new TopicPartition(topic, 2); consumer.seek(tp1, consumer.committed(tp1).offset()); consumer.seek(tp2, consumer.committed(tp2).offset());}
重置前
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic --to-offset 5 --execute
通常来说,各个分区的提交位移往往是不同的,所以将所有分区的位移设置成同一个值并不显示,需要指定分区。
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic:2 --to-offset 11 --execute
Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");final String topic = "mytopic";try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList(topic)); consumer.poll(0); consumer.partitionsFor(topic).stream().forEach(pi -> { TopicPartition tp = new TopicPartition(topic, pi.partition()); consumer.seek(tp, 5L); });}
Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");final String topic = "mytopic";try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList(topic)); consumer.poll(0); consumer.seek(new TopicPartition(topic, 2), 10L);}
重置前
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic --shift-by -1 --execute
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic:2 --shift-by -2 --execute
Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");final String topic = "mytopic";try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList(topic)); consumer.poll(0); for (PartitionInfo info : consumer.partitionsFor(topic)) { TopicPartition tp = new TopicPartition(topic, info.partition()); consumer.seek(tp, consumer.committed(tp).offset() - 1L); }}
Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");final String topic = "mytopic";try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList(topic)); consumer.poll(0); TopicPartition tp = new TopicPartition(topic, 2); consumer.seek(tp, consumer.committed(tp).offset() + 2L);}
有时按照时间点来重置位移是个不错的方式,重置前:
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic --to-datetime 2021-05-09T00:00:00.000 --execute
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic:2 --to-datetime 2020-05-09T00:00:00.000 --execute
Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");final String topic = "mytopic";try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList(topic)); consumer.poll(0); long ts = new Date().getTime() - 24 * 60 * 60 * 1000; Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream() .map(pi -> new TopicPartition(topic, pi.partition())) .collect(Collectors.toMap(Function.identity(), tp -> ts)); for (Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) { consumer.seek(entry.getKey(), entry.getValue() == null ? consumer.committed(entry.getKey()).offset() : entry.getValue().offset()); }}
Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");final String topic = "mytopic";try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList(topic)); consumer.poll(0); long ts = new Date().getTime() - 365 * 24 * 60 * 60 * 1000; Map<TopicPartition, Long> timeToSearch = new HashMap<TopicPartition, Long>(){{ put(new TopicPartition(topic, 2), ts); }}; for (Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) { consumer.seek(entry.getKey(), entry.getValue() == null ? consumer.committed(entry.getKey()).offset() : entry.getValue().offset()); }}
重置前
首先需要了解Java Duration
的格式PnDTnHnMnS
,这里不做详细展开。
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic --by-duration P1DT0H0M0S --execute
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic:2 --by-duration P1DT0H0M0S --execute
同DateTime
“Kafka2.7是如何重设消费者组位移”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/fulyn/blog/5042523