进击的小羊

  1. 首页
  2. 大数据
  3. 正文

Flink CEP和Blink SQL CEP 详解

2021年3月8日 510点热度 1人点赞 0条评论

什么是CEP

Complex Event Processing (CEP)是基于Flink实现的复杂事件处理库,根据模式匹配方式来抽取数据流中符合要求的各种数据进行处理。

详细内容参考官方文档

主要定义

时间相关

  1. 事件时间: 事件中源于事件本身的时间
  2. 处理时间: 事件开始处理的时间
  3. 进入时间: 进入Flink的时间

在不指定时间方式下,默认情况下使用处理时间(Processing Time)处理数据

事件定义

  • 简单事件:主要处理单一事件,比如对车辆的相关信息进行统计,超速10次会报警1次
  • 复杂事件:由多个事件组成的事件

Pattern

Pattern指用来匹配某些事件的规则方式,多个Pattern构成一个序列完成对复杂事件的筛选匹配。可以根据自定义规则指定从一个Pattern到下一个Pattern的状态变化,完成整个复杂事件的状态的转化。

Individual Patterns

Individual Patterns可以是一个匹配一次或者多次的的规则

比如如果pattern为a b+ c? d,说明匹配到a事件1次,b事件1次以上,c事件0次或1次,d事件,会匹配到结果。

Combining Patterns

多个匹配规则按照连续性的不同组成为一个序列

匹配方式

不能以notFollowedBy()结尾

详细匹配规则参考

NFA

NFA(Non-determined Finite Automaton),不确定有限状态机
在Pattern过程中涉及到主要两个过程

  1. Pattern匹配过程,会导致计算状态结果的缓冲区增加;
  2. 匹配提取,从状态缓冲区中,提取出匹配的结果

ComputationState保存了事件的状态,NFAState中使用private Queue<ComputationState> completedMatches;保存了所有事件的状态

NFA通过NFACompiler将当前创建的Pattern序列转换为NFA的状态转换关系

State Transition

State: 对应代码类名StateType
  • Start
  • Final
  • Normal
  • Stop

Transition: 对应代码类名StateTransitionAction

  • Take: 事件存入sharedBuffer
  • Ignore: 事件丢弃
  • Process: 事件在目标state进行转化

状态转换

CEP使用场景

CEP主要应用于事件与事件、事件与状态之间存在逻辑关系的处理。

如何使用Flink CEP

使用场景描述
车速大于10且小于50后第二条车速大于50且小于90,两条记录中速度差值大于30

AfterMatchSkipStrategy afterMatchSkipStrategy = AfterMatchSkipStrategy.skipToNext();

Pattern<CarInfo, ?> pattern = Pattern.<CarInfo>begin("start", afterMatchSkipStrategy).where(
        new SimpleCondition<CarInfo>() {
            @Override
            public boolean filter(CarInfo subEvent) {
                return subEvent.getSpeed() > 10 && subEvent.getSpeed() < 50;
            }
        }
).times(1).next("middle").where(new IterativeCondition<CarInfo>() {
    @Override
    public boolean filter(CarInfo value, Context<CarInfo> ctx) throws Exception {
        Iterable<CarInfo> start = ctx.getEventsForPattern("start");
        Iterator<CarInfo> iterator = start.iterator();
        if (iterator.hasNext()) {
            CarInfo midCarInfo = iterator.next();
            return value.getSpeed() < 90 && value.getSpeed() > 50 && Math.abs(midCarInfo.getSpeed() - value.getSpeed()) > 30;
        }
        return false;
    }
}).times(1);

PatternStream<CarInfo> patternStream = CEP.pattern(carInfoStream, pattern);

DataStream<Alert> result = patternStream.process(
        new PatternProcessFunction<CarInfo, Alert>() {
            @Override
            public void processMatch(
                    Map<String, List<CarInfo>> pattern,
                    Context ctx,
                    Collector<Alert> out) throws Exception {
                Alert alert = new Alert();
                alert.setId(pattern.size());
                out.collect(alert);
            }
        });

如何使用Blink SQL CEP

相同场景

select *
from carInfo_stream
MATCH_RECOGNIZE(
    MEASURES
        device_id
    ONE ROW PER MATCH
    AFTER MATCH SKIP TO NEXT ROW
    PATTERN (start{1} middle{1})
    DEFINE
        start AS start.speed < 50 and start.speed > 10,
        middle AS middle.speed < 90 and middle.speed > 50 and  abs(middle.speed-start.speed) > 30
)

Flink CEP 和 Blink SQL CEP部分对应关系

针对上例中两者之间的联系更有利于理解Blink SQL CEP的实现
两者间关系如下

Flink CEP Blink SQL CEP 关系说明
carInfoStream carInfo_stream 处理的数据源信息
AfterMatchSkipStrategy.skipToNext() AFTER MATCH SKIP TO NEXT ROW 数据匹配成功的跳转方式
start middle end start middle end 事件声明的名称
Next() start{1} middle{1} Start与middle之间的空格代表next()函数
followedByAny() Start->end Start与middle之间的->符号

Blink SQL CEP的业务处理方式中,将代码中的函数关系比如oneOrMore()等匹配规则使用类似正则的方式进行匹配的处理

标签: Flink
最后更新:2022年3月9日

qizi

一生懸命,步履不停

点赞
分类
  • Java
  • 大数据
  • 数据库
  • 杂项
文章目录
  • 什么是CEP
    • 主要定义
  • CEP使用场景
  • 如何使用Flink CEP
  • 如何使用Blink SQL CEP
  • Flink CEP 和 Blink SQL CEP部分对应关系

COPYRIGHT © 2022 进击的小羊. ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang