Spark 的这两个算子,值得你深入研究一下

来源:安瑞哥是码农


估计现在大部分同学在用 Spark 的时候,都用的是它的 SQL API,其中的算子应该就用的很少了。


对于 Spark 来说,用它的 SQL API 还是算子 API,核心取决于你要解决的数据问题,从总体上来看, SQL API 对业务最为友好,使用时,一般只需要根据业务目标,对数据进行「粗粒度」的组织,逻辑较为简单。


而对于算子 API 来说,因为需要你对数据进行「细粒度」的处理,所有的处理逻辑都得自己来实现,所以理论上,它的使用难度要比 SQL 更大。


虽然算子 API 的玩法难度更大,但是它最大的优点在于,对数据处理起来,有着更好的「自由度」,而且,对于一些特定场合下比较复杂的处理逻辑,解决起来其实更得心应手。


比如,今天我要给你展示的这个场景,就需要用到 Spark 中,我认为最具灵性的两个算子flatMap」跟reduceByKey」。



0. 啥需求


原始数据说起来就下面两个字段,一个是域名(domain),另一个是跟域名相关的其他详细信息(detail_json)。


其中 domain 存储的是 String 类型,而 detail_json 虽然本质是个 json,但存储的也是 String 类型。


0.1 业务说明


从业务角度来说,一个 domain,会对应多个 nameserver(名字服务器),也就是 domain 跟 nameserver 之间,是一对多的关系。


但是,我们一般在做业务统计的时候,只取 nameserver 的 value 后面两个部分,比如图中的 ns1.selectel.org、ns2.selectel.org、ns3.selectel.org、ns4.selectel.org,通过提取去重,就变成了 selectel.org(取名 subnameserver)。


而且,经过后两个部分提取后的 nameserver ,也就是 subnameserver,跟 domain 的对应关系,也是一对多的。


0.2 业务目的


需要统计出,每个 subnameserver 对应有多少个 domain (注意,这个时候跟原数据反过来了),需要把具体的 domain 内容用一个字段展示出来,比如这样:


结果字段分别为:subnameserver、domain_array、domain_size。



1. 咋做


怎么样?这个需求,是不是看上去还挺简单的。


但是,如果你想通过纯 SQL 来实现,我估计有点困难(如果不自定义UDF的话),或者说,一般数据库所能提供的默认函数跟数据处理功能,想要实现这个业务目的,你可能还真的好好折腾一番。


那么这个时候,如果用 Spark 的算子 API,情况就会简单很多,而且,这里的关键,就是要用好flatMap」跟「reduceByKey」这两个核心算子。


(PS:我将这个问题尝试抛给 Kimi,结果它的回答,让我很失望,有兴趣你也可以试试)



2. 代码实现


用 Spark 的算子 API,调通之后的代码如下(读取数据源部分省略):


import java.util.Properties

import com.alibaba.fastjson.JSON
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveModeSparkSession}

/**
  * @DESC: 测试说明 Spark 算子 flapMap 跟 reduceByKey 的魅力
  * @Auther: Anryg
  * @Date: 2024/6/4 15:11
  */

object Test4FlapMapAndReduceByKey {

    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("Test4FlapMapAndReduceByKey").setMaster("local[*]")

        val spark = SparkSession.builder().config(sparkConf).getOrCreate()

