来源:安瑞哥是码农
估计现在大部分同学在用 Spark 的时候,都用的是它的 SQL API,其中的算子应该就用的很少了。
对于 Spark 来说,用它的 SQL API 还是算子 API,核心取决于你要解决的数据问题,从总体上来看, SQL API 对业务最为友好,使用时,一般只需要根据业务目标,对数据进行「粗粒度」的组织,逻辑较为简单。
而对于算子 API 来说,因为需要你对数据进行「细粒度」的处理,所有的处理逻辑都得自己来实现,所以理论上,它的使用难度要比 SQL 更大。
虽然算子 API 的玩法难度更大,但是它最大的优点在于,对数据处理起来,有着更好的「自由度」,而且,对于一些特定场合下比较复杂的处理逻辑,解决起来其实更得心应手。
比如,今天我要给你展示的这个场景,就需要用到 Spark 中,我认为最具灵性的两个算子「flatMap」跟「reduceByKey」。
0. 啥需求
原始数据说起来就下面两个字段,一个是域名(domain),另一个是跟域名相关的其他详细信息(detail_json)。
其中 domain 存储的是 String 类型,而 detail_json 虽然本质是个 json,但存储的也是 String 类型。
0.1 业务说明
从业务角度来说,一个 domain,会对应多个 nameserver(名字服务器),也就是 domain 跟 nameserver 之间,是一对多的关系。
但是,我们一般在做业务统计的时候,只取 nameserver 的 value 后面两个部分,比如图中的 ns1.selectel.org、ns2.selectel.org、ns3.selectel.org、ns4.selectel.org,通过提取去重,就变成了 selectel.org(取名 subnameserver)。
而且,经过后两个部分提取后的 nameserver ,也就是 subnameserver,跟 domain 的对应关系,也是一对多的。
0.2 业务目的
需要统计出,每个 subnameserver 对应有多少个 domain (注意,这个时候跟原数据反过来了),需要把具体的 domain 内容用一个字段展示出来,比如这样:
结果字段分别为:subnameserver、domain_array、domain_size。
1. 咋做
怎么样?这个需求,是不是看上去还挺简单的。
但是,如果你想通过纯 SQL 来实现,我估计有点困难(如果不自定义UDF的话),或者说,一般数据库所能提供的默认函数跟数据处理功能,想要实现这个业务目的,你可能还真的好好折腾一番。
那么这个时候,如果用 Spark 的算子 API,情况就会简单很多,而且,这里的关键,就是要用好「flatMap」跟「reduceByKey」这两个核心算子。
(PS:我将这个问题尝试抛给 Kimi,结果它的回答,让我很失望,有兴趣你也可以试试)
2. 代码实现
用 Spark 的算子 API,调通之后的代码如下(读取数据源部分省略):
import java.util.Properties
import com.alibaba.fastjson.JSON
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* @DESC: 测试说明 Spark 算子 flapMap 跟 reduceByKey 的魅力
* @Auther: Anryg
* @Date: 2024/6/4 15:11
*/
object Test4FlapMapAndReduceByKey {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("Test4FlapMapAndReduceByKey").setMaster("local[*]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val rawDF = spark.read.${DATA_SOURCE} /**读取原始数据源*/
import spark.implicits._ /**引入隐式转换对象*/
val targetDF = rawDF.filter(row => {
val detail_json = row.getString(1) /**读取json字段*/
val jsonObj = JSON.parseObject(detail_json) /**将其转成json对象*/
jsonObj.containsKey("nameservers")/**只保留包含nameservers key的json*/
}).map(row => {
val domain = row.getString(0) /**取出domain字段*/
val detail_json = row.getString(1) /**取出json字段*/
val jsonObj = JSON.parseObject(detail_json) /**将其转成json对象*/
val nameserversArray = jsonObj.getString("nameservers").replace("[","").replace("]","").replace("\"","") /**将namservers中的value取出来,提取其中纯净的内容*/
(domain, nameserversArray)
}).flatMap(kv => { /**注意这里的玩法*/
val domain = kv._1
val nameserversArray = kv._2.split(",") /**将每个domain对应的多个nameserver解开成一个数组*/
val domainAndSubnamseserverArray = mutable.ArrayBuffer[String]() /**用来存放domain跟subnameserver的集合*/
for (nameserver <- nameserversArray){
val array = nameserver.split("\\.")
val sub_nameserver = array(array.length - 2) + "." + array(array.length - 1) /**取subnameserer*/
domainAndSubnamseserverArray += (domain + "|" + sub_nameserver) /**将domain跟每个subnameserer的组合,放到上面的可变数组中*/
}
domainAndSubnamseserverArray.distinct /**对数组去重,并通过flatMap,把数值的内容,转成多行*/
}).map(domainAndNameserver => {
val array = domainAndNameserver.split("\\|") /**拆开subnameserver跟domain*/
val subnameserver = array(1)
val domain = array(0)
(subnameserver, "\"" + domain + "\"") /**将subnameserver跟domain组成1个二元元组,注意domain用双引号引起来了,方便后续显示*/
}).rdd.reduceByKey(_+","+_) /**因为需要用到reduceByKey,所以需要把原本的dataset,转成rdd,这里将每个domain,用逗号给拼接起来,为了后续显示*/
.map(kv => {
val subnameserver = kv._1 /**取出subnameserver*/
val domainArray = kv._2 /**取出最终需要的domain array*/
val domainSize = domainArray.split(",").size /**得到domain array的size*/
(subnameserver, "[" + domainArray + "]", domainSize) /**添加中括号,组合成最终需要的结果*/
}).toDF("subnameserver", "domain_array", "domain_size") /**添加最终的schema*/
targetDF.show(false)
}
}
代码中的几乎每一行,我都进行了详细的注释,相信你一定能看懂。
这里面的难点,是要把原来「一对多」中,「多」的内容提取出来(subnameserver),变成结果需要的,另一个「一对多」中的「一」。
下面就是我调试成功后,跑出来的结果截图:
有图有真相,绝不蒙你。
3. flatMap 跟 reduceByKey
其实从 Spark 算子来解决一般的数据计算问题来看,这个例子已经不算简单了。
flatMap 跟 reduceByKey 这两个算子,我相信绝大部分同学都用过(别说你没写过word count),但有几个真的能深刻理解它们意思的?
以我多年的面试经验来看,能回答得让人满意的,寥寥无几。
从本质上来说,这个例子其实就是 word count 的一个变种。
先说 flatMap:
它的作用,就是把一个数组(容器)里面的所有元素,由原本的「一行数据」,映射成「多行数据」。
但是这里面其实有个限制:那就是这个容器,它必须是 Scala 的内部容器,不能是 Java 的。
这个算子最大的特点在于:你要是能想得到它,那你就能很快用好它;而如果你想不到它,那你就怎么着都想不到(好像句废话,但给我的感觉确实如此)。
再看 reduceByKey:
这个是我认为 Spark 的所有算子中,设计得最聪明的一个,没有之一。
非常好用,且非常高效,有没有?
它的核心意思是:所有 key 相同的数据(只能是二元组),其 value,两两合并,至于 value 怎么合并?相加、相乘、相除,还是用其他什么复杂的逻辑,随便,都行。
它最大的约束在于:对 value 合并之后的结果,其类型,必须还得跟原 value 一样。
最后
以上提供的 Spark API,是我在短时间内能想到的,最简单,但稍微有点笨拙的解决办法了。
当然,如果你的 SQL 玩的溜,说不定可以提供另一个思路的解法。