服务端要对客户端埋点进行数据清洗,考虑到闲鱼的DAU已经突破2000w,这个量是非常庞大的,非常消耗服务端资源;
Blink的策略是实时执行的,同样因为资源问题,现在只能同时上线十几个策略。

状态(State):状态是根据flink脚本里面的代码来决定的,最终会有一个
$end$的Final状态转换(StateTransition):State的转换条件,包括
take/proceed/ignore
take: 满足条件,获取当前元素,进入下一状态proceed:不论是否满足条件,不获取当前元素,直接进入下状态(如optional)并进行判断是否满足条件。ignore:不满足条件,忽略,进入下一状态。

Pattern来构建这个NFA,首先用它描述这个不确定性状态机。首先是构建一个 Pattern的一个链表,得到这个链表之后,会将每个Pattern映射成为 State的图,点与点之间会通过 StateTransition来连接。以下面的Python代码为例,看下如何API是如何工作的:以start事件开始,后续跟随一个middle的事件,后面紧跟着一个end事件作为结尾
Pattern.begin("start").where(SimpleCondition())\.followed_by('middle').where(SimpleCondition())\.next_('end').where(SimpleCondition())start、 middle、 end。Pattern里面保存了指向前面节点的引用 previous,整个Pattern链表构建完如下图所示:
end节点的一个引用 Ref,Pattern中会有一个变量指向前一个节点,这样就可以得到一个Pattern的反向链表。classPattern:# 静态方法,用来生成起始的pattern@staticmethoddef begin(self, name):pass# 标记紧接着的事件def followed_by(self, name):pass# 标记不需要紧跟的事件def not_followed_by(self, name):pass# 标记紧跟的事件def next_(self, name):pass# 标记事件循环次数def times(self, times):pass# 标记当前事件触发的条件def where(self, condition):pass# 标记当前事件的and条件def and_(self, condition):pass# 标记当前事件的or条件def or_(self, condition):pass# 用于聚合def group_by(self, fields):pass# 用于聚合,渠道特定字段的值def fields(self, key_by_state_name, field):pass# 用于聚合,统计事件具体的数量def count(self, field, condition):pass
StateTransition。有了Pattern链表,接下来就需要编译器(Compiler)了,它主要是将Pattern链表转化成NFA图,首先来看下NFA的2个核心组件:State和 StateTransition。
classState(object):
def __init__(self, name, state_type):
self.__name = name # 节点的名称,同Pattern的名称
self.__state_type = state_type # 节点的类型:Start/Normal/Stop/Final
self.__state_transitions = [] # 到其他节点的边
Start/Final/Normal/Stop。创建一个
$end$的结束节点(Final)再从后往前创建每个state节点,作为中间节点(
Normal/Stop)最后创建一个开始节点(
Start)

StateTransition。
classStateTransition:
def __init__(self, source_state, action, target_state, condition):
self.__source_state = source_state # 开始的State节点
self.__action = action # 具体action类型:take/ignore/proceed
self.__target_state = target_state # 结束的State节点
self.__condition = condition # 节点之间的判断条件
classConsumingStrategy: STRICT = 0# 严格匹配下个 SKIP_TILL_NEXT = 1# 跳过下一个 SKIP_TILL_ANY = 2# 跳过任意一个 NOT_FOLLOW = 3# 非跟随模式 NOT_NEXT = 4# 非紧邻模式

