温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Flink的窗口机制介绍

发布时间:2021-09-09 10:14:57 来源:亿速云 阅读:155 作者:chen 栏目:大数据

本篇内容介绍了“Flink的窗口机制介绍”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

读懂window区别:

读懂window内部的源码实现关系

  • Window Assigner:用来决定某个元素被分配到哪个/哪些窗口中去。

    Flink的窗口机制介绍

  • Trigger:触发器。决定了一个窗口何时能够被计算或清除,每个窗口都会拥有一个自己的Trigger。

    Flink的窗口机制介绍


  • Evictor:可以译为“驱逐者”。在Trigger触发之后,在窗口被处理之前,Evictor(如果有Evictor的话)会用来剔除窗口中不需要的元素,相当于一个filter。

    Flink的窗口机制介绍

读懂WindowAssignern内部实现机制,它主要是实现数据的分发,分发到不同的window中,我简单举例一个,我设置window的开始和结束时间,然后触发器发现我的window达到了结束时间,这个window就会触发。

Flink的窗口机制介绍

一张图读懂trigger,evictor,emit的执行顺序

假设有一个滑动计数窗口,每2个元素计算一次最近4个元素的总和,那么窗口工作示意图如下所示:

Flink的窗口机制介绍

测试验证代码:

import java.utilimport org.apache.flink.api.common.ExecutionConfigimport org.apache.flink.streaming.api.{TimeCharacteristic, environment}import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.assigners.WindowAssignerimport org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, Trigger}import org.apache.flink.streaming.api.windowing.windows.TimeWindowobject FlinkWindowTest {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)    val input = env.socketTextStream("localhost", 9001)    val inputMap = input.flatMap(f => {      f.split("\\W+")    }).map(line =>(line ,1))      .keyBy(0).window(new WindowAssigner[Object,TimeWindow] {      override def isEventTime = false      override def getDefaultTrigger(env: environment.StreamExecutionEnvironment) = {        ProcessingTimeTrigger.create()      }      override def assignWindows(element: Object, timestamp: Long, context: WindowAssigner.WindowAssignerContext) = {        val windows = new util.ArrayList[TimeWindow](7)        //每隔1分钟统计历史5分钟的数据        val size =1000L * 60 * 5        val slide = 1000L * 60        val lastStart = timestamp - timestamp % slide        var start = lastStart        while ( {          start > timestamp - size        })        {          start -= slide          windows.add(new TimeWindow(start, start + size))        }        //每隔1分钟统计历史1分钟的数据        val size1 =1000L * 60        val lastStart1 = timestamp - timestamp % slide        println(timestamp % slide)        var start1 = lastStart1        while ( {          start1 > timestamp - size1        })        {          windows.add(new TimeWindow(start1, start1 + size1))          start1 -= slide        }        windows      }      override def getWindowSerializer(executionConfig: ExecutionConfig) = new TimeWindow.Serializer    }).sum(1)    .print()    env.execute()  }}

总结

WindowAssigner主要是把数据分发到不同的window窗口中去,然后每个window自己内部设置触发器,当数据还没有触发之前整个数据是存储在flink的state中,也就是状态存储。当window触发(Trigger的返回结果可以是)之后,Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。

“Flink的窗口机制介绍”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI