要实现通过C语言向Kafka传输数据,可以使用librdkafka这个开源的C语言库。下面是一个简单的示例代码,演示如何使用librdkafka来向Kafka发送消息:
#include <librdkafka/rdkafka.h>
#include <stdio.h>
#include <string.h>
int main(int argc, char *argv[]) {
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
rd_kafka_conf_t *conf;
char errstr[512];
// 创建Kafka配置对象
conf = rd_kafka_conf_new();
// 设置Kafka配置项,例如bootstrap.servers(Kafka集群的地址)
if (rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "Error configuring Kafka: %s\n", errstr);
return 1;
}
// 创建Kafka生产者对象
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "Error creating Kafka producer: %s\n", errstr);
return 1;
}
// 创建Kafka主题对象
rkt = rd_kafka_topic_new(rk, "test_topic", NULL);
// 发送消息到Kafka
char *message = "Hello, Kafka!";
if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, message, strlen(message), NULL, 0, NULL) == -1) {
fprintf(stderr, "Error producing message: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
return 1;
}
// 等待消息发送完成
rd_kafka_flush(rk, 10*1000);
// 清理资源
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
return 0;
}
以上代码示例中,我们通过librdkafka库创建了一个Kafka生产者对象,并向名为"test_topic"的主题发送了一条消息"Hello, Kafka!"。在实际使用中,你可以根据自己的需求配置更多的Kafka参数,并发送不同的消息内容。