来源:安瑞哥是码农
如果说前两篇文章,分别用 Clickhouse(下称CK) 跟 Doris 来处理 schema-free 的 json 数据,从实测的情况来看,多少有点赶鸭子上架的味道。
那么今天,我准备用 Elasticsearch(下称ES ) 来解决同样的问题,就似乎有点踩中它舒适区的感觉。
那么 ES 在面对这个场景时的表现到底怎么样?我们一起来实测看看。
(PS:本文采用 Spark3.2.0 + ES7.8 进行测试)
0. 技术准备
由于这部分 schema-free 的数据源在 CK 里面,而且 ES 不能像 CK、Doris 这些数据库一样,以外部表的方式,把数据给导入过来。
于是,就只能老老实实用我最爱的方式——通过 Spark API 编码来搞定,而我通过 Spark 来写数据到 ES 的经历,最早可以追溯到10年前。
ES 区别于 CK 跟 Doris 的最大特点在于,它是一款 schema-free 的数据库,它底层数据的存储形式其实就是 json,所以它天然对 json 格式的数据友好,虽然任何一张 ES 的表(索引),你都可以预先定义 mapping (类比建表时,字段名跟字段类型的确定)。
但是,当你在实际数据写入的时候,是可以完全不受当初的 mapping 限制的,主打一个「随心所欲」。
1. 如何建表(建索引)
虽然对于 ES 来说,是可以完全不用提前建表的,如果你写入的数据包含新字段,即便没有在预先的 mapping 中定义,ES 也会采用一种默认的 mapping 方式,只不过这种默认的方式,很可能不是我们实际工作中需要的。
咋整?
这个时候就需要祭出 ES 一个我认为非常强大,且实用的功能——template(模板),它存在的目的,就是对有着一类相同特征的行为,或者数据,去执行同一个标准的动作,让最终的数据符合我们的使用预期。
比如,我这里虽然没有办法去指定数据源中,那个 schema-free 的json,每一个 key 的存储类型以及分词方式,但我可以根据它的数据特征,设置一个这样的 mapping 模板。
这样一来,所有 String 类型的字段,就都会以 keyword 的方式进行分词存储。
2. Spark API 写法
以下是我已经调试通过,能够运行成功的 Spark 代码:
package com.anryg.bigdata.jdbc
import java.util.Properties
import com.alibaba.fastjson.{JSON, JSONObject, JSONValidator}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.elasticsearch.spark.rdd.EsSpark
/**
* @DESC: Spark通过jdbc读取CK表,将 schema-free 数据写 ES
* @Auther: Anryg
* @Date: 2024/4/28 15:00
*/
object SparkFromCK2ES {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("SparkFromCK2ES").setMaster("local[*]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
/**CK属性配置*/
val propertiesCK = new Properties()
propertiesCK.put("username","default")
propertiesCK.put("password", "")
val rawDF = spark.read
.option("driver", "com.clickhouse.jdbc.ClickHouseDriver") //提交集群运行时,这里需要显示指定driver
.jdbc("jdbc:clickhouse://host_name:port/db_name",
"domain_whois_all",
propertiesCK)
import scala.collection.JavaConversions._
val targetRDD = rawDF.rdd.map(row => {
val domain = row.getString(0)
val detail = row.getString(1)
val time = row.getTimestamp(2).toString
val targetJson = new JSONObject()/**将最终所有的key放到一个非嵌套的json里*/
targetJson.put("domain", domain)
targetJson.put("time", time)
/**判断确定当前的detail信息,可以转换为标准的json格式*/
val ifJson = JSONValidator.from(detail).validate()
if (ifJson) {
val detailJson = JSON.parseObject(detail)
for (set <- detailJson.entrySet()) {
val key = set.getKey
val value = set.getValue.toString
if (key.equals("data")) {
if (!value.trim.equals("[]")) {/**确定data部分内容不为空*/
val dataJson = JSON.parseObject(JSON.parseArray(value).getString(0))
for (e <- dataJson.entrySet()) {
targetJson.put("data." + e.getKey, e.getValue.toString)/**将data内部的key-value全部抽取出来,组装成新的json*/
}
}
}
else targetJson.put(key, value)
}
}
targetJson
})
/**RDD写ES的方式*/
val esSetting = Map(("es.write.operation","upsert"), ("es.nodes","192.168.221.173"), ("es.port","9201"), ("es.mapping.id","domain"), ("es.resource.write","schema_free_json01"))
EsSpark.saveToEs(targetRDD, esSetting)
}
}
从代码中可以看到,除了固定字段 domain、time 之外,对于 data 这个 value 的处理,就采用了「字段数不确定」的灵活方式。
此外,值得说明的是,这里将最终的数据写入 ES,用的是 Spark 的 RDD API,而且,当前只有用 RDD API 才是最简单,最高效的解决方案。
原因很简单,普通的 DataSet/DataFrame API,在写数据时,都必须给予确定的 schema,也就是确定的字段名,但如果是 RDD,则不需要。
对比一下这两种不同 API 写 ES的特点:
DataSet/DataFrame:
RDD:
所以,不要再说类似 RDD 已经被淘汰了的话,事实证明,它依然有很多 DataSet/DataFrame 取代不了的优点。
3. 查看写入结果
只要上面代码调通了,数据写入就是件非常 easy 的事情。
从写入的量来看,以非常快的速度就写入了 10w+ 条数据。
接下来,就是见证 ES 写入非结构化数据的能力,我这里随便挑几条 schema 各不相同的来说明。
第1条:
可以看到只有4个字段,因为 data 的 value 为空,所以这里就没有显示。
第2条:
明显跟上一条不一样了,多了很多 data 相关的字段跟它们的值。
第3条:
虽然也有 data 相关的字段,但是跟上一条的字段个数也明显不一样。
第4条:
虽然跟第3条很像,但是,被我框出来的那2个字段,也是上一条没有的,所以也不全一样。
4. 如何查询
既然写入后的数据存储方式已经确定,那么接下来的查询方式,就显得非常简单了,还是对比上两次的测试内容。
查询登记国家在冰岛的 Whois 数量:
查询登记国家数量排名前10的数量分别是多少:
虽然用的不是 SQL 的查询方式,但我个人认为,相比之前的 CK 跟 Doris,ES 的这种查询方式更加简洁明了。
最后
基于同一场景下 schema-free 数据的存储需求,目前为止,就已经测试完了 CK、Doris,以及 ES 这3种数据库各自的解决方案。
从体验上来看,个人认为基于当前的需求场景,ES 提供的,是这3者中最成熟,实用性、和便利性最好的。当然,如果你想要当前数据库实现关联、嵌套查询,那就是另一个话题了。
当初有个球友在咨询我这个数据库选择问题的时候,我首推的就是 ES,只不过可惜的是,他并没有选择,原因是他们团队觉得 ES 的运维成本太高,玩不转,那就没有办法了。
还有的同学会认为,ES 不也需要建表吗?它是怎么能做到根据实际数据,对字段变化进行动态调整的呢(反正就是不信)?呐... 上面过程已经证明了。
此外,对 ES 的误解和偏见还有很多,以后有机会再证明给大家看。
那么最后,问你一个问题:你觉得 CK、Doris 跟 ES,哪个是存储 schema-free 数据的最佳数据库人选?