Shuffle Write 竣事后,可从每个 ShuffleMapTask 的 MapStatus 中统计获得按原打算执行时 Stage 2 各 Partition 的数据量以及 Stage 2 必要读取的总数据量。(一样平常来说,Partition 是 RDD 的属性而非 Stage 的属性,本文为了利便,不区分 Stage 与 RDD。可以简朴以为一个 Stage 只有一个 RDD,此时 Stage 与 RDD 在本文接头范畴内等价)
假如个中一个 Stage 的数据量较小,得当行使 BroadcastJoin,无须继承执行 Stage 2 的 Shuffle Read。相反,可操作 Stage 0 与 Stage 1 的数据举办 BroadcastJoin,如下图所示

Spark SQL Auto BroadcastJoin
详细做法是
- 将 Stage 1 所有 Shuffle Write 功效广播出去
- 启动 Stage 2,Partition 个数与 Stage 0 一样,都为 3
- 每个 Stage 2 每个 Task 读取 Stage 0 每个 Task 的 Shuffle Write 数据,同时与广播获得的 Stage 1 的全量数据举办 Join
注:广播数据存于每个 Executor 中,其上全部 Task 共享,无须为每个 Task 广播一份数据。上图中,为了更清楚展示为什么可以或许直接 Join 而将 Stage 2 每个 Task 方框内都安排了一份 Stage 1 的全量数据
固然 Shuffle Write 已完成,将后续的 SortMergeJoin 改为 Broadcast 如故能晋升执行服从
- SortMergeJoin 必要在 Shuffle Read 时对来自 Stage 0 与 Stage 1 的数据举办 Merge Sort,而且也许必要 Spill 到磁盘,开销较大
- SortMergeJoin 时,Stage 2 的全部 Task 必要取 Stage 0 与 Stage 1 的全部 Task 的输出数据(假若有它要的数据 ),会造成大量的收集毗连。且当 Stage 2 的 Task 较多时,会造成大量的磁盘随机读操纵,服从不高,且影响沟通呆板上其余 Job 的执行服从
- SortMergeJoin 时,Stage 2 每个 Task 必要从险些全部 Stage 0 与 Stage 1 的 Task 取数据,无法很好操作 Locality
- Stage 2 改用 Broadcast,每个 Task 直接读取 Stage 0 的每个 Task 的数据(一对一),可很好操作 Locality 特征。最亏得 Stage 0 行使的 Executor 上直接启动 Stage 2 的 Task。假如 Stage 0 的 Shuffle Write 数据并未 Spill 而是在内存中,则 Stage 2 的 Task 可直接读取内存中的数据,服从很是高。假若有 Spill,那可直接从当地文件中读取数据,且是次序读取,服从远比通过收集随机读数据服从高
3.5 行使与优化要领
该特征的行使方法如下
- 当 spark.sql.adaptive.enabled 与 spark.sql.adaptive.join.enabled 都配置为 true 时,开启 Adaptive Execution 的动态调解 Join 成果
- spark.sql.adaptiveBroadcastJoinThreshold 配置了 SortMergeJoin 转 BroadcastJoin 的阈值。假如不配置该参数,该阈值与 spark.sql.autoBroadcastJoinThreshold 的值相称
- 除了本文所述 SortMergeJoin 转 BroadcastJoin,Adaptive Execution 还可提供其余 Join 优化计策。部门优化计策也许会必要增进 Shuffle。spark.sql.adaptive.allowAdditionalShuffle 参数抉择了是否应承为了优化 Join 而增进 Shuffle。其默认值为 false
4 自动处理赏罚数据倾斜
4.1 办理数据倾斜典范方案
《Spark机能优化之道——办理Spark数据倾斜(Data Skew)的N种姿势》一文报告了数据倾斜的危害,发生缘故起因,以及典范办理要领
- 担保文件可 Split 从而停止读 HDFS 时数据倾斜
- 担保 Kafka 各 Partition 数据平衡从而停止读 Kafka 引起的数据倾斜
- 调解并行度或自界说 Partitioner 从而分手分派给统一 Task 的大量差异 Key
- 行使 BroadcastJoin 取代 ReduceJoin 消除 Shuffle 从而停止 Shuffle 引起的数据倾斜
- 对倾斜 Key 行使随机前缀或后缀从而分手大量倾斜 Key,同时将参加 Join 的小表扩容,从而担保 Join 功效的正确性
4.2 自动办理数据倾斜
今朝 Adaptive Execution 可办理 Join 时数据倾斜题目。其思绪可领略为将部门倾斜的 Partition (倾斜的判定尺度为该 Partition 数据是全部 Partition Shuffle Write 中位数的 N 倍) 举办单独处理赏罚,相同于 BroadcastJoin,如下图所示

Spark SQL resolve joinm skew
在上图中,阁下双方别离是参加 Join 的 Stage 0 与 Stage 1 (现实应该是两个 RDD 举办 Join,但犹如上文所述,这里不区分 RDD 与 Stage),中间是获取 Join 功效的 Stage 2
明明 Partition 0 的数据量较大,这里假设 Partition 0 切合“倾斜”的前提,其余 4 个 Partition 未倾斜
以 Partition 对应的 Task 2 为例,它需获取 Stage 0 的三个 Task 中全部属于 Partition 2 的数据,并行使 MergeSort 排序。同时获取 Stage 1 的两个 Task 中全部属于 Partition 2 的数据并行使 MergeSort 排序。然后对二者举办 SortMergeJoin
(编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|