免费注册
大数据中的数据倾斜及解决办法

大数据中的数据倾斜及解决办法

作者: 速优云数据平台架构师
阅读数:290
更新时间:2024-02-23 12:34:50
大数据中的数据倾斜及解决办法
id="%E4%B8%80%E4%BB%80%E4%B9%88%E6%98%AF%E6%95%B0%E6%8D%AE%E5%80%BE%E6%96%9C" class="code-line" dir="auto" data-line="0">一、什么是数据倾斜

数据倾斜指mapreduce计算架构或flink等流式计算平台下,在进行计算时,发生的数据操作卡在某一个子计算任务而导致整个任务被卡住的现象。例如写hive SQL计算指标时,发现数据从1%到99%很快,确一直卡在99%很长一段时间而无法完成任务。

实际上,发生这样的原因是,在计算一条SQL的时候,某个任务被分配的key太多了。从而导致,其他任务很快完成任务,但是某个任务确一直在计算,造成『一人累死,其他人闲死』的情况。

二、MapReduce原理

什么是map任务?什么是recude任务呢?

我们知道hive的底层是通过HDFS将文件数据存在磁盘上,数据以key、value的形式存储,而map操作相当于将把key、value键值对读取出来,重新组合。然后将整理好的数据,交给recude,reduce进行聚合计算。

举个例子,如下数据:(HDFS文件存储数据时,存在行存储和列存储两种,下面以列存储为例)

HDFS中存储文件的格式如下:

(0,"hive spark hive hbase" ) (21,"hadoop hive spark hive" ) (39,"sqoop flume scala scala" )

map操作将对HDFS文件进行分隔,并将每一行分成一个的<key,value>值(初始的k值是根据偏移量来的),然后在将<key,value>值,转成如下的格式,并存入缓存中:

(hive,1),(spark,1),(hive,1),(hbase,1);
(hadoop,1),(hive,1),(spark,1), (hive,1);
(sqoop,1),(flume,1),(scala,1), (scala,1);

map操作将不同分区中的key值进行整合排序,存放到一个集合当中,格式如下:

(hive,[1,1,1,1]),(spark,[1,1]),(hbase,[1]),(hadoop,[1]),(sqoop,[1]),(flume,[1]),(scala,[1,1])

随后,Reduce任务先处理多个Map任务的输出结果,再根据分区将其分配到不同的Reduce节点上(这个过程就是shuffle);Reduce任务对多个Map的输出结果进行合并、排序、计算,生成新的 (k,v)  值,具体如下:

(hive,4),(spark,2), (scala,2) ,(hbase,1),(hadoop,1),(sqoop,1),(flume,1)

Reduce任务会将上一步输出的<k,v>写到HDFS中,生成文件。

三、数据倾斜解决方案

数据倾斜一般都发生在reduce阶段。

reduce阶段最容易发生倾斜的操作是join和count distinct

下面列举几个常见的hive数据倾斜场景,和其对应解决方案:

3.1 空值数据倾斜优化

在上报的日志信息中,通常会出现信息丢失的情况,如果用上报缺失的字段去关联相关字段时就会出现数据倾斜的问题。

案例:日志中的user_id上报缺失,如果取其中的user_id和用户表的user_id进行关联的时候就会出现数据倾斜。

解决办法: 数据倾斜主要原因是join的key值发生倾斜,key值包含很多空值或是异常值,通常的做法是,对空值或者异常值赋一个随机的值来分散key。

select * from log a left join user b on case when (a.user_id is null or a.user_id = '-' or user_id='0')
--空值和异常值处理 then concat('sql_hive',rand()) else a.user_id end = b.user_id

3.2 大表和小表的优化

在hive SQL的join操作中,mr过程是按照join的key进行分发,而在join左边的表的数据会首先被读进内存,如果左边表的key相对分散,读入内存的数据会比较小,join任务就会执行的比较快;如果左边的key比较集中并且数据量比较大时,数据倾斜就会比较严重,执行时间将会增加。

经验总结:为了能够避免数据倾斜现象,通常将数据量小的表放在join的左边,此外,还需要使用map join让小维度的表先进内存,在map完成reduce操作。

SQL操作:/*+MAPJOIN(smallTable)*/

Set hive.auto.convert.join=true Set hive.mapjoin.smalltable.filesize=25000000 select /*+MAPJOIN(b)*/ a.key,a.value from a join b on a.key = b.key -- a为大表,b为小表

上面操作主要是将小表全部读入内存中,在map阶段大表的每个map和小表进行匹配,节省了reduce阶段的时间,提高了数据执行效率。

