Doris写ES,Spark、Flink以及DataX,谁更拉垮?



来源:安瑞哥是码农


为啥非要执着把 Doris 中的数据导入到 Elasticsearch (下称ES)呢,而且还用了3种方式做对比?


这不这些天在测试 Doris 的倒排索引功能嘛,上两篇文章分别测试了添加倒排索引后的字段,在进行全文检索,以及聚合计算时的性能表现。


那么接下来,我就想着基于相同的查询场景,来一场 Doris 跟 ES 的正面 PK,看到底它俩在查询、存储、以及易用性上有多大的差异,顺便验证一下网络上的文章到底有多大水分。


既然是PK,就要尽可能控制条件变量,那么首先就要保证数据集是一样的,因为之前写入到 Doris 表的数据来源,现在已经不记得了,公平起见,就只能想着把这张 Doris 表的数据,给导入到 ES 表中。


本来以为挺简单一件事,但是呢,在执行的时候发现,幺蛾子不断,期间有好几次想砸键盘的冲动,但好在克制住了。


今天这篇文章,就来跟你唠唠,我把一张超过8亿数据量的 Doris 表,导入到 ES 中所经历的波折。




0. 技术选型


可别看不起一个小小的数据导入,当你真要着手去实现的时候,会发现,咦... 好像没有那么容易耶。


从官方文档来看,Doris 目前的生态还是比较丰富的,支持各种可以跟外部数据源的关联,关键字一搜 Elasticsearch 发现,它还真支持 ES 的外表。



只不过让人遗憾的是,它这个外表只能支持 ES 的数据在 Doris 中使用,反过来则不行(这一点也跟官方的人确认过了),尴了个尬。


官网继续往下翻,暂时没有找到相对便捷的导入方式。


于是,就只能出大招了,以我的习惯,像这种导数据的活,优先考虑用 Spark、不行再用 Flink,还不行再试试其他导入工具,比如 DataX 之类的。



1. Spark 导入方式


这个方式,是我除数据库本身能够提供的导入功能之外,第一个想到的,也是我一直以来的习惯,因为对于当下的大数据生态圈来说,Spark 就像一把能打通一切的「万能钥匙」,它几乎能跟所有主流的,非主流的数据库玩到一起。


所以这个时候,把 Spark 作为 Doris 跟 ES 之间的数据传输管道,自然也完全没有问题(官方文档明确支持)。


Spark 读取 Doris 表,再写 ES 表(索引)的完整代码如下:

package com.anryg.bigdata.doris

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveModeSparkSession}

/**
  * @DESC: Spark 读取 Doris 表数据,然后写入 ES
  * @Auther: Anryg
  * @Date: 2024/3/6 09:19
  */

object SparkSQLFromDoris2ES {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("ReadDoris2ES")/*.setMaster("local[*]")*/
        val spark = SparkSession.builder().config(conf).getOrCreate()

        /**创建跟Doris的映射表*/
        spark.sql(
            """
              |CREATE
              |TEMPORARY VIEW spark_doris
              |USING doris
              |OPTIONS(
              |  "
table.identifier"="example_db.dns_logs_from_kafka",
              |  "
fenodes"="192.168.221.173:8030",
              |  "
user"="root",
              |  "
password"="...."
              |);
            "
"".stripMargin)

        /**读取Doris的表数据*/
        val df01 = spark.sql(
            """
              |select * 
              |from 
              |spark_doris
            "
"".stripMargin)

        import spark.implicits._

        /**对数据进行简单处理*/
        val targetDF = df01.map(row =>{
                val client_ip = row.getString(0)
                val domain = row.getString(1)
                val time = row.getString(2)
                val target_ip = row.getString(3)
                var rcode = 99
                try {
                    rcode = row.getInt(4)
                } catch {
                    case e:Exception => println(row,e) //捕获异常数据异常情况,防止进程因此退出
                }
                var query_type = 99
                try {
                    query_type = row.getInt(5)
                } catch {
                    case e:Exception => println(row,e)
                }
                val authority_record = row.getString(6)
                val add_msg = row.getString(7)
                val dns_ip = row.getString(8)

                val id = client_ip + domain + time //设计写入ES的_id,保证数据记录唯一不重复
                (id, client_ip, domain, time, target_ip, rcode, query_type, authority_record, add_msg, dns_ip)
            }).toDF("id","client_ip","domain","time","target_ip","rcode","query_type","authority_record","add_msg","dns_ip")

