加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Adaptive Execution 让 Spark SQL 更智能更高效

发布时间:2018-10-25 04:38:00 所属栏目:教程 来源:郭俊 Jason Guo
导读:本文转发自技能天下,原文链接 http://www.jasongj.com/spark/adaptive_execution/ 1 配景 前面《Spark SQL / Catalyst 内部道理 与 RBO》与《Spark SQL 机能优化再进一步 CBO 基于价钱的优化》先容的优化,从查询自己与方针数据的特点的角度尽也许担保了

Shuffle Write 竣事后,可从每个 ShuffleMapTask 的 MapStatus 中统计获得按原打算执行时 Stage 2 各 Partition 的数据量以及 Stage 2 必要读取的总数据量。(一样平常来说,Partition 是 RDD 的属性而非 Stage 的属性,本文为了利便,不区分 Stage 与 RDD。可以简朴以为一个 Stage 只有一个 RDD,此时 Stage 与 RDD 在本文接头范畴内等价)

假如个中一个 Stage 的数据量较小,得当行使 BroadcastJoin,无须继承执行 Stage 2 的 Shuffle Read。相反,可操作 Stage 0 与 Stage 1 的数据举办 BroadcastJoin,如下图所示

Adaptive Execution 让 Spark SQL 更智能更高效

Spark SQL Auto BroadcastJoin

详细做法是

  • 将 Stage 1 所有 Shuffle Write 功效广播出去
  • 启动 Stage 2,Partition 个数与 Stage 0 一样,都为 3
  • 每个 Stage 2 每个 Task 读取 Stage 0 每个 Task 的 Shuffle Write 数据,同时与广播获得的 Stage 1 的全量数据举办 Join

注:广播数据存于每个 Executor 中,其上全部 Task 共享,无须为每个 Task 广播一份数据。上图中,为了更清楚展示为什么可以或许直接 Join 而将 Stage 2 每个 Task 方框内都安排了一份 Stage 1 的全量数据

固然 Shuffle Write 已完成,将后续的 SortMergeJoin 改为 Broadcast 如故能晋升执行服从

  • SortMergeJoin 必要在 Shuffle Read 时对来自 Stage 0 与 Stage 1 的数据举办 Merge Sort,而且也许必要 Spill 到磁盘,开销较大
  • SortMergeJoin 时,Stage 2 的全部 Task 必要取 Stage 0 与 Stage 1 的全部 Task 的输出数据(假若有它要的数据 ),会造成大量的收集毗连。且当 Stage 2 的 Task 较多时,会造成大量的磁盘随机读操纵,服从不高,且影响沟通呆板上其余 Job 的执行服从
  • SortMergeJoin 时,Stage 2 每个 Task 必要从险些全部 Stage 0 与 Stage 1 的 Task 取数据,无法很好操作 Locality
  • Stage 2 改用 Broadcast,每个 Task 直接读取 Stage 0 的每个 Task 的数据(一对一),可很好操作 Locality 特征。最亏得 Stage 0 行使的 Executor 上直接启动 Stage 2 的 Task。假如 Stage 0 的 Shuffle Write 数据并未 Spill 而是在内存中,则 Stage 2 的 Task 可直接读取内存中的数据,服从很是高。假若有 Spill,那可直接从当地文件中读取数据,且是次序读取,服从远比通过收集随机读数据服从高

3.5 行使与优化要领

该特征的行使方法如下

  • 当 spark.sql.adaptive.enabled 与 spark.sql.adaptive.join.enabled 都配置为 true 时,开启 Adaptive Execution 的动态调解 Join 成果
  • spark.sql.adaptiveBroadcastJoinThreshold 配置了 SortMergeJoin 转 BroadcastJoin 的阈值。假如不配置该参数,该阈值与 spark.sql.autoBroadcastJoinThreshold 的值相称
  • 除了本文所述 SortMergeJoin 转 BroadcastJoin,Adaptive Execution 还可提供其余 Join 优化计策。部门优化计策也许会必要增进 Shuffle。spark.sql.adaptive.allowAdditionalShuffle 参数抉择了是否应承为了优化 Join 而增进 Shuffle。其默认值为 false

4 自动处理赏罚数据倾斜

4.1 办理数据倾斜典范方案

《Spark机能优化之道——办理Spark数据倾斜(Data Skew)的N种姿势》一文报告了数据倾斜的危害,发生缘故起因,以及典范办理要领

  • 担保文件可 Split 从而停止读 HDFS 时数据倾斜
  • 担保 Kafka 各 Partition 数据平衡从而停止读 Kafka 引起的数据倾斜
  • 调解并行度或自界说 Partitioner 从而分手分派给统一 Task 的大量差异 Key
  • 行使 BroadcastJoin 取代 ReduceJoin 消除 Shuffle 从而停止 Shuffle 引起的数据倾斜
  • 对倾斜 Key 行使随机前缀或后缀从而分手大量倾斜 Key,同时将参加 Join 的小表扩容,从而担保 Join 功效的正确性

4.2 自动办理数据倾斜

今朝 Adaptive Execution 可办理 Join 时数据倾斜题目。其思绪可领略为将部门倾斜的 Partition (倾斜的判定尺度为该 Partition 数据是全部 Partition Shuffle Write 中位数的 N 倍) 举办单独处理赏罚,相同于 BroadcastJoin,如下图所示

Adaptive Execution 让 Spark SQL 更智能更高效

Spark SQL resolve joinm skew

在上图中,阁下双方别离是参加 Join 的 Stage 0 与 Stage 1 (现实应该是两个 RDD 举办 Join,但犹如上文所述,这里不区分 RDD 与 Stage),中间是获取 Join 功效的 Stage 2

明明 Partition 0 的数据量较大,这里假设 Partition 0 切合“倾斜”的前提,其余 4 个 Partition 未倾斜

以 Partition 对应的 Task 2 为例,它需获取 Stage 0 的三个 Task 中全部属于 Partition 2 的数据,并行使 MergeSort 排序。同时获取 Stage 1 的两个 Task 中全部属于 Partition 2 的数据并行使 MergeSort 排序。然后对二者举办 SortMergeJoin

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读