温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Hadoop中数据倾斜的示例分析

发布时间:2021-12-08 10:08:33 来源:亿速云 阅读:147 作者:小新 栏目:云计算

这篇文章给大家分享的是有关Hadoop中数据倾斜的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

数据分布:

    正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , 不同的数据字段可能的数据倾斜一般有两种情况:

一种是唯一值非常少,极少数值有非常多的记录值(唯一值少于几千)

一种是唯一值比较多,这个字段的某些值有远远多于其他值的记录数,但是它的占比也小于百分之一或千分之一

分区:

常见的mapreduce分区方式为hash 和range ,

hash partition 的好处是比较弹性,跟数据类型无关,实现简单(设定reduce个数就好,一般不需要自己实现)

range partition 需要实现者自己了解数据分布, 有时候需要手工做sample取样. 同时也不够弹性, 表现在几个方面,1. 对同一个表的不同字段都需要实现不同的range partition,  对于时间这种字段根据查询类型的不同或者过滤条件的不同切分range 的大小都不一定.

2 .有时候可能设计使用多个字段组合的情况, 这时候又不能使用之前单个字段的partition 类, 并且多个字段组合之间有可能有隐含的联系,比如出生日期和星座,商品和季节.

3. 手工做sample 非常耗时间,需要使用者对查询使用的数据集的分布有领域知识.

4. 分配方式是死的,reduce 个数是确定的,一旦某种情况下发生倾斜,调整参数

其他的分区类型还有hbase 的hregionpartitioner  或者totalorder partitioner  等.

能够想到的关于数据倾斜的一些解决方式(欢迎补充,尤其是有没有做搜索或者数据挖掘的朋友有碰到类似问题):

1. 增加reduce 的jvm内存

2. 增加reduce 个数

3. customer partition

4. 其他优化的讨论.

5. reduce sort merge排序算法的讨论

6. 正在实现中的hive skewed join.

7. pipeline

8. distinct

9. index 尤其是bitmap index

方式1:既然reduce 本身的计算需要以合适的内存作为支持,在硬件环境容许的情况下,增加reduce 的内存大小显然有改善数据倾斜的可能,这种方式尤其适合数据分布第一种情况,单个值有大量记录, 这种值的所有纪录已经超过了分配给reduce 的内存,无论你怎么样分区这种情况都不会改变. 当然这种情况的限制也非常明显, 1.内存的限制存在,2.可能会对集群其他任务的运行产生不稳定的影响.

方式2:  这个对于数据分布第二种情况有效,唯一值较多,单个唯一值的记录数不会超过分配给reduce 的内存. 如果发生了偶尔的数据倾斜情况,增加reduce 个数可以缓解偶然情况下的某些reduce 不小心分配了多个较多记录数的情况. 但是对于第一种数据分布无效.

方式3: 一种情况是某个领域知识告诉你数据分布的显著类型,比如hadoop definitive guide 里面的温度问题,一个固定的组合(观测站点的位置和温度) 的分布是固定的, 对于特定的查询如果前面两种方式都没用,实现自己的partitioner 也许是一个好的方式.

方式4: 目前有的一些针对数据倾斜的优化比如pig 的skewed join

http://pig.apache.org/docs/r0.7.0/piglatin_ref1.html#Skewed+Joins

pig 文档上面说是根据数据输入的统计信息来确定分区(也就是range partition?),另外不清楚这个行为是否是动态运行时候才决定的,也就是运行之前有一步pig 自动做sample 的工作,因为pig 是没有统计信息这一说的.

hive 中的group by

<property> 
  <name>hive.groupby.skewindata</name> 
  <value>false</value> 
  <description>Whether there is skew in data to optimize group by queries</description> 
</property> 
<property> 
  <name>hive.optimize.groupby</name> 
  <value>true</value> 
  <description>Whether to enable the bucketed group by from bucketed partitions / tables.</description> 
</property>

