这篇文章将为大家详细讲解有关Flink CEP事件处理,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
复杂事件处理,允许在无界数据流中检测出特定事件模型
单个模式指一个模式,可以是一个单例也可以是循环模式。
模式都是单例的,可以通过量词转换成循环模式。每个模式可以有一个或多个条件来决定接受哪些事件。
pattern.oneOrMore():期望给定的事件出现一次或多次
pattern.times(#oftimes):期望一个给定事件出现特定次数的模式
pattern.times(#fromTimes, #toTimes):期望一个给定事件出现次数在一个最小值与最大值中间
pattern.greedy():贪心算法,尽可能多匹配,还不能让模式组贪心
pattern.optional():变为可选
示例:
// 期望出现4次
start.times(4);
// 期望出现0或者4次
start.times(4).optional();
// 期望出现2、3或者4次
start.times(2, 4);
// 期望出现2、3或者4次,并且尽可能的重复次数多
start.times(2, 4).greedy();
// 期望出现0、2、3或者4次
start.times(2, 4).optional();
// 期望出现0、2、3或者4次,并且尽可能的重复次数多
start.times(2, 4).optional().greedy();
// 期望出现1到多次
start.oneOrMore();
// 期望出现1到多次,并且尽可能的重复次数多
start.oneOrMore().greedy();
// 期望出现0到多次
start.oneOrMore().optional();
// 期望出现0到多次,并且尽可能的重复次数多
start.oneOrMore().optional().greedy();
// 期望出现2到多次
start.timesOrMore(2);
// 期望出现2到多次,并且尽可能的重复次数多
start.timesOrMore(2).greedy();
// 期望出现0、2或多次
start.timesOrMore(2).optional();
// 期望出现0、2或多次,并且尽可能的重复次数多
start.timesOrMore(2).optional().greedy();
判断事件属性的条件可以是以下方法
pattern.where()
pattern.or()
pattern.until()
这些方法入参可以是IterativeCondition或SimpleCondition
pattern.subtype方法限制接受事件类型是初始事件的子类型。
迭代条件IterativeCondition
简单条件SimpleCondition
组合条件.where().or()等
停止条件.until()
FlinkCEP支持事件之间如下形式的连续策略
严格连续:期望所有匹配事件严格的一个接一个出现,中间没有任何不匹配事件
松散连续:忽略匹配的事件之间的不匹配的事件
不确定的松散连续:更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配
1. next() 指定严格连续
2. followedBy() 指定松散连续
3. followedByAny() 不确定松散连续
4. notNext() 如果不想后面直接连着一个特定事件
5. notFollowedBy(),如果不想一个特定事件发生在两个事件之间的任何地方。
ps: 模式序列不能以notFollowedBy()结尾
一个NOT模式前面不能是可选的模式
定义模式一个有效时间约束:pattern.within()方法指定有效时间内发生。
模式序列只能有一个时间限制,如果限制多个时间在不同的模式上,会使用最小的时间限制。
循环模式默认是松散连续,如果合用严格连续,需使用consecutive()方法明确指定。如果想使用不确定松散连续,可以使用allowCombinations()方法
==示例:consecutive==
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
})
.followedBy("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().consecutive()
.followedBy("end1").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
输入:C D A1 A2 A3 D A4 B,会产生下面的输出:
如果施加严格连续性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B}
不施加严格连续性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}
==示例:allowCombinations==
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
})
.followedBy("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().allowCombinations()
.followedBy("end1").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
输入:C D A1 A2 A3 D A4 B,会产生如下的输出:
如果使用不确定松散连续: {C A1 B},{C A1 A2 B},{C A1 A3 B},{C A1 A4 B},{C A1 A2 A3 B},{C A1 A2 A4 B},{C A1 A3 A4 B},{C A1 A2 A3 A4 B}
如果不使用:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}
定义一个模式序列作为begin,followedBy,followedByAny和next条件
关于“Flink CEP事件处理”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/3015713/blog/4681347