Daily-Note 11-02
OVERVIEW
- Kafka,分布式高性能事件流式处理平台。
- Docker,docker 是一个用于部署、传输、运行应用的开发平台。
- Spark Shuffle,分布式环境下重新组织数据使有序的操作。
Kafka
Docker
EntryPoint,该命令相比于 CMD 命令而言,可以在 docker run 命令后添加参数。EntryPoint
Spark Shuffle
Spark SQL Query Engine Deep Dive (14) – Partitioning & Ordering
Spark SQL Query Engine Deep Dive (15) – UnsafeExternalSorter & SortExec
Spark SQL Query Engine Deep Dive (16) – ShuffleExchangeExec & UnsafeShuffleWriter
Shuffle 目的在于使 Map 端的输出重新组织,从而使得数据根据 Key 有序,方便后续的 Reduce 端进行数据聚合。在特定情况下,聚合也会提前在 Map 端执行,从而减少数据传输的大小。
Overview,在物理计划 SparkPlan 由 Optimized 转换完成之后,进入 preparations 阶段,生成最后的 executedPlan,在 preparations 中的 EnsureRequirements 规则被应用,从而在需要 Shuffle 的算子之间插入对应的 SparkPlan,主要是 ShuffleExchangeExec。
1)在 ShuffleExchangeExec 中,应用对的 ShuffleDependency 生成 ShuffleRowRDD 用于构建 Reduce 端。
2)在整个 Plan 完成后,提交任务阶段,将在 getMissingParentStages() 中根据 Shuffledependency 划分得到 ShuffleMapStage 和 ResultStage,并在后续的 submitMissingTasks() 中根据对应的 Stage 生成相对的 ShuffleMapTask 和 ResultTask,在 ShuffleMapTask 中,根据对应的 dependency 调用 ShuffleWriterProcesser 的 write() 将 MapOut 写出到指定的 partitions 中。
从而完成 Shuffle 两端 Map 和 Reduce 的职能划分。
ShuffleExchangeExec
ShuffleExchangeExec,用于执行 Shuffle 的 SparkPlan,在物理计划的规则 EnsureRequirements 中插入需要 Shuffle 的 Spark Plan 之间。
protected override def doExecute(): RDD[InternalRow] = {
// Returns the same ShuffleRowRDD if this plan is used by multiple plans.
if (cachedShuffleRDD == null) {
cachedShuffleRDD = new ShuffledRowRDD(shuffleDependency, readMetrics)
}
cachedShuffleRDD
}
在 doExecute() 中构建并缓存 ShuffleRowRDD,其中 shuffledependency 由 prepareShuffleDependency() 获得。
在 prepareShuffleDependency() 方法中,整体上看,首先获取对应的 part,接着根据 getPartitionKeyExtractor() 得到 row 中需要有序的 key,最后根据 part 和 getPartitionKey 获取对应的记录的 partitionId,最后生成 MutablePair 用于后续的分区。
graph TD
A(part:Partitioner)
B(getPartitionKey: InternalRow => Any)
C(rddWithPartitionIds:MutablePair)
D(dependency:Shuffledependency)
A-->C
B-->C
C-->D
首先根据 newPartitioning 属性匹配对应的 partitioner。这里的 newpartitioning 属性来自于 ShuffleExchangExec 的 outputpartitioning,而 outputpartitioning 则是在创建 ShuffleExchangeExec 时由前一个 SparkPlan 的 requiredDistribution 决定的。即在生成父节点的物理计划时,指定子节点需要满足的 partitioning。
其中 Distribution 在 Spark v3.3.1 包括:
-
UnspecifiedDistribution,该 distribution 即不需要特定的分区分布
-
AllTuples,要求所有 record 在一个分区内。
-
BroadcastDistribution,使分区广播到所有的节点。
-
ClustertedDistribution,根据 Clustering Expression 将分区的 record hash 到对应的分区中。
override def createPartitioning(numPartitions: Int): Partitioning = { assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions, s"This ClusteredDistribution requires ${requiredNumPartitions.get} partitions, but " + s"the actual number of partitions is $numPartitions.") HashPartitioning(clustering, numPartitions) // 使用 hashpartitioning 策略
-
StatefulOpClusteredDistribution
-
OrderedDistribution,要求相邻分区之间有序,但不要求分区内记录有序。
override def createPartitioning(numPartitions: Int): Partitioning = { RangePartitioning(ordering, numPartitions) //使用 rangepartitioning 策略 }
不同的分区使用不同的 partitioning 实现将记录移动到指定的分区中,即实现 Shuffle。在 prepareShuffleDependency() 中,根据 newPartitioning 来获取对应的 partitioning。
val part: Partitioner = newPartitioning match {
case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions)
case HashPartitioning(_, n) =>
new Partitioner {
override def numPartitions: Int = n
// For HashPartitioning, the partitioning key is already a valid partition ID, as we use
// `HashPartitioning.partitionIdExpression` to produce partitioning key.
override def getPartition(key: Any): Int = key.asInstanceOf[Int]
}
case RangePartitioning(sortingExpressions, numPartitions) =>
// Extract only fields used for sorting to avoid collecting large fields that does not
// affect sorting result when deciding partition bounds in RangePartitioner
val rddForSampling = rdd.mapPartitionsInternal { iter =>
val projection =
UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes)
val mutablePair = new MutablePair[InternalRow, Null]()
// Internally, RangePartitioner runs a job on the RDD that samples keys to compute
// partition bounds. To get accurate samples, we need to copy the mutable keys.
iter.map(row => mutablePair.update(projection(row).copy(), null))
}
// Construct ordering on extracted sort key.
val orderingAttributes = sortingExpressions.zipWithIndex.map { case (ord, i) =>
ord.copy(child = BoundReference(i, ord.dataType, ord.nullable))
}
implicit val ordering = new LazilyGeneratedOrdering(orderingAttributes)
new RangePartitioner(
numPartitions,
rddForSampling,
ascending = true,
samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition)
case SinglePartition =>
new Partitioner {
override def numPartitions: Int = 1
override def getPartition(key: Any): Int = 0
}
case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning")
// TODO: Handle BroadcastPartitioning.
}
不同的 partitioning 由不同的 Pationer 实现其分区需求。
Partitioner 主要包括:
- RoundRobinPartitioning
- HashPartitioning,通过 hash 的方式将 record key 放入指定的分区,这里 hash partitioning 的 record key 在提取时,已经完成 hash,因此 getPartition() 直接返回即可。
- RangePartitioning,
- SinglePartition
接着 getPartitionKeyExtractor() 将返回一个提取 record 中的 partition key 的 lambda 方法。利用该方法获取 Partitioner 的 getPartitionKey(key) 的输入参数。
def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
case RoundRobinPartitioning(numPartitions) =>
// Distributes elements evenly across output partitions, starting from a random partition.
// nextInt(numPartitions) implementation has a special case when bound is a power of 2,
// which is basically taking several highest bits from the initial seed, with only a
// minimal scrambling. Due to deterministic seed, using the generator only once,
// and lack of scrambling, the position values for power-of-two numPartitions always
// end up being almost the same regardless of the index. substantially scrambling the
// seed by hashing will help. Refer to SPARK-21782 for more details.
val partitionId = TaskContext.get().partitionId()
var position = new XORShiftRandom(partitionId).nextInt(numPartitions)
(row: InternalRow) => {
// The HashPartitioner will handle the `mod` by the number of partitions
position += 1
position
}
case h: HashPartitioning =>
val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes)
row => projection(row).getInt(0)
case RangePartitioning(sortingExpressions, _) =>
val projection = UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes)
row => projection(row)
case SinglePartition => identity
case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning")
}
其中 HashPartitioning 利用 UnsafeProjection 构造 Expression 获取对应的 record 的 hash。RangePartitioning 则利用 sortingExpression 返回需要 order 的 key。
最后构建一个新的 MutablePair,由 <partitionId, row> 构成。
val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = {
// ...
newRdd.mapPartitionsWithIndexInternal((_, iter) => {
val getPartitionKey = getPartitionKeyExtractor()
val mutablePair = new MutablePair[Int, InternalRow]()
iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) }
}, isOrderSensitive = isOrderSensitive)
}
并基于此构造对应的 dependency。
val dependency =
new ShuffleDependency[Int, InternalRow, InternalRow](
rddWithPartitionIds,
new PartitionIdPassthrough(part.numPartitions),
serializer,
shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics))
SortShuffleManager
SortshuffleManager,主要实现了三个方法,registerShuffle/getReader/getWriter。
其中 registerShuffle 主要将自己注册到 dependency 中,在任务反序列化时,将 handle 传入获取对应的 writer。
override def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, dependency)
}
getWriter() 根据 handle 获取对应的 writer,利用 writer 将 RDD 的计算结果 Iterator 写出到对应的文件中。
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
handle.shuffleId, _ => new OpenHashSet[Long](16))
mapTaskIds.synchronized { mapTaskIds.add(mapId) }
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf,
metrics,
shuffleExecutorComponents)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
bypassMergeSortHandle,
mapId,
env.conf,
metrics,
shuffleExecutorComponents)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents)
}
}
在 BypassMergeSortShuffleHandle 中 write() 将调用 ShuffleExchangExec 中的 Partitioner 将记录写出到指定的分区中。
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1(); // key
// partitioner
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
getReader() 根据 mapOutputTracker 获取 partition 的地址后,构建对应的 BlockStoreShuffleReader 用于获取 shuffle 的 mapout。
override def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
val (blocksByAddress, canEnableBatchFetch) =
if (baseShuffleHandle.dependency.isShuffleMergeFinalizedMarked) {
val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
(res.iter, res.enableBatchFetch)
} else {
val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
(address, true)
}
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
shouldBatchFetch =
canEnableBatchFetch && canUseBatchFetch(startPartition, endPartition, context))
}
Writer & Reader CALL
在 ShuffleMapTask 中通过调用
dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
将 mapout 结果写出,其中由 shufflerWriterProcessor 调用
writer = manager.getWriter[Any, Any](
dep.shuffleHandle,
mapId,
context,
createMetricsReporter(context))
获取对应的 wrtier。
在 ShuffleRowRDD 中,compute() 中获取对应的 reader 并读入指定分区的 mapout。
val reader = split.asInstanceOf[ShuffledRowRDDPartition].spec match {
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) =>
SparkEnv.get.shuffleManager.getReader(
dependency.shuffleHandle,
startReducerIndex,
endReducerIndex,
context,
sqlMetricsReporter)
case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex, _) =>
SparkEnv.get.shuffleManager.getReader(
dependency.shuffleHandle,
startMapIndex,
endMapIndex,
reducerIndex,
reducerIndex + 1,
context,
sqlMetricsReporter)
case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) =>
SparkEnv.get.shuffleManager.getReader(
dependency.shuffleHandle,
mapIndex,
mapIndex + 1,
startReducerIndex,
endReducerIndex,
context,
sqlMetricsReporter)
case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, numReducers) =>
SparkEnv.get.shuffleManager.getReader(
dependency.shuffleHandle,
startMapIndex,
endMapIndex,
0,
numReducers,
context,
sqlMetricsReporter)
}
// map(_._2) get value
reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2)