<property> 
  <name>hive.mapjoin.followby.map.aggr.hash.percentmemory</name> 
  <value>0.3</value> 
  <description>Portion of total memory to be used by map-side grup aggregation hash table, when this group by is followed by map join</description> 
</property> 
<property> 
  <name>hive.groupby.mapaggr.checkinterval</name> 
  <value>100000</value> 
  <description>Number of rows after which size of the grouping keys/aggregation classes is performed</description> 
</property>

其中最后一个参数hive.groupby.mapaggr.checkinterval 的思路跟in-memory combiner 相似, in-memeory combiner  是发生在mapper 端sort 之前,而不是现在的combiner发生在mapper sort 之后甚至在写入磁盘之后重新读磁盘然后排序合并. in-memeory combiner 最早好像是《Data-Intensive Text Processing with MapReduce》,mapr 去年的介绍ppt 里面好像提到它们也有这个优化. mapper 端减少数据的机会比reduce 端的要大,所以一般不会看到reduce 端的combiner 的讨论,但是这种思路也有,比如google tenzing 的join 讨论里面有一个prev-next 的小优化就是基于reduce 端的combiner, 但那个前提是基于block shuffle 实现的基础上,数据已经排过序了,所以join 时候前一条数据跟后一条数据相同的概率很大.

hive 中的skewed join :  之前的文章已经介绍过两表join 中hive 的几个优化,其中的skewed join 的类似思路就是上面介绍的skewed 的第二种:增加reduce 的个数,hive 中是通过判断阈值如果大于一个reduce 需要处理的数据量,重新起额外的task 来处理这些超额的reduce 本身需要处理的数据, 这是一种较晚的补救措施,本身hive 开始分区的时候已经倾斜(partition 的方式不合理), 当运行的时候通过运行时监控reduce 发现倾斜的特殊key 然后额外的起task 去处理,效果比较一般,感兴趣的同学可以参考HIVE-3086 里面我和facebook 团队对这种优化思路的讨论. 第六节我会讨论一下我所认为的思路和facebook 正在做的思路之间的差别.

方式5 :  reduce 分配的内存远小于处理的数据量时,会产生multi-pass sort 的情况是瓶颈,那么就要问

1. 这种排序是有必要的嘛?

2. 是否有其他排序算法或优化可以根据特定情况降低他瓶颈的阈值?

3. map reduce 适合处理这种情况嘛?

关于问题1. 如果是group by , 那么对于数据分布情况1 ,hash 比sort 好非常多,即使某一个reduce 比其他reduce 处理多的多的数据,hash 的计算方式也不会差距太大.

问题2. 一个是如果实现block shuffle 肯定会极大的减少排序本身的成本, 另外,如果分区之后的reduce 不是使用copy –> sort-merge –> reduce 的计算方式, 在copy 之后将每个block 的头部信息保存在内存中,不用sort – merge 也可以直接计算reduce, 只不过这时候变成了随机访问,而不是现在的sort-merge 之后的顺序访问. block shuffle 的实现有两种类型,一种是当hadoop 中真正有了列数据格式的时候,数据有更大的机会已经排过序并且按照block 来切分,一般block 为1M ( 可以关注avro-806 )  , 这时候的mapper 什么都不做,甚至连计算分区的开销都小了很多倍,直接进入reduce 最后一步,第二种类型为没有列数据格式的支持,需要mapper 排序得到之后的block 的最大最小值,reduce 端在内存中保存最大最小值,copy  完成后直接用这个值来做随机读然后进行reduce. ( block shuffle  的实现可以关注 MAPREDUCE-4039 , hash 计算可以关注 MAPREDUCE-1639)

