Spark(streaming)自定义sink写Clickhouse


之前文章聊过用kafka引擎表+创建物化视图的方式,就可以将kafka的数据给直接写入到clickhouse中。


虽然吧,这个方式对于开发者来说,非常简单,省了很多代码开发的时间和工作量,只需要捣腾几个SQL语句,就能轻松把外部数据给接入到CK中存储起来。


但是,之前也说了,这种方式虽然简单、方便,但数据写入CK这个过程对于开发者来说,完全是一个黑盒,你无法清楚数据在写入CK过程中它的流量波动情况,资源占用情况,以及可能出现的各种异常等等。


而在实际项目应用中,用专门的数据导入工具,或者直接用计算引擎来实现数据的入库,应该是更为常用,或者靠谱的方法。


刚好今天看到群里有小伙伴问起,数据通过jdbc方式写入,如何指定CK的集群问题。


那么这篇文章,就来说说,如何用spark structured streaming实时消费kafka的数据,并将结果写入到clickhouse表中的。


PS:我的spark版本为3.2.0,clickhouse版本为23.4.1.1943



0. 准备工作


所谓这个准备呢,其实就是查看官网,看spark structured streaming 应该如何优雅地跟CK结合起来。


我之前说过,spark作为一款流行度如此之高的计算引擎,那么它一定会想办法跟这些主流的数据库进行兼容,否则,它是市场份额就有可能被那些兼容度更好的工具所蚕食,这个spark肯定是不答应的。


首先要说明的是,CK没有像Doris那样,直接在官网就可以找到与spark兼容的玩法,而且在spark的官网,也没有直接找到spark直接兼容CK的说明,估计在这一点上,就会劝退一部分CK的使用者。


Doris兼容Spark的文档说明



但是,纵使无论是CK,还是spark的官方文档都没有直接告诉你如何去兼容CK,我们依然有办法让spark把数据写入到CK中


我们从spark的官方文档可以看到,虽然它表面上直接支持的sink对象不多:


Spark structured streaming支持的sink方式


但是你要知道,人家可是能支持你自定义扩展的呀,而这自定义的sink可就厉害了,只要你遵循它的某个抽象标准,理论上任何外部存储,你都可以把数据写进去



1. 确定spark写入方式


可以看到官方文档其实给了两个扩展方法,一个叫foreach,而另一个叫foreachBatch。




从源码的注释来看,前者针对的是单条数据,而后者,则针对的是一个小批次的数据,且它现在还是一个experiment方法,而我们当前的写入场景是一个实时场景,所以选择foreach。



2. 实现思路


(PS: 网上我也找了很多实现方式,但是没有一个是我满意的)


其实源码也已经说的很清楚了,foreach的输入参数必须要是一个ForeachWriter对象。


ForeachWriter对象又是个抽象的类,所以想实现它的功能,你就得继承它,继承之后,主要实现里面的方法,它的方法有3个:


1. open函数:用于初始化和创建sink对象需要的连接,以及相关配置信息,最好设置为全部变量,这样一来,这些必要对象就只需要new一次,下次被调用时,直接返回之前new过的,提高效率;


2. process函数:具体数据写入的实现,粒度为每一条数据,至于如何写入,目标数据库(存储)一定会提供对应的API,而这里,CK则是利用官方提供的JDBC API来写入;


3. close函数:一般用来关闭存储对象的连接(调用完上面两个之后),但因为这里是streaming任务(常驻任务),因此我们需要对象的连接为长连接,这里不关闭。


具体要实现的函数清楚各自的职责之后,接下来就需要知道具体如何实现了,既然CK提供了jdbc的连接方式,那我们就要来看一下,官网对于这块的说明是什么样的:





提供了跟CK版本兼容的jdbc的版本,于是我们在软件工程中需要把这个依赖(maven依赖)给加进去。



3. ForeachWriter的实现


既然spark官网提供了ForeachWriter的实现接口,而CK官网又提供了JDBC的实现,那么接下来,就需要把这两者结合起来,具体的ForeachWriter实现类如下:


package com.anryg.bigdata.clickhouse;

import com.clickhouse.jdbc.ClickHouseConnection;
import com.clickhouse.jdbc.ClickHouseDataSource;
import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.Row;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @DESC: 自定义structured streaming的外部sink,通过jdbc写数据到clickhouse中
 * @Auther: Anryg
 * @Date: 2023/7/3 11:29
 */

public class CKSink extends ForeachWriter<Row{
    private static final String jdbcUrl = "jdbc:ch://192.168.211.107:8123,192.168.211.108:8123,192.168.211.109:8123/local_db"//为了防止找不到本地表,把整个集群的配置都写上
    //private static final Properties properties = new Properties();
    private static ClickHouseDataSource ckDtaSource;
    private static ClickHouseConnection connection;

    private static final String user = "default"//用CK的默认用户
    private static final String pwd = "";     //默认用户没有设置密码
    private static final String tableName = "dns_logs_from_spark"//写入的CK目标表


    /**
     * @DESC: 执行数据处理之前的准备工作,创建数据库连接,并确保单例,其中open会以partition为单位执行
     * */

