之前文章聊过用kafka引擎表+创建物化视图的方式,就可以将kafka的数据给直接写入到clickhouse中。
虽然吧,这个方式对于开发者来说,非常简单,省了很多代码开发的时间和工作量,只需要捣腾几个SQL语句,就能轻松把外部数据给接入到CK中存储起来。
但是,之前也说了,这种方式虽然简单、方便,但数据写入CK这个过程对于开发者来说,完全是一个黑盒,你无法清楚数据在写入CK过程中它的流量波动情况,资源占用情况,以及可能出现的各种异常等等。
而在实际项目应用中,用专门的数据导入工具,或者直接用计算引擎来实现数据的入库,应该是更为常用,或者靠谱的方法。
刚好今天看到群里有小伙伴问起,数据通过jdbc方式写入,如何指定CK的集群问题。
那么这篇文章,就来说说,如何用spark structured streaming实时消费kafka的数据,并将结果写入到clickhouse表中的。
(PS:我的spark版本为3.2.0,clickhouse版本为23.4.1.1943)
0. 准备工作
所谓这个准备呢,其实就是查看官网,看spark structured streaming 应该如何优雅地跟CK结合起来。
我之前说过,spark作为一款流行度如此之高的计算引擎,那么它一定会想办法跟这些主流的数据库进行兼容,否则,它是市场份额就有可能被那些兼容度更好的工具所蚕食,这个spark肯定是不答应的。
首先要说明的是,CK没有像Doris那样,直接在官网就可以找到与spark兼容的玩法,而且在spark的官网,也没有直接找到spark直接兼容CK的说明,估计在这一点上,就会劝退一部分CK的使用者。

Doris兼容Spark的文档说明
但是,纵使无论是CK,还是spark的官方文档都没有直接告诉你如何去兼容CK,我们依然有办法让spark把数据写入到CK中。
我们从spark的官方文档可以看到,虽然它表面上直接支持的sink对象不多:

Spark structured streaming支持的sink方式
但是你要知道,人家可是能支持你自定义扩展的呀,而这自定义的sink可就厉害了,只要你遵循它的某个抽象标准,理论上任何外部存储,你都可以把数据写进去。
1. 确定spark写入方式
可以看到官方文档其实给了两个扩展方法,一个叫foreach,而另一个叫foreachBatch。


从源码的注释来看,前者针对的是单条数据,而后者,则针对的是一个小批次的数据,且它现在还是一个experiment方法,而我们当前的写入场景是一个实时场景,所以选择foreach。
2. 实现思路
(PS: 网上我也找了很多实现方式,但是没有一个是我满意的)
其实源码也已经说的很清楚了,foreach的输入参数必须要是一个ForeachWriter对象。
而ForeachWriter对象又是个抽象的类,所以想实现它的功能,你就得继承它,继承之后,主要实现里面的方法,它的方法有3个:
1. open函数:用于初始化和创建sink对象需要的连接,以及相关配置信息,最好设置为全部变量,这样一来,这些必要对象就只需要new一次,下次被调用时,直接返回之前new过的,提高效率;
2. process函数:具体数据写入的实现,粒度为每一条数据,至于如何写入,目标数据库(存储)一定会提供对应的API,而这里,CK则是利用官方提供的JDBC API来写入;
3. close函数:一般用来关闭存储对象的连接(调用完上面两个之后),但因为这里是streaming任务(常驻任务),因此我们需要对象的连接为长连接,这里不关闭。
具体要实现的函数清楚各自的职责之后,接下来就需要知道具体如何实现了,既然CK提供了jdbc的连接方式,那我们就要来看一下,官网对于这块的说明是什么样的:


提供了跟CK版本兼容的jdbc的版本,于是我们在软件工程中需要把这个依赖(maven依赖)给加进去。
3. ForeachWriter的实现
既然spark官网提供了ForeachWriter的实现接口,而CK官网又提供了JDBC的实现,那么接下来,就需要把这两者结合起来,具体的ForeachWriter实现类如下:
package com.anryg.bigdata.clickhouse;
import com.clickhouse.jdbc.ClickHouseConnection;
import com.clickhouse.jdbc.ClickHouseDataSource;
import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.Row;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
* @DESC: 自定义structured streaming的外部sink,通过jdbc写数据到clickhouse中
* @Auther: Anryg
* @Date: 2023/7/3 11:29
*/
public class CKSink extends ForeachWriter<Row> {
private static final String jdbcUrl = "jdbc:ch://192.168.211.107:8123,192.168.211.108:8123,192.168.211.109:8123/local_db"; //为了防止找不到本地表,把整个集群的配置都写上
//private static final Properties properties = new Properties();
private static ClickHouseDataSource ckDtaSource;
private static ClickHouseConnection connection;
private static final String user = "default"; //用CK的默认用户
private static final String pwd = ""; //默认用户没有设置密码
private static final String tableName = "dns_logs_from_spark"; //写入的CK目标表
/**
* @DESC: 执行数据处理之前的准备工作,创建数据库连接,并确保单例,其中open会以partition为单位执行
* */
@Override
public boolean open(long partitionId, long epochId){
if (ckDtaSource == null || connection == null) {
synchronized (CKSink.class){
if (ckDtaSource == null || connection == null) {
try {
ckDtaSource = new ClickHouseDataSource(jdbcUrl);
connection = ckDtaSource.getConnection(user, pwd);
} catch (SQLException e) {
e.printStackTrace();
System.exit(-1); //捕获到异常后进程退出
}
}
}
}
if (connection == null) return false;
else return true;
}
/**
* @DESC: 当open函数返回为true之后,会针对partition中的每个ROW进行调用
* */
@Override
public void process(Row value) {
try {
PreparedStatement preparedStatement = connection.prepareStatement("insert into " + tableName + " values(?,?,?,?,?,?,?,?,?)");
preparedStatement.setString(1,value.getString(0));
preparedStatement.setString(2,value.getString(1));
preparedStatement.setString(3,value.getString(2));
preparedStatement.setString(4,value.getString(3));
preparedStatement.setString(5,value.getString(4));
preparedStatement.setString(6,value.getString(5));
preparedStatement.setString(7,value.getString(6));
preparedStatement.setString(8,value.getString(7));
preparedStatement.setString(9,value.getString(8));
preparedStatement.addBatch();
preparedStatement.executeBatch();
} catch (SQLException e) {
e.printStackTrace();
System.exit(-1); //捕获到异常后进程退出
}
}
/**
* @DESC: 上两个函数执行完后,开始调用,一般用于关闭连接
* */
@Override
public void close(Throwable errorOrNull) {
//长连接,不关闭
}
}
一些我认为比较重要的地方都在代码中进行了注释,希望你能看懂,
至于有同学问,如何在jdbc中指定CK集群?
把CK所有的实例都写在JDBC的URL里面就可以了,如上代码所示。
当然,需要提前在CK创建一张写数据的本地表(也可以是分片表),这里建表语句如下:

