来源:安瑞哥是码农
为啥非要执着把 Doris 中的数据导入到 Elasticsearch (下称ES)呢,而且还用了3种方式做对比?
这不这些天在测试 Doris 的倒排索引功能嘛,上两篇文章分别测试了添加倒排索引后的字段,在进行全文检索,以及聚合计算时的性能表现。
那么接下来,我就想着基于相同的查询场景,来一场 Doris 跟 ES 的正面 PK,看到底它俩在查询、存储、以及易用性上有多大的差异,顺便验证一下网络上的文章到底有多大水分。
既然是PK,就要尽可能控制条件变量,那么首先就要保证数据集是一样的,因为之前写入到 Doris 表的数据来源,现在已经不记得了,公平起见,就只能想着把这张 Doris 表的数据,给导入到 ES 表中。
本来以为挺简单一件事,但是呢,在执行的时候发现,幺蛾子不断,期间有好几次想砸键盘的冲动,但好在克制住了。
今天这篇文章,就来跟你唠唠,我把一张超过8亿数据量的 Doris 表,导入到 ES 中所经历的波折。
0. 技术选型
可别看不起一个小小的数据导入,当你真要着手去实现的时候,会发现,咦... 好像没有那么容易耶。
从官方文档来看,Doris 目前的生态还是比较丰富的,支持各种可以跟外部数据源的关联,关键字一搜 Elasticsearch 发现,它还真支持 ES 的外表。
只不过让人遗憾的是,它这个外表只能支持 ES 的数据在 Doris 中使用,反过来则不行(这一点也跟官方的人确认过了),尴了个尬。
官网继续往下翻,暂时没有找到相对便捷的导入方式。
于是,就只能出大招了,以我的习惯,像这种导数据的活,优先考虑用 Spark、不行再用 Flink,还不行再试试其他导入工具,比如 DataX 之类的。
1. Spark 导入方式
这个方式,是我除数据库本身能够提供的导入功能之外,第一个想到的,也是我一直以来的习惯,因为对于当下的大数据生态圈来说,Spark 就像一把能打通一切的「万能钥匙」,它几乎能跟所有主流的,非主流的数据库玩到一起。
所以这个时候,把 Spark 作为 Doris 跟 ES 之间的数据传输管道,自然也完全没有问题(官方文档明确支持)。
Spark 读取 Doris 表,再写 ES 表(索引)的完整代码如下:
package com.anryg.bigdata.doris
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* @DESC: Spark 读取 Doris 表数据,然后写入 ES
* @Auther: Anryg
* @Date: 2024/3/6 09:19
*/
object SparkSQLFromDoris2ES {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ReadDoris2ES")/*.setMaster("local[*]")*/
val spark = SparkSession.builder().config(conf).getOrCreate()
/**创建跟Doris的映射表*/
spark.sql(
"""
|CREATE
|TEMPORARY VIEW spark_doris
|USING doris
|OPTIONS(
| "table.identifier"="example_db.dns_logs_from_kafka",
| "fenodes"="192.168.221.173:8030",
| "user"="root",
| "password"="...."
|);
""".stripMargin)
/**读取Doris的表数据*/
val df01 = spark.sql(
"""
|select *
|from
|spark_doris
""".stripMargin)
import spark.implicits._
/**对数据进行简单处理*/
val targetDF = df01.map(row =>{
val client_ip = row.getString(0)
val domain = row.getString(1)
val time = row.getString(2)
val target_ip = row.getString(3)
var rcode = 99
try {
rcode = row.getInt(4)
} catch {
case e:Exception => println(row,e) //捕获异常数据异常情况,防止进程因此退出
}
var query_type = 99
try {
query_type = row.getInt(5)
} catch {
case e:Exception => println(row,e)
}
val authority_record = row.getString(6)
val add_msg = row.getString(7)
val dns_ip = row.getString(8)
val id = client_ip + domain + time //设计写入ES的_id,保证数据记录唯一不重复
(id, client_ip, domain, time, target_ip, rcode, query_type, authority_record, add_msg, dns_ip)
}).toDF("id","client_ip","domain","time","target_ip","rcode","query_type","authority_record","add_msg","dns_ip")
/**对处理后的数据写入ES*/
targetDF.write
.format("org.elasticsearch.spark.sql") //指定外部输出为ES
.option("es.nodes","192.168.221.173")
.option("es.port","9201")
.option("es.write.operation","upsert")
.option("es.mapping.id","id") //确定ES表的_id字段
.option("es.mapping.exclude","id") //_id不作为常规字段写入表中
.mode(SaveMode.Append) /**追加写入*/
.save("dns_logs02")
}
}
跟着官方文档的要求,整个写入数据的代码很简单。
PS:不知你有没有发现一个问题,Spark 的 API 可以允许 SQL 跟 算子在同一个代码中混用,但 Flink 不可以。
运行后,本地模式以及集群模式能跑起来,只不过呢,导入过程很不稳定,大概导入到5千万左右数据量的时候,就会抛出如下的异常:
导入到当前数据量时Spark进程异常
Spark进程异常信息
重试了多次之后,以上异常依旧,关键让人恼火的是,这个报错信息你几乎找不到有价值的信息,它既不是数据本身的问题,也不是字段类型映射的问题。
它就告诉你:老子就强行给你 aborted 掉了,你能咋滴?
翻看了几乎所有能看的日志,spark 任务的,Doris 服务端的,都找不到有价值的线索,关键,此时的集群系统负载是很低的,就让人比较生气。
2. Flink 导入方式
既然用 Spark 不靠谱,那用 Flink 会不会扭转局面呢?咱们来试一下。
还是根据官网提供的 API 示例,利用 Flink SQL API,经过调试后,能跑通的代码如下:
package com.anryg.doris
import java.time.Duration
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
/**
* @DESC: 利用 Flink SQL 读取 Doris 表,写 ES
* @Auther: Anryg
* @Date: 2024/3/5 17:26
*/
object FlinkSQLFromDoris2ES {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(10000L)
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)) //新的设置state backend的方式
env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkSQLFromDoris2ES")
env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //设置checkpoint记录的保留策略
env.getCheckpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(1L))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
val tableEnv = StreamTableEnvironment.create(env)
/**映射读取的Doris表*/
tableEnv.executeSql(
s"""
|CREATE TABLE data_from_doris (
|`client_ip` STRING,
|domain STRING,
|`time` STRING,
|target_ip STRING,
|rcode STRING,
|query_type STRING,
|authority_record STRING,
|add_msg STRING,
|dns_ip STRING
|)
| WITH (
| 'connector' = 'doris',
| 'fenodes' = '192.168.221.173:8030',
| 'table.identifier' = 'example_db.dns_logs_from_kafka',
| 'username' = 'root',
| 'password' = 'xxxx',
| 'sink.label-prefix' = '${args(0)}' //导入标签前缀
|)
""".stripMargin)
/**映射要写入的ES表*/
tableEnv.executeSql(
"""
|CREATE TABLE data2es (
|`client_ip` STRING,
|domain STRING,
|`time` STRING,
|target_ip STRING,
|rcode STRING,
|query_type STRING,
|authority_record STRING,
|add_msg STRING,
|dns_ip STRING,
|PRIMARY KEY (client_ip,domain,`time`) NOT ENFORCED
|) WITH (
| 'connector' = 'elasticsearch-7',
| 'hosts' = 'http://192.168.221.173:9201',
| 'index' = 'dns_logs04'
|);
""".stripMargin)
/**写入数据*/
tableEnv.executeSql(
"""
|INSERT INTO data2es
|select
|*
|from
|data_from_doris
""".stripMargin)
}
}
跑是真的能跑起来,数据也是真的能写进去。
只不过呢,跟 Spark 表现出的毛病一样,写到中途就不行了,虽然过程中它会拼了命的自动重试,然而,当数据写入到6千多万的时候,就怎么着都写不动了。
Flink能写进去的最大数据量
Flink子任务重试的次数
Flin任务运行的时间
Flin任务运行时抛出的异常
努力是真的努力,但不行也是真的不行。
3. DataX导入方式
既然 Spark 跟 Flink 都不行,那还剩一招,用 DataX ,毕竟之前用过它来导 MySQL 的数据到 Clickhouse,还算靠谱。
虽然 DataX 没有 Doris Reader 插件,但是因为 Doris 支持 Mysql 协议,所以直接用 Mysql reader 就好了,而至于 ES writer,它是支持的。
对于当前的导入任务,其配置如下:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "xxxx",
"splitPk": "time",
"connection": [
{
"querySql": [
"select concat(client_ip,domain,time) as id,* from dns_logs_from_kafka;"
],
"jdbcUrl": [
"jdbc:mysql://192.168.221.173:9030/example_db"
]
}
]
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://192.168.221.173:9201",
"index": "dns_logs03",
"type": "default",
"column": [
{"name": "id", "type": "id"},
{"name": "client_ip", "type": "text", "norms": false},
{"name": "domain", "type": "text", "norms": false},
{"name": "time", "type": "keyword"},
{"name": "target_ip", "type": "keyword"},
{"name": "rcode", "type": "keyword"},
{"name": "query_type", "type": "keyword"},
{"name": "authority_record", "type": "text", "norms": false},
{"name": "add_msg", "type": "keyword"},
{"name": "dns_ip", "type": "keyword"}
]
}
}
}
]
}
}
导入任务启动后,跑是真的能跑,但是能 hold 住的数据量有限,那也是真的有限,试了很多次,一般也就2百万左右的水平。
然后,就歇菜了。
也是一样,抛出的异常中透露着诡异,给你一种无缘无故、莫名其妙的无力感。
总结
从这次 Doris 数据导入 ES 用的3个工具的实践对比来看,都不能满足要求。
核心原因在于,要导入的这张 Doris 表,数据量实在是太大,想通过「一次性」的方式来搞定,目前来看是行不通的。
虽然这3个工具抛出的异常原因都非常模糊,但有一点可以确定,那就是,每次报错,问题都出在任务的上游,也就是读取 Doris 表的阶段。
这也从侧面反映出一个问题,那就是:针对大数据量时,这3个工具对 Doris 表的全表读取默认策略还不够友好,或者说这个过程还有很大的优化空间。
同时也进一步说明,一些所谓的解决方案,在遇到真正的大数据量时,都显得那么的脆弱无比,不堪一击。
那么既然目前这3个方案都行不通,你猜,我接下来会用什么办法,来搞定这件事情?
最后,在这次数据导入的对比测试中,Spark、Flink 还有 DataX,你觉得谁的表现最差劲?