Kafka是一个分布式流处理平台,它通过主题(Topic)来组织和管理数据流。以下是关于Kafka主题管理的一些关键概念和操作:
kafka-topics.sh
脚本创建主题。例如:bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3
Properties adminClientProps = new Properties();
adminClientProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(adminClientProps)) {
NewTopic newTopic = new NewTopic("my-topic", 3, (short) 1);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic));
createTopicsResult.all().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
--partitions
参数或在创建主题时指定分区数。--replication-factor
参数或在创建主题时指定副本因子。kafka-topics.sh
脚本修改主题配置。例如:bin/kafka-topics.sh --alter --topic my-topic --partitions 5 --replication-factor 2
Properties adminClientProps = new Properties();
adminClientProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(adminClientProps)) {
AlterTopicsResult alterTopicsResult = adminClient.alterTopics(Collections.singletonList(new TopicName("my-topic", "my-cluster")));
alterTopicsResult.all().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
kafka-topics.sh
脚本删除主题。例如:bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092
Properties adminClientProps = new Properties();
adminClientProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(adminClientProps)) {
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList(new TopicName("my-topic", "my-cluster")));
deleteTopicsResult.all().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
kafka-topics.sh
脚本查看主题信息。例如:bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
Properties adminClientProps = new Properties();
adminClientProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(adminClientProps)) {
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList("my-topic"));
describeTopicsResult.names().get().forEach(topicName -> {
TopicDescription topicDescription = describeTopicsResult.topicDetails().get(topicName);
System.out.println("Topic: " + topicName);
System.out.println("Partitions: " + topicDescription.partitions().size());
System.out.println("Replication Factor: " + topicDescription.replicationFactor());
});
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
通过以上操作,您可以有效地管理Kafka中的主题,包括创建、修改、删除和监控主题。