问题3 . map reduce 只有两个函数,一个map 一个 reduce, 一旦发生数据倾斜就是partition 失效了,对于join 的例子,某一个key 分配了过多的记录数,对于只有一次partittion的机会,分配错了数据倾斜的伤害就已经造成了,这种情况很难调试,但是如果你是基于map-reduce-reduce 的方式计算,那么对于同一个key 不需要分配到同一个reduce 中,在第一个reduce 中得到的结果可以在第二个reduce 才汇总去重,第二个reduce 不需要sort – merge 的步骤,因为前一个reduce 已经排过序了,中间的reduce 处理的数据不用关心partition 怎么分,处理的数据量都是一样大,而第二个reduce 又不使用sort-merge 来排序,不会遇到现在的内存大小的问题,对于skewed join 这种情况瓶颈自然小很多.

方式6:  目前hive 有几个正在开发中的处理skewed join 情况的jira case,  HIVE-3086 , HIVE-3286 ,HIVE-3026 . 简单介绍一下就是facebook 希望通过手工处理提前枚举的方式列出单个倾斜的值,在join 的时候将这些值特殊列出当作map join 来处理,对于其他值使用原来的方式. 我个人觉得这太不伸缩了,值本身没有考虑应用过滤条件和优化方式之后的数据量大小问题,他们提前列出的值都是基于整个分区的. join key 如果为组合key 的情况也应该没有考虑,对metastore 的储存问题有限制,对输入的大表和小表都会scan 两次( 一次处理非skew key , 一次处理skew key 做map join), 对输出表也会scan 两次(将两个结果进行merge) , skew key 必须提前手工列出这又存在额外维护的成本,目前因为还没有完整的开发完到能够投入生产的情况,所以等所有特性处理完了有了文档在看看这个处理方式是否有效,我个人认为的思路应该是接着bucked map join 的思路往下走,只不过不用提前处理cluster key 的问题, 这时候cluster key 的选择应该是join key + 某个能分散join key 的列, 这等于将大表的同一个key 的值分散到了多个不同的reduce 中,而小表的join key 也必须cluster 到跟大表对应的同一个key , join 中对于数据分布第二种情况不用太难,增加reduce 个数就好,主要是第一种,需要大表的join key 能够分散,对于同样join key 的小表又能够匹配到所有大表中的记录. 这种思路就是不用扫描大表两遍或者结果输出表,不需要提前手工处理,数据是动态sample 的应用了过滤条件之后的数据,而不是提前基于统计数据的不准确结果. 这个基本思路跟tenzing 里面描述的distributed hash join 是一样的,想办法切成合适的大小然后用hash 和 map join .

方式7: 当同时出现join 和group 的时候, 那么这两个操作应该是以pipeline (管道) 的方式执行. 在join 的时候就可以直接使用group 的操作符减少大量的数据,而不是等待join 完成,然后写入磁盘,group 又读取磁盘做group操作. HIVE-2206 正在做这个优化. hive 里面是没有pipeline 这个概念的. 像是cloudera 的crunch 或者twitter 的Scalding 都是有这种概念的.

方式8: distinct 本身就是group by 的一种简写,我原先以为count(distinct x)这种跟group by 是一样的,但是发现hive 里面distinct 明显比group by 要慢,可能跟group by 会有map 端的combiner有关, 另外观察到hive 在预估count(distinct x) 的reduce 个数比group by 的个数要少 , 所以hive 中使用count(distinct x) , 要么尽量把reduce 个数设置大,直接设置reduce 个数或者hive.exec.reducers.bytes.per.reducer 调小,我个人比较喜欢调后面一个,hive 目前的reduce 个数没有统计信息的情况下就是用map端输入之前的数值, 如果你是join 之后还用count(distinct x) 的话,这个默认值一般都会悲剧,如果有where 条件并能过滤一定数量的数据,那么默认reduce 个数可能就还好一点. 不管怎样,多浪费一点reduce slot 总比等十几甚至几十分钟要好, 或者转换成group by 的写法也不错,写成group by 的时候distributed by 也很有帮助.

