温馨提示×

PHP rdkafka怎样确认消息

PHP
小樊
83
2024-11-24 06:49:44
栏目: 编程语言

要确认消息已被正确处理,您可以使用 PHP RdKafka 扩展的 ack 方法

<?php
// 创建消费者
$conf = new \RdKafka\Conf();
$conf->set('group.id', 'myGroup');
$conf->set('bootstrap.servers', 'localhost:9092');
$consumer = new \RdKafka\KafkaConsumer($conf);

// 订阅主题
$consumer->subscribe(['myTopic']);

while (true) {
    // 拉取消息
    $message = $consumer->consume(120*1000);

    switch ($message->err) {
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "Reached end of partition event\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_NOT_FOUND:
            echo "Partition not found\n";
            break;
        case RD_KAFKA_RESP_ERR__UNKNOWN:
            throw new \Exception($message->errstr(), $message->err);
        default:
            if ($message->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
                // 消息已处理,确认
                $consumer->ack($message);
                echo "Message consumed and acknowledged\n";
            } else {
                throw new \Exception($message->errstr(), $message->err);
            }
            break;
    }
}

在这个示例中,我们创建了一个 Kafka 消费者,订阅了名为 “myTopic” 的主题。然后,我们进入一个无限循环,不断从 Kafka 拉取消息。当成功拉取到消息时($message->err == RD_KAFKA_RESP_ERR_NO_ERROR),我们调用 ack 方法来确认消息已被正确处理。如果发生错误,我们将抛出异常。

0