HUAHUA

Daily-Note 11-02

OVERVIEW

  1. Kafka,分布式高性能事件流式处理平台。
  2. Docker,docker 是一个用于部署、传输、运行应用的开发平台。
  3. Spark Shuffle,分布式环境下重新组织数据使有序的操作。

Kafka

Kafka introduction

Kafka documentation

Docker

docker

Docker 从入门到实践

EntryPoint,该命令相比于 CMD 命令而言,可以在 docker run 命令后添加参数。EntryPoint

Spark Shuffle

Spark SQL Query Engine Deep Dive (14) – Partitioning & Ordering

Spark SQL Query Engine Deep Dive (15) – UnsafeExternalSorter & SortExec

Spark SQL Query Engine Deep Dive (16) – ShuffleExchangeExec & UnsafeShuffleWriter

Shuffle 目的在于使 Map 端的输出重新组织,从而使得数据根据 Key 有序,方便后续的 Reduce 端进行数据聚合。在特定情况下,聚合也会提前在 Map 端执行,从而减少数据传输的大小。

Overview,在物理计划 SparkPlan 由 Optimized 转换完成之后,进入 preparations 阶段,生成最后的 executedPlan,在 preparations 中的 EnsureRequirements 规则被应用,从而在需要 Shuffle 的算子之间插入对应的 SparkPlan,主要是 ShuffleExchangeExec。

1)在 ShuffleExchangeExec 中,应用对的 ShuffleDependency 生成 ShuffleRowRDD 用于构建 Reduce 端。

2)在整个 Plan 完成后,提交任务阶段,将在 getMissingParentStages() 中根据 Shuffledependency 划分得到 ShuffleMapStage 和 ResultStage,并在后续的 submitMissingTasks() 中根据对应的 Stage 生成相对的 ShuffleMapTask 和 ResultTask,在 ShuffleMapTask 中,根据对应的 dependency 调用 ShuffleWriterProcesser 的 write() 将 MapOut 写出到指定的 partitions 中。

从而完成 Shuffle 两端 Map 和 Reduce 的职能划分。

ShuffleExchangeExec

ShuffleExchangeExec,用于执行 Shuffle 的 SparkPlan,在物理计划的规则 EnsureRequirements 中插入需要 Shuffle 的 Spark Plan 之间。

protected override def doExecute(): RDD[InternalRow] = {
  // Returns the same ShuffleRowRDD if this plan is used by multiple plans.
  if (cachedShuffleRDD == null) {
    cachedShuffleRDD = new ShuffledRowRDD(shuffleDependency, readMetrics)
  }
  cachedShuffleRDD
}

doExecute() 中构建并缓存 ShuffleRowRDD,其中 shuffledependency 由 prepareShuffleDependency() 获得。

prepareShuffleDependency() 方法中,整体上看,首先获取对应的 part,接着根据 getPartitionKeyExtractor() 得到 row 中需要有序的 key,最后根据 part 和 getPartitionKey 获取对应的记录的 partitionId,最后生成 MutablePair 用于后续的分区。

graph TD
  A(part:Partitioner)
  B(getPartitionKey: InternalRow => Any)
  C(rddWithPartitionIds:MutablePair)
  D(dependency:Shuffledependency)
  A-->C
  B-->C
  C-->D

首先根据 newPartitioning 属性匹配对应的 partitioner。这里的 newpartitioning 属性来自于 ShuffleExchangExec 的 outputpartitioning,而 outputpartitioning 则是在创建 ShuffleExchangeExec 时由前一个 SparkPlan 的 requiredDistribution 决定的。即在生成父节点的物理计划时,指定子节点需要满足的 partitioning。

其中 Distribution 在 Spark v3.3.1 包括:

  • UnspecifiedDistribution,该 distribution 即不需要特定的分区分布

  • AllTuples,要求所有 record 在一个分区内。

  • BroadcastDistribution,使分区广播到所有的节点。

  • ClustertedDistribution,根据 Clustering Expression 将分区的 record hash 到对应的分区中。

    override def createPartitioning(numPartitions: Int): Partitioning = {
      assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions,
        s"This ClusteredDistribution requires ${requiredNumPartitions.get} partitions, but " +
          s"the actual number of partitions is $numPartitions.")
      HashPartitioning(clustering, numPartitions) // 使用 hashpartitioning 策略
    
  • StatefulOpClusteredDistribution

  • OrderedDistribution,要求相邻分区之间有序,但不要求分区内记录有序。

    override def createPartitioning(numPartitions: Int): Partitioning = {
      RangePartitioning(ordering, numPartitions) //使用 rangepartitioning 策略
    }
    

