Flink读kafka写hive动态分区表遇到的坑

(PS:因为我阳了的缘故,导致这篇文章延迟了2天)

前几篇文章都是围绕Flink从kafka读取数据,并根据一些简单的业务要求对结果进行控制台输出。

那么这篇文章就沿用之前的业务思路,通过Flink(1.15.3)实时消费kafka(2.0.0)数据,并将结果实时写入hive(3.1.0)分区表,来对比之前用Spark structured streaming写hive分区表。

看一看,Flink在同样场景下,对比Spark有哪些优势或者坑。

(Spark structured streaming写hive动态分区表)

来,还是老规矩,第一步打开Flink的官方文档。因为我们是要通过Flink跟hive进行连接,或者说Flink CDC的hive对象。

于是,找到文档中的Connectors部分,可以看到跟hive相关的内容,在Table API中(从官方文档来看,只有TableAPI支持hive):

于是,打开其中的内容,发现基于当前Flink1.15.3最高可以支持的hive版本为3.1.2,而我当前集群的hive版本为3.1.0,所以理论上是支持的。

(PS:以下所有描述,都是基于我的大数据实战项目中的软件工程进行)


0. 基础环境中遇到的坑(jar包冲突)

之所以想把这部分内容写进来,因为在这个过程中,根据我的经验,即便你是根据官网的步骤一字不落的照做,也往往会碰到很多的坑。

比如,我根据文档要求,在软件工程的pom文件中,添加了如下hive CDC必要依赖:
<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-connector-hive_2.12artifactId>
  <version>${flink.version}version>
dependency>

<dependency>
    <groupId>org.apache.hivegroupId>
    <artifactId>hive-execartifactId>
    <version>${hive.version}version>
dependency>

然后,根据文档内容,新建了下面这么一个简单的类,并运行:

按理说,根据官方文档一步步来应该错不了,可是我一点击运行,报错如下:


这是一个典型的jar包冲突问题,通过检查发现,这个SqlParser类在当前 classpath 环境中有两个jar包都包含:

而通过对比发现,是 calcite-core 这个包冲突了(具体如何定位jar包冲突方式,可以查看我之前的文章),于是果断排除掉。

排除这个之后,又出现了另一个jar包冲突问题(calcite-linq4j),具体的报错表现如下:

还是用同样的方式排除掉,而这两个冲突的jar包,都是刚才新引进的hive-exec中的子依赖,排除之后在pom文件就成了这样:

而这些内容,Flink官方文档没有,也不太可能告诉你,这些坑,只能自己凭经验去摸索。

而把这两个jar包冲突的问题解决后,这个简单的API才能正常运行了,最终把hive对应库下面的表给打印了出来:



1. hive配置文件的路径优化

还没完,不知道你有没有发现一个官方文档中有个不太友好的地方,那就是对于hive的配置文件,要求指定一个配置文件的目录,像我代码中那样指定的目录,在IDEA环境中运行是没有问题的。

但是如果一旦打包提交到集群,这个路径肯定是找不到的,那么怎么办,通过读HiveCatalog的源码发现,如果hiveConf这个参数不指定,即设置为null,那么程序就会到当前classpath路径下去找hive-site.xml文件

于是,我们就可以像Spark程序那样,直接把hive-site.xml文件放到软件工程的resources目录下,然后路径参数留空,代码如下:

而这样,就不用再担心配置文件因为环境异构而导致可能无法找到的问题。


2. 将kafka数据实时写入hive分区表

上面的步骤只能算是把基础环境给准备了一下,确保上述代码运行没有问题后。

那么接下来的内容,才是本篇文章的重点,既然是从kafka读取数据,那么就可以沿用之前的读取Kafka 的API,然后再根据官方文档的api,将数据写入hive。

完整代码如下:
package com.anryg.hive_cdc

