温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

flink中窗口的作用是什么

发布时间:2021-06-24 09:30:56 来源:亿速云 阅读:811 作者:chen 栏目:大数据

这篇文章主要讲解了“flink中窗口的作用是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“flink中窗口的作用是什么”吧!

窗口

  • 窗口计算是流式计算中常用的数据计算方式之一,通过按照固定时间或长度将数据流切分成不同的窗口,再对数据进行相应的聚合操作,得到一定时间范围内的统计结果,例如统计最近5分钟内某网站的点击数,此时,点击数据在不断产生,通过5分钟窗口将数据限定在固定时间范围内,就可以对该范围内的有界数据执行聚合,得到最近5分钟的网站点击数。

  • 代码接口规则

stream.keyBy(...)  //keyed类型数据集
.window(...)   //指定窗口分配器类型
[.trigger(...)]  //指定触发器类型(可选)
[.evictor(...)]  //指定evictor(可选)
[.allowedLateness(...)]  //指定是否延迟处理数据(可选)
[.sideOutputLateData(...)]  //指定Output Lag(可选)
.reduce/aggregate/fold/apply()  //指定窗口计算函数
[.getSideOutput(...)]  //根据Tag输出数据(可选)
  • 算子

    • Windows Assigner:指定窗口类型,定义如何将数据流分配到一个或多个窗口

    • Windows Trigger:指定窗口触发的时机,定义窗口满足什么样的条件触发计算;

    • Evictor:用于数据剔除

    • Lateness:标记是否处理迟到数据,当迟到数据到达窗口中是否触发计算。

    • Output Tag:标记输出标签,然后通过getSideOutput将窗口中的数据根据标签输出。

    • Windows Function:定义窗口上数据处理的逻辑,例如对数据进行sum操作。

Keyed 和 Non-Keyed窗口

  • 在运用窗口计算时,Flink根据上游数据集是否为KeyedStream类型(将数据集按Key分区),对应的Window Assigner会不同,

    • 上游数据集为KeyedStream类型,则调用DataStream API的Windwo()方法指定Windows Assigner,数据将根据Key在不同的Task实例中并行分别计算,最后得出针对每个Key统计的结果。

    • 如果是Non-Keyed类型,则调用WindowsAll()方法来指定Windows Assigner,所有数据都被窗口算子路由到一个Task中计算,并得到结果。

  • 建议数据进行KeyedStream处理,这样启动并行计算,加速效率。

Window Assigner

  • flink支持两种类型的窗口,一种基于时间,窗口大小由开始和结束时间戳约束,一种基于数量,根据固定数量定义窗口大小。

  • 根据Windows Assigner数据分配方式的不同将Windows分为4大类:滚动窗口(Tumbling Windows)、滑动窗口(Sliding Windows)、会话窗口(Session Windows)和全局窗口(Global Windows)

滚动窗口

  • 滚动窗口根据固定时间或大小切分,且窗口与窗口间元素互不重叠,适合于固定时间大小和周期统计某一指标的窗口计算。

  • DataStream API提供了基于Event Time和Process Time两种时间类型的Tumbling窗口,对应的Assigner分别为TumblingEventTimeWindows和TumblingProcessTimeWindows,窗口大小童工of()指定,时间单位分别为Time.milliseconds(x)、Time.seconds(x)或Time.minutes(x),也可以是不同时间单位的组合。

  • 如下实例,窗口时间按10S进行切分,窗口的时间是[1:00:00.000-1:00:09.999] 到[1:00:10.000-1:00:19.999]的等固定时间范围。

val inputStream:DataStream[T]= ...
//定义Event Time Tumbling Windows
val tumblingEventTimeWindows=inputStream.keyBy(_.id)
//通过使用TumblingEventTimeWindows定义Event Time滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(...)  //定义窗口函数

//定义Process Time Tumbling Windows
val tumblingProcessingTimeWindows = inputStream.keyBy(_.id)
//通过TumblingProcessTimeWindows定义Evnet Time滚动窗口
.window(TumblingProcessTimeWindows.of(Times.seconds(10)))
.process(...)  //定义窗口函数

滑动窗口

  • 滑动窗口是一种常见的窗口类型,特点是在滚动窗口基础上增加了窗口滑动时间(Slide Time),且允许窗口数据发生重叠。这种窗口不像滚动窗口按照Windows Size向前移动,而是根据设定的Slide Time向前滑动。窗口之间的数据重叠大小根据Windows Size和Slide time决定,当Slide Time小于Windows Size便会发生窗口重叠,Slide Size大于WindowsSize会出现窗口不连续,数据可能不会再任何一个窗口内计算。

  • DataStream API针对Sliding Windows根据不同时间类型Assigner,包括基于Event Time的SlidingEventTimeWindows和基于Process Time的SlidingProcessingTimeWindows。

  • 实例如下,指定Windows Size为1h,Slide Time为10m。

