山东大学软件工程应用与实践——Spark(13)代码分析

2021SC@SDUSC

1.执行物理执行计划:

   经过分析、优化、逻辑计划转换为物理计划的懒执行,最终调用SparkPlan的execute方法执行物理计划。以execution.Project为例,其execute方法见代码:

Project及其execute方法:


@DeveloperApi
case class Project (projectList: Seq [NamedExpression], chi1d: SparkPlan) extends
    UnaryNode {
    override def output = projectList .map(_.toAttribute)

    @transient 1azy val bui ldProjection = newMutableProjection (projectList, child.output)

    def execute() = child.execute().mapPartitions { iter =>
        val resuableProjection = bu1ldProjection ().
        iter.map (resuableProjection)
    }
}

        Project的execute方法执行步骤:

        1.调用child的execute方法,以保证将要投影的输入数据已经经过处理。

        2.调用SparkOlan的newMutableProjection来处理其投影操作,newMutableProjection的实现代码:

SparkPlan的newMutableProjection方法:


protected def newMutableProjection(
    expressions: Seq[Expression],
    inputSchema: Seq[Attribute]): () => MutableProjection = {

    log.debug (
    s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen: $codegenEnabled")
    if (codegenEnabled) {
        GenerateMutableProjection (expressions,inputSchema)
    } else {
        () => new InterpretedMutableProjection(expressions, inputSchema)
    }
}

        newMuablePrijction默认情况下使用InterpretedMutableProjection处理投影,其实现见代码。BindReferences. bindReference再次使用了transform 方法,用于给表达式绑定引用,比如将List(name#1)替换为List(input[1])。最终的投影由InterpretedMutableProjection的apply方法来完成。BindReferences.bindReference 的实现见代码:

Projection.scala中的InterpretedMutableProjection实现:


case class InterpretedMutableProjection (expressions: Seq[Expression]) extends
MutableProjection {
    def this lexpressions: Seq lExpression], inputSchema: Seq[Attribute]l =
        this (expressions.map (BindRe ferences.bindReference(_, input Schema1)l

    privatelthis] val exprArray = expressions.toArray
    private[this] var mutableRow: MutableRow = new GenericMutableRow (exprArray.size)
    def currentvalue: Row = mutableRow

    override def target (xow: MutableRow): MutableProjectlon = {
        mutableRow = row
        this
    }

    override def apply(input: Row): Row - {
        var i = 0
        while (i < exptArray.length) {
            mutableRow(i) = exprArray(i).eval (input)
            i += 1
        }
        mutableRow
    }
}

BindReferences. bindReference的实现


object BindReferences extends Logging {

    def bindReference[A <: Expression] (
        expression: A,
        input: Seq [Attribute],
        allowFailures: Boolean = false): A = {
    expression. transform { case a: AttributeReference =>
        attachTree(a, "Binding attribute") (
        val ordinal = input. indexWhere(_.exprId = a.exprId)
        if (ordinal == -1) {
            if (allowFailures) (
                a
            } else {
                    sys .error(s"Couldn't find $a in ${input.mkString("[",",", "]")}")
                }
            } else {
                BoundReference (ordinal, a.dataType, a.nullable)
            }
        }    
    }.asInstanceof[A] // Kind of a hack, but safe. TODO: Tighten return type when possible.
    }
}

再以execution.Filter为例,其execute方法见代码:
Filter的execute方法的执行步骤如下:
1 )调用child的execute方法,以保证将要过滤的输入数据已经经过处理。
2)调用SparkPlan的newPredicate来处理其过滤操作,newPredicate 的实现见代码:

Filter及其excute方法:


@DeveloperApi
case class Filter (condition: Expression, child: SparkPlan) extends UnaryNode {
    override def output = child. output

    @transient lazy val conditionEvaluator = newPredicate (condition, child. output)

    def execute() = child. execute() .mapPartitions { iter =>
        iter. filter (conditionEvaluator)
    }
}

        newPredicate默认使用InterpretedPredicate处理过滤,其实现见下面代码,BindReferences. bindReference方法在此处将(age#0 >= 13) && (age#0 <= 19))转换为[(input[0] >=13), (input[0] <= 19)]。 最终的过滤由InterpretedPredicate 的第二个apply 方法来完成。

SparkPlan的newPredicate方法:


protected def newPredicate (
        expression: Expression, inputSchema: Seq [Attribute]): (Row) => Boolean = {
    if (codegenEnabled){
        GeneratePredicate (expression, inputSchema)
    } else{
        InterpretedPredicate (expression,inputSchema)
    }
}

InterpretedPredicate的实现:


object InterpretedPredicate {
    def apply (expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) =
        apply (BindReferences.bindReference (expression, inputSchema) )
    
    def apply (expression: Expression) : (Row => Boolean) = {
        (r: Row) => expression.eval (r).asInstanceOf[Boolean]
    }
}

        execution.Project和execution.Filter都有child,并不是所有的SparkPlan的子类都有child。
        比如execution.PhysicalRDD是没有child的,因为execution.PhysicalRDD一般是作为最底层的LogicalPlan 节点,其代码实现如下。

case class PhysicalRDD (output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
override def execute () = rdd 
}

        基于整个SparkPlan的execute体系,就可以保证先执行低层(孩子)的SparkPlan的转换动作,然后才执行当前SparkPlan的转换动作,最终完成SQL的执行。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>