来源:Apache Flink
摘要:本文整理自阿里云 Flink 存储引擎团队李晋忠,兰兆千,梅源关于阿里云实时计算企业级状态存储引擎 Gemini 的研究,内容主要分为以下五部分:
1. 流计算状态访问的痛点
01
流计算状态访问的痛点
Flink 作为有状态的流计算系统[1] ,状态存储引擎在其中扮演着重要角色。Flink 中状态 (State) 用来存储计算的中间结果或者历史的事件序列(如图 1-1 所示)。以两个最常见的场景为例:
聚合分析类 (Agg) 算子中,当流入的数据每次完成计算后,会将当前计算结果存储到状态中,当后续新数据到来时,可以依赖上一次的计算结果做增量计算;
双流/多流 Join 类算子中,每条流上的数据,会和其他几条流的历史数据做 Join 条件匹配,所以每条流需要用状态把过去一段时间流入的事件序列全部保存下来。

图 1-1. Flink 状态用来存储计算中间结果或者历史事件序列
当 Flink 作业状态规模较大时,状态存储引擎很难把全量状态数据存储到内存中,往往会将部分冷数据保存在磁盘上。内存和磁盘在访问性能和延迟方面的差异是巨大的,IO 访问很容易成为数据处理的瓶颈,在 Flink 计算过程中如果某个算子需要频繁从磁盘上加载状态数据的话,这个算子就很容易成为整个作业吞吐的性能瓶颈。因此,状态存储引擎在很多时候是决定 Flink 作业性能的关键因素。
1.1 RocksDB 状态后端的问题
目前社区生产可用的状态存储引擎是基于 RocksDB 的实现。RocksDB 作为一个通用的 KV 存储引擎,并不完全适合流式计算场景。我们在实际生产使用和用户反馈中,发现其具有以下痛点:
Flink 周期性 Checkpoint 使得 RocksDB 性能变差, 且容易出现 CPU 尖峰,影响集群稳定性。
在 Flink 容错机制中,作业会定期触发 Checkpoint,生成全局状态快照用于故障恢复。Flink 每次触发 Checkpoint 时,会将 RocksDB 内存中的数据刷盘生成新的文件,这会带来很多负面影响:
造成不必要的 cache miss,读磁盘变多,性能会变差;
内部 Log 整理频率更高,让系统整体的 CPU 和 IO 开销更大;
在 Checkpoint 期间容易形成 CPU 尖峰(如图 1-2 所示),导致集群产生突发的资源争抢,用户也很难提前预估合理的集群资源。
图 1-2. Flink 作业周期性 Checkpoint 导致周期性 CPU 尖峰
扩缩并发状态恢复很慢,特别是在缩并发的场景。
以 Flink-1.18 RocksDB 两个并发缩成一个并发为例,缩并发时需要从原先 DB 实例中将有效的 Key-Value 数据遍历出来,插入到新的 DB 实例中,整个过程会涉及很多的 IO 读写操作,速度会相对较慢(注:Flink-1.19 以后预期可以支持文件级别剪裁,对该过程有所加速);特别是对于大状态的场景,这个状态恢复过程可能会达到几十分钟级别。
强依赖本地盘,本地盘空间写满后,作业无法正常运行。
RocksDB 中状态数据必须存储在本地磁盘。然而本地盘容量一般是受限的,RocksDB 经常会遇到本地盘写满导致作业无法稳定运行的情况,只能通过扩容磁盘或者扩并发来解决,系统整体的扩展性较差。
02
企业级状态存储引擎 Gemini
2.1 核心架构

2.2 存算分离与冷热分离--增强磁盘容量扩展性
在云原生部署环境下,本地磁盘容量一般是有受限的。RocksDB 在设计上需要将全量状态数据存储到本地磁盘中,扩展性较差。Gemini 支持状态数据文件的远端存储和访问,当本地磁盘容量不足时,可以将部分冷数据存储到远端分布式文件系统中,从而可以摆脱本地磁盘的容量限制。用户不必因存储用量不足而采取扩并发的方法,可以节约很多成本。
2.3 状态懒加载与延迟剪裁--大幅提升启动和扩缩容速度
为了解决大状态场景下作业恢复耗时久,作业断流时间很长的问题,新版 Gemini 提供了状态懒加载(LazyRestore)的功能。如图 2-2 所示,传统的状态恢复方式下,需要等待远端检查点文件同步下载到本地后,用户作业才可以正常运行,处理业务数据。在状态懒加载模式下,状态恢复时只需要下载少量元数据,就可以让作业启动处理用户数据,然后用异步下载的方式将远端检查点文件下载到本地;下载过程中,算子可以直接读远端的状态数据完成计算。
扩缩并发也是用户常见的操作。与简单作业恢复不同的是,扩缩并发涉及到状态的剪裁,即处理冗余数据。不同于 RocksDB 在扩缩并发时需要遍历所需 key-value 数据才能恢复作业,Gemini 可以直接用原有文件进行元数据的拼接,快速恢复 DB 实例,开始处理用户数据;而文件中的冗余数据可以异步进行清理,并且在清理过程中几乎不会对状态读写线程的性能造成影响。这一功能称为状态延迟剪裁。
Gemini 利用状态懒加载以及延迟剪裁能够在作业恢复速度上取得非常大的功效,我们对比一下三种不同的恢复方式(见图 2-3 ):
Rocksdb:
状态恢复阶段需要下载状态文件和元数据文件,然后处理冗余数据,处理完成后作业才能成功启动,整体断流时间较长;
Gemini + 延迟剪裁:
只需下载状态文件和元数据文件即可启动,将处理冗余数据的操作异步化,且异步处理期间对读写线程性能几乎影响,可以让作业快速启动,减少断流时间;
Gemini + 状态懒加载 + 延迟剪裁:
进一步将下载状态文件的操作放到异步阶段执行,允许作业可以只下载少量元数据数据就可以启动处理用户数据,大大缩小作业断流时间。异步下载状态文件过程中,作业的性能会从 0 开始逐渐提升,随着远端文件逐步下载到本地,作业性能可以逐渐恢复到正常水平。状态懒加载方式和完全阻塞的下载方式相比,由于下载状态文件期间还可以正常处理数据,作业整体吞吐要更高。
图 2-3. Rocksdb/Gemini/Gemini 状态懒加载三者断流时间对比
2.4 KV 分离--优化双流/多流 Join 性能
■ 2.4.1 KV 分离核心优势
很多 Flink 双流 Join 场景中, 具备 Join 成功率较低、或者状态数据 Value 较长的特点,KV 分离可以在这类作业下发挥性能优势。例如风控场景中,通常只有异常的数据才可能 Join 成功;在实时推荐场景中(如图 2-4 所示),只有推荐算法实际生效的情况下,才可以 Join 成功;这类用户场景特点决定了其对应的 Flink 作业 Join 成功率会很低,同时 Value 存储的业务数据字段很长,开启 KV 分离可以获得极大的性能优势。
Join 场景下 KV 分离的优势来源于两个方面:
Join 算子只需利用 Key 即可判断是否 Join 成功, Value 只有在 Join 成功的情况下才会参与计算;在 Join 成功率低的场景下,KV 分离可以将更多 Key 缓存在 Cache 中,状态访问性能更好;
将状态数据中的大 Value 分离存储,降低主存储数据结构的大小,极大地减少引擎内部冗余数据整理的 CPU 和 IO 开销。

■ 2.4.2 KV 分离支持存算分离
■ 2.4.3 自适应 KV 分离