        /**对处理后的数据写入ES*/
        targetDF.write
            .format("org.elasticsearch.spark.sql")  //指定外部输出为ES
            .option("es.nodes","192.168.221.173")
            .option("es.port","9201")
            .option("es.write.operation","upsert")
            .option("es.mapping.id","id"//确定ES表的_id字段
            .option("es.mapping.exclude","id"//_id不作为常规字段写入表中
            .mode(SaveMode.Append/**追加写入*/
            .save("dns_logs02")
    }
}


跟着官方文档的要求,整个写入数据的代码很简单。


PS:不知你有没有发现一个问题,Spark 的 API 可以允许 SQL 跟 算子在同一个代码中混用,但 Flink 不可以。


运行后,本地模式以及集群模式能跑起来,只不过呢,导入过程很不稳定,大概导入到5千万左右数据量的时候,就会抛出如下的异常:


导入到当前数据量时Spark进程异常


Spark进程异常信息


重试了多次之后,以上异常依旧,关键让人恼火的是,这个报错信息你几乎找不到有价值的信息,它既不是数据本身的问题,也不是字段类型映射的问题。


它就告诉你:老子就强行给你 aborted 掉了,你能咋滴?


翻看了几乎所有能看的日志,spark 任务的,Doris 服务端的,都找不到有价值的线索,关键,此时的集群系统负载是很低的,就让人比较生气。



2. Flink 导入方式


既然用 Spark 不靠谱,那用 Flink 会不会扭转局面呢?咱们来试一下。


还是根据官网提供的 API 示例,利用 Flink SQL API,经过调试后,能跑通的代码如下:


package com.anryg.doris

import java.time.Duration

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
  * @DESC: 利用 Flink SQL 读取 Doris 表,写 ES
  * @Auther: Anryg
  * @Date: 2024/3/5 17:26
  */

object FlinkSQLFromDoris2ES {

    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.enableCheckpointing(10000L)
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true)) //新的设置state backend的方式
        env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkSQLFromDoris2ES")
        env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION//设置checkpoint记录的保留策略
        env.getCheckpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(1L))
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)

        val tableEnv = StreamTableEnvironment.create(env)

        /**映射读取的Doris表*/
        tableEnv.executeSql(
            s"""
              |CREATE TABLE data_from_doris (
              |`client_ip` STRING,
              |domain STRING,
              |`time` STRING,
              |target_ip STRING,
              |rcode STRING,
              |query_type STRING,
              |authority_record STRING,
              |add_msg STRING,
              |dns_ip STRING
              |)
              |    WITH (
              |      'connector' = 'doris',
              |      'fenodes' = '192.168.221.173:8030',
              |      'table.identifier' = 'example_db.dns_logs_from_kafka',
              |      'username' = 'root',
              |      'password' = 'xxxx',
              |      'sink.label-prefix' = '${args(0)}'  //导入标签前缀
              |)
            "
"".stripMargin)


        /**映射要写入的ES表*/
        tableEnv.executeSql(
            """
              |CREATE TABLE data2es (
              |`client_ip` STRING,
              |domain STRING,
              |`time` STRING,
              |target_ip STRING,
              |rcode STRING,
              |query_type STRING,
              |authority_record STRING,
              |add_msg STRING,
              |dns_ip STRING,
              |PRIMARY KEY (client_ip,domain,`time`) NOT ENFORCED
              |) WITH (
              |  'connector' = 'elasticsearch-7',
              |  'hosts' = 'http://192.168.221.173:9201',
              |  'index' = 'dns_logs04'
              |);
            "
"".stripMargin)

        /**写入数据*/
        tableEnv.executeSql(
            """
              |INSERT INTO data2es
              |select
              |*
              |from
              |data_from_doris
            "
"".stripMargin)
    }
}

跑是真的能跑起来,数据也是真的能写进去。


只不过呢,跟 Spark 表现出的毛病一样,写到中途就不行了,虽然过程中它会拼了命的自动重试,然而,当数据写入到6千多万的时候,就怎么着都写不动了。


Flink能写进去的最大数据量



Flink子任务重试的次数


Flin任务运行的时间

Flin任务运行时抛出的异常


努力是真的努力,但不行也是真的不行。



3. DataX导入方式


既然 Spark 跟 Flink 都不行,那还剩一招,用 DataX ,毕竟之前用过它来导 MySQL 的数据到 Clickhouse,还算靠谱。


虽然 DataX 没有 Doris Reader 插件,但是因为 Doris 支持 Mysql 协议,所以直接用 Mysql reader 就好了,而至于 ES writer,它是支持的。


对于当前的导入任务,其配置如下:


{
  "job": {
    "content": [
      {
        "reader": {
          "name""mysqlreader",
          "parameter": {
            "username""root",
            "password""xxxx",
            "splitPk""time",
            "connection": [
              {
                "querySql": [
                "select concat(client_ip,domain,time) as id,* from dns_logs_from_kafka;"
              ],
                "jdbcUrl": [
                  "jdbc:mysql://192.168.221.173:9030/example_db"
                ]
              }
            ]
          }
        },
        "writer": {
          "name""elasticsearchwriter",
          "parameter": {
                "endpoint""http://192.168.221.173:9201",
                "index""dns_logs03",
            "type""default",
            "column": [
              {"name""id""type""id"},
              {"name""client_ip""type""text""norms"false},
              {"name""domain""type""text""norms"false},
              {"name""time""type""keyword"},
              {"name""target_ip""type""keyword"},
              {"name""rcode""type""keyword"},
              {"name""query_type""type""keyword"},
              {"name""authority_record""type""text""norms"false},
              {"name""add_msg""type""keyword"},
              {"name""dns_ip""type""keyword"}
            ]            
          }
        }
      }
    ]
  }
}


导入任务启动后,跑是真的能跑,但是能 hold 住的数据量有限,那也是真的有限,试了很多次,一般也就2百万左右的水平。


然后,就歇菜了。


也是一样,抛出的异常中透露着诡异,给你一种无缘无故、莫名其妙的无力感。



总结


从这次 Doris 数据导入 ES 用的3个工具的实践对比来看,都不能满足要求。


核心原因在于,要导入的这张 Doris 表,数据量实在是太大,想通过「一次性」的方式来搞定,目前来看是行不通的。


虽然这3个工具抛出的异常原因都非常模糊,但有一点可以确定,那就是,每次报错,问题都出在任务的上游,也就是读取 Doris 表的阶段。


这也从侧面反映出一个问题,那就是:针对大数据量时,这3个工具对 Doris 表的全表读取默认策略还不够友好,或者说这个过程还有很大的优化空间


同时也进一步说明,一些所谓的解决方案,在遇到真正的大数据量时,都显得那么的脆弱无比,不堪一击。


那么既然目前这3个方案都行不通,你猜,我接下来会用什么办法,来搞定这件事情?


最后,在这次数据导入的对比测试中,Spark、Flink 还有 DataX,你觉得谁的表现最差劲?




请使用浏览器的分享功能分享到微信等