温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

怎么在Java中利用kafka发送消息

发布时间:2021-04-08 17:21:56 阅读:415 作者:Leah 栏目:编程语言
Java开发者专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

这期内容当中小编将会给大家带来有关怎么在Java中利用kafka发送消息,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

1. maven依赖包

<dependency> 
 <groupId>org.apache.kafka</groupId> 
 <artifactId>kafka-clients</artifactId> 
 <version>0.9.0.1</version> 
</dependency>

2. 生产者代码

package com.lnho.example.kafka;  
import org.apache.kafka.clients.producer.KafkaProducerimport org.apache.kafka.clients.producer.Producerimport org.apache.kafka.clients.producer.ProducerRecord;   
import java.util.Properties;   
public class KafkaProducerExample { 
 public static void main(String[] args) { 
  Properties props = new Properties(); 
  props.put("bootstrap.servers""master:9092"); 
  props.put("acks""all"); 
  props.put("retries"0); 
  props.put("batch.size"16384); 
  props.put("linger.ms"1); 
  props.put("buffer.memory"33554432); 
  props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer"); 
  props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");   
  Producer<StringString> producer = new KafkaProducer<>(props); 
  for(int i = 0; i < 100; i++) 
   producer.send(new ProducerRecord<>("topic1"Integer.toString(i), Integer.toString(i)));   
  producer.close(); 
 } 
}

3. 消费者代码

package com.lnho.example.kafka;   
import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.clients.consumer.ConsumerRecordsimport org.apache.kafka.clients.consumer.KafkaConsumerimport java.util.Arraysimport java.util.Properties;   
public class KafkaConsumerExample { 
 public static void main(String[] args) { 
  Properties props = new Properties(); 
  props.put("bootstrap.servers""master:9092"); 
  props.put("group.id""test"); 
  props.put("enable.auto.commit""true"); 
  props.put("auto.commit.interval.ms""1000"); 
  props.put("session.timeout.ms""30000"); 
  props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer"); 
  props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer"); 
  KafkaConsumer<StringString> consumer = new KafkaConsumer<>(props); 
  consumer.subscribe(Arrays.asList("topic1")); 
  while (true) { 
   ConsumerRecords<StringString> records = consumer.poll(100); 
   for (ConsumerRecord<StringString> record : records) 
    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); 
  } 
 } 
}

上述就是小编为大家分享的怎么在Java中利用kafka发送消息了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI

开发者交流群×