Elasticsearch 存储 json,天然的舒适区

来源:安瑞哥是码农


如果说前两篇文章,分别用 Clickhouse(下称CK) 跟 Doris 来处理 schema-free 的 json 数据,从实测的情况来看,多少有点赶鸭子上架的味道。


那么今天,我准备用 Elasticsearch(下称ES ) 来解决同样的问题,就似乎有点踩中它舒适区的感觉。


那么 ES 在面对这个场景时的表现到底怎么样?我们一起来实测看看。


(PS:本文采用 Spark3.2.0 + ES7.8 进行测试)



0. 技术准备


由于这部分 schema-free 的数据源在 CK 里面,而且 ES 不能像 CK、Doris 这些数据库一样,以外部表的方式,把数据给导入过来。


于是,就只能老老实实用我最爱的方式——通过 Spark API 编码来搞定,而我通过 Spark 来写数据到 ES 的经历,最早可以追溯到10年前。


ES 区别于 CK 跟 Doris 的最大特点在于,它是一款 schema-free 的数据库,它底层数据的存储形式其实就是 json,所以它天然对 json 格式的数据友好,虽然任何一张 ES 的表(索引),你都可以预先定义 mapping (类比建表时,字段名跟字段类型的确定)。  


但是,当你在实际数据写入的时候,是可以完全不受当初的 mapping 限制的,主打一个「随心所欲」



1. 如何建表(建索引)


虽然对于 ES 来说,是可以完全不用提前建表的,如果你写入的数据包含新字段,即便没有在预先的 mapping 中定义,ES 也会采用一种默认的 mapping 方式,只不过这种默认的方式,很可能不是我们实际工作中需要的。


咋整?


这个时候就需要祭出 ES 一个我认为非常强大,且实用的功能——template(模板),它存在的目的,就是对有着一类相同特征的行为,或者数据,去执行同一个标准的动作,让最终的数据符合我们的使用预期。


比如,我这里虽然没有办法去指定数据源中,那个 schema-free 的json,每一个 key 的存储类型以及分词方式,但我可以根据它的数据特征,设置一个这样的 mapping 模板。



这样一来,所有 String 类型的字段,就都会以 keyword 的方式进行分词存储。



2. Spark API 写法


以下是我已经调试通过,能够运行成功的 Spark 代码:


package com.anryg.bigdata.jdbc

import java.util.Properties

import com.alibaba.fastjson.{JSONJSONObjectJSONValidator}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveModeSparkSession}
import org.elasticsearch.spark.rdd.EsSpark


/**
  * @DESC: Spark通过jdbc读取CK表,将 schema-free 数据写 ES
  * @Auther: Anryg
  * @Date: 2024/4/28 15:00
  */

object SparkFromCK2ES {

    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("SparkFromCK2ES").setMaster("local[*]")
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()  

        /**CK属性配置*/
        val propertiesCK = new Properties()
        propertiesCK.put("username","default")
        propertiesCK.put("password""")

        val rawDF = spark.read
            .option("driver""com.clickhouse.jdbc.ClickHouseDriver"//提交集群运行时,这里需要显示指定driver
            .jdbc("jdbc:clickhouse://host_name:port/db_name",
                "domain_whois_all",
                propertiesCK)

        import scala.collection.JavaConversions._ 

        val targetRDD = rawDF.rdd.map(row => {
                        val domain = row.getString(0)
                        val detail = row.getString(1)
                        val time = row.getTimestamp(2).toString
                        val targetJson = new JSONObject()/**将最终所有的key放到一个非嵌套的json里*/
                        targetJson.put("domain", domain)
                        targetJson.put("time", time)

                        /**判断确定当前的detail信息,可以转换为标准的json格式*/
                        val ifJson = JSONValidator.from(detail).validate()

                        if (ifJson) {
                            val detailJson = JSON.parseObject(detail)
                            for (set <- detailJson.entrySet()) {
                                val key = set.getKey
                                val value = set.getValue.toString
                                if (key.equals("data")) {
                                    if (!value.trim.equals("[]")) {/**确定data部分内容不为空*/
                                        val dataJson = JSON.parseObject(JSON.parseArray(value).getString(0))
                                        for (e <- dataJson.entrySet()) {
                                            targetJson.put("data." + e.getKey, e.getValue.toString)/**将data内部的key-value全部抽取出来,组装成新的json*/
                                        }
                                    }
                                }
                                else targetJson.put(key, value)
                            }
                        }
                        targetJson
                    })

        /**RDD写ES的方式*/
        val esSetting = Map(("es.write.operation","upsert"), ("es.nodes","192.168.221.173"), ("es.port","9201"), ("es.mapping.id","domain"), ("es.resource.write","schema_free_json01"))

        EsSpark.saveToEs(targetRDD, esSetting)
    }
}

从代码中可以看到,除了固定字段 domain、time 之外,对于 data 这个 value 的处理,就采用了「字段数不确定」的灵活方式。


此外,值得说明的是,这里将最终的数据写入 ES,用的是 Spark 的 RDD API,而且,当前只有用 RDD API 才是最简单,最高效的解决方案。


原因很简单,普通的 DataSet/DataFrame API,在写数据时,都必须给予确定的 schema,也就是确定的字段名,但如果是 RDD,则不需要。


对比一下这两种不同 API 写 ES的特点:


DataSet/DataFrame:



RDD:



所以,不要再说类似 RDD 已经被淘汰了的话,事实证明,它依然有很多 DataSet/DataFrame 取代不了的优点。



3. 查看写入结果


只要上面代码调通了,数据写入就是件非常 easy 的事情。


从写入的量来看,以非常快的速度就写入了 10w+ 条数据。


接下来,就是见证 ES 写入非结构化数据的能力,我这里随便挑几条 schema 各不相同的来说明。


第1条:


 

可以看到只有4个字段,因为 data 的 value 为空,所以这里就没有显示。


第2条:


明显跟上一条不一样了,多了很多 data 相关的字段跟它们的值。


第3条:



虽然也有 data 相关的字段,但是跟上一条的字段个数也明显不一样。


第4条:



虽然跟第3条很像,但是,被我框出来的那2个字段,也是上一条没有的,所以也不全一样。



4. 如何查询


既然写入后的数据存储方式已经确定,那么接下来的查询方式,就显得非常简单了,还是对比上两次的测试内容。


查询登记国家在冰岛的 Whois 数量:



查询登记国家数量排名前10的数量分别是多少:



虽然用的不是 SQL 的查询方式,但我个人认为,相比之前的 CK 跟 Doris,ES 的这种查询方式更加简洁明了。



最后


基于同一场景下 schema-free 数据的存储需求,目前为止,就已经测试完了 CK、Doris,以及 ES 这3种数据库各自的解决方案。


从体验上来看,个人认为基于当前的需求场景,ES 提供的,是这3者中最成熟,实用性、和便利性最好的。当然,如果你想要当前数据库实现关联、嵌套查询,那就是另一个话题了。


当初有个球友在咨询我这个数据库选择问题的时候,我首推的就是 ES,只不过可惜的是,他并没有选择,原因是他们团队觉得 ES 的运维成本太高,玩不转,那就没有办法了。


还有的同学会认为,ES 不也需要建表吗?它是怎么能做到根据实际数据,对字段变化进行动态调整的呢(反正就是不信)?呐... 上面过程已经证明了。


此外,对 ES 的误解和偏见还有很多,以后有机会再证明给大家看。


那么最后,问你一个问题:你觉得 CK、Doris 跟 ES,哪个是存储 schema-free 数据的最佳数据库人选?

请使用浏览器的分享功能分享到微信等