来源:安瑞哥是码农
上两篇文章分别测试了 Flink 跟 Spark 以流的方式读取 HDFS 数据和 kafka 数据,做简单的数据处理之后,写入 Elasticsearch。
然后分别用手动 cancel 的方式,以及设计程序遇到数据问题的方式,分别都退出进程。
最后,通过彼此的 checkpoint 来恢复数据,从最终的恢复效果来看,在没有采取其他任何辅助措施的情况下,Flink 跟 Spark 都不能实现数据的「精确一次性」。
但好在,对于一个生产系统来说,我们在乎的,是数据入库之后的「最终一致性」,而对于这个要求,通过设置每条数据的唯一 id,就能搞定。
但是,如果这个时候,咱们把之前的「简单 ETL 入库」,换成「聚合结果入库」,情况还能一样吗?
理论上分析,如果这个时候跟之前一样,中间过程数据出现了重复,那么聚合结果就要比预期的要大,即便这个时候设置了唯一 id (利用去重表),也弥补不了这个错误。
那到底实际情况是怎样的呢?咱们再来实测一次。
0. 场景准备
数据源还是用上次写入到 Kafka 的那一份一千多万的数据,而为了更好的演示最终的聚合结果,这次把写入结果的数据库从 ES 换成 Doris 的去重模型表。
具体流程是这样式的:
同时设计的 Doris 表结构如下:
CREATE TABLE `agg_result_table` (
`client_ip` varchar(50),
`sum` int(11)
)
UNIQUE KEY(`client_ip`)
DISTRIBUTED BY HASH(`client_ip`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
先上 Flink SQL API 的代码:
package com.anryg.doris
import java.time.Duration
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
/**
* @DESC: 通过Flink SQL API读取kafka数据,对数据进行聚合,最后把结果写入到Doris去重表,测试聚合状态的精确一次性
* @Auther: Anryg
* @Date: 2024/4/18 11:29
*/
object FlinkSQLAggStatusTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(100000L)
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)) //新的设置state backend的方式
env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkSQLAggStatusTest")
env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //设置checkpoint记录的保留策略
env.getCheckpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(1L))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val tableEnv = StreamTableEnvironment.create(env)
/**第一步:读取kafka数据源*/
tableEnv.executeSql(
"""
Create table data_from_kafka(
|client_ip STRING,
|domain STRING,
|`time` STRING,
|target_ip STRING,
|rcode STRING,
|query_type STRING,
|authority_record STRING,
|add_msg STRING,
|dns_ip STRING
|)
|with(
|'connector' = 'kafka',
|'topic' = 'test',
|'properties.bootstrap.servers' = '192.168.211.107:6667',
|'properties.group.id' = 'FlinkSQLAggStatusTest',
|'scan.startup.mode' = 'timestamp', //指定用时间戳的读取方式
|'scan.startup.timestamp-millis' = '1713182435000', //指定消费的时间戳位置
|'value.format'='csv', //确定数据源为文本格式
|'value.csv.field-delimiter'=',' //确定文本数据源的分隔符
|)
""".stripMargin)
/**第二步:创建Doris映射表*/
tableEnv.executeSql(
s"""
|CREATE TABLE agg_from_flink (
|`client_ip` STRING,
|`sum` BIGINT
|)
| WITH (
| 'connector' = 'doris',
| 'fenodes' = '192.168.221.173:8030',
| 'table.identifier' = 'example_db.agg_from_flink02',
| 'username' = 'root',
| 'password' = 'xxxx',
| 'sink.label-prefix' = '${args(0)}' //导入标签前缀,每次要不一样
|)
""".stripMargin)
/**第三步:数据写入到Doris表中*/
tableEnv.executeSql(
"""
|INSERT INTO agg_from_flink
|SELECT
|client_ip,
|count(*)
|from
|data_from_kafka
|GROUP BY
|client_ip
""".stripMargin)
}
}
再上 Spark structed streaming API 的代码:
package com.anryg.bigdata.streaming.doris
import com.alibaba.fastjson.JSONObject
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
/**
* @DESC: 测试 Spark 在流式模式下的聚合计算,并将结果写入到Doris,测试结果是否能做到精确一次性
* @Auther: Anryg
* @Date: 2024/4/18 11:23
*/
object SparkSQLAggStatusTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkSQLAggStatusTest").setMaster("local[*]")
val spark = SparkSession.builder()
.config("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")//指定流计算的状态存储为RocksDB,3.2之后支持
.config(conf)
.getOrCreate()
val rawDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.211.107:6667")
.option("subscribe", "test")
.option("failOnDataLoss",false)
.option("fetchOffset.numRetries",3)
.option("startingTimestamp","1713182435000") //通过时间戳指定消费位置
.load()
import spark.implicits._
val df1 = rawDF.selectExpr("CAST(value AS string)")
.map(row =>{
val line = row.getAs[String]("value")
val fieldArray:Array[String] = line.split(",")
fieldArray
}).map(array => {
var client_ip = ""; var domain = ""; var time = ""; var target_ip = ""; var rcode = ""; var query_type = ""; var authority_record = ""; var add_msg = ""; var dns_ip = ""
try {
client_ip = array(0)
domain = array(1)
time = array(2)
target_ip = array(3)
rcode = array(4)
query_type = array(5)
authority_record = array(6)
add_msg = array(7)
dns_ip = array(8)
} catch {
case e:Exception => println(array,e)
}
(client_ip, domain, time, target_ip, rcode, query_type, authority_record, add_msg, dns_ip)
}).toDF("client_ip","domain","time","target_ip","rcode","query_type","authority_record","add_msg","dns_ip")
df1.createOrReplaceTempView("data_from_kafka")
/**聚合计算逻辑,取出聚合结果*/
val df2 = spark.sql(
"""
|SELECT
|client_ip,
|count(*) AS sum
|FROM data_from_kafka
|GROUP BY
|client_ip
""".stripMargin)
/**必须要对结果的格式进行重新定义为json string,否则直接无法写入Doris*/
val df3 = df2.map(row => {
val json = new JSONObject(2) /**将数据封装为json对象*/
val client_ip = row.getString(0)
val count = row.getLong(1)
json.put("client_ip",client_ip)
json.put("sum", count)
json.toJSONString
}).toDF()
df3.writeStream
.outputMode(OutputMode.Complete()) //确定输出模式,因为是聚合结果,要用complete模式
.format("doris")
.option("doris.table.identifier", "example_db.agg_from_spark01")
.option("doris.fenodes","192.168.221.173:8030")
.option("user","root")
.option("password","xxxx")
.option("checkpointLocation","hdfs://192.168.211.106:8020/tmp/spark_checkpoint/SparkSQLAggStatusTest") /**这个必须加上,记录offset以及计算状态*/
.start()
.awaitTermination()
}
}
简单起见,这次只做了一个非常 easy 的聚合。
1. 基准结果测试
开始,分别执行 Flink 跟 Spark 的代码,将准备好的数据源在没有程序中断的情况下,全部入库,目的是为了测试整个代码逻辑,以及流程的合理性。
在跑之前,我们先来看一眼原始数据的聚合结果:
取聚合数量排在前20的部分结果,用作后续的对比(事实证明,只用这部分就够了)。
接着,用 Flink 代码也完整跑一遍,得到如下的最终结果:
可以看到,跟上面的标准结果保持一致。
然后,再用 Spark 代码完整跑一遍,得到如下的最终结果:
同样,也跟上面的标准结果保持一致。
说明,当前无论是 Flink 还是 Spark 的代码逻辑,以及设计的这个写入流程,是没有问题的。
2. Flink 聚合状态恢复测试
这一次,咱们再把程序先给跑起来,把数据写入到另外一个 Doris 去重表里,然后,挑一个良辰吉时,把任务 cancel 掉。
此时,写入到 Doris 表的数据情况为(此时的数据还没有完全跑完):
接着,再利用 Flink 的精确一次性语义,通过 checkpoint 进行状态恢复,看最终写入到表里的聚合结果,是否符合预期。
等剩下的所有数据跑完,全部写入到 Doris 去重表后的数据结果为:
经过对比,有点出乎我意料的是,最终这个聚合结果是符合预期的(如果出现数据重复,会导致聚合结果比预期要大)。
说明,基于当前聚合计算的场景下,Flink 利用其「精确一次」语义进行状态恢复时,是真的可以达到结果的「最终一次性」的。
但是,这里需要注意的一点,那就是 Flink 的checkpoint 时间间隔问题,不宜设置过长,这样会导致任务重启的时候,进程还没有来得及 checkpoint,会重复读取之前所有处理过的数据(你猜,这样会给最终的聚合结果带来什么影响?)。
3. Spark 聚合状态恢复测试
也是先把进程跑起来,然后,瞅准一个时机,把进程 kill 掉,这个时候,写入到 Doris 表的数据情况如下:
接着,利用 Spark 的 checkpoint 进行状态恢复(直接重启就行)。
最终,写入到 Doris 的结果是这样的:
经过比对,跟预期的正确结果保持一致。
说明,基于当前聚合计算的场景下,Spark 直接重启进程进行状态恢复时,也是可以达到结果的「最终一次性」的。
最后
从这次的测试结论来看,由于有了前两次的刻板映像,所以一开始,我会不自觉的认为,无论是 Spark 还是 Flink,通过状态恢复后,依然会出现数据重复,这样也就会导致最终的聚合结果「比实际值大」。
但经过实测之后发现,并没有,并且这两者「聚合状态恢复」后的结果,都是符合预期的正确结果。
只不过在测试过程中,有几个比较有意思的现象,那就是一开始,我都用本地 idea 环境跑的时候(相同资源,相同并行度情况下),Flink 的入库效率要明显低于 Spark。
但是,换成集群环境后,这俩货的执行效率,就发生了倒挂。
不知道你对本次测试的过程有没有兴趣,如果有,可以关注我周末的直播,咱们一起再来玩一遍如何?
最后,你可以思考一个问题,如果我在状态恢复时,把 Flink 的「精确一次性」换成「至少一次性」,你觉得最终的聚合结果,还会是对的吗?