来源:安瑞哥是码农
上篇跟上上篇文章,咱聊了用 Flink API 读写(主要是写) Paimon 表的一些情况,记录了这个过程中我认为体验不好的地方,同时也顺便测试对比了一下,用 Flink API 在同一场景下写 Hudi 跟 Paimon 表的效率差异(有兴趣的同学可自行查看历史文章)。
我们都知道,Paimon 是属于当下被大家所熟知的,号称4大数据湖技术中(Hudi、Iceberg、Delta lake、Paimon) 唯一一个 Flink 的“近亲”,跟 Flink 的配合自然会更加紧密。
但是,作为一款想被更多用户所接纳的技术,自然也就少不了对 Spark 这位江湖元老的支持,至少官方文档是这么赫然写着的。
至于 Paimon 跟 Spark 之间配合的默契程度如何,咱们今天就给它“跑起来”,看看实际效果如何?
还是老规矩,咱通过 Spark structured streaming 实时读取 kafka 数据,然后以流方式写入 Paimon 表来进行测试。
(PS:本文根据 kafka2.0 + Spark3.2 + Hadoop3.1 + Paimon0.6 展开)
0. 跑前准备
跑步运动员在开跑前,需要先选择合身的装备,穿上合脚的跑鞋,那么这里也一样。
从官方文档的描述来看,paimon 只支持 Spark3.1 及以上版本,幸好,我当前使用的是 Spark3.2。
要不怎么说「官方文档有时候过于官方」呢,跟 Flink API 遇到的情况一样,目前虽然你可以下载到最新 0.7 版本的 paimon-spark 联合 jar 包。
但是,maven 中央仓库能提供的,却依然只有0.6的孵化版本。
至于为什么我要纠结这个,因为要在我的 Spark 项目中通过 pom 文件的方式来引入对 paimon 的依赖。
而这,才是一个正规大数据开发的「正规玩法」。
1. 开始编码
开发环境的基础依赖解决之后,接下来就要着手编码了,老规矩,还是参照官方文档一步步来。
瞧这代码部分写的,那叫一个简洁。
但我告诉你,如果你以为跟它一样「照猫画虎」就能轻松把程序调通,多少会显得有点天真,以我这么长时间来的趟坑经验,它要是不给你设置点障碍,那几乎是不太可能的事情。
知道你可能很着急,但你先别急,后面我会把遇到的坑一一告诉你。
一番折腾之后,我把调通之后的(数据正常写入),用 Spark structured streaming 读 kafka 数据写 Paimon 表的代码提供如下:
package com.anryg.bigdata.streaming.paimon
import com.alibaba.fastjson.{JSON, JSONValidator}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
/**
* @DESC: Spark 读取 Kafka 数据写 Paimon
* @Auther: Anryg
* @Date: 2023/12/28 11:01
*/
object SparkReadKafka2Paimon {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkReadKafka2Paimon")/*.setMaster("local[*]")*/
conf.set("spark.sql.catalog.paimon","org.apache.paimon.spark.SparkCatalog") //必须设置,指定catalog为paimon
conf.set("spark.sql.catalog.paimon.warehouse","hdfs://192.168.211.106:8020/tmp/paimon") //必须设置
val spark = SparkSession.builder().config(conf).getOrCreate()
val rawDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.211.107:6667")
.option("subscribe", "test")
.option("failOnDataLoss",false)
.option("fetchOffset.numRetries",3)
.option("startingOffsets","latest")
.load()
import spark.implicits._
val ds = rawDF.selectExpr("CAST(value AS STRING)")
.map(row => {
val line = row.getAs[String]("value")
val rawJson = JSON.parseObject(line) //原始string是一个json,对其进行解析
val message = rawJson.getString("message") //获取业务数据部分
val fieldArray = message.split(",")
fieldArray
}).filter(_.length == 9).map(array =>(array(0),array(1),array(2),array(3),array(4),array(5),array(6),array(7),array(8)))
.toDF("client_ip","domain","time","target_ip","rcode","query_type","authority_record","add_msg","dns_ip")
spark.sql("USE paimon") //指定catalog
/**跟Hudi不一样,需要提前创建paimon表*/
spark.sql(
"""
|CREATE TABLE IF NOT EXISTS data_from_spark2paimon01(
|client_ip STRING,
|domain STRING,
|`time` STRING,
|target_ip STRING,
|rcode STRING,
|query_type STRING,
|authority_record STRING,
|add_msg STRING,
|dns_ip STRING
|)
|TBLPROPERTIES (
|'primary-key'='client_ip,domain,time,target_ip'
|)
""".stripMargin)
ds.writeStream
.outputMode(OutputMode.Append())
.option("checkpointLocation", "hdfs://192.168.211.106:8020/tmp/offset/test/SparkReadKafka2Paimon01")
.format("paimon")
.start("hdfs://192.168.211.106:8020/tmp/paimon/default.db/data_from_spark2paimon01")
.awaitTermination()
}
}
2. 几个需要注意的地方
代码交代完,咱集中火力,来看看这个过程中,一共可能碰到哪些让你“恼火”的事。
2.1 注意点一
如果根据官方文档,在 IDEA 编辑器里,根据常规思维,写下了 Spark 读取 kafka 然后写入 Paimon 的“常规代码”后,一运行,你就会收获第一个报错:
Exception in thread "main" org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: Database 'paimon' not found
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireDbExists(SessionCatalog.scala:218)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.setCurrentDatabase(SessionCatalog.scala:313)
at org.apache.spark.sql.connector.catalog.CatalogManager.setCurrentNamespace(CatalogManager.scala:104)
它告诉你没有“paimon”这个数据库,你以为自己需要新建这个库吗?
不,它其实是要你添加这个配置而已:
而这个“paimon”也不是什么database,它其实是我们需要选择的“catalog”,这里的错误提示会让人有误解。
2.2 注意点二
解决完上面那个问题后,继续调试,迎接着你的2个报错,是这个:
Exception in thread "main" java.lang.NullPointerException: Paimon 'warehouse' path must be set
at org.apache.paimon.utils.Preconditions.checkNotNull(Preconditions.java:65)
at org.apache.paimon.catalog.CatalogFactory.warehouse(CatalogFactory.java:55)
at org.apache.paimon.catalog.CatalogFactory.createCatalog(CatalogFactory.java:83)
at org.apache.paimon.catalog.CatalogFactory.createCatalog(CatalogFactory.java:66)
提示你需要设置“warehouse”的路径。
之所以说它让人恼火呢,原因在于,同一份代码,你会发现,这个数据写入的破地址,它居然让你配置2遍,咋想的?
你需要在代码里配置这个(当然,也可以在外部的运行参数中指定,但我不太习惯):
这是第一次数据位置的指定,但为了程序能顺利跑起来,还需要在后面数据写入的时候再指定一次,这个咱下面聊。
其实以上这两个坑,在官方文档是有提示的,只不过需要你在编码的时候,要学会理解这些设置的含义。
通过我的验证,这3项配置中,前2个是必须的,否则就会抛出上面我说的异常,而第3项,目前从我的测试来看,不需要。
2.3 注意点三
第3个,可能会因为没有设置 paimon catalog,而引发的异常如下:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Hive support is required to CREATE Hive TABLE (AS SELECT);
'CreateTable `default`.`data_from_spark2paimon01`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Ignore
怎么样,是不是看得你一脸懵逼?
怎么就它喵的提示创建 Hive 表失败了呢?
原因很简单,因为这个时候,如果你没有指定 catalog,那么这个代码中的“create table”建表语句,程序会默认你用的 hive catalog,所以会认为你在创建 hive 表,而当前这个建表语句又不符合 hive 建表语句的规范。
咋解决呢?在代码中手动指定 catalog。
其实对于这一点,官网也有提示,只不过,你需要知道在代码中如何指定:
前一个是指定 catalog,后一个是指定 database,因为默认就是 default,所以也可以不用写。
2.4 注意点四
这个坑是我认为这几个里面,最坑爹的,为啥这么说呢?
刚才上面不是提到那个“warehouse”路径设置问题了嘛,前面是这么设置的:
我在这里设置了一个数据存储的总目录。
如果程序非得让我在最后数据写入的时候,还得指定一次路径,根据我一个正常的人理解,我就会这么设置:
在之前总目录的基础上,再加一个我要建的表名(data_from_spark2paimon01),以此来存储这个表要写进去的数据。
这样思考按理说没毛病吧?因为 Hudi 对于数据写入路径的设计就是这样的。
可是呢,人家 Paimon 偏要膈应一些。
结果我一运行,你猜会怎么着?
Exception in thread "main" java.lang.IllegalArgumentException: Schema file not found in location hdfs://192.168.211.106:8020/tmp/paimon/data_from_spark2paimon01. Please create table first.
at org.apache.paimon.table.FileStoreTableFactory.lambda$create$0(FileStoreTableFactory.java:61)
at java.util.Optional.orElseThrow(Optional.java:290)
它告诉你,当前这个目录下找不到“schema file”。
我就奇了怪了,我指定的这个目录,不就是让你去写数据的吗,你的 schema file 不应该就写到这个目录里面的吗?
可它... 偏不!
至此,我们不妨来瞅一眼,这个当初我指定的目录,到底发生了什么变化:
看到没,程序自作主张的在我指定的主目录下,创建了一个“default.db”子目录,然后在这个子目录下,再创建了一个以表名(data_from_spark2paimon01)命名的子子目录。
接着,在这个表名的下级目录中,又创建了一个“schema”目录。
我想,这个“schema”目录,大抵就是刚才那个抛出的异常,要寻找的吧。
于是,把刚才那个最后指定写入路径的地址,给改成了这个:
果然,程序终于能正常调通了。
3. 瞅一眼效果
提交到 yarn 集群之后,跑了一个多小时,没有出现什么幺蛾子,运行正常:
再看一眼数据目录,写了一共 2G 数据,共486个文件(后续数量没有再变少)。
从文件数量上来看,这个小文件量有点多(默认设置下)。
最后
从这次通过 Spark 写 Paimon 的 API 来看,过程虽然经历了一点小坎坷,但基本上通过仔细阅读文档内容,再根据抛出的异常提示,就能较快定位到问题,并迅速解决。
对于这次的测试体验,总体感觉还可以。
虽然我还是觉得,官方文档的描述,还可以再详细和友好一些,但比之前那种遇到异常之后,看着报错信息一脸懵逼,不知道如何是好要强一些。
最后,想强调一点的就是,Spark structed streaming 就是一个实时的流式计算框架,我都已经在生产上用它好几年了,请有些同学别再误会它了,拜托。