温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

怎样实现Pulsar与Kafka消费模型对比

发布时间:2021-12-15 09:45:04 来源:亿速云 阅读:188 作者:柒染 栏目:大数据

这期内容当中小编将会给大家带来有关怎样实现Pulsar与Kafka消费模型对比,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

kafka

kafka 属于 Stream 的消费模型,为了支持多 partition 的消费关系,引入了 consumer group 的概念,同时支持在消费端动态的 reblance 操作,当多个 Consumer 订阅了同一个 Topic 时,会根据分区策略进行消费者订阅分区的重分配。只要 consumer-group 与 topic 之间的关系发生变更,就会动态触发 reblance 操作,诸如:

  • 增加或减少 topic 中 partition 的数目

  • consumer-group 中的 consumer 数减少

  • consumer-group 与 topic 之间的订阅关系发生变更

  • 等等

引入 reblance 的好处在于,当订阅关系发生变更时,用户无需重新启动系统,就可以实现订阅关系的变更,相当于 kafka 将这种分配的权利从服务端下放到客户端中的 consumer 来管理,这样用户就可以自定义自己的分配方案。

pulsar

类似 kafka 这样的 Stream MQ,更多时候适合做离线业务的处理与分析,很多线上业务会使用 Active MQ 这样 Queue 的 MQ。为了同时兼容这两种消费模型,pulsar 做了一层消费层的抽象,统一了 Queue 和 Stream 这两种消费模型。

其中,Exclusive 和 Failover 属于 Stream 的消费模型,Share 属于 Queue 的消费模型。在写此文章时,pulsar 最新版本为 2.3.1,Key_Shared 属于pulsar 新增加的一种订阅模型,在之后的文章中,我们会单独对 Key_shared 订阅模型做单独的分享,这里不在赘述。

对 Stream 支持的对比

由于 kafka 不支持 Queue 类型的消费模型,所以 Share 这种形式在这里不做对比。下面,和大家一起讨论以下在 Stream 下 pulsar 与 kafka 的消费模型。

如下图所示,左边为 pulsar 在 Failover 和 Exclusive 下的消费情况,右边为 kafka 的消费模型。

假设目前有一个 topic,topic name 为 topic1,有 5 个partition,分别为:topic1-p1,topic1-p2,topic1-p3,topic1-p4,topic1-p5,在 kafka 中,使用了 consumer-group 且该 group 下有三个 consumer,上文中提到,kafka 支持 reblance 机制,所以当 consumer-2 与 consumer-3 加入 consumer-group 的过程中,会动态分摊之前 consumer-1 的消费压力,表现为如上图右半部分所示,cousumer-1 消费 topic1-p1 和 ropic1-p2,consumer-2 消费 topic1-p3 和 topic1-p4,consumer-3 消费 topic1-p5 。所以当用户不断的往 consumer-group 中添加 consumer 时,利用 kafka 的 reblance 机制,是可以让用户动态指定具体哪一个 consumer 来消费 topic1 中的哪些 partition。

在 pulsar 中,你可以将 subscribe 理解为 kafka 中的 consumer-group,如果用户在启动 consumer 时,指定的 subscribe-name 是相同的,说明这两个 consumer 属于同一个订阅组,代码示例如下:


Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic("topic-1").subscriptionName("my-subscriber-name")                .subscriptionType(SubscriptionType.Failover)                .subscribe();
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic("topic-2").subscriptionName("my-subscriber-name")                .subscriptionType(SubscriptionType.Failover)                .subscribe();

如上图示例所示,在同一个订阅组下,启动三个 consumer,在 pulsar 中,每一个 consumer 都会去订阅 topic1 中的 5 个 partition,所以每个 consumer 都会去启动 5 个 sub-consumer,在 failover 的订阅模型下,会使用 hashcode 的形式,将 5 个 partition 分配给三个 consumer 来消费,pulsar 将当前正在消费的 sub-consumer 看作是处于 leader 状态的 consumer,剩余未工作的 sub-consumer 作为从节点,当 leader 状态的 consumer 由于某些原因无法工作时,处于从状态的 sub-consumer 会去接替 leader 的 consumer,并继续工作。可以发现,kafka 加入 reblance 的机制,允许用户自己指定哪些 consumer 来消费 哪些 partition,在 pulsar 中,这个工作由 failover 的机制来完成,它通过 hash 的形式,将 consumer 分配到不同的 sub-consumer 中来执行。

现在,验证一下上述所描述的内容。

场景一

1. 以 standalone 的形式启 pulsar

$ docker run -it \  -p 6650:6650 \  -p 8080:8080 \  -v $PWD/pulsardata:/pulsar/data \  apachepulsar/pulsar:2.3.0 \  bin/pulsar standalone

2. 创建一个 topic,partition 的数目为 4


$ ./bin/pulsar-admin topics mytopic1 create-partitioned-topic -p 4

以 failover 的订阅类型,启动 3 个 consumer,并指定他们为同一个订阅组,即-s sub-1

$ ./bin/pulsar-client consume mytopic1 -s sub-1 -n 0 -t Failover

3. 启动 producer,发送 10 条数据到 mytopic1

$ ./bin/pulsar-client produce mytopic1 -n 10 -m "hello-pulsar"

可以看到,consumer1 接收到 2 条消息,consumer2 接收到 5 条消息,consumer3 接收到 3 条消息。效果和我们所预期的是一致的。

上述情况是因为在 producer 发送之前,就已经启动好三个 consumer 来消费消息,所以 pulsar 会以 hash 的形式将消息分发到三个 consumer 中来消费。

场景二

Exclusive 的订阅形式启动两个 consumer,效果如下:

./bin/pulsar-client consume mytopic1 -s sub-1 -n 0 -t Exclusive

当启动 consumer2 时,会报错 Exclusive consumer is already connected,这是因为,Failover 的订阅模式下,其它的 consumer 会以 “从” consumer 的形态存在,但是 Exclusive 只允许一个 consumer 订阅一个 topic。

上述就是小编为大家分享的怎样实现Pulsar与Kafka消费模型对比了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI