温馨提示×

kafka subscribe怎样进行消息订阅

小樊
81
2024-12-18 12:41:12
栏目: 大数据

Kafka 消费者(Consumer)订阅主题(Topic)以消费消息。以下是使用 Java 客户端库进行订阅的步骤:

  1. 添加 Kafka 客户端依赖

在 Maven 项目的 pom.xml 文件中添加以下依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>
  1. 创建消费者配置

创建一个 Kafka 消费者配置对象,指定 Kafka 集群的地址、消费者组 ID 等属性。

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  1. 创建消费者

使用消费者配置创建一个 Kafka 消费者实例。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  1. 订阅主题

使用 subscribe 方法订阅一个或多个主题。

consumer.subscribe(Arrays.asList("my-topic"));
  1. 消费消息

使用 poll 方法轮询消息,并使用 consume 方法处理消息。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        // 处理消息的逻辑
    }
}

这是一个简单的 Kafka 消费者订阅消息的示例。实际应用中,你可能需要根据需求进行更多的配置和处理。

0