Clickhouse的AggregatingMergeTree玩法


0.


上篇文章说到clickhouse(下文统一称CK)通过利用kafka引擎表,以及物化视图功能,就可以把kafka中的数据给源源不断灌到CK的分片表中。


对于这种方式的优缺点为:


优点:省去了使用数据导入工具,以及开发kafka消费者代码的成本,效率极高。


缺点:这是个黑盒功能,接入的数据流量有多大,接入过程中有没有报错,对于使用者来说感知不明显,你只能通过去查看后台日志才能找到一些可能的蛛丝马迹。


但是,因为之前的做法只是简单地把kafka的全量数据给全部灌到了CK中(出于演示目的),导致存储在其中的表数据量迅速膨胀,即便是用了分片表,但是架不住数据量太大,导致对于一些需要聚合的查询,随着数据量的增加,查询效率肉眼可见的降低。


我们知道,数据库的资源其实是非常宝贵的,我们对它的每一次使用都需要耗费相应的内存、CPU、以及带宽资源,因此,就必须要秉承资源能省则省的原则来开展工作。



1.


比如现在我收到一个统计需求:


以小时为单位,统计出每个上网IP的上网次数,取出上网次数最多的前100个IP


按照无脑的常规思路,把原始数据全部入到一张表中,然后对其写SQL,比如我的原始表结构是这样的:



如果想要统计出上面那个需求,你可以写这么一条SQL查询语句:



就能把需要的结果给查询出来(其中的where条件是用来筛选脏数据的)。


但是,随着我kafka数据量的不断涌入,当这张表的数据量超过10亿规模之后,你会明显感觉到查询效率在降低,而且耗费的内存和CPU资源也在急剧上升:



可以看到14亿条数据,用了超过62G的内存,同时用满了几十个CPU核心,查询也花了超过42秒(你让其他进程还怎么活)。


而且随着数据的持续写入,这个效率还会持续下降。



2.


如果对于一个实时的统计需求场景来说,上面这种设计方式显然是不能忍的。


对于普通的传统数据库来说,针对这种数据量大,且还需要实时聚合结果的情况,一般需要在数据库先设计一张聚合结果的表。


然后通过流式计算引擎在取数据源(kafka)数据时,先在小时间范围内将原始数据进行初步聚合,接着在将结果写入数据库时,再跟之前已经存在数据库的数据进行merge操作(比如相同key的数据捞出来相加),获得一个最新的写入结果。


但是对于CK来说,它有一种特殊的数据存储引擎,就是专门干这个事情的,也就是说,通过使用这种引擎,你的数据源在每次进入到目标结果表的时候,不用人工干预,会自动进行merge操作,算出最新的结果。


而这个引擎就叫:AggregatingMergeTree


从名字就可以看出来,它隶属于MergeTree家族的一员,但是跟普通MergeTree不同的是,它除了会对多个写入目录的数据进行合并到一个目录外,还会根据你的聚合条件,对每次写入的数据进行值的合并。



3.


那如何来做呢?


一共可以分为如下几个步骤:


1. 准备好kafka的引擎表,目的在CK内部引入kafka的数据源;


2. 创建分片表需要的集群配置信息,确定分片规则(因为考虑到数据量巨大,所以还是使用分片表);


3. 通过指定上一步配置的分片信息,来创建用来存储聚合结果需要的分片表(虚拟表);


4. 再创建用来存放分片表实体数据的本地表;


5. 根据聚合规则,创建基于第1步数据源的物化视图,把根据聚合规则获取的数据灌入到第3步的分片表中



3.1 创建kafka引擎表


这个因为上篇文章中已经创建过,不赘述:



为了防止因为字段类型转换而引发的错误,因此所有字段都用String类型,后面根据需要进行转换。



3.2 创建分片表需要的集群配置信息


同样,上篇文章中已经创建过,可对其进行复用,不赘述:



因为机器数少的原因,只能在3台机器上各建一个分片。



3.3 创建分片表(虚拟表)