val inputStream:DataStream[T]= ...
//定义Event Time Sliding Windows
val slidingEventTimeWindows=inputStream.keyBy(_.id)
//通过使用SlidingEventTimeWindows定义Event Time滚动窗口
.window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(10)))
.process(...)  //定义窗口函数

//定义Process Time Sliding Windows
val slidingProcessTimeWindows = inputStream.keyBy(_.id)
//通过SlidingProcessTimeWindows定义Evnet Time滚动窗口
.window(SlidingProcessTimeWindows.of(Time.hours(1),Time.minutes(10)))
.process(...)  //定义窗口函数

会话窗口

  • 将某个时间段内活跃较高的数据聚合为一个窗口进行计算,窗口的触发条件为Session Gap,指规定时间内没有数据活跃接入,则任务窗口结束,触发窗口计算。

  • 注意:如果数据一直不间断,会导致窗口始终不触发。

  • 与滑动、滚动窗口不同,Session Windows不需要定义Windows Size和Slide Time,只需要定义session gap,规定不活跃数据的时间上线即可。

  • Session Windows比较适合非连续型数据处理或周期性产生数据的场景。DataStream API中可以创建基于Event Time和Process Time的Session Windows,对应的有Assigner分别为EventTimeSessionWindow和ProcessTimerSessionWindows。

  • 实例代码如下:

val inputStream:DataStream[T]= ...
//定义Event Time Session Windows
val eventTimeSessionWindows=inputStream.keyBy(_.id)
//通过使用EventTimeSessionWindows定义Event Time滚动窗口
.window(EventTimeSessionWindows.withGap(Time.milliseconds(10)))
.process(...)  //定义窗口函数

//定义Process Time Session Windows
val processTimeSessionWindows = inputStream.keyBy(_.id)
//通过ProcessTimeSessionWindows定义Evnet Time滚动窗口
.window(ProcessTimeSessionWindows.withGap(Time.milliseconds(10)))
.process(...)  //定义窗口函数
  • flink支持动态调整的Session Gap,需要实现SessionWindowTimeGapExtractor接口,并复写extract方法,完成Session Gap的抽取,然后将创建好的Session Gap抽取器传入ProcessiongTimeSessionWindows.withDynamicGap()方法即可。

val inputStream:DataStream[T]= ...
//定义Event Time Session Windows
val eventTimeSessionWindows=inputStream.keyBy(_.id)
//通过使用EventTimeSessionWindows定义Event Time滚动窗口
.window(EventTimeSessionWindows.withDynamicGap(

    //实例化SessionWindowTimeGapExtractor接口
    new SessionWindowTimeGapExtractor[String]{
        override def extract(element:String):Long={
            //动态指定并返回Session Gap
        }
    }
))
.process(...)  //定义窗口函数

//定义Process Time Session Windows
val processTimeSessionWindows = inputStream.keyBy(_.id)
//通过ProcessTimeSessionWindows定义Evnet Time滚动窗口
.window(ProcessTimeSessionWindows.withDynamicGap(

    //实例化SessionWindowTimeGapExtractor接口
    new SessionWindowTimeGapExtractor[String]{
        override def extract(element:String):Long={
            //动态指定并返回Session Gap
        }
    }
))
.process(...)  //定义窗口函数

全局窗口

  • 全局会话窗口将所有相同的key数据分配到单个窗口中计算,窗口没有起始和结束时间,窗口需要借助Triger触发计算,如果不指定,则不会触发计算。

  • 使用全局窗口要非常谨慎,必须明确自己在整个窗口中统计出的结果是什么,并指定对应的触发器,同时指定相应的数据清理机制,否则数据将一直留在内存中。

val inputStream:DataStream[T]= ...
val globalWindows = inputStream.keyBy(_.id)
.window(GlobalWindows.create())  //通过GlobalWindows定义Global Windows
.process()

总结

  • flink定义的四种窗口,容易和时间窗口和事件窗口混淆,他们是不同维度的的窗口定义,需要特别注意下。

  • 越长大越孤单,珍惜好身边人。

感谢各位的阅读,以上就是“flink中窗口的作用是什么”的内容了,经过本文的学习后,相信大家对flink中窗口的作用是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI