来源:大数据技术与数仓
写在前面
数据倾斜,在MapReduce编程模型中十分常见,就是大量的相同key被分配到一个分区里,造成了个别task运行的非常慢,从而影响了整个任务的执行效率。
数据倾斜产生的根本原因是少数Worker处理的数据量远远超过其他Worker处理的数据量,因此少数Worker的运行时长远远超过其他Worker的平均运行时长,导致整个任务运行时间超长,造成任务延迟。
本文主要讨论group by,join以及count(distinct)查询导致的数据倾斜问题,希望本文对你有所帮助。
数据倾斜的原因
当我们看任务进度长时间维持在99%(或100%),查看任务监控页面就会发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大,这就是数据倾斜的直接表现。
导致数据倾斜常见的原因主要包括以下几个方面:
group by倾斜
场景
group by
的key
分布不均匀。例如,在大促期间,某个店铺的单品PV量达4千万以上,店铺PV量达8千万以上,导致根据商品和店铺的PV量计算IPV时,发生数据倾斜。
解决方案
配置下面的参数与SQL语句一起提交。
set hive.groupby.skewindata=true;
数据倾斜时负载均衡,当选项设定为true,生成的查询计划会有两个MRJob。第一个MRJob 中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。
join倾斜
场景
join on
的key
分布不均匀。
解决方案
大表join小表
如果join两边的表中有一张是小表,可以将join改为mapjoin来处理。
关联key中有大量NULL值
对易产生倾斜的key用单独的逻辑来处理。例如两边表的key中有大量NULL数据会导致倾斜,需要在join前先过滤掉NULL数据或补上随机数,然后再进行join。例如,某张表中,有大量未登录用户的访问记录(user_id为NULL),如果直接和用户表关联的话,会产生倾斜。这时候可以做如下处理:
SELECT *
FROM table_a a
LEFT OUTER JOIN table_b b
ON CASE WHEN a.user_id IS NULL THEN concat('dp_hive',rand() ) ELSE a.user_id END = b.user_id
;通常情况下,可能倾斜的值不是NULL,而是有意义的数据,这时候就需要对这类数据进行单独处理。
count distinct倾斜
场景
特殊值过多,常见于固定的特殊值比较多的场景,和join
中易产生倾斜的key
类似。
解决方案
先过滤特殊值,在count
结果的基础上加上特殊值的个数。或根据具体场景进行具体分析。
count distinct数据倾斜优化
DISTINCT的使用场景
数据量较小的表 数据量较大时,且使用单个DISTINCT,且GROUP BY字段没有严重的数据倾斜
案例
案例1
统计商品访问的uv
-- 优化前
SELECT sku_code
,COUNT(DISTINCT user_id) AS vst_uv
FROM dwd_log_vst_di
WHERE ds = '${cur_date}'
GROUP BY sku_code
-- 优化后
SELECT sku_code
,sum(uv) AS vst_uv
FROM (
SELECT sku_code
,user_id
,1 AS uv
FROM dwd_log_vst_di
WHERE ds = '${cur_date}'
GROUP BY sku_code
,user_id
)
GROUP BY sku_code
案例2
统计不同分组下每个标签对应的用户数
假设有一张表dwd_user_tag_df用户标签表
字段 | 注释 | 数据类型 |
---|---|---|
gid | 分组id | STRING |
user_id | 用户id | STRING |
is_tag_a | 是否是a标签,0:否,1:是 | BIGINT |
is_tag_b | 是否是b标签,0:否,1:是 | BIGINT |
is_tag_c | 是否是c标签,0:否,1:是 | BIGINT |
-- 优化前
SELECT gid
,COUNT(DISTINCT if(is_tag_a = 1,user_id,null)) AS tag_a_usr_cnt
,COUNT(DISTINCT if(is_tag_b = 1,user_id,null)) AS tag_b_usr_cnt
,COUNT(DISTINCT if(is_tag_c = 1,user_id,null)) AS tag_c_usr_cnt
FROM dwd_user_tag_df
WHERE ds = '${cur_date}'
GROUP BY gid;
-- 优化后
SELECT gid
,SUM(IF(is_a_cnt > 0 ,1,0)) AS a_cnt
,SUM(IF(is_b_cnt > 0 ,1,0)) AS b_cnt
,SUM(IF(is_c_cnt > 0 ,1,0)) AS ab_cnt
FROM (
SELECT gid
,user_id
,SUM(IF( is_tag_a = 1 ,1,0 )) AS is_a_cnt
,SUM(IF( is_tag_b = 1 ,1,0 )) AS is_b_cnt
,SUM(IF( is_tag_c = 1 ,1,0 )) AS is_c_cnt
FROM dwd_user_tag_df
WHERE ds = '${cur_date}'
GROUP BY gid
,user_id
) t
GROUP BY gid
;
案例3
数据去重统计是非常常见的一种计算方式,比如对UV指标的计算。Count(Distinct) ... Group By:计算各分组字段内不重复记录条数。由于引入Distinct,若某些值(key)记录过多,其他key的记录较少,则引发的数据分布不均匀,Count统计时会因等待较多记录的key导致数据倾斜。
一般情况下,可以通过两阶段聚合的方式进行优化
通过访问日志明细,统计dau。
常规做法
SELECT ds
,COUNT(DISTINCT user_id) AS dau
FROM dwd_log_demo
GROUP ds
优化
首先按照user_id+ds进行去重,然后在进行count
SELECT ds
,COUNT(*)
FROM (
SELECT user_id
,ds
FROM dwd_log_demo
GROUP BY user_id
,ds
) t
GROUP ds按照分区字段值ds拼接随机数,拼接一个随机数,进行两阶段去重
SELECT SPLIT_PART(rand_ds, '_',2) ds
,COUNT(DISTINCT user_id) dau
FROM (
SELECT CONCAT(ROUND(RAND(),1)*10,'_', ds) AS rand_ds
,user_id
FROM dwd_log_demo
GROUP BY CONCAT(ROUND(RAND(),1)*10,'_', ds)
,user_id
)
GROUP BY SPLIT_PART(rand_ds, '_',2)
总结
当数据量较小时,我们几乎遇不到数据倾斜等影响任务运行效率的问题,但是当数据量非常大时且又存在文中所说的使用场景,如果不对任务进行优化,那么很可能任务产出效率会很低,不能满足业务的需求。本文主要介绍引起数据倾斜的几方面原因,并对每种情况进行了说明,最后给出了count distinct数据倾斜优化的案例,希望对你有所帮助。