来源:安瑞哥是码农
这段时间,算是跟 Doris 大数据量表导出这件事杠上了。
这不,前面的文章分别用了3种主流的方式,对 Doris 一张8亿+数据量的表做了多次导出尝试,但都失败了,于是我不禁怀疑:莫非是数据量大的数据库表都会这样?
直到前天,我把数据库换成了 Clickhouse,用了一张一毛一样数据量的表,做了同样的导出测试,才发现,人家是可以的,至少我用 Spark 就能一次性导出成功。
那这么说来,Doris 是不是只要数据量一大,数据导出就注定要歇菜呢?
带着一脸狐疑,「不应该呀」,继续在官网寻找可行的解决办法,于是,我就看到了下面的内容:
官网推荐的数据导出方式
与此同时,Doris 官方的小伙伴看到我上次发的文章,也提醒我说,上次用 Flink 的方式导出之所以会每小时失败一次,原因可能在于,BE的「执行内存」默认限制在 2G 以内,且「请求超时时间」刚好限定是 1 小时。
于是今天,我决定分别用 Doris 内置的数据导出方式,以及通过 Flink 调整参数后的方式,来再次进行测试,看问题到底是不是出现在这里。
(PS:本次测试是基于Doris2.0.2,以及 Flink1.15.2 进行)
1. 官方推荐的导出方式
官方推荐通过创建一个 Export 任务,然后将数据直接输出到 HDFS。
具体做法非常简单,针对我的这个 Doris 表,创建如下的 Export 任务:
EXPORT TABLE example_db.dns_logs_from_kafka
TO "hdfs://192.168.211.106:8020/tmp/from_doris/"
PROPERTIES
(
"label" = "doris2hdfs01",
"column_separator"=",",
"parallelism" = "6"
)
with HDFS (
"fs.defaultFS"="hdfs://192.168.211.106:8020",
"hadoop.username" = "doris"
);
解释一下其中的参数:
TO:后面跟的是 HDFS 存储的绝对路径,注意,这里必须以「hdfs://」打头,否则导出任务会创建失败(亲自试过)。个人认为这个路径头,跟下面的 fs.defaultFS 重复了,应该可以考虑省掉;
label:导入任务的标签,每次重新创建,名字必须不一样,否则导出任务创建不了。我觉得,做这个限制,有这个必要吗?任务失败重试时,害得我每次都要改这里;
format:导出到 HDFS 后,数据文件的格式,目前支持:CSV、parquet、ORC等,如果不指定这项,默认为 CSV;
column_separator:导出到 HDFS 后,如果上面 format 选择 CSV ,用来指定列数据之间的分隔符;
parallelism:官网说它指的是,导数据时的并行度,但通过我的实践来看,并不是,下面演示。
fs.defaultFS:指定 HDFS 集群中,namenode 的地址,如果你的 HDFS 配置的是 HA,那么用 dfs.nameservices 跟 dfs.ha.namenodes.[nameservice ID];
hadoop.username:导出到 HDFS 之后,其文件的所有者。
在 SQL 客户端启动上面的 Export 任务后,通过 SHOW EXPORT SQL命令,以及查看 HDFS 上生成的文件数,来观察其导出状态。
这里有几个让人疑惑的地方。
疑惑1:
对于这个大表的导出,查看它的进度,永远都是 83%:
只要导出任务启动了,无论是已经导出了一个 HDFS 文件,还是多个,这个中间进度永远不变。
直到最后,导出任务失败,进度依然是 83% :
就很神奇,这到底是这么算出来的呢?
疑惑2:
从上面这个截图可以看出来,这个导出任务运行到5分钟的时候,就嗝屁了,给出的理由是:query timeout。
但是,官方文档给的超时默认时间,明明说的是2小时呢,怎么就变成5分钟了。
你俩到底是谁在耍我?
疑惑3:
不是说好的 parallelism 参数是指定任务并行度吗?
但我明明设置的6个并行度,而从实际看到的情况,却是串行的呢?
如果说真是6个并行度,那么在这个文件夹下面,应该在同一时间会生成6个文件,然后6个线程会同时往6个文件里写数据才对。
而不应该像这个截图所看到的,一次只有1个文件生成,等到这个文件数据写满后(默认1G),再写入下一个。
别告诉我是6个线程同时在写一个文件,我不信有这种脑残的设计。
(PS:即便我后面在文档里,找到了手动打开并行设置的参数,set enable_parallel_outfile = true 但也还是一个鸟样)
怎么解决?
显然,这次尝试又双叒失败了,于是就想,是时候要调整导出任务的参数了。
根据昨天 Doris 官方小伙伴的提醒,像这种大数据量的导出,可能需要增加「执行内存」大小,以及「请求超时」时间这两个参数。
于是,我又设置了这两个参数(实践证明,只需要增加超时参数的值就可以),只不过在设置的时候,又躺了坑。
官方文档里告诉你,超时参数是这样的:
而实际上,对于我当前使用的 Doris2.0.2 版本,这个参数变成了 query_timeout(问官方的小伙伴后得知) ,而且是在命令行里设置的。
然后,再次启动导出任务,终于导成功了:
从这个导出 label 可以看出来,我前后经过了足足14次的尝试,才把这个问题搞定,莫非是我太菜了吗。
好在,通过这种 Doris 内置的导出方式,导出效率还挺高的,8亿+条数据,用时不到22分钟。
那么接下来,咱再用 Flink 试试,情况会怎么样呢?
2. 调整参数后的 Flink 导出方式
由于被提醒需要增加「执行内存」大小,以及「请求超时」时间这两个参数。
所以,添加了这两个参数之后的, Flink 读 Doris 写 Kafka 的代码就变成了下面这样:
package com.anryg.doris
import java.time.Duration
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
/**
* @DESC: 利用 Flink SQL 读取 Doris 表,写 Kafka
* @Auther: Anryg
* @Date: 2024/3/5 17:26
*/
object FlinkSQLFromDoris2Kafka {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(30000L)
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)) //新的设置state backend的方式
env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkSQLFromDoris2Kafka")
env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //设置checkpoint记录的保留策略
env.getCheckpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(1L))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
val tableEnv = StreamTableEnvironment.create(env)
/**映射读取的Doris表*/
tableEnv.executeSql(
s"""
|CREATE TABLE data_from_doris (
|`client_ip` STRING,
|domain STRING,
|`time` STRING,
|target_ip STRING,
|rcode STRING,
|query_type STRING,
|authority_record STRING,
|add_msg STRING,
|dns_ip STRING
|)
| WITH (
| 'connector' = 'doris',
| 'fenodes' = '192.168.221.173:8030',
| 'table.identifier' = 'example_db.dns_logs_from_kafka',
| 'doris.request.query.timeout.s' = '36000', //防止查询超时,默认1小时
| 'doris.exec.mem.limit' = '5147483648', //查询时BE对内存的限制,默认2G
| 'username' = 'root',
| 'password' = 'pcl@2024',
| 'sink.label-prefix' = '${args(0)}' //导入标签前缀,每次要不一样
|)
""".stripMargin)
/**映射要写入Kafka*/
tableEnv.executeSql(
"""
|Create table data2kafka(
|client_ip STRING,
|domain STRING,
|`time` STRING,
|target_ip STRING,
|rcode STRING,
|query_type STRING,
|authority_record STRING,
|add_msg STRING,
|dns_ip STRING
|)
|with(
|'connector' = 'kafka',
|'topic' = 'test', //参数化表示方式
|'properties.bootstrap.servers' = '192.168.211.107:6667',
|'properties.group.id' = 'FlinkSQLFromDoris2Kafka',
|'value.format'='csv',
|'value.csv.field-delimiter'=','
|)
""".stripMargin)
/**写入数据*/
tableEnv.executeSql(
"""
|INSERT INTO data2kafka
|select
|*
|from
|data_from_doris
""".stripMargin)
}
}
添加的内容为:
改完之后,再次提交到 YARN 上执行。
果然就行了。
但由于我用的是 yarn session 模式跑的,跑完之后没有来得及去看,然后那个UI界面,没过多久,显示的任务状态就没了(这也是这个模式最操蛋的一点),只能去找它的日志:
证明加上那两个参数之后,是可行的,整个导出过程花了5个多小时,比同数据量的 CK 导出慢了1个多小时。
最后
从最近这些天,对 Doris 跟 CK 的数据导出折腾的过程来看,个人的直观感受是,CK 的很多默认设置明显要更加人性化,更加好用(不用增加任何额外设置)。
站在一个使用者的角度,如果在面对数据导出,这种非常简单的业务需求时,还需要使用者仔细研读各种文档说明,明辨不同版本之间,相同功能参数的不同写法。
那就只能说明一点,这个软件的使用成本未免有点过高。
从这次对 Doris 这张8亿+数据量的大表导出来看,起关键作用的,就是那「两个参数」的设置。
但对于是否需要设置这两个参数,以及在哪设置,怎么设置?你无论从导出任务的错误信息,还是在创建导出任务时,必要的参数设置说明上(官方文档),都很难找到对应的线索,和明确的设置方法。
这就很尴尬,你如果让用的人自己来猜,那我很有可能就,干脆不用了。
最后,感谢 Doris 官方小伙伴的帮助,否则,这个数据导出,还真不一定能这么快搞定。