Kafka 消费者(Consumer)订阅主题(Topic)以消费消息。以下是使用 Java 客户端库进行订阅的步骤:
在 Maven 项目的 pom.xml 文件中添加以下依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
创建一个 Kafka 消费者配置对象,指定 Kafka 集群的地址、消费者组 ID 等属性。
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
使用消费者配置创建一个 Kafka 消费者实例。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
使用 subscribe
方法订阅一个或多个主题。
consumer.subscribe(Arrays.asList("my-topic"));
使用 poll
方法轮询消息,并使用 consume
方法处理消息。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// 处理消息的逻辑
}
}
这是一个简单的 Kafka 消费者订阅消息的示例。实际应用中,你可能需要根据需求进行更多的配置和处理。