import org.apache.flink.configuration.Configuration
import org.apache.flink.table.api.{EnvironmentSettingsSqlDialectTableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog

/**
  * @DESC: Flink读取kafka写hive动态分区表
  * @Auther: Anryg
  * @Date: 2022/12/19 10:36
  */

object FlinkReadKafka2Hive {

    def main(args: Array[String]): Unit = {
        val settings = EnvironmentSettings.newInstance().inStreamingMode()
                .withConfiguration(setConf())
                .build() //读设置
        val tableEnv = TableEnvironment.create(settings) //获取table env
        setHive(tableEnv)

        /**读取kafka source*/
        getDataSource(tableEnv)

        tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE//设置当前SQL语法为hive方言,该方言可以在整个上下文过程中来回切换
        /**创建hive表*/
        createHiveTable(tableEnv)

        tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT//设置当前SQL语法为flink方言,
        /**将数据Sink到*/
        sinkData(tableEnv)

    }

    /**
      * @DESC: 设置Flink相关配置
      * */

    private def setConf(): Configuration ={
        val config = new Configuration() //设置checkpoint
        config.setString("execution.checkpointing.interval","10000")
        config.setString("state.backend""filesystem")
        config.setString("state.checkpoints.dir","hdfs://${name_node}:8020/tmp/checkpoint/FlinkWithHive")
        config
    }

    /**
      * @DESC: 设置hive catalog
      * */

    private def setHive(tableEnv: TableEnvironment): Unit ={
        val name = "hive_test"  //取个catalog名字
        val database = "test"   //指定hive的database
        //val hiveConf = "./flink-coding/src/main/resources/" //指定hive-site.xml配置文件所在的地方

        /**读取hive配置,并生成hive的catalog对象*/
        val hive = new HiveCatalog(name,database, null//hiveConf为null后,程序会自动到classpath下找hive-site.xml
        tableEnv.registerCatalog(name, hive) //将该catalog登记到Flink的table env环境中,这样flink就可以直接访问hive中的表

        tableEnv.useCatalog(name) //让当前的flink环境使用该catalog
    }

    /**
      * @DESC: 读取Kafka数据源
      * */

    private def getDataSource(tableEnv: TableEnvironment): Unit ={
        tableEnv.executeSql(
            """
              |Create table test.kafkaTable(
              |client_ip STRING,
              |domain STRING,
              |`time` STRING,
              |target_ip STRING,
              |rcode STRING,
              |query_type STRING,
              |authority_record STRING,
              |add_msg STRING,
              |dns_ip STRING
              |)
              |with(
              |'connector' = 'kafka',
              |'topic' = '${your_topic}',
              |'properties.bootstrap.servers' = '${kafka_broker}:6667',
              |'properties.group.id' = 'FlinkWithHive',
              |'scan.startup.mode' = 'latest-offset',
              |'value.format'='csv',                                 //确定数据源为文本格式
              |'value.csv.field-delimiter'='|'                      //确定文本数据源的分隔符
              |);
            "
"".stripMargin)
    }

    /**
      * @DESC: 创建hive目标数据表
      * */

    private def createHiveTable(tableEnv: TableEnvironment): Unit ={
        tableEnv.executeSql(
            """
              |CREATE TABLE if not exists test.kafka_flink_hive (
              |client_ip STRING,
              |domain STRING,
              |target_ip STRING,
              |rcode STRING,
              |query_type STRING,
              |authority_record STRING,
              |add_msg STRING,
              |dns_ip STRING
              |)
              |PARTITIONED BY (`time` STRING)
              |STORED AS textfile TBLPROPERTIES (
              |  'partition.time-extractor.timestamp-pattern'='$time',
              |  'sink.partition-commit.trigger'='partition-time',
              |  'sink.partition-commit.delay'='1 h',
              |  'sink.partition-commit.policy.kind'='metastore,success-file'
              |);
            "
"".stripMargin)
    }

    /**
      * @DESC: 将数据写入到目标表中
      * */

    private def sinkData(tableEnv: TableEnvironment): Unit ={
        tableEnv.executeSql(
            """
              |INSERT INTO test.kafka_flink_hive
              |SELECT client_ip,domain,target_ip,rcode,query_type,authority_record,add_msg,dns_ip,`time`
              |FROM test.kafkaTable;
            "
"".stripMargin)
    }
}
以上,是经过我一番折腾和调试之后,可以正常运行的代码,执行后在HDFS上生成的文件如下,为了快速达到演示效果,分区设置的有些粗糙,你可以进一步去优化:


当然,这个过程中免不了各种趟坑。


3. 代码调试过程中的各种坑

坑1:
必须在执行任何SQL操作前进行hive的catalog设置,让后续所有的操作都在同一个catalog下。

否则创建的kafka数据源表,跟后续创建的hive sink表可能不在一个catalog里面,导致最后在数据写入时,在当前的catalog中无法识别kafka的数据源表,从而数据无法进行sink;

坑2:
在执行SQL操作时,一定要注意当前的dialect选择,否则即便SQL语句没有语法问题,但是因为dialect选择不当,会导致语法树解析出错,让你一头雾水,不明所以。

同时也要注意,在不同dialect下,你的SQL语法会有差异,切不可混为一谈;

坑3:
因为hive3默认开启的表的ACID功能,有个设置 hive.strict.managed.tables默认为true,需要给提前给设置成false,注意必须要在hive的服务端进行修改,因为这个检查是在hive端建表的时候才触发的,否则在建表的时候会抛出:Table is marked as a managed table but not transactional 的错误;

坑4:
在最后数据写入的时候,一定不要偷懒写insert... select *,因为如果是分区表,分区字段必须写到select字段中的最后,否则你的分区字段可能跟你期望的不一样。

另外,如果字段名刚好是跟当前SQL语法的reserved keyword重名,那么一定要将tab键上方的尖括号给括起来,否则解析也会出错。


除了以上几个主要的坑之外,还有几处在使用过程中需要注意的地方。


4. 需要注意的几个地方

注意点1:
在创建kafka数据源表的时候,根据我们以往的常识理解,这个表应该只会存在于Flink的进程空间里,也就是内存中(Spark是这么干的)。

但是,在Flink中不一样,它必须要在hive里面建一张这样的表(你受得了不,Spark可不这样),虽然也就是一张空表,没有任何数据,但我看着确实不太舒服;

注意点2:
在将数据写入到hive目标表时,不需要提前在hive端创建这张目标表,hive会自动给创建好;

注意点3:
跟Spark一样,虽然可以顺利的写入到hive的动态分区表,但是无法用hive SQL直接在hive的CLI查询到(查询记录为0),不清楚原理的同学可能会认为数据没有写进去。

这也是为什么在这个例子中,我用textfile作为hive存储格式的原因,因为可以直接用hdfs命令来查看数据是否正确,你也可以改成ORC或者parquet等其他存储格式;


最后
虽然是根据官方文档一步步来完成,但是你要知道,很多坑爹的地方,官方文档是不可能告诉你,或者也无法注意到的。

因为hive是Hadoop生态最为重量级的选手之一,因此Flink一定会不遗余力在这方面做文章,以抬高自己的身价,但目前就我看到的,还有很大进步空间。

我会一直关注它们的发展,然后通过文章与你分享。

(PS:本案例代码和数据源全部基于我的实战项目,已经上传至GitHub,地址:https://github.com/Anryg/internet_behavior_project)



你可以添加我的私人微信,拉你入技术讨论群,跟一群热爱技术的小伙伴一起成长...



请使用浏览器的分享功能分享到微信等