方式9: hive 中的index 就是物化视图,对于group by 和distinct 的情况等于变成了map 端在做计算,自然不存在倾斜. 尤其是bitmap index , 对于唯一值比较少的列优势更大,不过index 麻烦的地方在于需要判断你的sql 是不是常用sql , 另外如果create index 的时候没有选你查询的时候用的字段,这个index 是不能用的( hive 中是永远不可能有DBMS中的用index 去lookup 或者join 原始表这种概念的)

其他建议:

网上能找到的另外一份很好的描述数据倾斜的资料是

http://nuage.cs.washington.edu/pubs/opencirrus2011.pdf

里面的map side skew 和expensive record 都不是关系型计算中的问题,所以不是这篇文章关注点. 对于关系型计算,其中数据倾斜影响最大的地方在reduce 的sort. 这篇文章里面最后总结的5点好的建议值得参考,

其中第三条需要你知道应用combiner 和特殊优化方式是否带来了性能的提升,hive 的map aggr 在数据分布情况1效果会比较好,数据分布情况2效果就不大,还有combiner 应用的时候是消耗了系统资源的,确认这种消耗是否值得而不是任何情况下都使用combiner. 

对于第四点关系型计算中map 倾斜情况不太常见. 一种可以举出来的例子是分区不合理,或者hive 中的cluster by 的key 选择不合理(都是使用目录的方式分区, 目录是最小处理单元了).

  • Use domain knowledge when choosing the 
    map output partitioning scheme if the reduce operation is 
    expensive: Range partition or some other form of explicit 
    partition may be better than the default hash-partition

  • Try different partitioning schemes on sample 
    workloads or collect the data distribution at the reduce input 
    if a MapReduce job is expected to run several times

  • Implement a combiner to reduce the amount 
    of data going into the reduce-phase and, as such, significantly 
    dampen the effects of any type of reduce-skew

  • Use a pre-processing MapReduce job that 
    extracts properties of the input data in the case of a longruning, 
    skew-prone map phase. Appropriately partitioning the 
    data before the real application runs can significantly reduce 
    skew problems in the map phase.

  • Best Practice 5. Design algorithms whose runtime depends 
    only on the amount of input data and not the data distribution.

    另外一份是淘宝的数据倾斜总结:

    http://www.alidata.org/archives/2109

    不过我个人觉得帮助不是太大,里面第一个解决方式空值产生的影响第一个Union All 的方式个人是极力反对的,同一个表尤其是大表扫描两遍这额外的成本跟收益太不匹配,不推荐,第二个将特殊值变成random 的方式, 这个产生的结果是正确的嘛? 尤其是在各种情况下输出结果是正确的嘛?里面背景好像是那个小表users 的主键为userid, 然后userid 又是join key , 而且还不为空? 不太推荐,背景条件和输出的正确性与否存疑.

    第二个数据类型不同的问题我觉得跟HIVE-3445 都算是数据建模的问题,提前修改好是一样的.

    第三个是因为淘宝的hadoop 版本中没有map side hash aggr 的参数吧. 而且写成distinct 还多了一个MR 步骤,不太推荐.

     

    数据倾斜在MPP 中也是一个课题,这也设计到一个数据重分配的问题,但是相对于MPP 中有比较成熟的机制,一个是mpp 在处理数据初始分布的时候总是会指定segmented by 或者distributed by 这种显示分配到不同物理机器上的建表语句. 还有就是统计信息会帮助执行引擎选择合适的重新分布.但是统计信息也不是万能的,比如

    1:统计信息的粒度和更新问题.

    2: 应用了过滤条件之后的数据也许不符合原始期望的数据分布.

    3: 统计信息是基于采样的,总于真实所有数据存在误差.

    4: 统计信息是基于partittion 的, 对于查询没有涉及到partition 字段的切分就不能使用各partition 只和来表示总体的统计信息.

    5. 临时表或者多步骤查询的中间过程数据没有统计信息的情况.

    6. 各种其他的算法优化比如in-mapper combiner 或者google Tenzing 的prev – next combine 都会影响统计信息对于算法选择的不同.

感谢各位的阅读!关于“Hadoop中数据倾斜的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI