本篇内容主要讲解“Java使用Kafka的方法”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Java使用Kafka的方法”吧!
1、maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
2、Producer
2.1、producer发送消息
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* @author Thomas
* @Description:最简单的kafka producer
* @date 22:18 2019-7-5
*/
public class ProducerDemo {
public static void main(String[] args) {
Properties properties =new Properties();
//zookeeper服务器集群地址,用逗号隔开
properties.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//自定义producer拦截器
properties.put("interceptor.classes", "com.lt.kafka.producer.MyProducerInterceptor");
//自定义消息路由规则(消息发送到哪一个Partition中)
//properties.put("partitioner.class", "com.lt.kafka.producer.MyPartition");
Producer<string, string=""> producer = null;
try {
producer = new KafkaProducer<string, string="">(properties);
for (int i = 20; i < 40; i++) {
String msg = "This is Message:" + i;
/**
* kafkaproducer中会同时调用自己的callback的onCompletion方法和producerIntercepter的onAcknowledgement方法。
* 关键源码:Callback interceptCallback = this.interceptors == null
* callback : new InterceptorCallback<>(callback,
* this.interceptors, tp);
*/
producer.send(new ProducerRecord<string, string="">("leixiang", msg),new MyCallback());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if(producer!=null)
producer.close();
}
}
}
2.2、自定义producer拦截器
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* @author Thomas
* @Description:自定义producer拦截器
* @date 22:21 2019-7-5
*/
public class MyProducerInterceptor implements ProducerInterceptor<string,string>{
/**
* 打印配置相关信息
*/
public void configure(Map<string,> configs) {
// TODO Auto-generated method stub
System.out.println(configs.toString());
}
/**
* producer发送信息拦截方法
*/
public ProducerRecord<string,string> onSend(ProducerRecord<string, string=""> record) {
System.out.println("拦截处理前=============");
String topic=record.topic();
String value=record.value();
System.out.println("拦截处理前的消息====:"+value);
ProducerRecord<string,string> record2=new ProducerRecord<string, string="">(topic, value+" (intercepted)");
System.out.println("拦截处理后的消息:"+record2.value());
System.out.println("拦截处理后===============");
return record2;
}
/**
* 消息确认回调函数,和callback的onCompletion方法相似。
* 在kafkaProducer中,如果都设置,两者都会调用。
*/
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (metadata != null)
System.out.println("MyProducerInterceptor onAcknowledgement:RecordMetadata=" + metadata.toString());
if (exception != null)
exception.printStackTrace();
}
/**
* interceptor关闭回调
*/
public void close() {
System.out.println("MyProducerInterceptor is closed!");
}
}
2.3、自定义消息路由规则
自定义路由规则,可以根据自己的需要定义消息发送到哪个分区。自定义路由规则需要实现Partitioner。
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
/**
* @author Thomas
* @Description:
* @date 22:24 2019-7-5
*/
public class MyPartition implements Partitioner {
public void configure(Map<string,> arg0) {
// TODO Auto-generated method stub
}
public void close() {
// TODO Auto-generated method stub
}
public int partition(String arg0, Object arg1, byte[] arg2, Object arg3, byte[] arg4, Cluster arg5) {
// TODO Auto-generated method stub
return 0;
}
}
3.1、自动提交
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* @author Thomas
* @Description:
* @date 22:26 2019-7-5
*/
public class AutoCommitConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092");
props.put("group.id", "leixiang");
props.put("enable.auto.commit", "true");
//想要读取之前的数据,必须加上
//props.put("auto.offset.reset", "earliest");
/* 自动确认offset的时间间隔 */
props.put("auto.commit.interval.ms", "1000");
/*
* 一旦consumer和kakfa集群建立连接,
* consumer会以心跳的方式来高速集群自己还活着,
* 如果session.timeout.ms 内心跳未到达服务器,服务器认为心跳丢失,会做rebalence
*/
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//配置自定义的拦截器,可以在拦截器中引入第三方插件实现日志记录等功能。
//props.put("interceptor.classes", "com.lt.kafka.consumer.MyConsumerInterceptor");
@SuppressWarnings("resource")
KafkaConsumer<string, string=""> consumer = new KafkaConsumer<string, string="">(props);
try {
/* 消费者订阅的topic, 可同时订阅多个 ,用逗号隔开*/
consumer.subscribe(Arrays.asList("leixiang"));
while (true) {
//轮询数据。如果缓冲区中没有数据,轮询等待的时间为毫秒。如果0,立即返回缓冲区中可用的任何记录,则返回空
ConsumerRecords<string, string=""> records = consumer.poll(100);
for (ConsumerRecord<string, string=""> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(),
record.value());
}
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
3.2、手动提交
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* @author Thomas
* @Description:
* @date 22:28 2019-7-5
*/
public class ManualCommitConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092");
props.put("group.id", "leixiang");
props.put("enable.auto.commit", "false");//手动确认
/* 自动确认offset的时间间隔 */
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");//想要读取之前的数据,必须加上
/*
* 一旦consumer和kakfa集群建立连接,
* consumer会以心跳的方式来高速集群自己还活着,
* 如果session.timeout.ms 内心跳未到达服务器,服务器认为心跳丢失,会做rebalence
*/
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//配置自定义的拦截器,可以在拦截器中引入第三方插件实现日志记录等功能。
props.put("interceptor.classes", "com.lt.kafka.consumer.MyConsumerInterceptor");
KafkaConsumer<string, string=""> consumer = new KafkaConsumer<string, string="">(props);
/* 消费者订阅的topic, 可同时订阅多个 ,用逗号隔开*/
consumer.subscribe(Arrays.asList("leixiang"));
while (true) {
ConsumerRecords<string, string=""> records = consumer.poll(100);
for (ConsumerRecord<string, string=""> record : records) {
//处理消息
saveMessage(record);
//手动提交,并且设置Offset提交回调方法
//consumer.commitAsync(new MyOffsetCommitCallback());
consumer.commitAsync();
}
}
}
public static void saveMessage(ConsumerRecord<string, string=""> record){
System.out.printf("处理消息:offset = %d, key = %s, value = %s%n", record.offset(), record.key(),
record.value());
}
}
自定义Consumer拦截器
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
/**
* @author Thomas
* @Description:
* @date 22:29 2019-7-5
*/
public class MyConsumerInterceptor implements ConsumerInterceptor<string, string="">{
public void configure(Map<string,> configs) {
System.out.println("MyConsumerInterceptor configs>>>"+configs.toString());
}
public ConsumerRecords<string, string=""> onConsume(ConsumerRecords<string, string=""> records) {
System.out.println("onConsume");
return records;
}
public void onCommit(Map<topicpartition, offsetandmetadata=""> offsets) {
System.out.println("onCommit");
}
public void close() {
System.out.println("MyConsumerInterceptor is closed!");
}
}
自定义Offset提交回调方法
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
/**
* @author Thomas
* @Description:
* @date 22:31 2019-7-5
*/
public class MyOffsetCommitCallback implements OffsetCommitCallback {
public void onComplete(Map<topicpartition, offsetandmetadata=""> offsets, Exception exception) {
if (offsets != null)
System.out.println("offsets>>>" + offsets.toString());
if (exception != null)
exception.printStackTrace();
}
}
到此,相信大家对“Java使用Kafka的方法”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/437232/blog/3070310