Kafka Producer Ack(确认)机制用于确保消息被成功发送到 Kafka 集群。在 Kafka Producer 中,有两种发送确认方式:同步(synchronous)和异步(asynchronous)。要实现异步发送,请按照以下步骤操作:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
这里,我们配置了 Kafka 集群的地址(bootstrap.servers
),以及消息键(key)和值(value)的序列化方式。
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
send()
方法发送消息,并设置回调函数以处理异步响应:producer.send(new ProducerRecord<>("your-topic", "key", "value"), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// 处理发送失败的情况
exception.printStackTrace();
} else {
// 处理发送成功的情况
System.out.println("Message sent to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
}
}
});
在上面的代码中,我们使用 send()
方法发送消息,并传入一个 ProducerRecord
对象,表示要发送的消息及其目标主题、分区和键。同时,我们设置了一个回调函数 Callback
,用于处理异步发送的结果。当消息发送成功或失败时,回调函数会被调用。
在发送完消息后,记得关闭 KafkaProducer 实例以释放资源:
producer.close();
通过以上步骤,我们实现了 Kafka Producer 的异步发送功能。在实际应用中,可以根据需求选择合适的发送确认方式。