什么是CEP
Complex Event Processing (CEP)是基于Flink实现的复杂事件处理库,根据模式匹配方式来抽取数据流中符合要求的各种数据进行处理。
详细内容参考官方文档
主要定义
时间相关
- 事件时间: 事件中源于事件本身的时间
- 处理时间: 事件开始处理的时间
- 进入时间: 进入Flink的时间
在不指定时间方式下,默认情况下使用处理时间(Processing Time)处理数据
事件定义
- 简单事件:主要处理单一事件,比如对车辆的相关信息进行统计,超速10次会报警1次
- 复杂事件:由多个事件组成的事件
Pattern
Pattern指用来匹配某些事件的规则方式,多个Pattern构成一个序列完成对复杂事件的筛选匹配。可以根据自定义规则指定从一个Pattern到下一个Pattern的状态变化,完成整个复杂事件的状态的转化。
Individual Patterns
Individual Patterns可以是一个匹配一次或者多次的的规则
比如如果pattern为a b+ c? d
,说明匹配到a事件1次,b事件1次以上,c事件0次或1次,d事件,会匹配到结果。
Combining Patterns
多个匹配规则按照连续性的不同组成为一个序列
不能以notFollowedBy()结尾
详细匹配规则参考
NFA
NFA(Non-determined Finite Automaton),不确定有限状态机
在Pattern过程中涉及到主要两个过程
- Pattern匹配过程,会导致计算状态结果的缓冲区增加;
- 匹配提取,从状态缓冲区中,提取出匹配的结果
ComputationState
保存了事件的状态,NFAState
中使用private Queue<ComputationState> completedMatches;
保存了所有事件的状态
NFA通过NFACompiler
将当前创建的Pattern序列转换为NFA的状态转换关系
State Transition
State: 对应代码类名StateType
- Start
- Final
- Normal
- Stop
Transition: 对应代码类名StateTransitionAction
- Take: 事件存入sharedBuffer
- Ignore: 事件丢弃
- Process: 事件在目标state进行转化
CEP使用场景
CEP主要应用于事件与事件、事件与状态之间存在逻辑关系的处理。
如何使用Flink CEP
使用场景描述
车速大于10且小于50后第二条车速大于50且小于90,两条记录中速度差值大于30
AfterMatchSkipStrategy afterMatchSkipStrategy = AfterMatchSkipStrategy.skipToNext();
Pattern<CarInfo, ?> pattern = Pattern.<CarInfo>begin("start", afterMatchSkipStrategy).where(
new SimpleCondition<CarInfo>() {
@Override
public boolean filter(CarInfo subEvent) {
return subEvent.getSpeed() > 10 && subEvent.getSpeed() < 50;
}
}
).times(1).next("middle").where(new IterativeCondition<CarInfo>() {
@Override
public boolean filter(CarInfo value, Context<CarInfo> ctx) throws Exception {
Iterable<CarInfo> start = ctx.getEventsForPattern("start");
Iterator<CarInfo> iterator = start.iterator();
if (iterator.hasNext()) {
CarInfo midCarInfo = iterator.next();
return value.getSpeed() < 90 && value.getSpeed() > 50 && Math.abs(midCarInfo.getSpeed() - value.getSpeed()) > 30;
}
return false;
}
}).times(1);
PatternStream<CarInfo> patternStream = CEP.pattern(carInfoStream, pattern);
DataStream<Alert> result = patternStream.process(
new PatternProcessFunction<CarInfo, Alert>() {
@Override
public void processMatch(
Map<String, List<CarInfo>> pattern,
Context ctx,
Collector<Alert> out) throws Exception {
Alert alert = new Alert();
alert.setId(pattern.size());
out.collect(alert);
}
});
如何使用Blink SQL CEP
相同场景
select *
from carInfo_stream
MATCH_RECOGNIZE(
MEASURES
device_id
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
PATTERN (start{1} middle{1})
DEFINE
start AS start.speed < 50 and start.speed > 10,
middle AS middle.speed < 90 and middle.speed > 50 and abs(middle.speed-start.speed) > 30
)
Flink CEP 和 Blink SQL CEP部分对应关系
针对上例中两者之间的联系更有利于理解Blink SQL CEP的实现
两者间关系如下
Flink CEP | Blink SQL CEP | 关系说明 |
---|---|---|
carInfoStream | carInfo_stream | 处理的数据源信息 |
AfterMatchSkipStrategy.skipToNext() | AFTER MATCH SKIP TO NEXT ROW | 数据匹配成功的跳转方式 |
start middle end | start middle end | 事件声明的名称 |
Next() | start{1} middle{1} | Start与middle之间的空格代表next()函数 |
followedByAny() | Start->end | Start与middle之间的->符号 |
Blink SQL CEP的业务处理方式中,将代码中的函数关系比如oneOrMore()等匹配规则使用类似正则的方式进行匹配的处理