Daily-Note 10-24
OVERVIEW
- Pytorch,机器学习框架
- 神经网络,由神经元构成的非线性参数模型。
- SparkPlanner,将 Logical Plan 转换到 Physics Plan。
- Spark Expression,
Pytorch
TUTORIAL 1: INTRODUCTION TO PYTORCH fsdl-text-recognizer-2021-labs
神经网络
(Neural Network)
BP (Back Propagation)
SparkPlanner
Spark SQL Query Engine Deep Dive (7) – Spark Planner Overview
QueryPlan,QueryPlan 继承自 TreeNode,其增加了逻辑图和物理图的一些公共属性和遍历方法。
LogicalPlan,LogicalPlan 是逻辑图中如 Aggregate/Sort/Join 等节点的基类。其基础自 QueryPlan。
SparkPlan,SparkPlan 是物理算子的基类,物理算子包括 HashAggregateExec、ShuffleExchangeExec 等。与 LogicalPlan 相同,继承自 QueryPlan。
QueryExecution 包括一个请求中主要流程的各个阶段,包括 UnresovledPlan Logical Plan/Logical Plan/Optimized Logical Plan/Physical Plan 等阶段。在 QueryExecution 中分别对应为 logical/analyzed/optimized/sparkPlan 等属性。
QueryExecution.createSparkPlan() 方法将 optimized 转换为 sparkPlan。
/**
* Transform a [[LogicalPlan]] into a [[SparkPlan]].
*
* Note that the returned physical plan still needs to be prepared for execution.
*/
def createSparkPlan(
sparkSession: SparkSession,
planner: SparkPlanner,
plan: LogicalPlan): SparkPlan = {
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(plan)).next()
}
这里 plan() 方法根据对应的 strategies 策略迭代地转换 optimized plan。
def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
// Obviously a lot to do here still...
// Collect physical plan candidates.
val candidates = strategies.iterator.flatMap(_(plan))
// The candidates may contain placeholders marked as [[planLater]],
// so try to replace them by their child plans.
val plans = candidates.flatMap { candidate =>
val placeholders = collectPlaceholders(candidate)
if (placeholders.isEmpty) {
// Take the candidate as is because it does not contain placeholders.
Iterator(candidate)
} else {
// Plan the logical plan marked as [[planLater]] and replace the placeholders.
placeholders.iterator.foldLeft(Iterator(candidate)) {
case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
// Plan the logical plan for the placeholder.
val childPlans = this.plan(logicalPlan)
candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
childPlans.map { childPlan =>
// Replace the placeholder by the child plan
candidateWithPlaceholders.transformUp {
case p if p.eq(placeholder) => childPlan
}
}
}
}
}
}
下面是 SparkPlanner 提供的策略:
override def strategies: Seq[Strategy] =
experimentalMethods.extraStrategies ++
extraPlanningStrategies ++ (
LogicalQueryStageStrategy ::
PythonEvals ::
new DataSourceV2Strategy(session) ::
FileSourceStrategy ::
DataSourceStrategy ::
SpecialLimits ::
Aggregation ::
Window ::
JoinSelection ::
InMemoryScans ::
BasicOperators :: Nil)
Strategies 包括 Join/Aggregation 等策略,分别负责转换相关的 logical plan 节点。
Aggregation
在应用 Aggregation 到 logical plan 时,首先匹配是否为 PhysicalAggregation 模式,并分别判断是 AggregateExpression 或 PythonUDF。
1)当为 AggregateExpression 时,当不存在 Distinct 情况下,将直接调用 AggUtils.planAggregateWithoutDistinct() 方法返回对应的 HashAggregateExec 物理算子结构并替换原 logical plan 中的 aggregation 节点。
case PhysicalAggregation(groupingExpressions, aggExpressions, resultExpressions, child)
if aggExpressions.forall(expr => expr.isInstanceOf[AggregateExpression]) =>
// ...
val aggregateOperator =
if (functionsWithDistinct.isEmpty) {
AggUtils.planAggregateWithoutDistinct(
normalizedGroupingExpressions,
aggregateExpressions,
resultExpressions,
planLater(child))
} else {
// ...
}
aggregateOperator
Join
Spark Join Strategy,Spark 提供多种 Join Strategy,包括常见的 Hash、Sort、NestedLoop 等。
JoinSelection 用于选择对应的 Join Strategy,join 主要分为 equi-join 和 not equi-join,前者即使用 == 作为判断条件,后者包括 <>, <, > 等。
object JoinSelection extends Strategy
with PredicateHelper
with JoinSelectionHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// If it is an equi-join, we first look at the join hints w.r.t. the following order:
// 1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides
// have the broadcast hints, choose the smaller side (based on stats) to broadcast.
// 2. sort merge hint: pick sort merge join if join keys are sortable.
// 3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both
// sides have the shuffle hash hints, choose the smaller side (based on stats) as the
// build side.
// 4. shuffle replicate NL hint: pick cartesian product if join type is inner like.
//
// If there is no hint or the hints are not applicable, we follow these rules one by one:
// 1. Pick broadcast hash join if one side is small enough to broadcast, and the join type
// is supported. If both sides are small, choose the smaller side (based on stats)
// to broadcast.
// 2. Pick shuffle hash join if one side is small enough to build local hash map, and is
// much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false.
// 3. Pick sort merge join if the join keys are sortable.
// 4. Pick cartesian product if join type is inner like.
// 5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
// other choice.
case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) =>
// ...
// If it is not an equi-join, we first look at the join hints w.r.t. the following order:
// 1. broadcast hint: pick broadcast nested loop join. If both sides have the broadcast
// hints, choose the smaller side (based on stats) to broadcast for inner and full joins,
// choose the left side for right join, and choose right side for left join.
// 2. shuffle replicate NL hint: pick cartesian product if join type is inner like.
//
// If there is no hint or the hints are not applicable, we follow these rules one by one:
// 1. Pick broadcast nested loop join if one side is small enough to broadcast. If only left
// side is broadcast-able and it's left join, or only right side is broadcast-able and
// it's right join, we skip this rule. If both sides are small, broadcasts the smaller
// side for inner and full joins, broadcasts the left side for right join, and broadcasts
// right side for left join.
// 2. Pick cartesian product if join type is inner like.
// 3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
// other choice. It broadcasts the smaller side for inner and full joins, broadcasts the
// left side for right join, and broadcasts right side for left join.
case logical.Join(left, right, joinType, condition, hint) =>
// ...
}
1)考虑 join type 为 equi-join 时,
createBroadcastHashJoin(true)
.orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None }
.orElse(createShuffleHashJoin(true))
.orElse { if (hintToShuffleReplicateNL(hint))
createCartesianProduct() else None }
.getOrElse(createJoinWithoutHint())
createBroadcastHashJoin() 方法判断 join 的 left side 和 right side 是否有用户指定的 hint,如果有返回 buildSide,从而构建对应的 BroadcastHashJoinExec。
def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {
getBroadcastBuildSide(
left, right, joinType, hint, onlyLookingAtHint, conf).map {
buildSide =>
Seq(joins.BroadcastHashJoinExec(
leftKeys,
rightKeys,
joinType,
buildSide,
nonEquiCond,
planLater(left),
planLater(right)))
}
}
如若没有,则依次完成剩下的逻辑判断。判断是否有 SortMerge、ShuffleReplicateNL 等 hint。最后如果上述流程都没变构建得到对应的 JoinExec,则调用 createJoinWithoutHint() 方法根据默认的逻辑创建对应的 JoinExec。
def createJoinWithoutHint() = {
createBroadcastHashJoin(false) // 没有 hint 条件下创建 BroadHashJoin
.orElse {
if (!conf.preferSortMergeJoin) { // prefer 是否指定 SortMerge 优先
createShuffleHashJoin(false)
} else {
None
}
}
.orElse(createSortMergeJoin())
.orElse(createCartesianProduct())
.getOrElse {
// This join could be very slow or OOM
val buildSide = getSmallerSide(left, right)
Seq(joins.BroadcastNestedLoopJoinExec( // NestedLoopJoin 策略为兜底
planLater(left), planLater(right), buildSide, joinType, nonEquiCond))
}
}
在没有用户指定的 hint 的情况下,根据 BroadcastHashJoin/ShuffleHashJoin(需要 preferSortMergeJoin 指定为 false)/SortMergeJoin/CartesianProduct/BroadcastNestedLoopJoin的递补顺序选择特定的 Join Strategy。
Broadcast Hash Join,最优先策略,但要求 relation size 小于 autoBroadcastJoinThreshold。
// joins.scala
val buildLeft = if (hintOnly) {
hintToBroadcastLeft(hint)
} else {
canBroadcastBySize(left, conf) && !hintToNotBroadcastLeft(hint)
}
def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
plan.stats.sizeInBytes >= 0 &&
plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
}
ShuffleHashJoin,限制条件在于 relation size 小于 autoBroadcastJoinThreshold 与 numShufflePartitions 之乘积。
private def canBuildLocalHashMapBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}
SortMergeJoin,如果使用 SortMergeJoin 则要求对应的属性类型是可排序的。
CartesianProduct,笛卡尔积 Join 的性能极差。
BroadcastNestedLoopJoin,兜底策略,嵌套循环 Join 性能极差。