温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

kafka-consumer-offset位移问题怎么解决

发布时间:2023-03-07 11:24:40 来源:亿速云 阅读:120 作者:iii 栏目:开发技术

这篇文章主要介绍了kafka-consumer-offset位移问题怎么解决的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇kafka-consumer-offset位移问题怎么解决文章都会有所收获,下面我们一起来看看吧。

    1 offset的默认维护位置

    kafka-consumer-offset位移问题怎么解决

    _consumer_offsets主题里面采用key和 value的方式存储数据。

    key是 group.id+topic+分区号value 就是当前offset的值

    每隔一段时间,kafka 内部会对这个topic进行compact(压缩),也就是每个group.id+topic+分区号就保留最新数据。

    Kafka0.9版本之前,consumer黑认将offset保存在Zookeeper中。0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为_consumer_offsets。

    将offset信息存储在zk中的不足:如果将offset信息存储在zk中,那么所有的consumer都会访问zk,会消耗大量的网络资源,消费速度慢。

    1.1 消费offset案例

    思想:_consumer_offsets为Kafka中的 topic,那就可以通过消费者进行消费。

    在配置文件 config/consumer.properties中添加配置exclude.internal.topics = false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为false。修改以后执行分发命令:xsync consumer.properties。

    采用命令行方式,创建一个新的topic。

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu --partitions 2 --replication-factor 2

    启动生产者往atguigu生产数据。

    [atguigu@hadoop102 kafka] $ bin/kafka-console-producer.sh --topic atguigu --bootstrap-server hadoop102:9092

    启动消费者消费atguigu数据。

    [atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh bootstrap-server hadoop102:9092--topic atguigu --group test

    注意:指定消费者组名称,更好观察数据存储位置(key是 group.id+topic+分区号)。查看消费者消费主题_consumer_offsets。

    [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --topic _consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

    kafka-consumer-offset位移问题怎么解决

    2 自动提交offset

    为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。自动提交offset的相关参数:

    • enable.auto.commit:是否开启自动提交offset功能,默认是true

    • auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

    kafka-consumer-offset位移问题怎么解决

    消费者配置代码:

    //配置是否是自动提交
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
    //提交时间间隔,单位是ms
    properties.put(ConsumerConfig.AUTO_COMNIT_INTERVAL_NS_CONFI6,1000);

    3 手动提交offset

    3.1 原理

    虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。

    手动提交offset的方法有两种:分别是commitSync(同步提交)commitAsync(异步提交)

    两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

    • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。

    • commitAsync(异步提交):发送完提交offset请求后,就开始消费下一批数据了

    kafka-consumer-offset位移问题怎么解决

    3.2 代码示例

    3.2.1 同步提交

    //手动提交属性配置
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ,false);
    //消费代码逻辑
    XXX
    XXX
    XXX
    //手动提交代码(处理完数据以后,这里为了方便,只展示关键代码)
    //手动提交offset
    kafkaConsumer.commitsync();

    3.2.2 异步提交(生产常用)

    //手动提交属性配置
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ,false);
    //消费代码逻辑
    XXX
    XXX
    XXX
    //手动提交代码(处理完数据以后,这里为了方便,只展示关键代码)
    //手动提交offset
    kafkaConsumer.commitAsync();

    4 指定offset消费

    auto.offset.reset = earliest | latest | none 默认是latest

    当Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

    • earliest:自动将偏移量重置为最早的偏移量,--from-beginning。

    • latest(默认值):自动将偏移量重置为最新偏移量。

    • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

    kafka-consumer-offset位移问题怎么解决

    任意指定offset位移开始消费。

    //1创建消费者
    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
    // 2订阅主题
    ArrayList<String> topics = new ArrayList<>(;topics.add( "first");
    kafkaConsumer.subscribe(topics);
     
    //指定位置进行消费
    set<TopicPartition> assignment = kafkaConsumer.assignment();//获取所有分区信息
    //保证分区分配方案已经制定完毕,因为由于leader消费者制定分配方案会消耗一定时间,有可能此时获取不到分区信息,所以加一层分区空间判断
    while (assignment.size() == 0){
        //促使获取的分区数量不为0
        kafkaConsumer.poll(Duration.ofSeconds(1));
        assignment = kafkaConsumer.assignment();
    }
     
    //遍历所有分区,指定消费的offset
    for (TopicPartition topicPartition : assignment) {
        kafkaConsumer.seek(topicPartition, 100);
    }
     
    // 3消费数据
    while (true){

    5 指定时间消费

    需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。

    例如要求按照时间消费前一天的数据,怎么处理?

    //1创建消费者
    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
    // 2订阅主题
    ArrayList<String> topics = new ArrayList<>(;topics.add( "first");
    kafkaConsumer.subscribe(topics);
     
    //指定位置进行消费
    set<TopicPartition> assignment = kafkaConsumer.assignment();//获取所有分区信息
    //保证分区分配方案已经制定完毕,因为由于leader消费者制定分配方案会消耗一定时间,有可能此时获取不到分区信息,所以加一层分区空间判断
    while (assignment.size() == 0){
        //促使获取的分区数量不为0
        kafkaConsumer.poll(Duration.ofSeconds(1));
        assignment = kafkaConsumer.assignment();
    }
    //希望把时间转换为对应的offset
    HashMap<TopicPartition,Long> topicPartitionLongHashMap = new HashMap<>();
    //封装对应集合
    for (TopicPartition topicPartition : assignment) {
        //希望获取当前系统时间一天前的数据。
        topicPartitionLongHashMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
    }
    Nap<TopicPartition,OffsetAnd imestamp> topioPartitionffsetAndrtimestampMep = karfiaConsumer.offsetsForTines(topicPartitionL ongHashiap);
     
     
    //遍历所有分区,指定消费的offset
    //指定消费的offset
    for (TopicPartition topicPartition : assignment) {
        OffsetAndTimestamp offsetAndTimestamp = topicPartition0ffsetAndTimestampHap.get(topicPartition);
        kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
    }
     
    // 3消费数据
    while (true){

    6 漏消费和重复消费分析

    6.1 重复消费

    场景1:重复消费。自动提交offset引起。

    kafka-consumer-offset位移问题怎么解决

    6.2 漏消费

    场景1:漏消费。设置offset为手动提交,当offset被提交时,数据还在内存中未落盘,此时刚好消费者线程被kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。

    kafka-consumer-offset位移问题怎么解决

    6.3 消费者事务

    如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定

    此时我们需要将Kafka的offset保存到支持事务的自定义介质(比如MySQL)。这部分知识会在后续项目部分涉及。

    kafka-consumer-offset位移问题怎么解决

    7 数据积压

    方案1:如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)

    kafka-consumer-offset位移问题怎么解决

    方案2:如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。

    kafka-consumer-offset位移问题怎么解决

    关于“kafka-consumer-offset位移问题怎么解决”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“kafka-consumer-offset位移问题怎么解决”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注亿速云行业资讯频道。

    向AI问一下细节

    免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

    AI