根据已经配置的分片规则,创建分片表,创建语句如下(会在分片所有的机器上都能看这张表):



其中的字段说明如下:


client_ip: 没什么好说的,聚合的主体字段,用String就好了。


date_hour:聚合时需要的时间字段,用DateTime类型;


count: client_ip根据时间聚合的数量字段,这里面需要注意的是它的字段类型,跟我们平时普通表的类型明显是不一样的,这个表是用来存放聚合后结果的,因为数据每次写入时,并不是简单的append,而是需要做聚合操作;


AggregateFunction(count, UInt32):代表当前字段为聚合字段,括号里的count,代表聚合类型为count(类比sum,avg),UInt32代表聚合结果的数据类型为无符号的int;


然后这张分片表根据client_ip这个字段的cityHash64函数进行拆分。


3.4 创建分片表的本地表(实体表)


建表语句如下:



其中的字段类型跟分片表保持一致,但是注意其中的表引擎,代表数据写进表时,是会自动进行聚合操作的,以及后面的order by条件,有需要的还可以加上分区规则。


该表会创建在所有配置的分片上,并且会有对应的数据目录,是真正存储数据的表。



3.5 根据聚合条件创建物化视图


所有必要的表都创建完毕后,现在只需要一个物化视图,将需要的聚合数据导入到上述创建的表中:



对于这个建表语句,可能理解起来稍微有点难度:


toStartOfHour(parseDateTimeBestEffort(time)):这个好理解,就是把形如“20230607204834”这样的时间字符串给转成精确到小时的时间格式;


countState(client_ip):这个比较费解一点,因为我们是聚合client_ip的数量,按常理应该是count()函数才对,怎么是countState()呢?


其实你仔细想一下也好理解,这个物化视图创建的目的,其实是通过无数次的小数据量聚合之后(类比流式计算引擎),在写入到目标表(分片表)时,还要再经过一次聚合才得到最新结果的(类比之前的分片表中,表示聚合字段的类型为什么是AggregateFunction类型)。


所以这里的无数次小聚合,就代表了一个个小的聚合状态,因此叫countState


当然,最后还要注意提取数据时的group by条件。



4. 如何查询AggregatingMergeTree表


以上步骤,就完成了通过物化视图的方式,将kafka数据源的数据,以聚合的状态写入到CK的AggregatingMergeTree引擎表中。


那么这样的表数据,该如何查询呢?


按理说,既然是写入的聚合结果,那我直接select * 不就可以了吗?



但你看到这个输出结果,是不是瞬间有种懵逼的感觉,说好的聚合结果呢,怎么就成这熊样了捏,是数据出问题了吗?


其实,不是数据出问题了,而是这种查询姿势不对,正确的查询方式是这样的:



可以看到,我们对于这种已经是聚合结果表的查询方式,依然需要写这种一板一眼的聚合句式。


至于为什么?


我是这么理解的:你要知道这个表的引擎,咱们用的是MergeTree家族,这类引擎有个特点就是,数据在写入的时候,是多线程(单台机器多个目录并行)写入的,而且数据是在后台不定期进行合并。


也就是说,你要查的聚合数据有可能在你查询的时候,还没来及全部聚合完成呢,你需要显示告诉CK,我要的是最终的聚合结果,触发CK当即去做这样的聚合,因此需要你显示地写出这样的查询句式。


我们从这个表的数据量,由大突然变小就可以证明这一点:



说明它的聚合结果是在后台异步运算的


而那个我用红框框出来的countMerge函数,也是在查询的时候显示告诉CK:我要你当下最新的聚合结果,赶紧现在就去merge给我



5.


如果上述这种方式能够持续稳定地运行下去的话,那确实可以给我们的开发工作带来很大的便利,同时也节省了大量的存储空间、硬件资源和查询时间。


我就挂着让他一直跑,看中间会不会出现什么幺蛾子,咱们拭目以待...



你可以添加我的私人微信,拉你入技术讨论群,跟一群热爱技术的小伙伴一起成长...


请使用浏览器的分享功能分享到微信等