本篇内容介绍了“Flink的窗口机制介绍”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
读懂window区别:
读懂window内部的源码实现关系
Window Assigner:用来决定某个元素被分配到哪个/哪些窗口中去。
Trigger:触发器。决定了一个窗口何时能够被计算或清除,每个窗口都会拥有一个自己的Trigger。
Evictor:可以译为“驱逐者”。在Trigger触发之后,在窗口被处理之前,Evictor(如果有Evictor的话)会用来剔除窗口中不需要的元素,相当于一个filter。
读懂WindowAssignern内部实现机制,它主要是实现数据的分发,分发到不同的window中,我简单举例一个,我设置window的开始和结束时间,然后触发器发现我的window达到了结束时间,这个window就会触发。
一张图读懂trigger,evictor,emit的执行顺序
假设有一个滑动计数窗口,每2个元素计算一次最近4个元素的总和,那么窗口工作示意图如下所示:
测试验证代码:
import java.utilimport org.apache.flink.api.common.ExecutionConfigimport org.apache.flink.streaming.api.{TimeCharacteristic, environment}import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.assigners.WindowAssignerimport org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, Trigger}import org.apache.flink.streaming.api.windowing.windows.TimeWindowobject FlinkWindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val input = env.socketTextStream("localhost", 9001) val inputMap = input.flatMap(f => { f.split("\\W+") }).map(line =>(line ,1)) .keyBy(0).window(new WindowAssigner[Object,TimeWindow] { override def isEventTime = false override def getDefaultTrigger(env: environment.StreamExecutionEnvironment) = { ProcessingTimeTrigger.create() } override def assignWindows(element: Object, timestamp: Long, context: WindowAssigner.WindowAssignerContext) = { val windows = new util.ArrayList[TimeWindow](7) //每隔1分钟统计历史5分钟的数据 val size =1000L * 60 * 5 val slide = 1000L * 60 val lastStart = timestamp - timestamp % slide var start = lastStart while ( { start > timestamp - size }) { start -= slide windows.add(new TimeWindow(start, start + size)) } //每隔1分钟统计历史1分钟的数据 val size1 =1000L * 60 val lastStart1 = timestamp - timestamp % slide println(timestamp % slide) var start1 = lastStart1 while ( { start1 > timestamp - size1 }) { windows.add(new TimeWindow(start1, start1 + size1)) start1 -= slide } windows } override def getWindowSerializer(executionConfig: ExecutionConfig) = new TimeWindow.Serializer }).sum(1) .print() env.execute() }}
WindowAssigner主要是把数据分发到不同的window窗口中去,然后每个window自己内部设置触发器,当数据还没有触发之前整个数据是存储在flink的state中,也就是状态存储。当window触发(Trigger的返回结果可以是)之后,Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。
“Flink的窗口机制介绍”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。