Spark3 AQE之自动合并Shuffle partition源码解读

Branch:spark-3.0
有不对的地方欢迎各位大佬批评指正!
相关参数:
spark.sql.adaptive.enabled AQE是否开启
spark.sql.adaptive.coalescePartitions.enabled 分区合并是否开启
spark.sql.adaptive.coalescePartitions.minPartitionNum 合并后最小的分区数,下文我们简称为minPartitionNum
spark.sql.adaptive.advisoryPartitionSizeInBytes 开发者建议的分区大小,下文我们简称为advisoryPartitionSizeInBytes

一、代码入口

类:AdaptiveSparkPlanExec.scala
入口:queryStageOptimizerRules —> CoalesceShufflePartitions —> coalescePartitions

  @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
    ReuseAdaptiveSubquery(conf, context.subqueryCache),
    CoalesceShufflePartitions(context.session),
    // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
    // added by `CoalesceShufflePartitions`. So they must be executed after it.
    OptimizeSkewedJoin(conf),
    OptimizeLocalShuffleReader(conf)
  )

二、流程

1、判断

判断AQE是否开启、所有叶子结点是否都是查询阶段(如果不是的话合并分区会破坏所有子节点具有相同数量的输出分区假设)

if (!conf.coalesceShufflePartitionsEnabled) {
      return plan
    }
if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])
        || plan.find(_.isInstanceOf[CustomShuffleReaderExec]).isDefined) {
      // If not all leaf nodes are query stages, it's not safe to reduce the number of
      // shuffle partitions, because we may break the assumption that all children of a spark plan
      // have same number of output partitions. 
      return plan
}

2、收集Stage

将一棵树所有节点的ShuffleStage收集起来,为接下来分区合并使用

    def collectShuffleStages(plan: SparkPlan): Seq[ShuffleQueryStageExec] = plan match {
      case stage: ShuffleQueryStageExec => Seq(stage)
      case _ => plan.children.flatMap(collectShuffleStages)
    }

    val shuffleStages = collectShuffleStages(plan)

3、执行

首先会判断这些Shuffle是否能够进行分区合并,如果不能的话会直接将plan返回

if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) {
      plan

若Shuffle的分区已被确定好,则此Stage也会跳过

val validMetrics = shuffleStages.flatMap(_.mapStats)

多个Task进行Shuffle时,每个Task需要具有相同的分区数才能进行合并

val distinctNumPreShufflePartitions =
        validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1){
........
}

取最小的分区数,如果未定义则取Spark默认的并行度(为了避免分区合并后的性能退化)

val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
          .getOrElse(session.sparkContext.defaultParallelism)

进入真正执行的方法
参数:validMetrics.toArray(shuffleID和每个分区的大小)
advisoryTargetSize(开发者定义的分区大小默认值64M)
minNumPartitions(最小分区数)

val partitionSpecs = ShufflePartitionsUtil.coalescePartitions(
          validMetrics.toArray,
          advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
          minNumPartitions = minPartitionNum)

4、coalescePartitions真正执行分区合并的方法

advisoryTargetSize数值的重新设定
如果inputSize=1000M 10分区 而设置advisoryTargetSize为200M 则通过一下计算会排除200M这个设置
advisoryTargetSize=maxTargetSize

// 所有分区的总大小
val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
// 分区大小的最大值
val maxTargetSize = math.max(
      math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16)
// 确定真正的分区大小(避免上述例子的情况出现)
val targetSize = math.min(maxTargetSize, advisoryTargetSize)

分区合并

    while (i < numPartitions) {// 我们从所有shuffle中计算第i个shuffle分区的总大小 对于每个task中的每个相邻分区合并,直到不大于targetSize
      // 从所有洗牌中计算第i次洗牌分区的总大小。
      var totalSizeOfCurrentPartition = 0L
      var j = 0
      while (j < mapOutputStatistics.length) {// 对每个shuffle中的partition进行合并
        totalSizeOfCurrentPartition += mapOutputStatistics(j).bytesByPartitionId(i)
        j += 1
      }

      // 如果包含' totalSizeOfCurrentPartition '将超过目标大小,则启动一个新的合并分区。
      if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition > targetSize) {
        partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)
        latestSplitPoint = i
        // 重置postShuffleInputSize
        coalescedSize = totalSizeOfCurrentPartition
      } else {
        coalescedSize += totalSizeOfCurrentPartition
      }
      i += 1
    }

最后 将合并的分区返回

    partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions)

    partitionSpecs

官方例子(说实话这例子我还没验证成功)

 * For example, we have two shuffles with the following partition size statistics:
   *  - shuffle 1 (5 partitions): [100 MiB, 20 MiB, 100 MiB, 10MiB, 30 MiB]
   *  - shuffle 2 (5 partitions): [10 MiB,  10 MiB, 70 MiB,  5 MiB, 5 MiB]
   * Assuming the target size is 128 MiB, we will have 4 coalesced partitions, which are:
   *  - coalesced partition 0: shuffle partition 0 (size 110 MiB)
   *  - coalesced partition 1: shuffle partition 1 (size 30 MiB)
   *  - coalesced partition 2: shuffle partition 2 (size 170 MiB)
   *  - coalesced partition 3: shuffle partition 3 and 4 (size 50 MiB)
   *
   *  @return A sequence of [[CoalescedPartitionSpec]]s. For example, if partitions [0, 1, 2, 3, 4]
   *          split at indices [0, 2, 3], the returned partition specs will be:
   *          CoalescedPartitionSpec(0, 2), CoalescedPartitionSpec(2, 3) and
   *          CoalescedPartitionSpec(3, 5).

三、结束

结合官方给出的对应单侧看源码能够更快的理解
分区合并对应的测试类ShufflePartitionsUtilSuite

引流一波,强烈推荐极客时间吴磊老师讲的spark课程,非常非常好!看完后受益匪浅,而且从每节课下面的评论都能学到很多东西!!!
强烈推荐!!!
做与Spark相关工作的小伙伴可无脑入手!

在这里插入图片描述
在这里插入图片描述

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>