温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Kafka多线程Consumer的实例代码

发布时间:2021-09-08 16:17:34 阅读:162 作者:chen 栏目:云计算
开发者测试专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

这篇文章主要介绍“Kafka多线程Consumer的实例代码”,在日常操作中,相信很多人在Kafka多线程Consumer的实例代码问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Kafka多线程Consumer的实例代码”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

 
多线程示例代码:
这里要根据自身需求开发,我这里只举一个简单的例子,就是几个分区就启动几个consumer,一一对应。三个类:Main:public static void main(String[] args) {		String bootstrapServers = "kafka01:9092,kafka02:9092";		String groupId = "test";		String topic = "testtopic";		int consumerNum = 3;		ConsumerGroup cg = new ConsumerGroup(consumerNum,bootstrapServers,groupId,topic);		cg.execute();}import java.util.ArrayList;import java.util.List;public class ConsumerGroup {	private List<ConsumerRunnable> consumers;	public ConsumerGroup(int consumerNum,String bootstrapServers,String groupId,String topic){		consumers = new ArrayList<>(consumerNum);		for(int i=0;i < consumerNum;i++){			ConsumerRunnable ConsumerRunnable = new ConsumerRunnable(bootstrapServers,groupId,topic);			consumers.add(ConsumerRunnable);		}	}	public void execute(){		for(ConsumerRunnable consumerRunnable:consumers){			new Thread(consumerRunnable).start();		}	}}import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;public class ConsumerRunnable implements Runnable{	private final KafkaConsumer<String,String> consumer;	public ConsumerRunnable(String bootstrapServers,String groupId,String topic){		Properties props = new Properties();props.put("bootstrap.servers", bootstrapServers);props.put("group.id", groupId);props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset","earliest");this.consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));	}	@Override	public void run() {while (true) {ConsumerRecords<String, String> records = consumer.poll(10);for (ConsumerRecord<String, String> record : records) {	System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}	}}
poll方法详解:

(旧版本:多分区多线程     新版本:一个线程管理多个socket连接)

但新版本KafkaConsumer是双线程的,主线程负责:消息获取,rebalance,coordinator,位移提交等等,

另一个是后台心跳线程。

根据上边的各种配置,poll方法会找到offset,当获取了足够多的可用数据,或者等待时间超过了指定的超时时间,就会返回。

java consumer不是线程安全的,同一个KafkaConsumer用在了多个线程中,将会报Kafka Consumer is not safe for multi-threaded assess异常。可以加一个同步锁进行保护。

poll的超时参数,已经说过1000的话是超时设定,如果没有很多数据,也就等一秒,就返回了,比如定时5秒的将消息写入,就可以将超时参数设置为5000,达到效率最大化。

如果没有定时任务呢,那就设置为  Long.MAX_VALUE   未获取足够多的数据就无限等待。这里要捕获一下WakeupException。

到此,关于“Kafka多线程Consumer的实例代码”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

原文链接:https://my.oschina.net/u/4187920/blog/4424891

AI

开发者交流群×