不同的分区使用不同的 partitioning 实现将记录移动到指定的分区中,即实现 Shuffle。在 prepareShuffleDependency() 中,根据 newPartitioning 来获取对应的 partitioning。

val part: Partitioner = newPartitioning match {
  case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions)
  case HashPartitioning(_, n) =>
    new Partitioner {
      override def numPartitions: Int = n
      // For HashPartitioning, the partitioning key is already a valid partition ID, as we use
      // `HashPartitioning.partitionIdExpression` to produce partitioning key.
      override def getPartition(key: Any): Int = key.asInstanceOf[Int]
    }
  case RangePartitioning(sortingExpressions, numPartitions) =>
    // Extract only fields used for sorting to avoid collecting large fields that does not
    // affect sorting result when deciding partition bounds in RangePartitioner
    val rddForSampling = rdd.mapPartitionsInternal { iter =>
      val projection =
        UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes)
      val mutablePair = new MutablePair[InternalRow, Null]()
      // Internally, RangePartitioner runs a job on the RDD that samples keys to compute
      // partition bounds. To get accurate samples, we need to copy the mutable keys.
      iter.map(row => mutablePair.update(projection(row).copy(), null))
    }
    // Construct ordering on extracted sort key.
    val orderingAttributes = sortingExpressions.zipWithIndex.map { case (ord, i) =>
      ord.copy(child = BoundReference(i, ord.dataType, ord.nullable))
    }
    implicit val ordering = new LazilyGeneratedOrdering(orderingAttributes)
    new RangePartitioner(
      numPartitions,
      rddForSampling,
      ascending = true,
      samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition)
  case SinglePartition =>
    new Partitioner {
      override def numPartitions: Int = 1
      override def getPartition(key: Any): Int = 0
    }
  case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning")
  // TODO: Handle BroadcastPartitioning.
}

不同的 partitioning 由不同的 Pationer 实现其分区需求。

Partitioner 主要包括:

  • RoundRobinPartitioning
  • HashPartitioning,通过 hash 的方式将 record key 放入指定的分区,这里 hash partitioning 的 record key 在提取时,已经完成 hash,因此 getPartition() 直接返回即可。
  • RangePartitioning
  • SinglePartition

接着 getPartitionKeyExtractor() 将返回一个提取 record 中的 partition key 的 lambda 方法。利用该方法获取 Partitioner 的 getPartitionKey(key) 的输入参数。

def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
  case RoundRobinPartitioning(numPartitions) =>
    // Distributes elements evenly across output partitions, starting from a random partition.
    // nextInt(numPartitions) implementation has a special case when bound is a power of 2,
    // which is basically taking several highest bits from the initial seed, with only a
    // minimal scrambling. Due to deterministic seed, using the generator only once,
    // and lack of scrambling, the position values for power-of-two numPartitions always
    // end up being almost the same regardless of the index. substantially scrambling the
    // seed by hashing will help. Refer to SPARK-21782 for more details.
    val partitionId = TaskContext.get().partitionId()
    var position = new XORShiftRandom(partitionId).nextInt(numPartitions)
    (row: InternalRow) => {
      // The HashPartitioner will handle the `mod` by the number of partitions
      position += 1
      position
    }
  case h: HashPartitioning =>
    val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes)
    row => projection(row).getInt(0)
  case RangePartitioning(sortingExpressions, _) =>
    val projection = UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes)
    row => projection(row)
  case SinglePartition => identity
  case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning")
}

其中 HashPartitioning 利用 UnsafeProjection 构造 Expression 获取对应的 record 的 hash。RangePartitioning 则利用 sortingExpression 返回需要 order 的 key。

最后构建一个新的 MutablePair,由 <partitionId, row> 构成。

val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = {
 // ... 
  newRdd.mapPartitionsWithIndexInternal((_, iter) => {
    val getPartitionKey = getPartitionKeyExtractor()
    val mutablePair = new MutablePair[Int, InternalRow]()
    iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) }
  }, isOrderSensitive = isOrderSensitive)
}

并基于此构造对应的 dependency。

val dependency =
  new ShuffleDependency[Int, InternalRow, InternalRow](
    rddWithPartitionIds,
    new PartitionIdPassthrough(part.numPartitions),
    serializer,
    shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics))

SortShuffleManager

SortshuffleManager,主要实现了三个方法,registerShuffle/getReader/getWriter

其中 registerShuffle 主要将自己注册到 dependency 中,在任务反序列化时,将 handle 传入获取对应的 writer。

override def registerShuffle[K, V, C](
    shuffleId: Int,
    dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
  if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
    // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
    // need map-side aggregation, then write numPartitions files directly and just concatenate
    // them at the end. This avoids doing serialization and deserialization twice to merge
    // together the spilled files, which would happen with the normal code path. The downside is
    // having multiple files open at a time and thus more memory allocated to buffers.
    new BypassMergeSortShuffleHandle[K, V](
      shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
  } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
    // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
    new SerializedShuffleHandle[K, V](
      shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
  } else {
    // Otherwise, buffer map outputs in a deserialized form:
    new BaseShuffleHandle(shuffleId, dependency)
  }

getWriter() 根据 handle 获取对应的 writer,利用 writer 将 RDD 的计算结果 Iterator 写出到对应的文件中。

override def getWriter[K, V](
    handle: ShuffleHandle,
    mapId: Long,
    context: TaskContext,
    metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
  val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
    handle.shuffleId, _ => new OpenHashSet[Long](16))
  mapTaskIds.synchronized { mapTaskIds.add(mapId) }
  val env = SparkEnv.get
  handle match {
    case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
      new UnsafeShuffleWriter(
        env.blockManager,
        context.taskMemoryManager(),
        unsafeShuffleHandle,
        mapId,
        context,
        env.conf,
        metrics,
        shuffleExecutorComponents)
    case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
      new BypassMergeSortShuffleWriter(
        env.blockManager,
        bypassMergeSortHandle,
        mapId,
        env.conf,
        metrics,
        shuffleExecutorComponents)
    case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
      new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents)
  }
}

在 BypassMergeSortShuffleHandle 中 write() 将调用 ShuffleExchangExec 中的 Partitioner 将记录写出到指定的分区中。

while (records.hasNext()) {
  final Product2<K, V> record = records.next();
  final K key = record._1(); // key
  // partitioner 
  partitionWriters[partitioner.getPartition(key)].write(key, record._2()); 
}

getReader() 根据 mapOutputTracker 获取 partition 的地址后,构建对应的 BlockStoreShuffleReader 用于获取 shuffle 的 mapout。

override def getReader[K, C](
    handle: ShuffleHandle,
    startMapIndex: Int,
    endMapIndex: Int,
    startPartition: Int,
    endPartition: Int,
    context: TaskContext,
    metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
  val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
  val (blocksByAddress, canEnableBatchFetch) =
    if (baseShuffleHandle.dependency.isShuffleMergeFinalizedMarked) {
      val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
        handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
      (res.iter, res.enableBatchFetch)
    } else {
      val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
        handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
      (address, true)
    }
  new BlockStoreShuffleReader(
    handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
    shouldBatchFetch =
      canEnableBatchFetch && canUseBatchFetch(startPartition, endPartition, context))
}

Writer & Reader CALL

在 ShuffleMapTask 中通过调用

dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)

将 mapout 结果写出,其中由 shufflerWriterProcessor 调用

writer = manager.getWriter[Any, Any](
  dep.shuffleHandle,
  mapId,
  context,
  createMetricsReporter(context))

获取对应的 wrtier。

在 ShuffleRowRDD 中,compute() 中获取对应的 reader 并读入指定分区的 mapout。

val reader = split.asInstanceOf[ShuffledRowRDDPartition].spec match {
  case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) =>
    SparkEnv.get.shuffleManager.getReader(
      dependency.shuffleHandle,
      startReducerIndex,
      endReducerIndex,
      context,
      sqlMetricsReporter)

  case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex, _) =>
    SparkEnv.get.shuffleManager.getReader(
      dependency.shuffleHandle,
      startMapIndex,
      endMapIndex,
      reducerIndex,
      reducerIndex + 1,
      context,
      sqlMetricsReporter)

  case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) =>
    SparkEnv.get.shuffleManager.getReader(
      dependency.shuffleHandle,
      mapIndex,
      mapIndex + 1,
      startReducerIndex,
      endReducerIndex,
      context,
      sqlMetricsReporter)

  case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, numReducers) =>
    SparkEnv.get.shuffleManager.getReader(
      dependency.shuffleHandle,
      startMapIndex,
      endMapIndex,
      0,
      numReducers,
      context,
      sqlMetricsReporter)
}
// map(_._2) get value
reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2)

ExternalSorter

TimSort 博客园

TimSort Wiki