    @Override
    public boolean open(long partitionId, long epochId){
        if (ckDtaSource == null || connection == null) {
            synchronized (CKSink.class){
                if (ckDtaSource == null || connection == null) {
                    try {
                        ckDtaSource = new ClickHouseDataSource(jdbcUrl);
                        connection = ckDtaSource.getConnection(user, pwd);
                    } catch (SQLException e) {
                        e.printStackTrace();
                        System.exit(-1); //捕获到异常后进程退出
                    }
                }
            }
        }

        if (connection == nullreturn false;
        else return true;
    }
    
    
    /**
     * @DESC: 当open函数返回为true之后,会针对partition中的每个ROW进行调用
     * */

    @Override
    public void process(Row value) {
        try {
            PreparedStatement preparedStatement = connection.prepareStatement("insert into " + tableName + " values(?,?,?,?,?,?,?,?,?)");
            preparedStatement.setString(1,value.getString(0));
            preparedStatement.setString(2,value.getString(1));
            preparedStatement.setString(3,value.getString(2));
            preparedStatement.setString(4,value.getString(3));
            preparedStatement.setString(5,value.getString(4));
            preparedStatement.setString(6,value.getString(5));
            preparedStatement.setString(7,value.getString(6));
            preparedStatement.setString(8,value.getString(7));
            preparedStatement.setString(9,value.getString(8));
            preparedStatement.addBatch();
            preparedStatement.executeBatch();
        } catch (SQLException e) {
            e.printStackTrace();
            System.exit(-1); //捕获到异常后进程退出
        }

    }

    /**
     * @DESC: 上两个函数执行完后,开始调用,一般用于关闭连接
     * */

    @Override
    public void close(Throwable errorOrNull) {
        //长连接,不关闭
    }
}



一些我认为比较重要的地方都在代码中进行了注释,希望你能看懂,

至于有同学问,如何在jdbc中指定CK集群?


把CK所有的实例都写在JDBC的URL里面就可以了,如上代码所示。


当然,需要提前在CK创建一张写数据的本地表(也可以是分片表),这里建表语句如下:


另外,我已经将该代码提交到之前的实战项目中,GitHub地址:https://github.com/Anryg/internet_behavior_project。



4. Spark structured streaming实现


其实上面那部分实现才是本篇内容的重点,至于Spark这部分的代码,都是根据之前消费kafka的代码copy过来,稍加修改就可以了,具体代码如下:


package com.anryg.bigdata.streaming.clickhouse


import com.anryg.bigdata.clickhouse.CKSink
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputModeTrigger}

/**
  * @DESC: 通过spark structured streaming消费kafka,并通过自定义的ForeachWriter写数据到clickhouse
  * @Auther: Anryg
  * @Date: 2023/7/3 10:18
  */

object Kafka2CK {

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Kafka2CK")/*.setMaster("local[*]")*/
        val spark = SparkSession.builder().config(conf).getOrCreate()

        val rawDF = spark.readStream //获取数据源
                .format("kafka"//确定数据源的来源格式
                .option("kafka.bootstrap.servers""192.168.211.107:6667"//指定kafka集群的地址,理论上写一个broker就可以了
                .option("subscribe", ${topic})  //指定topic
                //.option("group.id","test9999") /**不再用该方式来绑定offset,而是每个程序有个唯一的id,该id跟checkpointLocation绑定,虽然group.id属性在运行中依然保留,但是不再跟offset绑定*/
                .option("failOnDataLoss",false)  //如果读取数据源时,发现数据突然缺失,比如被删,则是否马上抛出异常
                .option("fetchOffset.numRetries",3)  //获取消息的偏移量时,最多进行的重试次数
                .option("startingOffsets","latest")  //第一次消费时,读取kafka数据的位置
                .load()

        import spark.implicits._

        val ds = rawDF.selectExpr("CAST(value AS STRING)")  //将kafka中的数据的value转为为string,原始为binary类型
                .map(row => {
                    val line = row.getAs[String]("value"//获取row对象中的field,其实也只有一个field
                    val msgArray = line.split("\\|")  //指定分隔符进行字段切分
                    msgArray
                }).filter(_.length == 9)  //只留字段数为9的数据
                .map(array => (array(0),array(1),array(2),array(3),array(4),array(5),array(6),array(7),array(8))) //将其转化成为元组,为了方便下一步赋予schema
                .toDF("client_ip","domain","time","target_ip","rcode","query_type","authority_record","add_msg","dns_ip"//给裸数据添加字段名


        val query = ds.writeStream
                .outputMode(OutputMode.Append())  //指定数据的写入方式
                .foreach(new CKSink)   //通过自定义的方式写数据到clickhouse          
                .option("checkpointLocation","hdfs://192.168.211.106:8020/tmp/offset/Kafka2CK2"/**用来保存offset,用该目录来绑定对应的offset,如果该目录发生改变则程序运行的id会发生变化,类比group.id的变化*/
                .start()

        query.awaitTermination()
    }

}



5. 运行状态和结果查看


将上面代码打包提交到集群运行,可以通过spark UI监控到整个数据写入的效率情况:


我们知道,流式计算引擎因为是实时写数据,因此写数据库有个最大的特点就是:会产生数量惊人的小文件


之前用同样的方式写Hive的时候,为了解决这个问题,我们可以采取两个策略:


1,流式数据尽量收集一段时间之后再入库(通过设置Trigger参数来实现);


2,用脚本不定期对hive的表执行合并命令。


但是对于CK来说,这两个步骤是可以省掉的,因为我们定义的这张表是用的MergeTree引擎,之前文章说过,这个引擎的最大特点就是:后台会启动异步任务将这些碎文件进行不定期的合并,直到文件数达到一个合理的数值


可以看到,我往该表写了8W+条数据,结果产生了3w+个数据文件夹(每个文件夹里面又有多个小文件):



但是过一会之后,我们再看文件夹的个数:


看到没,你能想象就这一会功夫,3w+的文件夹,一下就给你干到只剩6个了,而这,就是MergeTree这个数据结构的功能。



7. 最后


本文向你提供了一个通过spark的ForeachWriter功能,同时结合clickhouse的JDBC接口,实现了一个kafka实时数据写入CK的案例。


希望对你有所帮助。




你可以添加我的私人微信,拉你入技术讨论群,跟一群热爱技术的小伙伴一起成长...


请使用浏览器的分享功能分享到微信等