ubuntu16.04系统中怎么安装kafka,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
下载
wget http://mirror-hk.koddos.net/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
安装
tar zxvf kafka_2.12-2.3.0.tgz cd kafka_2.12-2.3.0/ vim config/server.properties
配置
# 通用配置 # kafka数据目录 log.dirs=/data/kafka # zookeeeper zookeeper.connect=kafka-node1:2181,kafka-node2:2181,kafka-node3:2181 # 节点配置 # 节点1 broker.id=0 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://10.13.6.1:9092 # 节点2 broker.id=1 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://10.13.6.2:9092 # 节点3 broker.id=2 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://10.13.6.3:9092
启动
#进入kafka根目录 cd /app/kafka_2.12-2.3.0/ #启动 bin/kafka-server-start.sh -daemon config/server.properties #启动成功输出示例(最后几行) [2019-09-11 11:14:13,403] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager) [2019-09-11 11:14:13,423] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator) [2019-09-11 11:14:13,424] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager) [2019-09-11 11:14:13,424] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator) [2019-09-11 11:14:13,459] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread) [2019-09-11 11:14:13,479] INFO [SocketServer brokerId=0] Started data-plane processors for 1 acceptors (kafka.network.SocketServer) [2019-09-11 11:14:13,485] INFO Kafka version: 2.3.0 (org.apache.kafka.common.utils.AppInfoParser) [2019-09-11 11:14:13,485] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser) [2019-09-11 11:14:13,485] INFO Kafka startTimeMs: 1568171653480 (org.apache.kafka.common.utils.AppInfoParser) [2019-09-11 11:14:13,487] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
使用
1、创建Topic 在kafka-node1(Broker)上创建测试Tpoic:test-ken-io,这里我们指定了3个副本、1个分区 bin/kafka-topics.sh --create --bootstrap-server kafka-node1:9092 --replication-factor 3 --partitions 1 --topic test-ken-io Topic在kafka-node1上创建后也会同步到集群中另外两个Broker:kafka-node2、kafka-node3 2、查看Topic 我们可以通过命令列出指定Broker的 bin/kafka-topics.sh --list --bootstrap-server kafka-node1:9092 3、发送消息 这里我们向Broker(id=0)的Topic=test-ken-io发送消息 bin/kafka-console-producer.sh --broker-list kafka-node1:9092 --topic test-ken-io #消息内容 > test by ken.io 4、消费消息 在kafka-node2上消费Broker03的消息 bin/kafka-console-consumer.sh --bootstrap-server kafka-node3:9092 --topic test-ken-io --from-beginning 在Kafka03上消费Broker02的消息 bin/kafka-console-consumer.sh --bootstrap-server kafka-node2:9092 --topic test-ken-io --from-beginning 然后均能收到消息 test by ken.io 这是因为这两个消费消息的命令是建立了两个不同的Consumer 如果我们启动Consumer指定Consumer Group Id就可以作为一个消费组协同工,1个消息同时只会被一个Consumer消费到 bin/kafka-console-consumer.sh --bootstrap-server kafka-node3:9092 --topic test-ken-io --from-beginning --group testgroup_ken bin/kafka-console-consumer.sh --bootstrap-server kafka-node2:9092 --topic test-ken-io --from-beginning --group testgroup_ken
参数
Kafka常用Broker配置说明:
配置项 | 默认值/示例值 | 说明 |
---|---|---|
broker.id | 0 | Broker唯一标识 |
listeners | PLAINTEXT://192.168.88.53:9092 | 监听信息,PLAINTEXT表示明文传输 |
log.dirs | kafka/logs | kafka数据存放地址,可以填写多个。用”,”间隔 |
message.max.bytes | message.max.bytes | 单个消息长度限制,单位是字节 |
num.partitions | 1 | 默认分区数 |
log.flush.interval.messages | Long.MaxValue | 在数据被写入到硬盘和消费者可用前最大累积的消息的数量 |
log.flush.interval.ms | Long.MaxValue | 在数据被写入到硬盘前的最大时间 |
log.flush.scheduler.interval.ms | Long.MaxValue | 检查数据是否要写入到硬盘的时间间隔。 |
log.retention.hours | 24 | 控制一个log保留时间,单位:小时 |
zookeeper.connect | 192.168.88.21:2181 | ZooKeeper服务器地址,多台用”,”间隔 |
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。