HUAHUA

Daily-Note 10-24

OVERVIEW

  1. Pytorch,机器学习框架
  2. 神经网络,由神经元构成的非线性参数模型。
  3. SparkPlanner,将 Logical Plan 转换到 Physics Plan。
  4. Spark Expression

Pytorch

TUTORIAL 1: INTRODUCTION TO PYTORCH fsdl-text-recognizer-2021-labs

神经网络

(Neural Network)

Tutorial 1: Introduction to PyTorch

BP (Back Propagation)

SparkPlanner

Spark SQL Query Engine Deep Dive (7) – Spark Planner Overview

QueryPlan,QueryPlan 继承自 TreeNode,其增加了逻辑图和物理图的一些公共属性和遍历方法。

LogicalPlan,LogicalPlan 是逻辑图中如 Aggregate/Sort/Join 等节点的基类。其基础自 QueryPlan

SparkPlan,SparkPlan 是物理算子的基类,物理算子包括 HashAggregateExec、ShuffleExchangeExec 等。与 LogicalPlan 相同,继承自 QueryPlan

QueryExecution 包括一个请求中主要流程的各个阶段,包括 UnresovledPlan Logical Plan/Logical Plan/Optimized Logical Plan/Physical Plan 等阶段。在 QueryExecution 中分别对应为 logical/analyzed/optimized/sparkPlan 等属性。

QueryExecution.createSparkPlan() 方法将 optimized 转换为 sparkPlan。

  /**
   * Transform a [[LogicalPlan]] into a [[SparkPlan]].
   *
   * Note that the returned physical plan still needs to be prepared for execution.
   */
  def createSparkPlan(
      sparkSession: SparkSession,
      planner: SparkPlanner,
      plan: LogicalPlan): SparkPlan = {
    // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
    //       but we will implement to choose the best plan.
    planner.plan(ReturnAnswer(plan)).next()
  }

这里 plan() 方法根据对应的 strategies 策略迭代地转换 optimized plan。

def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
    // Obviously a lot to do here still...

    // Collect physical plan candidates.
    val candidates = strategies.iterator.flatMap(_(plan))

    // The candidates may contain placeholders marked as [[planLater]],
    // so try to replace them by their child plans.
    val plans = candidates.flatMap { candidate =>
      val placeholders = collectPlaceholders(candidate)

      if (placeholders.isEmpty) {
        // Take the candidate as is because it does not contain placeholders.
        Iterator(candidate)
      } else {
        // Plan the logical plan marked as [[planLater]] and replace the placeholders.
        placeholders.iterator.foldLeft(Iterator(candidate)) {
          case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
            // Plan the logical plan for the placeholder.
            val childPlans = this.plan(logicalPlan)

            candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
              childPlans.map { childPlan =>
                // Replace the placeholder by the child plan
                candidateWithPlaceholders.transformUp {
                  case p if p.eq(placeholder) => childPlan
                }
              }
            }
        }
      }
    }

下面是 SparkPlanner 提供的策略:

override def strategies: Seq[Strategy] =
  experimentalMethods.extraStrategies ++
    extraPlanningStrategies ++ (
    LogicalQueryStageStrategy ::
    PythonEvals ::
    new DataSourceV2Strategy(session) ::
    FileSourceStrategy ::
    DataSourceStrategy ::
    SpecialLimits ::
    Aggregation ::
    Window ::
    JoinSelection ::
    InMemoryScans ::
    BasicOperators :: Nil)

Strategies 包括 Join/Aggregation 等策略,分别负责转换相关的 logical plan 节点。

Aggregation

Spark SQL Query Engine Deep Dive (8) – Aggregation Strategy

在应用 Aggregation 到 logical plan 时,首先匹配是否为 PhysicalAggregation 模式,并分别判断是 AggregateExpression 或 PythonUDF。

1)当为 AggregateExpression 时,当不存在 Distinct 情况下,将直接调用 AggUtils.planAggregateWithoutDistinct() 方法返回对应的 HashAggregateExec 物理算子结构并替换原 logical plan 中的 aggregation 节点。

case PhysicalAggregation(groupingExpressions, aggExpressions, resultExpressions, child)
if aggExpressions.forall(expr => expr.isInstanceOf[AggregateExpression]) =>
	// ...
  val aggregateOperator =
  if (functionsWithDistinct.isEmpty) {
    AggUtils.planAggregateWithoutDistinct(
      normalizedGroupingExpressions,
      aggregateExpressions,
      resultExpressions,
      planLater(child))
  } else {
    // ...
  }
  aggregateOperator

Join

Spark SQL Query Engine Deep Dive (11) – Join Strategies

Spark Join Strategy,Spark 提供多种 Join Strategy,包括常见的 Hash、Sort、NestedLoop 等。

JoinSelection 用于选择对应的 Join Strategy,join 主要分为 equi-join 和 not equi-join,前者即使用 == 作为判断条件,后者包括 <>, <, > 等。

object JoinSelection extends Strategy
  with PredicateHelper
  with JoinSelectionHelper {

  def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    // If it is an equi-join, we first look at the join hints w.r.t. the following order:
    //   1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides
    //      have the broadcast hints, choose the smaller side (based on stats) to broadcast.
    //   2. sort merge hint: pick sort merge join if join keys are sortable.
    //   3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both
    //      sides have the shuffle hash hints, choose the smaller side (based on stats) as the
    //      build side.
    //   4. shuffle replicate NL hint: pick cartesian product if join type is inner like.
    //
    // If there is no hint or the hints are not applicable, we follow these rules one by one:
    //   1. Pick broadcast hash join if one side is small enough to broadcast, and the join type
    //      is supported. If both sides are small, choose the smaller side (based on stats)
    //      to broadcast.
    //   2. Pick shuffle hash join if one side is small enough to build local hash map, and is
    //      much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false.
    //   3. Pick sort merge join if the join keys are sortable.
    //   4. Pick cartesian product if join type is inner like.
    //   5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
    //      other choice.
      case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) =>
    // ...
    
    // If it is not an equi-join, we first look at the join hints w.r.t. the following order:
    //   1. broadcast hint: pick broadcast nested loop join. If both sides have the broadcast
    //      hints, choose the smaller side (based on stats) to broadcast for inner and full joins,
    //      choose the left side for right join, and choose right side for left join.
    //   2. shuffle replicate NL hint: pick cartesian product if join type is inner like.
    //
    // If there is no hint or the hints are not applicable, we follow these rules one by one:
    //   1. Pick broadcast nested loop join if one side is small enough to broadcast. If only left
    //      side is broadcast-able and it's left join, or only right side is broadcast-able and
    //      it's right join, we skip this rule. If both sides are small, broadcasts the smaller
    //      side for inner and full joins, broadcasts the left side for right join, and broadcasts
    //      right side for left join.
    //   2. Pick cartesian product if join type is inner like.
    //   3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
    //      other choice. It broadcasts the smaller side for inner and full joins, broadcasts the
    //      left side for right join, and broadcasts right side for left join.
      case logical.Join(left, right, joinType, condition, hint) =>
    // ...
  }

1)考虑 join type 为 equi-join 时

createBroadcastHashJoin(true)
  .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None }
  .orElse(createShuffleHashJoin(true))
  .orElse { if (hintToShuffleReplicateNL(hint))
           createCartesianProduct() else None }
  .getOrElse(createJoinWithoutHint())

sql syntax: hints

createBroadcastHashJoin() 方法判断 join 的 left side 和 right side 是否有用户指定的 hint,如果有返回 buildSide,从而构建对应的 BroadcastHashJoinExec。

def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {
  getBroadcastBuildSide(
    left, right, joinType, hint, onlyLookingAtHint, conf).map {
    buildSide =>
      Seq(joins.BroadcastHashJoinExec(
        leftKeys,
        rightKeys,
        joinType,
        buildSide,
        nonEquiCond,
        planLater(left),
        planLater(right)))
  }
}

如若没有,则依次完成剩下的逻辑判断。判断是否有 SortMerge、ShuffleReplicateNL 等 hint。最后如果上述流程都没变构建得到对应的 JoinExec,则调用 createJoinWithoutHint() 方法根据默认的逻辑创建对应的 JoinExec。

def createJoinWithoutHint() = {
  createBroadcastHashJoin(false) // 没有 hint 条件下创建 BroadHashJoin
    .orElse {
      if (!conf.preferSortMergeJoin) { // prefer 是否指定 SortMerge 优先
        createShuffleHashJoin(false)
      } else {
        None
      }
    }
    .orElse(createSortMergeJoin())
    .orElse(createCartesianProduct())
    .getOrElse {
      // This join could be very slow or OOM
      val buildSide = getSmallerSide(left, right)
      Seq(joins.BroadcastNestedLoopJoinExec( // NestedLoopJoin 策略为兜底
        planLater(left), planLater(right), buildSide, joinType, nonEquiCond))
    }
}

在没有用户指定的 hint 的情况下,根据 BroadcastHashJoin/ShuffleHashJoin(需要 preferSortMergeJoin 指定为 false)/SortMergeJoin/CartesianProduct/BroadcastNestedLoopJoin的递补顺序选择特定的 Join Strategy。

Broadcast Hash Join,最优先策略,但要求 relation size 小于 autoBroadcastJoinThreshold。

// joins.scala

val buildLeft = if (hintOnly) {
  hintToBroadcastLeft(hint)
} else {
  canBroadcastBySize(left, conf) && !hintToNotBroadcastLeft(hint)
}

def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
  plan.stats.sizeInBytes >= 0 &&
  plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
}

ShuffleHashJoin,限制条件在于 relation size 小于 autoBroadcastJoinThreshold 与 numShufflePartitions 之乘积。

private def canBuildLocalHashMapBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
  plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}

SortMergeJoin,如果使用 SortMergeJoin 则要求对应的属性类型是可排序的。

CartesianProduct,笛卡尔积 Join 的性能极差。

BroadcastNestedLoopJoin,兜底策略,嵌套循环 Join 性能极差。

Spark Expression