Doris 亿级大表非导出不可,有哪些可行办法?

来源:安瑞哥是码农


这段时间,算是跟 Doris 大数据量表导出这件事杠上了。


这不,前面的文章分别用了3种主流的方式,对 Doris 一张8亿+数据量的表做了多次导出尝试,但都失败了,于是我不禁怀疑:莫非是数据量大的数据库表都会这样?


直到前天,我把数据库换成了 Clickhouse,用了一张一毛一样数据量的表,做了同样的导出测试,才发现,人家是可以的,至少我用 Spark 就能一次性导出成功。


那这么说来,Doris 是不是只要数据量一大,数据导出就注定要歇菜呢?


带着一脸狐疑,「不应该呀」,继续在官网找可行的解决办法,于是,我就看到了下面的内容:


官网推荐的数据导出方式


与此同时,Doris 官方的小伙伴看到我上次发的文章,也提醒我说,上次用 Flink 的方式导出之所以会每小时失败一次,原因可能在于,BE的「执行内存」默认限制在 2G 以内,且请求超时时间刚好限定是 1 小时


于是今天,我决定分别用 Doris 内置的数据导出方式,以及通过 Flink 调整参数后的方式,来再次进行测试,看问题到底是不是出现在这里。


(PS:本次测试是基于Doris2.0.2,以及 Flink1.15.2 进



1. 官方推荐的导出方式


官方推荐通过创建一个 Export 任务,然后将数据直接输出到 HDFS。


具体做法非常简单,针对我的这个 Doris 表,创建如下的 Export 任务:


EXPORT TABLE example_db.dns_logs_from_kafka
TO "hdfs://192.168.211.106:8020/tmp/from_doris/" 
PROPERTIES
(
    "label" = "doris2hdfs01",
    "column_separator"=",",
    "parallelism" = "6"
)
with HDFS (
"fs.defaultFS"="hdfs://192.168.211.106:8020",
"hadoop.username" = "doris"
);


解释一下其中的参数:


TO:后面跟的是 HDFS 存储的绝对路径,注意,这里必须以「hdfs://」打头,否则导出任务会创建失败(亲自试过)。个人认为这个路径头,跟下面的 fs.defaultFS 重复了,应该可以考虑省掉;


label:导入任务的标签,每次重新创建,名字必须不一样,否则导出任务创建不了。我觉得,做这个限制,有这个必要吗?任务失败重试时,害得我每次都要改这里;


format:导出到 HDFS 后,数据文件的格式,目前支持:CSV、parquet、ORC等,如果不指定这项,默认为 CSV;


column_separator:导出到 HDFS 后,如果上面 format 选择 CSV ,用来指定列数据之间的分隔符;


parallelism:官网说它指的是,导数据时的并行度,但通过我的实践来看,并不是,下面演示。


fs.defaultFS:指定 HDFS 集群中,namenode 的地址,如果你的 HDFS 配置的是 HA,那么用 dfs.nameservices 跟 dfs.ha.namenodes.[nameservice ID]


hadoop.username:导出到 HDFS 之后,其文件的所有者。


在 SQL 客户端启动上面的 Export 任务后,通过 SHOW EXPORT SQL命令,以及查看 HDFS 上生成的文件数,来观察其导出状态。


这里有几个让人疑惑的地方。


疑惑1:


对于这个大表的导出,查看它的进度,永远都是 83%


只要导出任务启动了,无论是已经导出了一个 HDFS 文件,还是多个,这个中间进度永远不变。


直到最后,导出任务失败,进度依然是 83%


就很神奇,这到底是这么算出来的呢?


疑惑2:


从上面这个截图可以看出来,这个导出任务运行到5分钟的时候,就嗝屁了,给出的理由是:query timeout。


但是,官方文档给的超时默认时间,明明说的是2小时呢,怎么就变成5分钟了



你俩到底是谁在耍我?


疑惑3:


不是说好的 parallelism 参数是指定任务并行度吗?



但我明明设置的6个并行度,而从实际看到的情况,却是串行的呢?


如果说真是6个并行度,那么在这个文件夹下面,应该在同一时间会生成6个文件,然后6个线程会同时往6个文件里写数据才对。


而不应该像这个截图所看到的,一次只有1个文件生成,等到这个文件数据写满后(默认1G),再写入下一个。


别告诉我是6个线程同时在写一个文件,我不信有这种脑残的设计。


(PS:即便我后面在文档里,找到了手动打开并行设置的参数,set enable_parallel_outfile = true 但也还是一个鸟样


怎么解决?


显然,这次尝试又双叒失败了,于是就想,是时候要调整导出任务的参数了。


根据昨天 Doris 官方小伙伴的提醒,像这种大数据量的导出,可能需要增加「执行内存」大小,以及「请求超时」时间这两个参数。


于是,我又设置了这两个参数(实践证明,只需要增加超时参数的值就可以),只不过在设置的时候,又躺了坑。


官方文档里告诉你,超时参数是这样的:


而实际上,对于我当前使用的 Doris2.0.2 版本,这个参数变成了 query_timeout(问官方的小伙伴后得知) ,而且是在命令行里设置的。


然后,再次启动导出任务,终于导成功了:


从这个导出 label 可以看出来,我前后经过了足足14次的尝试,才把这个问题搞定,莫非是我太菜了吗。


好在,通过这种 Doris 内置的导出方式,导出效率还挺高的,8亿+条数据,用时不到22分钟


那么接下来,咱再用 Flink 试试,情况会怎么样呢?



2. 调整参数后的 Flink 导出方式


由于被提醒需要增加「执行内存」大小,以及「请求超时」时间这两个参数。


所以,添加了这两个参数之后的, Flink 读 Doris 写 Kafka 的代码就变成了下面这样:


package com.anryg.doris

import java.time.Duration

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
  * @DESC: 利用 Flink SQL 读取 Doris 表,写 Kafka
  * @Auther: Anryg
  * @Date: 2024/3/5 17:26
  */

object FlinkSQLFromDoris2Kafka {

    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.enableCheckpointing(30000L)

        env.setStateBackend(new EmbeddedRocksDBStateBackend(true)) //新的设置state backend的方式
        env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkSQLFromDoris2Kafka")
        env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION//设置checkpoint记录的保留策略
        env.getCheckpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(1L))
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)

        val tableEnv = StreamTableEnvironment.create(env)

        /**映射读取的Doris表*/
        tableEnv.executeSql(
            s"""
              |CREATE TABLE data_from_doris (
              |`client_ip` STRING,
              |domain STRING,
              |`time` STRING,
              |target_ip STRING,
              |rcode STRING,
              |query_type STRING,
              |authority_record STRING,
              |add_msg STRING,
              |dns_ip STRING
              |)
              |    WITH (
              |      'connector' = 'doris',
              |      'fenodes' = '192.168.221.173:8030',
              |      'table.identifier' = 'example_db.dns_logs_from_kafka',
              |      'doris.request.query.timeout.s' = '36000', //防止查询超时,默认1小时
              |      'doris.exec.mem.limit' = '5147483648',   //查询时BE对内存的限制,默认2G
              |      'username' = 'root',
              |      'password' = 'pcl@2024',
              |      'sink.label-prefix' = '${args(0)}'  //导入标签前缀,每次要不一样
              |)
            "
"".stripMargin)


        /**映射要写入Kafka*/
        tableEnv.executeSql(
            """
              |Create table data2kafka(
              |client_ip STRING,
              |domain STRING,
              |`time` STRING,
              |target_ip STRING,
              |rcode STRING,
              |query_type STRING,
              |authority_record STRING,
              |add_msg STRING,
              |dns_ip STRING
              |)
              |with(
              |'connector' = 'kafka',
              |'topic' = 'test',  //参数化表示方式
              |'properties.bootstrap.servers' = '192.168.211.107:6667',
              |'properties.group.id' = 'FlinkSQLFromDoris2Kafka',
              |'value.format'='csv',
              |'value.csv.field-delimiter'=','
              |)
            "
"".stripMargin)

        /**写入数据*/
        tableEnv.executeSql(
            """
              |INSERT INTO data2kafka
              |select
              |*
              |from
              |data_from_doris
            "
"".stripMargin)
    }
}


添加的内容为:


改完之后,再次提交到 YARN 上执行。


果然就行了。


但由于我用的是 yarn session 模式跑的,跑完之后没有来得及去看,然后那个UI界面,没过多久,显示的任务状态就没了(这也是这个模式最操蛋的一点),只能去找它的日志:


证明加上那两个参数之后,是可行的,整个导出过程花了5个多小时,比同数据量的 CK 导出慢了1个多小时



最后


从最近这些天,对 Doris 跟 CK 的数据导出折腾的过程来看,个人的直观感受是,CK 的很多默认设置明显要更加人性化,更加好用(不用增加任何额外设置)。


站在一个使用者的角度,如果在面对数据导出,这种非常简单的业务需求时,还需要使用者仔细研读各种文档说明,明辨不同版本之间,相同功能参数的不同写法。


那就只能说明一点,这个软件的使用成本未免有点过高。


从这次对 Doris 这张8亿+数据量的大表导出来看,起关键作用的,就是那「两个参数」的设置。


但对于是否需要设置这两个参数,以及在哪设置,怎么设置?你无论从导出任务的错误信息,还是在创建导出任务时,必要的参数设置说明上(官方文档),都很难找到对应的线索,和明确的设置方法


这就很尴尬,你如果让用的人自己来猜,那我很有可能就,干脆不用了。


最后,感谢 Doris 官方小伙伴的帮助,否则,这个数据导出,还真不一定能这么快搞定。

请使用浏览器的分享功能分享到微信等