这篇文章主要讲解了“flink中的聚合算子是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“flink中的聚合算子是什么”吧!
flink中的一个接口org.apache.flink.api.common.functions.AggregateFunction,这个类可以接在window流之后,做窗口内的统计计算。
注意:除了这个接口AggregateFunction,flink中还有一个抽象类AggregateFunction:org.apache.flink.table.functions.AggregateFunction,大家不要把这个弄混淆了,接口AggregateFunction我们可以理解为flink中的一个算子,和MapFunction、FlatMapFunction等是同级别的,而抽象类AggregateFunction是用于用户自定义聚合函数的,和max、min之类的函数是同级的。
比如我们想实现一个类似sql的功能:
select TUMBLE_START(proctime,INTERVAL '2' SECOND) as starttime,user,count(*) from logs group by user,TUMBLE(proctime,INTERVAL '2' SECOND)
这个sql就是来统计一下每两秒钟的滑动窗口内每个人出现的次数,今天我们就以这个简单的sql的功能为例讲解一下flink的aggregate算子,其实就是我们用程序来实现这个sql的功能。
首先看一下聚合函数的接口:
@PublicEvolvingpublic interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable { ACC createAccumulator(); ACC add(IN value, ACC accumulator); ACC merge(ACC a, ACC b); OUT getResult(ACC accumulator);}
这个接口AggregateFunction里面有4个方法,我们分别来讲解一下。
首先我们自定义source生成用户的信息
public static class MySource implements SourceFunction<Tuple2<String,Long>>{ private volatile boolean isRunning = true; String userids[] = { "4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5", "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b", "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702", "3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c", "e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee" }; @Override public void run(SourceContext<Tuple2<String,Long>> ctx) throws Exception{ while (isRunning){ Thread.sleep(10); String userid = userids[(int) (Math.random() * (userids.length - 1))]; ctx.collect(Tuple2.of(userid, System.currentTimeMillis())); } } @Override public void cancel(){ isRunning = false; } }
public static class CountAggregate implements AggregateFunction<Tuple2<String,Long>,Integer,Integer>{ @Override public Integer createAccumulator(){ return 0; } @Override public Integer add(Tuple2<String,Long> value, Integer accumulator){ return ++accumulator; } @Override public Integer getResult(Integer accumulator){ return accumulator; } @Override public Integer merge(Integer a, Integer b){ return a + b; } }
/** * 这个是为了将聚合结果输出 */ public static class WindowResult implements WindowFunction<Integer,Tuple3<String,Date,Integer>,Tuple,TimeWindow>{ @Override public void apply( Tuple key, TimeWindow window, Iterable<Integer> input, Collector<Tuple3<String,Date,Integer>> out) throws Exception{ String k = ((Tuple1<String>) key).f0; long windowStart = window.getStart(); int result = input.iterator().next(); out.collect(Tuple3.of(k, new Date(windowStart), result)); } }
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String,Long>> dataStream = env.addSource(new MySource()); dataStream.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(2))) .aggregate(new CountAggregate(), new WindowResult() ).print(); env.execute();
感谢各位的阅读,以上就是“flink中的聚合算子是什么”的内容了,经过本文的学习后,相信大家对flink中的聚合算子是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/4596020/blog/4439386