温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何使用 Apache查询Pulsar流

发布时间:2021-11-02 18:06:24 来源:亿速云 阅读:146 作者:柒染 栏目:大数据

本篇文章给大家分享的是有关如何使用 Apache查询Pulsar流,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。


这里将介绍 Apache Pulsar 和 Apache Flink 的集成和最新研发进展,并详细说明如何利用 Pulsar 内置 schema,使用 Apache Flink 实时查询 Pulsar 流。


Apache Pulsar 简介

Apache Pulsar 是一个灵活的发布/订阅消息系统,支持持久日志存储。  Pulsar 的架构优势包括多租户、统一消息模型、结构化事件流、云原生架构等,这些优势让 Pulsar 能够完美适用于多种用户场景,从计费、支付、交易服务到融合组织中不同的消息架构。  

现有 Pulsar & Flink 集成

(Apache Flink 1.6+)

在现有的 Pulsar 和 Flink 集成中,Pulsar 作为 Flink 应用程序中的消息队列来使用。Flink 开发人员可以选择特定 Pulsar source,并连接到所需的 Puslar 集群和 topic,将 Pulsar 用作 Flink 的流 source 和流 sink:

// create and configure Pulsar consumerPulsarSourceBuilder<String>builder = PulsarSourceBuilder    .builder(new SimpleStringSchema())   .serviceUrl(serviceUrl)  .topic(inputTopic)  .subsciptionName(subscription);SourceFunction<String> src = builder.build();// ingest DataStream with Pulsar consumerDataStream<String> words = env.addSource(src);

然后,Pulsar 流可以连接到 Flink 的处理逻辑。

// perform computation on DataStream (here a simple WordCount)DataStream<WordWithCount> wc = words  .flatmap((FlatMapFunction<String, WordWithCount>) (word, collector) -> {    collector.collect(new WordWithCount(word, 1));  })
 .returns(WordWithCount.class)  .keyBy("word")  .timeWindow(Time.seconds(5))  .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->    new WordWithCount(c1.word, c1.count + c2.count));

然后通过 sink 将数据写出到 Pulsar。

// emit result via Pulsar producer wc.addSink(new FlinkPulsarProducer<>(  serviceUrl,  outputTopic,  new AuthentificationDisabled(),  wordWithCount -> wordWithCount.toString().getBytes(UTF_8),  wordWithCount -> wordWithCount.word));

对于集成而言,这是重要的第一步,但现有设计还不足以充分利用 Pulsar 的全部功能。

Pulsar 与 Flink 1.6.0 的集成中有一些不足,包括:既没有作为持久存储来使用,也没有与 Flink 进行 schema 集成,导致在为应用程序 schema 注册添加描述时,需要手动输入。


Pulsar 与 Flink 1.9 的集成

将 Pulsar 用作 Flink catalog

Flink 1.9.0 与 Pulsar 的最新集成解决了前面提到的问题。阿里巴巴 Blink 对 Flink 仓库的贡献不仅强化了处理架构,还增加了新功能,使得 Flink 与 Pulsar 的集成更强大有效。

Flink 1.9.0:
https://flink.apache.org/downloads.html#apache-flink-191


在新 connector 的实现中引入了 Pulsar schema 集成,增加了对 Table API 的支持,同时提供了 exactly-once 语义的 Pulsar 读与 at-least-once 语义的 Pulsar 写。

并且,通过 schema 集成,Pulsar 可以注册为 Flink catalog,只需几个命令就可以在 Pulsar 流上运行 Flink 查询。下面我们将详细介绍新的集成,并举例说明如何使用 Flink SQL 查询 Pulsar 流。

利用 Flink <> Pulsar Schema 集成

在展开集成细节与具体的使用方法之前,我们先来看一下 Pulsar schema 是怎么工作的。

Apache Pulsar 内置对 Schema 的支持,无须额外管理 schema。Pulsar 的数据 schema 与每个 topic 相关联,因此,producer 和 consumer 都可以使用预定义 schema 信息发送数据,而 broker 可以验证 schema ,并在兼容性检查中管理 schema 多版本化和 schema 演化。

下面分别是 Pulsar schema 用于 producer 和 consumer 的示例。在 producer 端,可以指定使用 schema,并且 Pulsar 无需执行序列化/反序列化,就可以发送一个 POJO 类。

类似地,在 consumer 端,也可以指定数据 schema,并且在接收到数据后,Pulsar 会立即自动验证 schema 信息,获取给定版本的 schema,然后将数据反序列化到 POJO 结构。Pulsar 在 topic 的元数据中存储 schema 信息。

// Create producer with Struct schema and send messagesProducer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();producer.newMessage()  .value(User.builder()    .userName(“pulsar-user”)    .userId(1L)    .build())  .send();// Create consumer with Struct schema and receive messagesConsumer<User> consumer = client.newCOnsumer(Schema.AVRO(User.class)).create();consumer.receive();

假设一个应用程序对 producer 和/或 consumer 指定 schema。在接收到 schema 信息时,连接到 broker 的 producer(或 consumer)传输此类信息,以便 broker 在返回或拒绝该 schema 前注册 schema、验证 schema,并检查 schema 兼容性,如下图所示:

如何使用 Apache查询Pulsar流

Pulsar 不仅可以处理并存储 schema 信息,还可以在必要时处理 schema 演化(schema evolution)。Pulsar 能够有效管理 broker 中的 schema 演化,在必要的兼容性检查中,追踪 schema 的所有版本。

另外,当消息发布在 producer 端时,Pulsar 会在消息的元数据中标记 schema 版本;当 consumer 接收到消息,并完成反序列化元数据时,Pulsar 将会检查与此消息相关联的 schema 版本,并从 broker 中获取 schema 信息。

因此,当 Pulsar 与 Flink 应用集成时,Pulsar 使用预先存在的 schema 信息,并将带有 schema 信息的单个消息映射到 Flink 类型系统的不同行中。

当 Flink 用户不直接与 schema 交互或不使用原始 schema(primitive schema)时(例如,用 topic 来存储字符串或长数值),Pulsar 会转换消息到 Flink 行,即“值”;或者在结构化的 schema 类型(例如,JSON 和 AVRO)中,Pulsar 从 schema 信息中提取单个字段信息,并将字段映射到 Flink 的类型系统。

最后,所有与消息相关的元数据信息(例如,消息密钥、topic、发布时间、事件时间等)都会转换到 Flink 行中的元数据字段。以下是使用原始 schema 和结构化 schema 的两个示例,解释了如何将数据从 Pulsar topic 转换到 Flink 类型系统。

原始 schema(Primitive Schema):

root|-- value: DOUBLE|-- __key: BYTES|-- __topic: STRING|-- __messageId: BYTES|-- __publishTime: TIMESTAMP(3)|-- __eventTime: TIMESTAMP(3)

结构化 schema(Avor Schema):

@Data@AllArgsConstructor@NoArgsConstructorpublic static class Foo {    public int i;    public float f;    public Bar bar;}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Bar {    public boolean b;    public String s;}Schema s = Schema.AVRO(Foo.getClass());
root |-- i: INT |-- f: FLOAT |-- bar: ROW<`b` BOOLEAN, `s` STRING> |-- __key: BYTES |-- __topic: STRING |-- __messageId: BYTES |-- __publishTime: TIMESTAMP(3) |-- __eventTime: TIMESTAMP(3)

当所有 schema 信息都映射到 Flink 类型系统时,就可以在 Flink 中根据指定 schema 信息构建 Pulsar source、sink 或 catalog,如下所示:

Flink & Pulsar: 从 Pulsar 读取数据

1. 创建用于流查询的 Pulsar source

val env = StreamExecutionEnvironment.getExecutionEnvironmentval props = new Properties()props.setProperty("service.url", "pulsar://...")props.setProperty("admin.url", "http://...")props.setProperty("partitionDiscoveryIntervalMillis", "5000")props.setProperty("startingOffsets", "earliest")props.setProperty("topic", "test-source-topic")val source = new FlinkPulsarSource(props)// you don't need to provide a type information to addSource since FlinkPulsarSource is ResultTypeQueryableval dataStream = env.addSource(source)(null)
// chain operations on dataStream of Row and sink the output// end method chaining
env.execute()

2. 将 Pusar 中的 topic 注册为 streaming tables

val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create(env)
val prop = new Properties()prop.setProperty("service.url", serviceUrl)prop.setProperty("admin.url", adminUrl)prop.setProperty("flushOnCheckpoint", "true")prop.setProperty("failOnWrite", "true")props.setProperty("topic", "test-sink-topic")
tEnv  .connect(new Pulsar().properties(props))  .inAppendMode()  .registerTableSource("sink-table")
val sql = "INSERT INTO sink-table ....."tEnv.sqlUpdate(sql)env.execute()

Flink & Pulsar:向 Pulsar 写入数据

1. 创建用于流查询的 Pulsar sink

val env = StreamExecutionEnvironment.getExecutionEnvironmentval stream = .....
val prop = new Properties()prop.setProperty("service.url", serviceUrl)prop.setProperty("admin.url", adminUrl)prop.setProperty("flushOnCheckpoint", "true")prop.setProperty("failOnWrite", "true")props.setProperty("topic", "test-sink-topic")
stream.addSink(new FlinkPulsarSink(prop, DummyTopicKeyExtractor))env.execute()

2. 向 Pulsar 写入 streaming table

val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create(env)
val prop = new Properties()prop.setProperty("service.url", serviceUrl)prop.setProperty("admin.url", adminUrl)prop.setProperty("flushOnCheckpoint", "true")prop.setProperty("failOnWrite", "true")props.setProperty("topic", "test-sink-topic")
tEnv  .connect(new Pulsar().properties(props))  .inAppendMode()  .registerTableSource("sink-table")
val sql = "INSERT INTO sink-table ....."tEnv.sqlUpdate(sql)env.execute()

在以上示例中,Flink 开发人员都无需担心 schema 注册、序列化/反序列化,并将 Pulsar 集群注册为 Flink 中的 source、sink 或 streaming table。

当这三个要素同时存在时,Pulsar 会被注册为 Flink 中的 catalog,这可以极大简化数据处理与查询,例如,编写程序从 Pulsar 查询数据,使用 Table API 和 SQL 查询 Pulsar 数据流等。

以上就是如何使用 Apache查询Pulsar流,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI