温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

向kafka集群发布记录的kafka客户端怎么实现

发布时间:2021-12-16 16:49:37 来源:亿速云 阅读:122 作者:iii 栏目:云计算

这篇文章主要介绍“向kafka集群发布记录的kafka客户端怎么实现”,在日常操作中,相信很多人在向kafka集群发布记录的kafka客户端怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”向kafka集群发布记录的kafka客户端怎么实现”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

生产者是线程安全的,而且,多线程共享同一个producer实例通常比多个producer实例更快。

这里是一个简单的例子,使用producer发送字符串数据,包含key和value。

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

一个producer由几部分组成:1、一个buff poll,保存尚未发送的数据;2、一个后台运行的I/O线程,负责执行数据发送。producer使用完毕后,务必执行close操作,否则将会造成资源的泄漏。

send()方法是异步的。当调用它时,它将记录添加到缓冲区中,并立即返回。这使得producer能够批量的执行数据的生产。

acks有3个可能的值,0:客户端不必等待任何的server响应;1:leader of partition将会在把数据写入自己的log之后,响应客户端,而不必等待其他的follower完成同步的操作;all:leader和follower全部完成log写入操作。服务器才会响应客户端。相比之下,all最慢但是可靠性更好。

如果请求失败,生产者可以自动重试,但是我们已经设置retries = 0,那么重试将不会发生。如果我们开启了重试,可能会出现重复记录的问题。

producer保持每个partition的未发送数据的缓冲区。这些缓冲的大小由batch.size配置指定。如果增大这个配置,可以一次性执行更大的批量操作,但需要更多的内存(因为我们通常会有一个缓冲区为每个partition)。

默认情况下,缓冲区可以立即发送,即使在缓冲区中有额外的未使用的空间。但是如果你想减少请求的数量,可以设置linger.ms > 0。producer会等待一段时间(单位是毫秒)之后在进行发送,以期获得更大的批量操作。例如,在上面的代码片段,设置linger.ms = 1, 可能会有100条记录被批量发送。但是,如果在1毫秒的时间内,没有跟多的数据到达缓冲区,那么这1毫秒的等待仅仅是增加了延迟,而没有达到任何正面的效果。需要注意的是,如果在短时间内,大量的数据到达缓冲区,即使 linger.ms = 0 ,仍然会发生批量操作。

buffer.memory控制提供给producer的缓冲内存总量,如果该缓冲区的写入速率长时间大于输出速率,那么这个缓冲区将耗尽。当缓冲区耗尽后,额外的发送调用将被阻塞。阻塞一段时间之后(max.block.ms ),将会抛出一个TimeoutException。

key.serializervalue.serializer负责把record当中key和value 分别转换为byte数组,kafka提供了一组简单的序列化class。

到此,关于“向kafka集群发布记录的kafka客户端怎么实现”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI