温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Kafka常用命令之kafka-console-consumer.sh怎么使用

发布时间:2023-03-07 11:57:40 来源:亿速云 阅读:220 作者:iii 栏目:开发技术

今天小编给大家分享一下Kafka常用命令之kafka-console-consumer.sh怎么使用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。

    kafka-console-consumer.sh解读

    kafka-console-consumer.sh 脚本是一个简易的消费者控制台。

    该 shell 脚本的功能通过调用 kafka.tools 包下的 ConsoleConsumer 类,并将提供的命令行参数全部传给该类实现。

    • 注意:Kafka 从 2.2 版本开始将 kafka-topic.sh 脚本中的 −−zookeeper 参数标注为 “过时”,推荐使用 −−bootstrap-server 参数。

    • 若读者依旧使用的是 2.1 及以下版本,请将下述的 --bootstrap-server 参数及其值手动替换为 --zookeeper zk1:2181,zk2:2181,zk:2181。

    • 一定要注意两者参数值所指向的集群地址是不同的。

    消息消费

    bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName

    表示从 latest 位移位置开始消费该主题的所有分区消息,即仅消费正在写入的消息。

    从开始位置消费

    bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic topicName

    表示从指定主题中有效的起始位移位置开始消费所有分区的消息。

    显示key消费

    bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --property print.key=true --topic topicName

    消费出的消息结果将打印出消息体的 key 和 value。

    若还需要为你的消息添加其他属性

    请参考下述列表

    参数值类型说明有效值
    --topicstring被消费的topic
    --whiteliststring正则表达式,指定要包含以供使用的主题的白名单
    --partitioninteger指定分区
    除非指定’–offset’,否则从分区结束(latest)开始消费

    --offsetstring执行消费的起始offset位置
    默认值:latest
    latest
    earliest
    <offset>
    --consumer-propertystring将用户定义的属性以key=value的形式传递给使用者
    --consumer.configstring消费者配置属性文件
    请注意,[consumer-property]优先于此配置

    --formatterstring用于格式化kafka消息以供显示的类的名称
    默认值:kafka.tools.DefaultMessageFormatter
    kafka.tools.DefaultMessageFormatter
    kafka.tools.LoggingMessageFormatter
    kafka.tools.NoOpMessageFormatter
    kafka.tools.ChecksumMessageFormatter
    --propertystring初始化消息格式化程序的属性print.timestamp=true|false
    print.key=true|false
    print.value=true|false
    key.separator=<key.separator>
    line.separator=<line.separator>
    key.deserializer=<key.deserializer>
    value.deserializer=<value.deserializer>
    --from-beginning
    从存在的最早消息开始,而不是从最新消息开始
    --max-messagesinteger消费的最大数据量,若不指定,则持续消费下去
    --timeout-msinteger在指定时间间隔内没有消息可用时退出
    --skip-message-on-error
    如果处理消息时出错,请跳过它而不是暂停
    --bootstrap-serverstring必需(除非使用旧版本的消费者),要连接的服务器
    --key-deserializerstring

    --value-deserializerstring

    --enable-systest-events
    除记录消费的消息外,还记录消费者的生命周期
    (用于系统测试)

    --isolation-levelstring设置为read_committed以过滤掉未提交的事务性消息
    设置为read_uncommitted以读取所有消息
    默认值:read_uncommitted

    --groupstring指定消费者所属组的ID
    --blackliststring要从消费中排除的主题黑名单
    --csv-reporter-enabled
    如果设置,将启用csv metrics报告器
    --delete-consumer-offsets
    如果指定,则启动时删除zookeeper中的消费者信息
    --metrics-dirstring输出csv度量值
    需与[csv-reporter-enable]配合使用

    --zookeeperstring必需(仅当使用旧的使用者时)连接zookeeper的字符串。
    可以给出多个URL以允许故障转移

    以上就是“Kafka常用命令之kafka-console-consumer.sh怎么使用”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注亿速云行业资讯频道。

    向AI问一下细节

    免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

    AI