这篇文章将为大家详细讲解有关Flink中ProcessFunction类如何使用,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
深入了解ProcessFunction的状态操作(Flink-1.10);
ProcessFunction;
KeyedProcessFunction类;
ProcessAllWindowFunction(窗口处理);
CoProcessFunction(双流处理);
如下图,在常规的业务开发中,SQL、Table API、DataStream API比较常用,处于Low-level的Porcession相对用得较少,从本章开始,我们一起通过实战来熟悉处理函数(Process Function),看看这一系列的低级算子可以带给我们哪些能力?
处理函数有很多种,最基础的应该ProcessFunction类,来看看它的类图,可见有RichFunction的特性open、close,然后自己有两个重要的方法processElement和onTimer: 常用特性如下所示:
处理单个元素;
访问时间戳;
旁路输出;
接下来写两个应用体验上述功能;
开发环境操作系统:MacBook Pro 13寸, macOS Catalina 10.15.3
开发工具:IDEA ULTIMATE 2018.3
JDK:1.8.0_211
Maven:3.6.0
Flink:1.9.2
如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):
名称 | 链接 | 备注 |
---|---|---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
这个git项目中有多个文件夹,本章的应用在<font color="blue">flinkstudy</font>文件夹下,如下图红框所示:
执行以下命令创建一个flink-1.9.2的应用工程:
mvn \ archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.9.2
按提示输入groupId:com.bolingcavalry,architectid:flinkdemo
第一个demo用来体验以下两个特性:
处理单个元素;
访问时间戳;
创建Simple.java,内容如下:
package com.bolingcavalry.processfunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; public class Simple { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 并行度为1 env.setParallelism(1); // 设置数据源,一共三个元素 DataStream<Tuple2<String,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() { @Override public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { for(int i=1; i<4; i++) { String name = "name" + i; Integer value = i; long timeStamp = System.currentTimeMillis(); // 将将数据和时间戳打印出来,用来验证数据 System.out.println(String.format("source,%s, %d, %d\n", name, value, timeStamp)); // 发射一个元素,并且戴上了时间戳 ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, value), timeStamp); // 为了让每个元素的时间戳不一样,每发射一次就延时10毫秒 Thread.sleep(10); } } @Override public void cancel() { } }); // 过滤值为奇数的元素 SingleOutputStreamOperator<String> mainDataStream = dataStream .process(new ProcessFunction<Tuple2<String, Integer>, String>() { @Override public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception { // f1字段为奇数的元素不会进入下一个算子 if(0 == value.f1 % 2) { out.collect(String.format("processElement,%s, %d, %d\n", value.f0, value.f1, ctx.timestamp())); } } }); // 打印结果,证明每个元素的timestamp确实可以在ProcessFunction中取得 mainDataStream.print(); env.execute("processfunction demo : simple"); } }
这里对上述代码做个介绍:
创建一个数据源,每个10毫秒发出一个元素,一共三个,类型是Tuple2,f0是个字符串,f1是整形,每个元素都带时间戳;
数据源发出元素时,提前把元素的f0、f1、时间戳打印出来,和后面的数据核对是否一致;
在后面的处理中,创建了ProcessFunction的匿名子类,里面可以处理上游发来的每个元素,并且还能取得每个元素的时间戳(这个能力很重要),然后将f1字段为奇数的元素过滤掉;
最后将ProcessFunction处理过的数据打印出来,验证处理结果是否符合预期;
直接执行Simple类,结果如下,可见过滤和提取时间戳都成功了:
第二个demo是实现旁路输出(Side Outputs),对于一个DataStream来说,可以通过旁路输出将数据输出到其他算子中去,而不影响原有的算子的处理,下面来演示旁路输出:
创建SideOutput类:
package com.bolingcavalry.processfunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.util.ArrayList; import java.util.List; public class SideOutput { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 并行度为1 env.setParallelism(1); // 定义OutputTag final OutputTag<String> outputTag = new OutputTag<String>("side-output"){}; // 创建一个List,里面有两个Tuple2元素 List<Tuple2<String, Integer>> list = new ArrayList<>(); list.add(new Tuple2("aaa", 1)); list.add(new Tuple2("bbb", 2)); list.add(new Tuple2("ccc", 3)); //通过List创建DataStream DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list); //所有元素都进入mainDataStream,f1字段为奇数的元素进入SideOutput SingleOutputStreamOperator<String> mainDataStream = fromCollectionDataStream .process(new ProcessFunction<Tuple2<String, Integer>, String>() { @Override public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception { //进入主流程的下一个算子 out.collect("main, name : " + value.f0 + ", value : " + value.f1); //f1字段为奇数的元素进入SideOutput if(1 == value.f1 % 2) { ctx.output(outputTag, "side, name : " + value.f0 + ", value : " + value.f1); } } }); // 禁止chanin,这样可以在页面上看清楚原始的DAG mainDataStream.disableChaining(); // 取得旁路数据 DataStream<String> sideDataStream = mainDataStream.getSideOutput(outputTag); mainDataStream.print(); sideDataStream.print(); env.execute("processfunction demo : sideoutput"); } }
这里对上述代码做个介绍:
数据源是个集合,类型是Tuple2,f0字段是字符串,f1字段是整形;
ProcessFunction的匿名子类中,将每个元素的f0和f1拼接成字符串,发给主流程算子,再将f1字段为奇数的元素发到旁路输出;
数据源发出元素时,提前把元素的f0、f1、时间戳打印出来,和后面的数据核对是否一致;
将主流程和旁路输出的元素都打印出来,验证处理结果是否符合预期;
执行SideOutput看结果,如下图,main前缀的都是主流程算子,一共三条记录,side前缀的是旁路输出,只有f1字段为奇数的两条记录,符合预期: 上面的操作都是在IDEA上执行的,还可以将flink单独部署,再将上述工程构建成jar,提交到flink的jobmanager,可见DAG如下:
至此,处理函数中最简单的ProcessFunction类的学习
关于Flink中ProcessFunction类如何使用就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。