另外,我已经将该代码提交到之前的实战项目中,GitHub地址:https://github.com/Anryg/internet_behavior_project。
4. Spark structured streaming实现
其实上面那部分实现才是本篇内容的重点,至于Spark这部分的代码,都是根据之前消费kafka的代码copy过来,稍加修改就可以了,具体代码如下:
package com.anryg.bigdata.streaming.clickhouse
import com.anryg.bigdata.clickhouse.CKSink
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
/**
* @DESC: 通过spark structured streaming消费kafka,并通过自定义的ForeachWriter写数据到clickhouse
* @Auther: Anryg
* @Date: 2023/7/3 10:18
*/
object Kafka2CK {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Kafka2CK")/*.setMaster("local[*]")*/
val spark = SparkSession.builder().config(conf).getOrCreate()
val rawDF = spark.readStream //获取数据源
.format("kafka") //确定数据源的来源格式
.option("kafka.bootstrap.servers", "192.168.211.107:6667") //指定kafka集群的地址,理论上写一个broker就可以了
.option("subscribe", ${topic}) //指定topic
//.option("group.id","test9999") /**不再用该方式来绑定offset,而是每个程序有个唯一的id,该id跟checkpointLocation绑定,虽然group.id属性在运行中依然保留,但是不再跟offset绑定*/
.option("failOnDataLoss",false) //如果读取数据源时,发现数据突然缺失,比如被删,则是否马上抛出异常
.option("fetchOffset.numRetries",3) //获取消息的偏移量时,最多进行的重试次数
.option("startingOffsets","latest") //第一次消费时,读取kafka数据的位置
.load()
import spark.implicits._
val ds = rawDF.selectExpr("CAST(value AS STRING)") //将kafka中的数据的value转为为string,原始为binary类型
.map(row => {
val line = row.getAs[String]("value") //获取row对象中的field,其实也只有一个field
val msgArray = line.split("\\|") //指定分隔符进行字段切分
msgArray
}).filter(_.length == 9) //只留字段数为9的数据
.map(array => (array(0),array(1),array(2),array(3),array(4),array(5),array(6),array(7),array(8))) //将其转化成为元组,为了方便下一步赋予schema
.toDF("client_ip","domain","time","target_ip","rcode","query_type","authority_record","add_msg","dns_ip") //给裸数据添加字段名
val query = ds.writeStream
.outputMode(OutputMode.Append()) //指定数据的写入方式
.foreach(new CKSink) //通过自定义的方式写数据到clickhouse
.option("checkpointLocation","hdfs://192.168.211.106:8020/tmp/offset/Kafka2CK2") /**用来保存offset,用该目录来绑定对应的offset,如果该目录发生改变则程序运行的id会发生变化,类比group.id的变化*/
.start()
query.awaitTermination()
}
}
5. 运行状态和结果查看
将上面代码打包提交到集群运行,可以通过spark UI监控到整个数据写入的效率情况:

我们知道,流式计算引擎因为是实时写数据,因此写数据库有个最大的特点就是:会产生数量惊人的小文件。
之前用同样的方式写Hive的时候,为了解决这个问题,我们可以采取两个策略:
1,流式数据尽量收集一段时间之后再入库(通过设置Trigger参数来实现);
2,用脚本不定期对hive的表执行合并命令。
但是对于CK来说,这两个步骤是可以省掉的,因为我们定义的这张表是用的MergeTree引擎,之前文章说过,这个引擎的最大特点就是:后台会启动异步任务将这些碎文件进行不定期的合并,直到文件数达到一个合理的数值。
可以看到,我往该表写了8W+条数据,结果产生了3w+个数据文件夹(每个文件夹里面又有多个小文件):


但是过一会之后,我们再看文件夹的个数:

看到没,你能想象就这一会功夫,3w+的文件夹,一下就给你干到只剩6个了,而这,就是MergeTree这个数据结构的功能。
7. 最后
本文向你提供了一个通过spark的ForeachWriter功能,同时结合clickhouse的JDBC接口,实现了一个kafka实时数据写入CK的案例。
希望对你有所帮助。