        val rawDF = spark.read.${DATA_SOURCE/**读取原始数据源*/

        import spark.implicits._  /**引入隐式转换对象*/

        val targetDF = rawDF.filter(row => {
                            val detail_json = row.getString(1/**读取json字段*/
                            val jsonObj = JSON.parseObject(detail_json) /**将其转成json对象*/
                            jsonObj.containsKey("nameservers")/**只保留包含nameservers key的json*/
                        }).map(row => {
                            val domain = row.getString(0/**取出domain字段*/
                            val detail_json = row.getString(1/**取出json字段*/
                            val jsonObj = JSON.parseObject(detail_json) /**将其转成json对象*/
                            val nameserversArray = jsonObj.getString("nameservers").replace("[","").replace("]","").replace("\"",""/**将namservers中的value取出来,提取其中纯净的内容*/

                            (domain, nameserversArray)
                        }).flatMap(kv => { /**注意这里的玩法*/
                            val domain = kv._1
                            val nameserversArray = kv._2.split(","/**将每个domain对应的多个nameserver解开成一个数组*/
                            val domainAndSubnamseserverArray = mutable.ArrayBuffer[String]() /**用来存放domain跟subnameserver的集合*/
                            for (nameserver <- nameserversArray){
                                val array = nameserver.split("\\.")
                                val sub_nameserver = array(array.length - 2) + "." + array(array.length - 1/**取subnameserer*/
                                domainAndSubnamseserverArray += (domain + "|" + sub_nameserver) /**将domain跟每个subnameserer的组合,放到上面的可变数组中*/
                            }
                            domainAndSubnamseserverArray.distinct  /**对数组去重,并通过flatMap,把数值的内容,转成多行*/
                        }).map(domainAndNameserver => {
                            val array = domainAndNameserver.split("\\|"/**拆开subnameserver跟domain*/
                            val subnameserver = array(1)
                            val domain = array(0)
                            (subnameserver, "\"" + domain + "\"")  /**将subnameserver跟domain组成1个二元元组,注意domain用双引号引起来了,方便后续显示*/
                        }).rdd.reduceByKey(_+","+_)  /**因为需要用到reduceByKey,所以需要把原本的dataset,转成rdd,这里将每个domain,用逗号给拼接起来,为了后续显示*/
                            .map(kv => {
                                val subnameserver = kv._1  /**取出subnameserver*/
                                val domainArray = kv._2  /**取出最终需要的domain array*/
                                val domainSize = domainArray.split(",").size  /**得到domain array的size*/
                                (subnameserver, "[" + domainArray + "]", domainSize) /**添加中括号,组合成最终需要的结果*/
                            }).toDF("subnameserver""domain_array""domain_size"/**添加最终的schema*/                            
       targetDF.show(false)                     
    }
}

代码中的几乎每一行,我都进行了详细的注释,相信你一定能看懂。


这里面的难点,是要把原来「一对多」中,「多」的内容提取出来(subnameserver),变成结果需要的,另一个「一对多」中的「一」


下面就是我调试成功后,跑出来的结果截图:


有图有真相,绝不蒙你。



3. flatMap 跟 reduceByKey


其实从 Spark 算子来解决一般的数据计算问题来看,这个例子已经不算简单了。


flatMap 跟 reduceByKey 这两个算子,我相信绝大部分同学都用过(别说你没写过word count),但有几个真的能深刻理解它们意思的?


以我多年的面试经验来看,能回答得让人满意的,寥寥无几。


从本质上来说,这个例子其实就是 word count 的一个变种。


先说 flatMap:


它的作用,就是把一个数组(容器)里面的所有元素,由原本的「一行数据」,映射成「多行数据


但是这里面其实有个限制:那就是这个容器,它必须是 Scala 的内部容器,不能是 Java 的


这个算子最大的特点在于:你要是能想得到它,那你就能很快用好它;而如果你想不到它,那你就怎么着都想不到(好像句废话,但给我的感觉确实如此)。


再看 reduceByKey:


这个是我认为 Spark 的所有算子中,设计得最聪明的一个,没有之一。


非常好用,且非常高效,有没有?


它的核心意思是:所有 key 相同的数据(只能是二元组),其 value,两两合并,至于 value 怎么合并?相加、相乘、相除,还是用其他什么复杂的逻辑,随便,都行


它最大的约束在于:对 value 合并之后的结果,其类型,必须还得跟原 value 一样



最后


以上提供的 Spark API,是我在短时间内能想到的,最简单,但稍微有点笨拙的解决办法了。


当然,如果你的 SQL 玩的溜,说不定可以提供另一个思路的解法。

请使用浏览器的分享功能分享到微信等