03
Gemini 性能评测&线上表现
3.1 Flink State Benchmark
测试环境:一台阿里云 ECS i2.2xlarge 实例, 8vCPU, 64G 内存,Nvme SSD 磁盘;
测试设置:使用 Flink State Benchmark[3] 对 Rocksdb/Gemini 纯 State 操作的性能进行对比, Rocksdb 设置 WriteBuffer 64MB (默认 2 个),blockCache 512MB, Gemini 设置总内存 (64MB * 2 + 512MB) 。
测试结果如图 3-1 所示,对于 Flink 流计算场景中占比很大的点查询 (ValueGet/ListGet/MapGet) 操作, 以及写入操作(ValueUpate/ListUpdate/MapUpdate),Gemini 的吞吐性能多数可以到达 Rocksdb 的 2~5 倍。
图 3-1. Gemini/Rocksdb Flink-state-benchmark 性能对比
3.2 Nexmark
测试环境:5 台阿里云 ecs.c7.16xlarge 实例 (1个JM,4个TM), 每台实例 64 vCPU, 128GB 内存,ESSD PL1 云盘;
Rocksdb TPS/core | Gemini TPS/core | Gemini vs Rocksdb | |
---|---|---|---|
Q4 | 84.84 | 146.34 | +72.49% |
Q5 | 97.28 | 120.89 | +24.27% |
Q7 | 23.83 | 27.57 | +15.69% |
Q8 | 566.36 | 597.17 | +5.44% |
Q9 | 40.02 | 92.57 | +131.31% |
Q11 | 79.5 | 138.41 | +74.10% |
Q12 | 437.69 | 475.82 | +8.71% |
Q16 | 51.01 | 63.6 | +24.68% |
Q17 | 439.89 | 497.94 | +13.20% |
Q18 | 132.06 | 236.62 | +79.18% |
Q19 | 161.81 | 278.96 | +72.40% |
Q20 | 36.09 | 114.39 | +216.96% |
3.3 状态恢复速度测试

3.4 KV 分离效果测试
测试设置:选择 Nexmark Q20 Join 作业作为 KV 分离的性能测试 Benchamark,并适当扩大数据规模 (EventsNum=400M/800M) , 使得其更贴合双流 Join 大状态场景的情况,其他测试环境和设置与 3.2 Nemark 保持一致,分别测试 Gemini 在关闭/开启 KV 分离情况下的性能表现。
测试结果如图 3-3 所示,在 Q20 双流 Join 场景下,Gemini 开启 KV 分离后性能优化效果显著,作业吞吐能力可以提升 50% ~ 70% 以上。
图 3-3. Nexmark Q20 Gemini 关闭/开启KV分离吞吐对比
3.5 线上表现
Gemini 作为阿里云实时计算 Flink 引擎的默认状态后端,经历了三年多的不断优化和打磨,性能、稳定性和易用性不断提升,截至目前,阿里巴巴集团内部的实时计算平台和公有云的实时计算 Flink 服务中,有共计超 50WCU 的有状态作业使用 Gemini 存储引擎,助力实时计算用户高效完成业务目标;
在 VVR-8.0 版本中我们对 Gemini 架构进行了全新升级,截至目前,在阿里巴巴集团内部的实时计算平台,有 53%+ 的有状态 Flink 任务使用了新版 Gemini 引擎,性能和稳定性表现优异,据估算整体作业资源相对于旧版引擎进一步节省约 27%;在公有云实时计算 Flink 版中,截止目前也有 24%+ 的有状态作业使用了新版 Gemini 引擎。
04
结语
Flink 企业级状态存储引擎 Gemini 基于流计算场景特点设计,经历了三年多的不断优化和打磨,性能、稳定性和易用性不断提升。
自 VVR-8.X 版本起,新版 Gemini 在旧版本的基础上,对核心架构和功能都进行了改造升级,相比于 RocksDB , 新版 Gemini 拥有更优的状态访问性能,更快速的扩缩容机制,同时支持 KV 分离、存算分离和状态懒加载;其作为阿里云实时计算 Flink 版的默认状态存储引擎,也经历了阿里巴巴集团和阿里云用户大规模生产实践的考验。
在未来,Gemini 引擎仍将持续地进行优化和改进,提升流计算产品的性能、易用性和稳定性,打造成为最适合流计算场景的状态存储引擎。
05
参考
[1]https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/stateful-stream-processing
[2]https://help.aliyun.com/zh/flink/user-guide/dynamically-update-deployment-parameters
[3]https://github.com/apache/flink-benchmarks/tree/master/src/main/java/org/apache/flink/state/benchmark
[4]https://github.com/nexmark/nexmark
[5]https://help.aliyun.com/zh/flink/