上周写了篇关于数据倾斜的案例,很多小伙伴私下在练习的时候,发现用我之前提供的数据源验证的结果有些不太对劲,我仔细一看:shit... 是我搞错了,为了能模拟出比较好的数据倾斜效果,我其实是用了一份新的数据源。不过好在发现这个问题之后,我及时把我做验证的数据源给上传到了网盘,大家可以安心用这个数据源做验证了,截止到目前为止,我给大家提供的数据源从之前的1个大文件,增加到了4个,覆盖的数据日期也从原来的一天,覆盖到了多天,数据总量约1.5亿,后续还会持续增加(需要数据下载地址的,私信我)。
另外,项目代码的GitHub地址为:https://github.com/Anryg/internet_behavior_project
上篇文章提到给大家一个思考题,将spark默认的分区策略,根据当前业务场景的需要进行优化,解释了spark的默认分区原理(其实主流计算引擎都是一样的),但是通过默认的分区方式,并不能很好的将目标的4个key分派到4个不同的partition中,而我们解决数据倾斜的核心目的就是要减少单个partition的数据量。虽然我们用加盐和减盐的方式,让单个热点key的value数据量能够有效减少,有效的解决了数据倾斜的问题,但是如果我们能更进一步,让数根据不同key被不同的task处理,那么就会让数据处理进一步被平均化,效率也会进一步提升。因为默认的数据分区策略为HashPartitioner,又因为哈希碰撞的问题,导致了其中两个不同的key(target_ip),进入了同一个partition中。那么解决办法就是根据当前的实际业务需要,重写新的Partitioner,方式其实也特别简单,就是新写一个分区类,让其继承Partitioner抽象类,自定义数据与partition id之间的关系,示例代码如下:package com.anryg.bigdata.test.data_skew
import org.apache.spark.Partitioner
/**
* @DESC: 实现自定义的分区策略
* @Auther: Anryg
* @Date: 2022/10/13 09:52
*/
class MyPartitioner(partitionNum: Int) extends Partitioner{
override def numPartitions: Int = partitionNum //确定总分区数量
override def getPartition(key: Any): Int = {//确定数据进入分区的具体策略
val keyStr = key.toString
val keyTag = keyStr.substring(keyStr.length - 1, keyStr.length)
keyTag.toInt % partitionNum
}
}
代码特简单,由于我们的目的在于将最终结果需要的4个key(106.38.176.185、106.38.176.117、106.38.176.118、106.38.176.116)给打散,进行平均分配。最容易想到的通过hashcode来分区是被否定的,因为默认就是这种方式,不行,那我们通过观察这4个key的特征会发现,除了最后一个数字外,其他部分都一样。
那么我们就利用这最后一个不同的数字,来确定不同key跟Partition之间的关系:把最后一个数字取出来,再跟分区数取模,得出的分区ID一定是不同的。
这样,就把4个不同的key给分配到了4个不同的partition中。

分区优化后
再看看看优化前后分区数据的分布情况:


分区优化后
还是一样,别看案例中的数据量很小,在实际项目中,数据量可能是这个的数千倍、数万倍。那样,你的优化效果将会非常的明显。
你可能会说:怎么只是针对这4个key的分区策略呢,前面不是还有加盐之后的103个key以及第一次减盐后的13个key吗,他们怎么办?首先,你可以观察用默认的分区策略时,通过spark UI查看数据的分布情况怎么样,如果分布本身比较平均的话,那就可以暂时不用优化,而如果不平均,比如出现了过多的空partition现象。那么其实优化方法是一样的,那你就再自定义一个Partitioner呗,取他们key的最后两位数字,再跟你需要的分区数量取模,是不是就可以。具体怎么来实现,相信你肯定会了,不信,你试试看...
我这有一个高质量的大数据学习讨论群,不怎么扯淡、偶尔活跃气氛、大部分时间都在讨论技术,你在群里的几乎任何大数据相关的问题,都可以得到解答,要不要进来试试呢?拉你进群啊...