温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Kafka-on-Pulsar 的开发历程是怎样的

发布时间:2021-12-15 09:32:33 来源:亿速云 阅读:170 作者:柒染 栏目:大数据

这期内容当中小编将会给大家带来有关Kafka-on-Pulsar 的开发历程是怎样的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

Protocol Handler 是在 Pulsar 2.5.0 版本后加入的新机制,希望开发者们能利用 Pulsar 已有的基础架构,把 Pulsar 当作一个可靠、高效、稳定的流数据存储,可以利用它去开发一些可插拔消息协议。

所以 Kafka-on-Pulsar 是基于 Protocol Handler 进行开发的,支持 Kafka 2.0 协议的插件。只需下载 KoP 插件并安装到已有的 Pulsar broker 里,就能在 Pulsar 里支持 Kafka 协议。优点是简化了从 Kafka 迁移到 Pulsar 的流程,不需要更改两遍代码,直接无缝迁移。接下来就一起详细看看KoP 的开发历程吧。

什么是 Apache Pulsar

Apache Pulsar 是一个事件流平台。最初,Apache Pulsar 就采用云原生、分层分片的架构。该架构将服务和存储分离开来,使系统实现更友好的容器化。

而现在,Pulsar 不仅仅是是一个消息中间件,更是一个消息+流数据结合的系统,即 Cloud-Native Event Streaming。

我们之前写过很多关于 Pulsar 的具体详情,感兴趣的可以查看 :Apache Pulsar 介绍。

 Why KoP?

Plusar 为队列和流工作负载提供统一的消息模型。Pulsar 支持自己基于 protobuf 的二进制协议,以确保高性能和低延迟。protobuf 有利于实现 Pulsar 客户端。

而且,该项目也支持 Java,Go,Python 和 C ++ 语言以及社区提供的第三方客户端。但是,对于使用其他消息传输协议编写的应用程序,用户必须重写这些应用程序,否则这些应用程序无法采用 Pulsar 新的统一消息传输协议。

为了解决这一问题,Pulsar 社区之前也开发了一些应用程序,以便将 Kafka 应用程序从其他消息系统迁移到 Pulsar。例如,Pulsar 在 Kafka Java API 上提供了 Kafka wrapper。

Kafka wrapper 允许用户在不改变代码的情况下将其使用的 Kafka Java 客户端应用程序从 Kafka 切换到 Pulsar。Pulsar 还提供丰富的 connector 生态系统,用于连接 Pulsar 和其他数据系统。

但是,那些想要从其他 Kafka 应用程序切换到 Pulsar 的用户仍然有强烈的需求。

KoP 的诞生背景

因此,就产生了“在 Pulsar 上支持 Kafka 协议”的想法。最初的猜想是添加一个 proxy,比如好多公司会在 Kafka 之前加一个类似 HTTP proxy,后续再转换成 Pulsar 协议。

第二种猜想是能否直接将 Kafka 协议直接接入到 Pulsar broker 里,也就是目前 KoP 的成型。

那么关于第一种 proxy 做法,如果实现起来大概是什么样呢?OVHcloud 就有过一次尝试。

之前 OVHcloud 一直采用 Apache Kafka。尽管他们有在 Kafka 上运行多个集群且每秒处理数百万条消息的经验,但仍面临艰巨的运营挑战。所以,OVHcloud 放弃 Kafka,决定将其服务的产品转移到 Pulsar,并在 Pulsar 上构建其产品。

但是为了照顾到依旧使用 Kafka 系统的用户,所以他们想在 Pulsar 里添加一个 proxy 去支持 Kafka 协议。他们最初的做法就是将 Kafka 协议的一帧转换成 Pulsar 协议。

Kafka-on-Pulsar 的开发历程是怎样的

Proxy 收到来自 Kafka 客户端的任何一帧,通过自由状态机将其转换为 Pulsar 相应的接口。

这个状态机一种是用于接收 Kafka 请求,第二种是用于处理 Pulsar response。然后在其中间再添加一个状态机进行同步。

Kafka-on-Pulsar 的开发历程是怎样的

因为在 TCP 层进行这些操作,所以它的表现还是不错的。借由 Rust 的特性,整体运行流畅。但是这个情况下,代码仍需要一行行去写,同时 Kafka 协议里有一些是没有办法通过 proxy 方式实现。比如:group coordinator 和 offsets management。

还有一个比较关键的点是,因为用 Rust 去构写,所以比较难开源。即便是开源出来也很难作为一个组件去插入到 Pulsar 系统中。

刚好去年 StreamNative 的一条推特引起了 OVHcloud 的注意。这是 StreamNative 第一次举行线下 Pulsar meetup 时翟佳老师分享的 KoP demo。

Kafka-on-Pulsar 的开发历程是怎样的

经过几次双方经验互谈交流后,双方合力推出了更完善的「KoP」。利用 Pulsar 和 BookKeeper 的事件流存储架构和 Pulsar 的可插拔协议处理插件框架来提供一种精简而全面的解决方案。

KoP 组件与 Broker 协作

所以当我们倒回去重看 Pulsar 架构,下方模块图中最核心的:Broker、BookKeeper、ZooKeeper。Pulsar 就是基于 Managed ledger 实现的一套分布式流式存储,包括如何存数据、如何防止数据丢失、流如何从本地机房复制到另一机房等。

Kafka-on-Pulsar 的开发历程是怎样的

Pulsar 协议本身是一个很轻量级的东西,即上图中的 Pulsar protocol handler。它主要是处理 TCP 过来的请求格式,然后将请求转化和读取的操作。所以 Pulsar 协议最核心部分在存储层面、分布式均衡层面等。

将 Pulsar protocol handler 抽象出来,变成一个框架/接口。利用这个框架,可以直接访问 Pulsar 已经构建好的存储系统,剩下要做的只是协议的解析和转换。

所以依据这个构想,将 Kafka 协议带入去实践。在  Pulsar 2.5 版本时新加了一个「Pluggable protocol handler」的概念(PIP-41),将接口单独抽离了出来。

Pulsar protocol handler 的使用是类似 Pulsar function/connector,只需将其插入到 Pulsar broker 中,就可以让 Pulsar 具有读取和解析其他协议的能力。这个机制只需要调整两个配置:

Kafka-on-Pulsar 的开发历程是怎样的

配置完成后,重启集群即可支持「其他类型协议」的处理能力。当然这个特性只在 Pulsar 2.5 版本后才支持,所以如需尝试,可以先将 Pulsar 系统升级到 2.5 版本。

Kafka-on-Pulsar 的开发历程是怎样的

所以在此机制下过程就会变得更加明了简单。只需在 Pulsar 里实现 Kafka protocol handler 即可,剩下的上图实线绿色部分是 Kafka 原生客户端。只需将数据接入到 Pulsar 集群,就可以处理 Kafka 请求。

为什么选取 Kafka 作为实践对象?

应为 Pulsar 和 Kafka 在一些层面有很多相似之处。比如日志层,Pulsar 和 Kafka 都采用非常相似的数据模型,用于发布/订阅消息和事件流,Pulsar 和 Kafka 都采用分布式日志。

通过对比 Pulsar 和 Kafka,我们发现这两种系统有很多相似之处。这两种系统都包括以下操作:

  • Topic 查找
    所有客户端都连接到任一 broker 以查找 Topic 的元数据(即 owner broker)。获取元数据之后,客户端与 owner broker 建立持久的 TCP 连接。
  • 发布
    客户端与 Topic 区的 owner broker 进行对话,以将消息追加到分布式日志中。
  • 消费
    客户端与 Topic 分区的 owner broker 进行对话,以便从分布式日志中读取消息。
  • 偏移量
    为发布给 Topic 分区的消息分配偏移量。在 Pulsar 中,偏移量被称为 MessageId。consumer 可以使用偏移量来查找日志中的给定位置,以便读取消息。
  • 消费状态
    这两个系统都维护订阅中的 consumer( Kafka 称之为消费组)的消费状态。Kafka 将消费状态存储在 `__offsets` Topic,而 Pulsar 将消费状态存储在 `cursors`。


实现方式

1. Topic

Kafka 将所有 Topic 存储在扁平的命名空间。但是,Pulsar 将 Topic 存储在层次化、多租户的命名空间。我们在 broker 配置中添加了 `kafkaNamespace` 配置,这样管理员就可以将 Kafka Topic 映射到 Pulsar Topic。

为了方便 Kafka 用户使用 Apache Pulsar 的多租户特性,当 Kafka 用户使用 SASL 验证机制来验证 Kafka 客户端的时候,可以指定一个 Pulsar 租户和命名空间作为其 SASL 用户名。

2. 消息 ID 和偏移量

Kafka 为每条被成功发布到 Topic 分区的消息都指定了一个偏移量。Pulsar 为每条消息指定了一个 `MessageID`。消息 ID 由 `ledger-id`、 `entry-id` 和 `batch-index` 组成。我们在 Pulsar-Kafka wrapper 中使用相同的方法将 Pulsar 的消息 ID 转换为偏移量,反之亦然。

3. 消息

Kafka 和 Pulsar 的消息都包含键、值、时间戳和 header(在 Pulsar 中被称作 ‘properties’)。我们自动在 Kafka 消息和 Pulsar 消息之间转换这些字段。

4. Topic 查找

我们为 Kafka 和 Pulsar 的请求处理插件提供相同的 Topic 查找方法。请求处理插件发现 Topic,查找所请求的 Topic 分区的全部所有权,然后将包含所有权信息的 Kafka `TopicMetadata` 返回给 Kafka 客户端。

5. 发布消息

当收到 Kafka 客户端发布的消息后,Kafka 请求处理插件逐一将多个字段(例如键、值、时间戳和 headers)进行映射,从而将 Kafka 消息转换为 Pulsar 消息。

同时,Kafka 请求处理插件利用 ManagedLedger append API 将这些已转化的 Pulsar 消息存储在 BookKeeper。Kafka 请求处理插件将 Kafka 消息转换为 Pulsar 消息后,现有的 Pulsar 应用程序就可以接收 Kafka 客户端发布的消息。

6. 消费消息

当收到 Kafka 客户端的 consumer 请求时,Kafka 请求处理插件打开一个非持久 cursor,然后从请求的偏移量开始读取 entries。

Kafka 请求处理插件将 Pulsar 消息转换回 Kafka 消息后,现有的 Kafka 应用程序就可以接收 Pulsar 客户端发布的消息。

7. Group coordinator & 偏移量管理

最大的挑战是实现 group coordinator 和偏移量管理。Pulsar 不支持集中的 group coordinator,无法为消费组里的 consumer 分配分区,也无法管理每个消费组的偏移量。

Pulsar broker 基于分区来管理分区分配,而分区的 owner broker 通过将确认信息存储在 cursors 来管理偏移量。

我们很难让 Pulsar 模型与 Kafka 模型保持一致。因此,为了完全兼容 Kafka 客户端,我们将 coordinator group 的更改和偏移量存储在 Pulsar 名为    `public/kafka/__offsets`   系统 Topic 中,从而实现 Kafka coordinator group。

这样,我们能够在 Pulsar 和 Kafka 之间建立桥梁,并允许用户使用现有的 Pulsar 工具和策略来管理订阅并监控 Kafka consumer。我们在已实现的 coordinator group 中添加一个后台线程,定期将偏移量更新从系统 Topic 同步到 Pulsar cursor。

因此,实际上 Kafka 消费组被认为是 Pulsar 订阅。所有现有的 Pulsar 工具也可以用于管理 Kafka 消费组。

KoP 生产化

如果将 KoP 应用到实际场景中,就需要考虑以下多个方面:

  • 多租户
  • 安全性
  • 跨机房复制
  • 分层存储
  • Schema
  • 与已有的数据环境(如 Flink、Spark、Presto)集成

Q & A

1. Pulsar 有多种扩展,这些扩展有统一的管理方式吗?

目前在做一个项目:Pulsar Registry,类似于 DocHub。也可以看作一个应用商店,会集中一些组件/插件合集,可以期待一下。

2. Kafka 0.11 以下的版本是否能平滑升级到高版本?如果消息格式变了,是不是没法平滑升级?

不能,0.10/0.11 版本以上才可以平滑升级。

KoP 最终的目的,是方便用户将 Kafka 上已有的应用迁移到 Pulsar 上,同时通过 KoP 的方式让用户可以更方便地构建产品。未来 KoP 也会加大对 schema 和 Kafka 版本的支持与多兼容性。

上述就是小编为大家分享的Kafka-on-Pulsar 的开发历程是怎样的了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI