Paimon 跟 Spark 是否也能玩得来

来源:安瑞哥是码农


上篇跟上上篇文章,咱聊了用 Flink API 读写(主要是写) Paimon 表的一些情况,记录了这个过程中我认为体验不好的地方,同时也顺便测试对比了一下,用 Flink API 在同一场景下写 Hudi 跟 Paimon 表的效率差异(有兴趣的同学可自行查看历史文章)


我们都知道,Paimon 是属于当下被大家所熟知的,号称4大数据湖技术中(Hudi、Iceberg、Delta lake、Paimon) 唯一一个 Flink 的“近亲”,跟 Flink 的配合自然会更加紧密。


但是,作为一款想被更多用户所接纳的技术,自然也就少不了对 Spark 这位江湖元老的支持,至少官方文档是这么赫然写着的。


至于 Paimon 跟 Spark 之间配合的默契程度如何,咱们今天就给它“跑起来”,看看实际效果如何?


还是老规矩,咱通过 Spark structured streaming 实时读取 kafka 数据,然后以流方式写入 Paimon 表来进行测试。


(PS:本文根据 kafka2.0 + Spark3.2 + Hadoop3.1 + Paimon0.6 展开)



0. 跑前准备


跑步运动员在开跑前,需要先选择合身的装备,穿上合脚的跑鞋,那么这里也一样。


从官方文档的描述来看,paimon 只支持 Spark3.1 及以上版本,幸好,我当前使用的是 Spark3.2。


要不怎么说官方文档有时候过于官方」呢,跟 Flink API 遇到的情况一样,目前虽然你可以下载到最新 0.7 版本的 paimon-spark 联合 jar 包。


但是,maven 中央仓库能提供的,却依然只有0.6的孵化版本。


至于为什么我要纠结这个,因为要在我的 Spark 项目中通过 pom 文件的方式来引入对 paimon 的依赖。


而这,才是一个正规大数据开发的「正规玩法」。



1. 开始编码


开发环境的基础依赖解决之后,接下来就要着手编码了,老规矩,还是参照官方文档一步步来。


瞧这代码部分写的,那叫一个简洁。


但我告诉你,如果你以为跟它一样「照猫画虎就能轻松把程序调通,多少会显得有点天真,以我这么长时间来的趟坑经验,它要是不给你设置点障碍,那几乎是不太可能的事情。

知道你可能很着急,但你先别急,后面我会把遇到的坑一一告诉你。

一番折腾之后,我把调通之后的(数据正常写入),用 Spark structured streaming 读 kafka 数据写 Paimon 表的代码提供如下:


package com.anryg.bigdata.streaming.paimon

import com.alibaba.fastjson.{JSONJSONValidator}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode

/**
  * @DESC: Spark 读取 Kafka 数据写 Paimon
  * @Auther: Anryg
  * @Date: 2023/12/28 11:01
  */

object SparkReadKafka2Paimon {

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("SparkReadKafka2Paimon")/*.setMaster("local[*]")*/
        conf.set("spark.sql.catalog.paimon","org.apache.paimon.spark.SparkCatalog"//必须设置,指定catalog为paimon
        conf.set("spark.sql.catalog.paimon.warehouse","hdfs://192.168.211.106:8020/tmp/paimon"//必须设置
        val spark = SparkSession.builder().config(conf).getOrCreate()

        val rawDF = spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers""192.168.211.107:6667")
            .option("subscribe""test")
            .option("failOnDataLoss",false)
            .option("fetchOffset.numRetries",3)
            .option("startingOffsets","latest")
            .load()

        import spark.implicits._

        val ds = rawDF.selectExpr("CAST(value AS STRING)")
            .map(row => {
                val line = row.getAs[String]("value")
                val rawJson = JSON.parseObject(line)      //原始string是一个json,对其进行解析
                val message = rawJson.getString("message")  //获取业务数据部分
                val fieldArray = message.split(",")
                fieldArray
            }).filter(_.length == 9).map(array =>(array(0),array(1),array(2),array(3),array(4),array(5),array(6),array(7),array(8)))
            .toDF("client_ip","domain","time","target_ip","rcode","query_type","authority_record","add_msg","dns_ip")

        spark.sql("USE paimon"//指定catalog

        /**跟Hudi不一样,需要提前创建paimon表*/
        spark.sql(
            """
              |CREATE TABLE IF NOT EXISTS data_from_spark2paimon01(
              |client_ip STRING,
              |domain STRING,
              |`time` STRING,
              |target_ip STRING,
              |rcode STRING,
              |query_type STRING,
              |authority_record STRING,
              |add_msg STRING,
              |dns_ip STRING
              |)
              |TBLPROPERTIES (
              |'primary-key'='client_ip,domain,time,target_ip'
              |)
            "
"".stripMargin)

        ds.writeStream
            .outputMode(OutputMode.Append())
            .option("checkpointLocation""hdfs://192.168.211.106:8020/tmp/offset/test/SparkReadKafka2Paimon01")
            .format("paimon")
            .start("hdfs://192.168.211.106:8020/tmp/paimon/default.db/data_from_spark2paimon01")
            .awaitTermination()

    }
}



2. 几个需要注意的地方


代码交代完,咱集中火力,来看看这个过程中,一共可能碰到哪些让你“恼火”的事。


2.1 注意点一


如果根据官方文档,在 IDEA 编辑器里,根据常规思维,写下了 Spark 读取 kafka 然后写入 Paimon 的“常规代码”后,一运行,你就会收获第一个报错:


Exception in thread "main" org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: Database 'paimon' not found
 at org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireDbExists(SessionCatalog.scala:218)
 at org.apache.spark.sql.catalyst.catalog.SessionCatalog.setCurrentDatabase(SessionCatalog.scala:313)
 at org.apache.spark.sql.connector.catalog.CatalogManager.setCurrentNamespace(CatalogManager.scala:104)

它告诉你没有“paimon”这个数据库,你以为自己需要新建这个库吗?


不,它其实是要你添加这个配置而已:


而这个“paimon”也不是什么database,它其实是我们需要选择的“catalog”,这里的错误提示会让人有误解。


2.2 注意点二


解决完上面那个问题后,继续调试,迎接着你的2个报错,是这个:

Exception in thread "main" java.lang.NullPointerException: Paimon 'warehouse' path must be set
 at org.apache.paimon.utils.Preconditions.checkNotNull(Preconditions.java:65)
 at org.apache.paimon.catalog.CatalogFactory.warehouse(CatalogFactory.java:55)
 at org.apache.paimon.catalog.CatalogFactory.createCatalog(CatalogFactory.java:83)
 at org.apache.paimon.catalog.CatalogFactory.createCatalog(CatalogFactory.java:66)

提示你需要设置“warehouse”的路径。


之所以说它让人恼火呢,原因在于,同一份代码,你会发现,这个数据写入的破地址,它居然让你配置2遍,咋想的?


你需要在代码里配置这个(当然,也可以在外部的运行参数中指定,但我不太习惯):


这是第一次数据位置的指定,但为了程序能顺利跑起来,还需要在后面数据写入的时候再指定一次,这个咱下面聊。


其实以上这两个坑,在官方文档是有提示的,只不过需要你在编码的时候,要学会理解这些设置的含义。


通过我的验证,这3项配置中,前2个是必须的,否则就会抛出上面我说的异常,而第3项,目前从我的测试来看,不需要。


2.3 注意点三


第3个,可能会因为没有设置 paimon catalog,而引发的异常如下:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Hive support is required to CREATE Hive TABLE (AS SELECT);
'CreateTable `default`.`data_from_spark2paimon01`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Ignore

怎么样,是不是看得你一脸懵逼?


怎么就它喵的提示创建 Hive 表失败了呢?


原因很简单,因为这个时候,如果你没有指定 catalog,那么这个代码中的“create table”建表语句,程序会默认你用的 hive catalog,所以会认为你在创建 hive 表,而当前这个建表语句又不符合 hive 建表语句的规范。


咋解决呢?在代码中手动指定 catalog。


其实对于这一点,官网也有提示,只不过,你需要知道在代码中如何指定:


前一个是指定 catalog,后一个是指定 database,因为默认就是 default,所以也可以不用写。


2.4 注意点四


这个坑是我认为这几个里面,最坑爹的,为啥这么说呢?


刚才上面不是提到那个“warehouse”路径设置问题了嘛,前面是这么设置的:



我在这里设置了一个数据存储的总目录。


如果程序非得让我在最后数据写入的时候,还得指定一次路径,根据我一个正常的人理解,我就会这么设置:


在之前总目录的基础上,再加一个我要建的表名(data_from_spark2paimon01),以此来存储这个表要写进去的数据。


这样思考按理说没毛病吧?因为 Hudi 对于数据写入路径的设计就是这样的。


可是呢,人家 Paimon 偏要膈应一些。


结果我一运行,你猜会怎么着?


Exception in thread "main" java.lang.IllegalArgumentException: Schema file not found in location hdfs://192.168.211.106:8020/tmp/paimon/data_from_spark2paimon01. Please create table first.
 at org.apache.paimon.table.FileStoreTableFactory.lambda$create$0(FileStoreTableFactory.java:61)
 at java.util.Optional.orElseThrow(Optional.java:290)

它告诉你,当前这个目录下找不到“schema file”。


我就奇了怪了,我指定的这个目录,不就是让你去写数据的吗,你的 schema file 不应该就写到这个目录里面的吗?


可它... 偏不!


至此,我们不妨来瞅一眼,这个当初我指定的目录,到底发生了什么变化:


看到没,程序自作主张的在我指定的主目录下,创建了一个“default.db”子目录,然后在这个子目录下,再创建了一个以表名(data_from_spark2paimon01)命名的子子目录。


接着,在这个表名的下级目录中,又创建了一个“schema”目录。


我想,这个“schema”目录,大抵就是刚才那个抛出的异常,要寻找的吧


于是,把刚才那个最后指定写入路径的地址,给改成了这个:


果然,程序终于能正常调通了。



3. 瞅一眼效果


提交到 yarn 集群之后,跑了一个多小时,没有出现什么幺蛾子,运行正常:



再看一眼数据目录,写了一共 2G 数据,共486个文件(后续数量没有再变少)。



从文件数量上来看,这个小文件量有点多(默认设置下)。



最后


从这次通过 Spark 写 Paimon 的 API 来看,过程虽然经历了一点小坎坷,但基本上通过仔细阅读文档内容,再根据抛出的异常提示,就能较快定位到问题,并迅速解决。


对于这次的测试体验,总体感觉还可以。


虽然我还是觉得,官方文档的描述,还可以再详细和友好一些,但比之前那种遇到异常之后,看着报错信息一脸懵逼,不知道如何是好要强一些。


最后,想强调一点的就是,Spark structed streaming 就是一个实时的流式计算框架,我都已经在生产上用它好几年了,请有些同学别再误会它了,拜托。


请使用浏览器的分享功能分享到微信等