STRICT: 如果命中了事件了,会进到下个状态SKIP_TILL_NEXT: 如果命中了会进入下一个状态,否则会再当前节点循环,进入ignore的边SKIP_TILL_ANY: 不管是否命中条件,都会一直在当前状态循环NOT_FOLLOW: 如果遇到了一个匹配的,就会进入Stop状态NOT_NEXT: 如果命中一条,则进入Stop状态
followed_by接口会创建 SKIP_TILL_NEXT的节点。Pattern.begin('e1').where(SimpleCondition()).times(3);Times=3的Pattern,编译器在拿到这个Pattern之后,一样先创建一个$end$的Final节点,在处理times的时候,会创建重复的节点,只不过名称不同,不同的点之间用take链接起来,如下图所示:
_pattern = Pattern.begin("start").where(self.start_filter)\.followed_by('middle').where(SimpleCondition())\.next_('end').where(self.end_filter)\.group_by('group_by').fields('start', 'userId')start节点中的 userId作为聚合的节点,我们就会得到如下的 Pattern链表:
group_by节点的时候,我们需要做个特殊处理,判断如果有聚合节点,我们就需要再 $end$节点和前面节点之间插入一个聚合的节点和哨兵位节点,哨兵位节点命名为 $aggregationStartState$,最终效果如下图所示:
$aggregationStartState$节点和 group_by节点之间,是通过proceed结合,不需要满足特定条件就可以执行。State节点 AggregationState:
# 创建聚合节点
def __create_aggregation_state(self, sink_state):
# 渠道聚合节点的condition
_aggregation_condition = self.__current_pattern.get_aggregation_condition()
# 创建AggregationState
not_next = AggregationState(
self.__current_pattern.get_name(),
StateType.Normal,
_aggregation_condition.get_key_by_state_name(),
_aggregation_condition.get_field())
self.__states.append(not_next)
# 获取take的条件
take_condition = self.__get_take_condition(self.__current_pattern)
not_next.add_take(sink_state, take_condition)
# 将游标指向上一个节点
self.__following_pattern = self.__current_pattern
self.__current_pattern = self.__current_pattern.get_previous()
return not_nex
Show me the code
讲了太多原理的东西,接下来看下代码里面如何工作的,先来看下如何来编写一个CEP策略。
需要匹配用户查看3次宝贝详情页
# 1. 创建用来匹配的Pattern
_pattern = Pattern.begin('e1').where(KVCondition('scene', 'Page_xyItemDetail')).times(3)
# 2. 将需要匹配的事件流_batch_data和待匹配的Pattern
# CEP内部会先将pattern转化成NFA,然后再用NFA去匹配事件流
_cep = CEP.pattern(_batch_data['eventSeq'], _pattern)
# 用来选择的逻辑
def select_function(data):
pass
# 3. 匹配完成,通过cep的select接口查询匹配到的结果
self.result = _cep.select(select_function)
CEP.pattern()函数里面,会先创建 NFA,然后去进行匹配,可见整个匹配策略脚本非常的短小精悍。如下代码用来将 Pattern链表转化成 NFA图:
# 最后一个Pattern节点不允许是NotFollowedByif self.__current_pattern.get_quantifier().get_consuming_strategy() == ConsumingStrategy.NOT_FOLLOW:raiseException('NotFollowedBy is not supported as a last part of a Pattern!')# 校验Pattern的名称,必须唯一self.__check_pattern_name_uniqueness()# 校验Pattern的策略self.__check_pattern_skip_strategy()# 首先创建Final节点sink_state = self.__create_ending_state()# 判定是否有聚合节点if self.__current_pattern.get_aggregation_condition() isnotNone:# 首先创建聚合节点 sink_state = self.__create_aggregation_state(sink_state)# 然后创建聚合几点的起始节点 sink_state = self.__create_aggregation_start_state(sink_state)# 创建状态机中的中间节点,此函数会循环知道Start节点的Patternsink_state = self.__create_middle_states(sink_state)# 最后创建Start节点self.__create_start_state(sink_state)# 根据state列表和window来创建NFAreturn NFA(self.__states, self.__window_time, False)执行时间

内存使用


端计算是用Python实现,无法做到像Flink的状态机常驻内存,每次都要重新创建匹配,带来了额外的消耗
在事件流的清洗上面,现在是通过回朔拿到之前的事件流,存在大量的重复计算,后续可以借鉴Flink的Window机制来进行优化。
目前编译器暂时还不支持Group Pattern,后续还要对其进行扩展。
Python脚本现在还是需要手动编写,后续还可以考虑通过DSL来自动生成。
对于Flink的理解
CEP in Flink(1) - CEP规则解析
https://flink.apache.org/
《Efficient Pattern Matching over Event Streams》
https://github.com/apache/flink 1
闲鱼团队是Flutter+Dart FaaS前后端一体化新技术的行业领军者,就是现在!客户端/服务端java/架构/前端/质量工程师面向社会招聘,base杭州阿里巴巴西溪园区,一起做有创想空间的社区产品、做深度顶级的开源项目,一起拓展技术边界成就极致!
*投喂简历给小闲鱼→guicai.gxy@alibaba-inc.com