转载需标明出处 mythmoon@163.com
Complex Event Processing 复杂事件处理
In the previous chapter, we talked about the Table API provided by Apache Flink and how we can use it to process relational data structures. This chapter onwards, we will start learning more about the libraries provided by Apache Flink and how we can use them for specific use cases. To start with, let’s try to understand a library called Comrlex Event Processing (CEP). CEP is a very interesting but complex topic that has its value in various industries. Wherever there is a stream of events expected, naturally people want to perform complex event processing in all such use cases. Let’s try to understand what CEP is all about. 在前一章中, 我们讨论了Apache Flink 提供的表 api, 以及如何使用它来处理关系数据结构。本章之后, 我们将开始了解有关 apacheflink 提供的库的更多信息, 以及如何将它们用于特定的用例。首先, 让我们尝试了解一个名为 Comrlex Event Processing (CEP) 的库。cep 是一个非常有趣但复杂的话题, 在各个行业都有其价值。只要有预期的事件流, 人们自然希望在所有此类用例中执行复杂的事件处理。让我们试着了解 cep 的意义。
CEP analyzes streams of disparate events occurring with high frequency and low latency. These days, streaming events can be found in various industries, for example: cep 分析以高频和低延迟发生的不同事件流。如今, 流媒体事件可以在不同的行业中找到, 例如:
In the oil and gas domain, sensor data comes from various drilling tools or from upstream oil pipeline equipment 在石油和天然气领域, 传感器数据来自各种钻井工具或上游石油管道设备
In the security domain, activity data, malware information, and usage pattern data come from various end points 在安全域中, 活动数据、恶意软件信息和使用模式数据来自不同的端点
In the wearable domain, data comes from various wrist bands with information about your heart beat rate, your activity, and so on 在可穿戴领域, 数据来自不同的腕带, 其中包含有关您的心跳率、活动等信息
In the banking domain, data comes from credit card usage, banking activities, and so on 在银行领域, 数据来自信用卡使用、银行活动等
It is very important to analyze variation patterns to get notified in real time about any change in the regular assembly. CEP can understand patterns across the streams of events, sub-events, and their sequences. CEP helps to identify meaningful patterns and complex relationships among unrelated events, and sends notifications in real and near real time to prevent damage: 分析变体模式以实时获得有关常规程序集中任何更改的通知是非常重要的。cep 可以了解跨事件流、子事件及其序列的模式。cep 有助于识别不相关事件之间有意义的模式和复杂关系, 并实时和近实时发送通知, 以防止损坏:
The preceding diagram shows how the CEP flow works. Even though the flow looks simple, CEP has various abilities such as: 上图显示了 cep 流的工作原理。尽管流看起来很简单, cep 也有各种能力, 例如:
The ability to produce results as soon as the input event stream is available在输入事件流可用时生成结果的能力
The ability to provide computations such as aggregation over time and timeout between two events of interest提供计算 (如随时间的聚合和两个感兴趣的事件之间的超时) 的能力
The ability to provide real-timeInear real-time alerts and notifications on detection of complex event patterns能够提供实时输入实时警报和通知, 用于检测复杂事件模式
The ability to connect and correlate heterogeneous sources and analyze patterns in them连接异构源并将其关联并分析其中模式的能力
The ability to achieve high-throughput, low-latency processing实现高吞吐量、低延迟处理的能力
There are various solutions available on the market. With big data technology advancements, we have multiple options like Apache Spark, Apache Samza, Apache Beam, among others, but none of them have a dedicated library to fit all solutions. Now let us try to understand what we can achieve with Flink’s CEP library.
市场上有各种各样的解决方案。随着大数据技术的进步, 我们有多种选择, 如 apache spark, apache samza, apache beam 等, 但没有一个专用的库, 以适应所有的解决方案。现在, 让我们尝试了解 flink 的 cep 库可以实现什么。
Apache Flink provides the Flink CEP library, which provides APIs to perform complex event processing. The library consists of the following core components: apache flink 提供 flink cep 库, 该库提供用于执行复杂事件处理的 api。该库由以下核心组件组成:
Event stream
Pattern definition 模式定义
Pattern detection 模式检测
Alert generation 警告生成
Flink CEP works on Flink’s streaming API called DataStream. A programmer needs to define the pattern to be detected from the stream of events and then Flink’s CEP engine detects the pattern and takes the appropriate action, such as generating alerts. flink cep 适用于 flink 名为 datastream 的流媒体 api。程序员需要定义要从事件流中检测到的模式, 然后 flink 的 cep 引擎检测到该模式并采取适当的操作, 例如生成警报。
In order to get started, we need to add the following Maven dependency:
<!--
https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala_2.10 -
->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>1.1.4</version>
</dependency>
A very important component of CEP is its input event stream. In earlier chapters, we have seen details of the DataStream API. Now let’s use that knowledge to implement CEP. The very first thing we need to do is define a Java POJO for the event. Let’s assume we need to monitor a temperature sensor event stream. CEP的一个非常重要的组成部分是它的输入事件流。在前面的章节中, 我们看到了DataStream API的详细信息。现在, 让我们使用这些知识来实现CEP。我们需要做的第一件事就是为事件定义一个 Java POJO。假设我们需要监视温度传感器事件流。
First we define an abstract class and then extend this class.
The following code snippets demonstrate this. First, we write an abstract class as shown here:
package com.demo.chapter05;
public abstract class MonitoringEvent { private String machineName;
public String getMachineName() { return machineName;
}
public void setMachineName(String machineName) { this.machineName = machineName;
}
@Override
public int hashCode() { final int prime = 31; int result = 1;
result = prime * result + ((machineName == null) 0 : machineName.hashCode());
return result;
}
@Override
public boolean equals(Object obj) { if (this == obj)
return true; if (obj == null)
return false;
if (getClass() != obj.getClass()) return false;
MonitoringEvent other = (MonitoringEvent) obj; if (machineName == null) {
if (other.machineName != null) return false;
} else if (!machineName.equals(other.machineName)) return false;
return true;
}
public MonitoringEvent(String machineName) { super();
this.machineName = machineName;
}
}
Then we create a POJO for the actual temperature event:
package com.demo.chapter05;
public class TemperatureEvent extends MonitoringEvent { public TemperatureEvent(String machineName) {
super(machineName);
}
private double temperature; public double getTemperature() {
return temperature;
}
public void setTemperature(double temperature) { this.temperature = temperature;
}
@Override
public int hashCode() { final int prime = 31;
int result = super.hashCode(); long temp;
temp = Double.doubleToLongBits(temperature);
result = prime * result + (int) (temp ^ (temp >>> 32)); return result;
}
@Override
public boolean equals(Object obj) { if (this == obj)
return true;
if (!super.equals(obj)) return false;
if (getClass() != obj.getClass()) return false;
TemperatureEvent other = (TemperatureEvent) obj; if (Double.doubleToLongBits(temperature) !=
Double.doubleToLongBits(other.temperature)) return false;
return true;
}
public TemperatureEvent(String machineName, double temperature) { super(machineName);
this.temperature = temperature;
}
@Override
public String toString() {
return "TemperatureEvent [getTemperature()=" + getTemperature() + ", getMachineName()=" + getMachineName()
+ "]";
}
}
Now we can define the event source as follows: In Java:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0),
new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1), new TemperatureEvent("xyz", 22.2),
new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1),
new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz",
22.7),
new TemperatureEvent("xyz", 27.0));
In Scala:
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
val input: DataStream[TemperatureEvent] = env.fromElements(new TemperatureEvent("xyz", 22.0),
new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1),
new TemperatureEvent("xyz", 22.2),
new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.3),
new TemperatureEvent("xyz", 22.1),
new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7),
new TemperatureEvent("xyz", 27.0))
The Pattern API allows you to define complex event patterns very easily. Each pattern consists of multiple states. To go from one state to another state, generally we need to define the conditions. The conditions could be continuity or filtered out events. Pattern API 允许您非常轻松地定义复杂的事件模式。每个模式由多个状态组成。要从一种状态到另一个状态, 一般我们需要定义条件。条件可以是连续性, 也可以是筛选出事件。
Let’s try to understand each pattern operation in detail.
The initial state can be defined as follows: In Java:
Pattern<Event, > start = Pattern.<Event>begin("start");
In Scala:
val start : Pattern[Event, _] = Pattern.begin("start")
We can also specify the filter condition for the initial state: In Java:
start.where(new FilterFunction<Event>() {
@Override
public boolean filter(Event value) { return ... // condition
}
});
In Scala:
start.where(event => ... /* condition */)
We can also filter out events based on their sub-types, using the subtype() method: In Java:
start.subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {
@Override
public boolean filter(SubEvent value) { return ... // condition
}
});
In Scala:
start.subtype(classOf[SubEvent]).where(subEvent => ... /* condition */)
Pattern API also allows us define multiple conditions together. We can use OR and AND
operators. In Java:
pattern.where(new FilterFunction<Event>() {
@Override
public boolean filter(Event value) { return ... // condition
}
}).or(new FilterFunction<Event>() {
@Override
public boolean filter(Event value) { return ... // or condition
}
});
In Scala:
pattern.where(event => ... /* condition */).or(event => ... /* or condition
*/)
As stated earlier, we do not always need to filter out events. There can always be some pattern where we need continuity instead of filters.
Continuity can be of two types – strict continuity and non-strict continuity.
如前所述, 我们并不总是需要筛选出事件。总是可以有一些模式, 我们需要连续性, 而不是过滤器。连续性可以是两种类型-严格的连续性和不严格的连续性。
Strict continuity needs two events to succeed directly which means there should be no other event in between. This pattern can be defined by next().严格的连续性需要两个事件直接成功, 这意味着两者之间不应该有其他事件。此模式可以由下一个 定义。
In Java:
Pattern<Event, > strictNext = start.next("middle");
In Scala:
val strictNext: Pattern[Event, _] = start.next("middle")
Non-strict continuity can be stated as other events are allowed to be in between the specific two events. This pattern can be defined by followedBy().非严格的连续性可以说是, 因为允许在特定的两个事件之间出现其他事件。此模式可以通过以下定义 ()。
In Java:
Pattern<Event, > nonStrictNext = start.followedBy("middle");
In Scala:
val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")
Pattern API also allows us to do pattern matching based on time intervals. We can define a time-based temporal constraint as follows. Pattern API 还允许我们根据时间间隔进行模式匹配。我们可以定义一个基于时间的时间约束, 如下所示。
In Java:
next.within(Time.seconds(30));
In Scala:
next.within(Time.seconds(10))
To detect patterns against a stream of events, we need to run the stream though the pattern. The CEP.pattern() returns PatternStream. 要针对事件流检测模式, 我们需要通过模式运行该流。cep 模式 () 返回 patternstream。
The following code snippet shows how we can detect a pattern. First the pattern is defined to check if the temperature value is greater than 26.0 degrees in 10 seconds. 下面的代码段演示了如何检测模式。首先定义模式是为了在10秒内检查温度值是否大于22.0度。
In Java:
Pattern<TemperatureEvent, > warningPattern = Pattern.<TemperatureEvent> begin("first")
.subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {
public boolean filter(TemperatureEvent value) { if (value.getTemperature() >= 26.0) {
return true;
}
return false;
}
}).within(Time.seconds(10));
PatternStream<TemperatureEvent> patternStream = CEP.pattern(inputEventStream, warningPattern);
In Scala:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val input = // data
val pattern: Pattern[TempEvent, _] = Pattern.begin("start").where(event => event.temp >= 26.0)
val patternStream: PatternStream[TempEvent] = CEP.pattern(input, pattern)
Once the pattern stream is available, we need to select the pattern from it and then take appropriate actions based on it. We can use the select or flatSelect method to select data from the pattern. 一旦模式流可用, 我们需要从中选择模式, 然后根据它采取适当的行动。我们可以使用选择或平面选择方法从模式中选择数据。
The select method needs PatternSelectionFunction implementation. It has a select method which would be called for each event sequence. The select method receives a map of stringIevent pairs of matched events. The string is defined by the name of the state. The select method returns exactly one result. 选择方法需要模式选择函数实现。它有一个选择方法, 将为每个事件序列调用该方法。选择方法接收匹配事件的字符串事件对的映射。字符串由状态的名称定义。选择方法只返回一个结果。
To collect the results, we need to define the output POJO. In our case, let’s say we need to generate alerts as output. Then we need to define POJO as follows: 为了收集结果, 我们需要定义输出 pojo。在我们的例子中, 假设我们需要生成警报作为输出。然后, 我们需要定义 pojo, 如下所示:
package com.demo.chapter05; public class Alert {
private String message;
public String getMessage() { return message;
}
public void setMessage(String message) { this.message = message;
}
public Alert(String message) { super();
this.message = message;
}
@Override
public String toString() {
return "Alert [message=" + message + "]";
}
@Override
public int hashCode() { final int prime = 31; int result = 1;
result = prime * result + ((message == null) 0 : message.hashCode());
return result;
}
@Override
public boolean equals(Object obj) { if (this == obj)
return true; if (obj == null)
return false;
if (getClass() != obj.getClass()) return false;
Alert other = (Alert) obj; if (message == null) {
if (other.message != null) return false;
} else if (!message.equals(other.message)) return false;
return true;
}
}
Next we define the select functions. In Java:
class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
@Override
public OUT select(Map<String, IN> pattern) { IN startEvent = pattern.get("start"); IN endEvent = pattern.get("end"); return new OUT(startEvent, endEvent);
}
}
In Scala:
def selectFn(pattern : mutable.Map[String, IN]): OUT = { val startEvent = pattern.get("start").get
val endEvent = pattern.get("end").get OUT(startEvent, endEvent)
}
The flatSelect method is similar to the select method. The only difference between the two is that flatSelect can return an arbitrary number of results. The flatSelect method has an additional Collector parameter which is used for output element. 平板选择方法类似于选择方法。两者之间的唯一区别是, 平面选择可以返回任意数量的结果。平面选择方法有一个额外的收集器参数, 用于输出元素。
The following example shows how we can use the flatSelect method. In Java:
class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
@Override
public void select(Map<String, IN> pattern, Collector<OUT> collector) { IN startEvent = pattern.get("start");
IN endEvent = pattern.get("end");
for (int i = 0; i < startEvent.getValue(); i++ ) { collector.collect(new OUT(startEvent, endEvent));
}
}
}
In Scala:
def flatSelectFn(pattern : mutable.Map[String, IN], collector : Collector[OUT]) = {
val startEvent = pattern.get("start").get val endEvent = pattern.get("end").get for (i <- 0 to startEvent.getValue) {
collector.collect(OUT(startEvent, endEvent))
}
}
Sometimes we may miss out certain events if we have constrained the patterns with a time boundary. It is possible that events may be discarded because they exceed the length. In order to take actions on the timed out events, the select and flatSelect methods allow a timeout handler. This handler is called for each timeout event pattern. 有时, 如果我们限制了具有时间边界的模式, 我们可能会错过某些事件。事件可能会因为超过长度而被丢弃。为了对超时事件执行操作, select 和平面 select 方法允许超时处理程序。为每个超时事件模式调用此处理程序。
In this case, the select method contains two parameters: PatternSelectFunction and PatternTimeoutFunction. The return type for a timed out function can be different from the select pattern function. The timed out event and select event are wrapped in the class Either.Right and Either.Left.
The following code snippets shows how we do things in practice. In Java:
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Either<TimeoutEvent, ComplexEvent>> result = patternStream.select(
new PatternTimeoutFunction<Event, TimeoutEvent>() {...}, new PatternSelectFunction<Event, ComplexEvent>() {...}
);
DataStream<Either<TimeoutEvent, ComplexEvent>> flatResult = patternStream.flatSelect(
new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...}, new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
);
In Scala, the select API:
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.select{
(pattern: mutable.Map[String, Event], timestamp: Long) => TimeoutEvent()
} {
pattern: mutable.Map[String, Event] => ComplexEvent()
}
The flatSelect API is called with the Collector as it can emit an arbitrary number of events:
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect{
(pattern: mutable.Map[String, Event], timestamp: Long, out: Collector[TimeoutEvent]) =>
out.collect(TimeoutEvent())
} {
(pattern: mutable.Map[String, Event], out: Collector[ComplexEvent]) => out.collect(ComplexEvent())
}
In earlier sections, we learnt about various features provided by the Flink CEP engine. Now it’s time to understand how we can use it in real-world solutions. For that, let’s assume we work for a mechanical company which produces some products. In the product factory, there is a need to constantly monitor certain machines. The factory has already set up the sensors which keep on sending the temperature of the machines at a given time. 在前面的部分中, 我们了解了 flink cep 引擎提供的各种功能。现在×××解我们如何在现实世界的解决方案中使用它了。为此, 让我们假设我们在一家生产一些产品的机械公司工作。在产品工厂, 需要不断监控某些机器。工厂已经安装了传感器, 在给定的时间继续发送机器的温度。
Now we will be setting up a system that constantly monitors the temperature value and generates an alert if the temperature exceeds a certain value.
现在我们将设置一个系统, 不断监测温度值, 并在温度超过某个值的情况下生成警报。
We can use the following architecture:
Here we will be using Kafka to collect events from sensors. In order to write a Java application, we first need to create a Maven project and add the following dependency:
<!--
https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala_2.11 -
->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>1.1.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink- streaming-java_2.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.1.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink- streaming-scala_2.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>1.1.4</version>
</dependency>
Next we need to do following things for using Kafka.
First we need to define a custom Kafka deserializer. This will read bytes from a Kafka topic and convert it into TemperatureEvent. The following is the code to do this.
EventDeserializationSchema.java:
package com.demo.chapter05;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
public class EventDeserializationSchema implements DeserializationSchema<TemperatureEvent> {
public TypeInformation<TemperatureEvent> getProducedType() { return TypeExtractor.getForClass(TemperatureEvent.class);
}
public TemperatureEvent deserialize(byte[] arg0) throws IOException { String str = new String(arg0, StandardCharsets.UTF_8);
String[] parts = str.split("=");
return new TemperatureEvent(parts[0], Double.parseDouble(parts[1]));
}
public boolean isEndOfStream(TemperatureEvent arg0) { return false;
}
}
Next we create topics in Kafka called temperature:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication- factor 1 --partitions 1 --topic temperature
Now we move to Java code which would listen to these events in Flink streams: 现在, 我们转到 java 代码, 它将侦听 flink 流中的这些事件:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test");
DataStream<TemperatureEvent> inputEventStream = env.addSource(
new FlinkKafkaConsumer09<TemperatureEvent>("temperature", new EventDeserializationSchema(), properties));
Next we will define the pattern to check if the temperature is greater than 26.0 degrees Celsius within 10 seconds:
Pattern<TemperatureEvent, > warningPattern = Pattern.<TemperatureEvent> begin("first").subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {
private static final long serialVersionUID = 1L;
public boolean filter(TemperatureEvent value) { if (value.getTemperature() >= 26.0) {
return true;
}
return false;
}
}).within(Time.seconds(10));
Next match this pattern with the stream of events and select the event. We will also add up the alert messages into results stream as shown here:
DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)
.select(new PatternSelectFunction<TemperatureEvent, Alert>() { private static final long serialVersionUID = 1L;
public Alert select(Map<String, TemperatureEvent> event) throws Exception {
return new Alert("Temperature Rise Detected:" + event.get("first").getTemperature()
+ " on machine name:" + event.get("first").getMachineName());
}
});
In order to know what the alerts were generated, we will print the results:
patternStream.print();
And we execute the stream:
env.execute("CEP on Temperature Sensor");
Now we are all set to execute the application. As and when we get messages in Kafka topics, the CEP will keep on executing.
The actual execution will looks like the following. Here is how we can provide sample input:
xyz=21.0 xyz=30.0 LogShaft=29.3 Boiler=23.1 Boiler=24.2 Boiler=27.0 Boiler=29.0
Here is how the sample output will look like:
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1010488393]
10/09/2016 | 18:15:55 | Job execution switched to status RUNNING. |
10/09/2016 | 18:15:55 | Source: Custom Source(1/4) switched to SCHEDULED |
10/09/2016 | 18:15:55 | Source: Custom Source(1/4) switched to DEPLOYING |
10/09/2016 | 18:15:55 | Source: Custom Source(2/4) switched to SCHEDULED |
10/09/2016 | 18:15:55 | Source: Custom Source(2/4) switched to DEPLOYING |
10/09/2016 | 18:15:55 | Source: Custom Source(3/4) switched to SCHEDULED |
10/09/2016 | 18:15:55 | Source: Custom Source(3/4) switched to DEPLOYING |
10/09/2016 | 18:15:55 | Source: Custom Source(4/4) switched to SCHEDULED |
10/09/2016 | 18:15:55 | Source: Custom Source(4/4) switched to DEPLOYING |
10/09/2016 | 18:15:55 | CEPPatternOperator(1/1) switched to SCHEDULED |
10/09/2016 | 18:15:55 | CEPPatternOperator(1/1) switched to DEPLOYING |
10/09/2016 | 18:15:55 | Map -> Sink: Unnamed(1/4) switched to SCHEDULED |
10/09/2016 | 18:15:55 | Map -> Sink: Unnamed(1/4) switched to DEPLOYING |
10/09/2016 | 18:15:55 | Map -> Sink: Unnamed(2/4) switched to SCHEDULED |
10/09/2016 | 18:15:55 | Map -> Sink: Unnamed(2/4) switched to DEPLOYING |
10/09/2016 | 18:15:55 | Map -> Sink: Unnamed(3/4) switched to SCHEDULED |
10/09/2016 | 18:15:55 | Map -> Sink: Unnamed(3/4) switched to DEPLOYING |
10/09/2016 | 18:15:55 | Map -> Sink: Unnamed(4/4) switched to SCHEDULED |
10/09/2016 | 18:15:55 | Map -> Sink: Unnamed(4/4) switched to DEPLOYING |
10/09/2016 | 18:15:55 | Source: Custom Source(2/4) switched to RUNNING |
10/09/2016 | 18:15:55 | Source: Custom Source(3/4) switched to RUNNING |
10/09/2016 | 18:15:55 | Map -> Sink: Unnamed(1/4) switched to RUNNING |
10/09/2016 | 18:15:55 | Map -> Sink: Unnamed(2/4) switched to RUNNING |
10/09/2016 | 18:15:55 | Map -> Sink: Unnamed(3/4) switched to RUNNING |
10/09/2016 | 18:15:55 | Source: Custom Source(4/4) switched to RUNNING |
10/09/2016 | 18:15:55 | Source: Custom Source(1/4) switched to RUNNING |
10/09/2016 | 18:15:55 | CEPPatternOperator(1/1) switched to RUNNING |
10/09/2016 | 18:15:55 | Map -> Sink: Unnamed(4/4) switched to RUNNING |
1> Alert [message=Temperature Rise Detected:30.0 on machine name:xyz]
2> Alert [message=Temperature Rise Detected:29.3 on machine name:LogShaft] 3> Alert [message=Temperature Rise Detected:27.0 on machine name:Boiler] 4> Alert [message=Temperature Rise Detected:29.0 on machine name:Boiler]
We can also configure a mail client and use some external web hook to send e-mail or messenger notifications.
In this chapter, we learnt about CEP. We discussed the challenges involved and how we can use the Flink CEP library to solve CEP problems. We also learnt about Pattern API and the various operators we can use to define the pattern. In the final section, we tried to connect the dots and see one complete use case. With some changes, this setup can be used as it is present in various other domains as well. 在本章中, 我们了解了 cep。我们讨论了所涉及的挑战以及如何使用 flink cep 库来解决 cep 问题。我们还了解了模式 api 和各种运算符, 我们可以使用来定义模式。在最后一节中, 我们尝试连接点, 并看到一个完整的用例。通过一些更改, 此设置也可以在其他各种域中使用。
In the next chapter, we will see how to use Flink’s built-in Machine Learning library to solve complex problems.
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。