这篇文章将为大家详细讲解有关Flink CEP事件处理,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
复杂事件处理,允许在无界数据流中检测出特定事件模型
单个模式指一个模式,可以是一个单例也可以是循环模式。
模式都是单例的,可以通过量词转换成循环模式。每个模式可以有一个或多个条件来决定接受哪些事件。
pattern.oneOrMore():期望给定的事件出现一次或多次
pattern.times(#oftimes):期望一个给定事件出现特定次数的模式
pattern.times(#fromTimes, #toTimes):期望一个给定事件出现次数在一个最小值与最大值中间
pattern.greedy():贪心算法,尽可能多匹配,还不能让模式组贪心
pattern.optional():变为可选
示例: // 期望出现4次 start.times(4); // 期望出现0或者4次 start.times(4).optional(); // 期望出现2、3或者4次 start.times(2, 4); // 期望出现2、3或者4次,并且尽可能的重复次数多 start.times(2, 4).greedy(); // 期望出现0、2、3或者4次 start.times(2, 4).optional(); // 期望出现0、2、3或者4次,并且尽可能的重复次数多 start.times(2, 4).optional().greedy(); // 期望出现1到多次 start.oneOrMore(); // 期望出现1到多次,并且尽可能的重复次数多 start.oneOrMore().greedy(); // 期望出现0到多次 start.oneOrMore().optional(); // 期望出现0到多次,并且尽可能的重复次数多 start.oneOrMore().optional().greedy(); // 期望出现2到多次 start.timesOrMore(2); // 期望出现2到多次,并且尽可能的重复次数多 start.timesOrMore(2).greedy(); // 期望出现0、2或多次 start.timesOrMore(2).optional(); // 期望出现0、2或多次,并且尽可能的重复次数多 start.timesOrMore(2).optional().greedy();
判断事件属性的条件可以是以下方法
pattern.where()
pattern.or()
pattern.until()
这些方法入参可以是IterativeCondition或SimpleCondition
pattern.subtype方法限制接受事件类型是初始事件的子类型。
迭代条件IterativeCondition
简单条件SimpleCondition
组合条件.where().or()等
停止条件.until()
FlinkCEP支持事件之间如下形式的连续策略
严格连续:期望所有匹配事件严格的一个接一个出现,中间没有任何不匹配事件
松散连续:忽略匹配的事件之间的不匹配的事件
不确定的松散连续:更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配
1. next() 指定严格连续 2. followedBy() 指定松散连续 3. followedByAny() 不确定松散连续 4. notNext() 如果不想后面直接连着一个特定事件 5. notFollowedBy(),如果不想一个特定事件发生在两个事件之间的任何地方。 ps: 模式序列不能以notFollowedBy()结尾 一个NOT模式前面不能是可选的模式
定义模式一个有效时间约束:pattern.within()方法指定有效时间内发生。
模式序列只能有一个时间限制,如果限制多个时间在不同的模式上,会使用最小的时间限制。
循环模式默认是松散连续,如果合用严格连续,需使用consecutive()方法明确指定。如果想使用不确定松散连续,可以使用allowCombinations()方法
==示例:consecutive==
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }) .followedBy("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).oneOrMore().consecutive() .followedBy("end1").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }); 输入:C D A1 A2 A3 D A4 B,会产生下面的输出: 如果施加严格连续性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B} 不施加严格连续性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}
==示例:allowCombinations==
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }) .followedBy("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).oneOrMore().allowCombinations() .followedBy("end1").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }); 输入:C D A1 A2 A3 D A4 B,会产生如下的输出: 如果使用不确定松散连续: {C A1 B},{C A1 A2 B},{C A1 A3 B},{C A1 A4 B},{C A1 A2 A3 B},{C A1 A2 A4 B},{C A1 A3 A4 B},{C A1 A2 A3 A4 B} 如果不使用:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}
定义一个模式序列作为begin,followedBy,followedByAny和next条件
关于“Flink CEP事件处理”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。