这篇文章的内容主要围绕怎么使用Apache Pulsar Functions进行简单事件处理进行讲述,文章内容清晰易懂,条理清晰,非常适合新手学习,值得大家去阅读。感兴趣的朋友可以跟随小编一起阅读吧。希望大家通过这篇文章有所收获!
事件驱动架构(Event-driven Architecture,EDA)的关键特征是事件的核心重要性。在 EDA 中,事件 consumer 遵循基于事件编程(EBP)的编程样式对事件到达作出反应。与面向批处理或过程式编程不同,在 EBP 中,软件系统响应于接收一个或多个事件通知并执行处理,且完全通过事件以异步方式与其他软件组件通信。
尽管所有基于事件的应用程序不尽相同,但通常都遵循下图中的结构,事件 producer 将事件引入中间的事件处理框架,该框架负责持久化事件并将事件交付给事件 consumer。
图 1 基于事件的架构
中间的事件处理框架除了负责事件路由外,还托管事件处理器组件。事件处理器获取事件,还可以转发或发布新事件,因此在某种意义上它们既是事件 consumer 也是事件 producer。但是我们不会将这些事件处理器称为事件 producer 或事件 consumer,因为我们希望将它们与事件处理框架之外的实体区分开来。
在 EDA 中,事件处理器通常可分为以下几类:
简单事件处理器:事件到达立即触发事件处理器中的操作。一般来说,如果这些处理器是无状态的,则仅根据当前事件的内容执行所有逻辑;如果是有状态的,则可以跨调用保留消息,以便执行稍微复杂一点的逻辑。
复杂事件处理器:此类事件处理器处理一系列事件,并执行更为复杂的模式分析,以识别有意义的模式或关系,如检测事件相关性、因果关系或定时。典型用例一般用于电子商务、欺诈检测、网络安全、金融交易和其他需要立即响应的环境中。
基于事件的应用程度通常由许多按特定顺序或流排列的事件处理器组成。我们将事件 producer、事件处理器、事件 consumer 的集合称为事件处理网络。事件处理网络用于解决一个或多个特定的业务问题。
图 2 事件处理网络
如图 2 所示,外部事件 producer 和事件 consumer 处于边缘,中间是多个事件处理器。图 2 展示了事件处理器之间的事件流,这些箭头也称为隐式通道,用于将事件直接从一个事件处理器推到另一个事件处理器。当使用 Apache Pulsar 实现时,topic 就是这些隐式通道。
图 2 中还展示了另一种事件处理器之间通信的方式:共享状态管理。事件处理器一般需要保留多个事件之间的计算状态,因此事件处理架构需要提供一种机制以持久化状态信息,并允许事件处理器直接访问。共享状态提供了另一种在事件处理器之间共享信息的机制,并支持有状态事件处理,我们将在下一部分详述相关内容。
多个基于事件的应用程序可以与单个事件类型相关联。上图展示了基于事件的应用程序(蓝色)到另一个应用程序的过程。在将第一个应用程序发送到事件 consumer 2 前,同时将其输出到事件 consumer 1 和另一个事件处理器,以进行进一步处理。
将基于事件的应用程序链接在一起的原因有很多,如在某一场景中,需要监视物联网传感器读取模式或异常,同时也希望将这些事件长期存储(存储平台如 HDFS 或 Amazon S3),以便用于训练数据模型。
一级事件处理器序列首先进行事件的 ETL-类型处理,即将事件转换为可消费的格式。这些记录将被发送到事件 consumer 1,在本例中即 HDFS。同时,我们还希望将清理后的事件转发到实现异常检测工作流的二级事件处理器序列。我们将在下一部分讨论如何使用 Apache Pulsar Functions 作为框架,此框架采用简单编程逻辑 functions 来实现基于事件的处理。
Apache Pulsar Functions 提供了一个易于使用的框架,开发者可以使用 Functions 创建或部署处理逻辑,这些处理逻辑由 Apache Pulsar 执行。你可以用 Java 或 Python 编写简单或复杂的 function,并将这些 function 部署到 Pulsar 集群中,而无需运行单独的流处理引擎。Pulsar Functions 是轻量级计算框架,具有以下特点:
在消息发送至指定 input topic 时执行。
将用户自定义的处理逻辑应用于每条消息。
将计算结果发布到一个或多个 topic。
图 3 Pulsar Functions 编程模型
可以使用 Java 和 Python 编写 Pulsar Functions,编写方式有两种:
使用原生语言接口 ,不需要 Pulsar 特定的库或特殊依赖。例如,要在 Java 中实现一个 Pulsar Function,只需要编写一个实现 java.util.Function 接口的类,如下所示:
import java.util.Function;public class EchoFunction implements Function<String, String> { public String apply(String input) { // Logic Here }}
使用 Pulsar Functions SDK ,利用特定的 Pulsar 库,这些库提供原生接口中无法提供的一系列功能,如 org.apache.pulsar.functions.api.Context 对象提供的状态管理功能。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public interface Function<I, O> {
O process(I input, Context context) throws Exception;
}
原生语言方法提供了一种清晰的、无 API 的 Pulsar Functions 编写方法,非常适合无状态事件处理器的开发。但是,这种方法不支持访问先前的状态信息。
编译并测试 Pulsar Functions 后,需要将 Pulsar Functions 部署到 Pulsar 集群。Pulsar Functions 旨在支持多种部署场景。目前,运行 Pulsar Functions 的方式有两种:
本地运行模式:在此模式下运行时,Pulsar Function 将在执行命令的机器上运行(如笔记本电脑、边缘节点等)。下面是一个本地运行命令的例子:
$ bin/pulsar-admin functions localrun \ —py myfunc.py \ —className myfunc.SomeFunction —inputs input-topic-1 —outputs output-topic-1
集群模式: 在集群模式下运行时,Pulsar Function 代码将被上传到 Pulsar 集群中的 broker 中,并与 broker 一起运行,而不是在本地环境中运行。可以使用如下所示的命令在集群模式中创建 function,该命令在 Pulsar broker 节点上执行。
$ bin/pulsar-admin functions create \ —jar target/my-functions.jar \ —className org.example.functions.MyFunction \ —inputs input-topic-1 \ —outputs output-topic-1 \ —parallelism 4 \ —cpu 2 \ —ram 8589934592 \ —disk 10737418240
上面的命令将启用 4 个 org.example.functions.MyFunction 实例,每个实例有 2 个 CPU 内核、8 GB RAM、10 GB 磁盘空间。(注意,需以字节为单位设置 RAM 和磁盘,且在 Docker 环境中必须设置 CPU 和磁盘。)
还有一种方法可以在创建 Pulsar Function 时提供用户配置属性,此方法在需要复用 function 时非常有用。我们通过为 userConfig 属性指定一个 JSON 字符串,在下面的命令中传入一组键-值对。在运行时,可以通过使用 Pulsar Functions SDK 的 Pulsar Functions Context 对象访问传入的值,我们将在下一部分详述相关内容。
$ bin/pulsar-admin functions create \ —jar target/my-functions.jar \ —className org.example.functions.MyFunction \ —inputs input-topic-1 \ —outputs output-topic-1 \ —parallelism 4 \ —cpu 2 \ —ram 8589934592 \ —disk 10737418240 \ —userConfig ‘{“key-1”: “value-1”, “key-2”, “value-2”}’
Java 和 Python SDK 中定义的 Context 对象为 function 提供了各种各样的信息和功能,包括保留可用于提供有状态事件处理的中间结果的能力。以下示例是 Context 对象中所包含的信息:
Pulsar Function 的名称和 ID
每条消息的消息 ID。自动为每条 Pulsar 消息分配一个 ID。
发送消息的 topic 的名称
与 function 相关联的所有 input topic、output topic 的名称
SerDe 的类名称
与 function 相关联的租户和命名空间
运行 function 的 Pulsar Functions 实例的 ID
Function 的版本
Function 使用的 logger 对象,可用于创建 function 日志消息
访问通过 CLI 提供的任意用户配置值
记录 metric 的接口
接下来,我们将介绍一些利用了 Context 对象特性的使用模式。
运行或更新使用 SDK 创建的 Pulsar Functions 时,可以使用 -userConfig flag 通过命令行传入任意键/值。键/值必须指定为 JSON。以下示例创建 function 并传入用户键/值。
$ bin/pulsar-admin functions create \ —name word-filter \ —userConfig ‘{“filter”, “$.Sensors{?(@.Type==‘Temp’)]”}’ \ # Other function configs
这个特性允许我们编写可以多次使用的通用 function,但是配置略有不同。例如,假设你想编写一个基于 JSON 路径表达式过滤 JSON 事件的 function。当事件到达时,将其内容与配置的表达式进行比较,并过滤掉不匹配的 entry。
显然该 function 的行为完全依赖于它所过滤的 JSON 路径表达式。为了可以多次使用 function,我们使用 Pulsar SDK,直到部署 function 后再指定此路径表达式。
如上例所示,要使用的 JSON 路径过滤器的值在编译时未知,需使用 getUserConfigValueOrDefault 方法从 Context 中获取。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.jayway.jsonpath.JsonPath;
public JsonPathFilterFunction implements Function<String, String> {
String process(String input, Context context) throws Exception {
// Get the filter from the context
String filter = context.getUserConfigValueOrDefault(“filter”, “$”)
.toString();
Object filtered = JsonPath.read(input, filter);
Return filtered.toString();
}
}
有状态事件处理器使用先前事件的内存生成输出。存储状态的能力是处理多个事件的关键构件。在 Apache Pulsar Function 框架中,状态信息存储在基于 Apache BookKeeper 的专用键-值存储中。Pulsar SDK 通过 Context 对象访问状态信息。
图 4 Apache Pulsar 状态管理
我们来举例解释一下状态 agent。假设有一个应用程序,用于从物联网传感器获取温度读取事件,我们想知道传感器的平均温度,则可以使用事件处理 agent 通过以下 function 持续更新温度平均值:
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public AvgTempFunction implements Function<Float, Float> {
Float process(Float currentTemp, Context context) {
// Increment and get counter
context.incrCounter(“num-measurements”);
Integer n = context.getCounter(“num-measurements”);
// Calculate new average based on old average and count
Float old_average = context.getState(“avg-temp”);
Float new_average = (old_average * (n-1) + currentTemp) / n;
context.putState(“avg-temp”, new_average);
return new_average;
}
}
最佳实践 3:Void Funtions
Pulsar Functions 可以将结果发布到一个或多个 output topic,但可以不发布结果。也可以使用 function 仅生成日志,并将结果写入外部数据库,或仅用于监视流中的异常。以下示例中的 function 只会将接收到的事件存入日志:
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;
public LogFunction implements Function<String, Void> {
Void process(String input, Context context) throws Exception {
Logger LOG = context.getLogger();
LOG.info("Received {}”, input);
return null;
}
}
在使用输出类型为 Void 的 Java function 时,function 必须始终返回 null。在不想生成输出事件时,输出类型没有 Void 的 function 可以返回 null,例如,当你在使用过滤器,但不希望某一事件被处理时。
最佳实践 4:处理来自多个 input topic 的事件
如图 3 所示,Pulsar Functions 可以消费多个 topic 中的事件,下面我们来看一下如何编写一个这样的 function: import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public MultiTopicFunction implements Function<String, String> {
String process(String input, Context context) throws Exception {
String sourceTopic = context.getSourceTopic();
if (sourceTopic.equals(“TopicA”) {
// parse as TopicA Object
} else if (sourceTopic.equals(“TopicB”) {
// parse as Topic B Object
} else if (sourceTopic.equals(“TopicC”) {
// parse as Topic C Object
}
….
}
}
从代码中可以看出,我们首先要从 Context 对象获取 input topic 的名称,然后根据 input topic 的名称相应地解析/处理事件。
最佳实践 5:Metric 收集
Apache Pulsar SDK 提供了 metric 收集机制,可用于记录所选择的任何用户定义的 metric。在下面的示例中,我们使用单独的 metric 来跟踪调用该 function 的总次数,使用另一个 metric 来跟踪使用无效输入调用该 function 的次数。更多关于读取和使用 metric 的说明,请参阅监控指南。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public MetricFunction implements Function<Integer, Void> {
Void process(String input, Context context) throws Exception {
context.recordMetric(“invocation count”, 1);
if (input < 0) {
context.recordMetric(“Invalid data”, 1);
}
return null;
}
}
感谢你的阅读,相信你对“怎么使用Apache Pulsar Functions进行简单事件处理”这一问题有一定的了解,快去动手实践吧,如果想了解更多相关知识点,可以关注亿速云网站!小编会继续为大家带来更好的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。