3.3 大表和大表的join优化

  • 情况一:Map阶段输出的key数量上,导致reduce阶段的reduce数量为1

    案例:日志表中的user_id和用户表关联,user表上有500W+条记录,把user分发到所有的map开销很大,并且map join不支持大表操作。用普通的join操作,数据倾斜会产生。

    解决办法:这类问题产生的根本原因就是数据业务特性强,两个表都是大表。因此我们可以针对性的削减业务过程,比如log表中user_id有上百万个,但是每天会员的uv不会太多,有交易的会员不会很多,有点击的会员数不会很多等,在业务过程削减冗余的数据量,避免数据倾斜的发生。

    -- 思路:先过滤出需要被使用的唯一的user_id,再计算相关指标 select /*MAPJOIN(b)*/ from log a left join ( select /*MAPJOIN(b)*/ b1.* from (select distinct user_id from log) b join user b1 on b.user_id = b1.user_id ) a1 on a.user_id = a1.user_id;
  • 情况二:Map输出key分布不均匀,商品信息表的key对应大量的value,导致数据倾斜。

    案例:商品信息表a中的信息填充到商品浏览日志表b中,使用商品id进行关联。但是某些人买商品浏览量较大,造成数据偏移。

    解决办法:热点数据和非热点数据拆分处理

    select * from ( select /*MAPJOIN(i)*/ a.id, a.time, a.amount, b.name, b.loc, b.cat from a left join (select * from a where uid in ('1001','1002')) as b -- 热点子表 on a.uid = b.uid where a.uid in('1001','1002') -- 热点数据 ) union all select a.id, a.time, a.amount, b.name, b.loc, b.cat from a left join b on a.uid = b.uid where a.uid is not in('1001','1002') -- 非热点数据

3.4 count(distinct)数据倾斜优化

场景:在多个维度的同一个度量的count distinct

案例:根据月份和性别,统计买家的1月份男顾客数,女顾客数

原始方案:

SELECT seller, COUNT(DISTINCT CASE WHEN month=1 AND sex = 'M' THEN buyer END) M01_BUYER_CNT, COUNT(DISTINCT CASE WHEN month=1 AND sex = 'F' THEN buyer END) M01_FEMALE_BUYER_CNT FROM SHOP_ORDER where log_date = '20220301' group by seller

改造方案:

把DISTINCT用到的buyer,也加到group by统计上,然后再进行业务计算

with t1 as( -- 第一步:group by统计,结果存到t1中 select buyer, seller, count(case when month=1 and sex = 'M' then buyer end) s_M01_male_buyer_cnt, count(case when month=1 and sex = 'F' then buyer end) s_M01_female_buyer_cnt from SHOP_ORDER where log_date = '20220301' group by seller, buyer ) select -- 聚合目标指标 seller, sum(case when s_M01_male_buyer_cnt>0 then 1 else 0 end) as s_M01_male_buyer_cnt, sum(case when s_M01_female_buyer_cnt>0 then 1 else 0 end) as s_M01_female_buyer_cnt from t1 group by seller

3.5 经验总结

我们平时编写时需要记住以下几点:

  • 使用分区剪裁、列剪裁,分区一定要加

  • 少用 COUNT DISTINCT,group by 代替 distinct

  • 是否存在多对多的关联

  • 连接表时使用相同的关键词,这样只会产生一个 job

  • 减少每个阶段的数据量,只选出需要的,在 join 表前就进行过滤

  • 大表放后面

  • 谓词下推:where 谓词逻辑都尽可能提前执行,减少下游处理的数据量

发表评论

评论列表

暂时没有评论,有什么想聊的?

大数据中的数据倾斜及解决办法最新资讯

分享关于大数据最新动态,数据分析模板分享,如何使用低代码构建大数据管理平台和低代码平台开发软件

降雨量怎么计算?详细步骤和公式解析

降雨量怎么计算?详细步骤和公式解析 一、降雨量的定义与重要性 1.1 降雨量的定义 降雨量是指在一定时间内,从天空降落到地面上的水的量,通常以单位面积上接收到的水的深

...
2024-07-02 00:26:32
农业信息化:解决农业领域信息不对称的关键

农业信息化:解决农业领域信息不对称的关键 一、农业信息化的概念及其重要性 1.1 农业信息化的定义 农业信息化指的是利用现代信息技术手段,对农业生产、经营、管理、服务

...
2024-07-02 00:26:20
种质库是什么意思

概述“种质库是什么意思” 种质库是一个专门用于保存和保护种质资源的设施。种质资源是指具有遗传多样性的植物、动物和微生物的遗传材料,包括种子、苗木、菌种、细胞系等

...
2024-07-02 00:27:08

速优云

让监测“简单一点”

×

☺️

销售沟通:17190186096

售前咨询:15050465281

扫码加顾问微信 -->

速优物联PerfCloud官方微信