终极Marketing-for-Engineers工具资源大全:100+免费营销工具的详细评测与使用指南
2026/5/15 4:24:07
Spark Shuffle 是连接不同 Stage 的关键环节,也是 Spark 作业中最容易产生性能瓶颈的地方之一。它涉及大量磁盘 I/O、网络传输和内存使用。优化 Shuffle 对提升作业性能和稳定性至关重要。以下是一些关键的 Spark Shuffle 优化策略:
核心目标:减少 Shuffle 数据量、降低 I/O 开销、提升网络传输效率、优化内存使用、处理数据倾斜。
主要优化策略:
combineByKey,reduceByKey,aggregateByKey)。这能显著减少需要传输的键值对数量。优先使用reduceByKey/aggregateByKey而不是groupByKey。reduceByKey优于groupByKey+reduce;treeReduce/treeAggregate在聚合深度大时优于reduce/aggregate(减少网络轮次)。filter过滤掉不需要参与 Shuffle 的数据。distinct:distinct会触发 Shuffle。考虑是否可以用其他方式(如 Map 端去重)或在更小的数据集上使用。broadcast将其分发到所有 Executor,避免大表 Shuffle。spark.sql.autoBroadcastJoinThreshold控制自动广播的阈值。spark.shuffle.file.buffer:增加 Shuffle 写过程中每个分区文件的内存缓冲区大小(默认 32K)。增大此值(如 64K, 128K)可以减少磁盘 I/O 次数,但会增加内存压力。需权衡。spark.shuffle.spill.diskWriteBufferSize:增大溢出到磁盘时使用的缓冲区大小(默认 1024K),同样减少写磁盘次数。spark.shuffle.unsafe.file.output.buffer:对于使用 Tungsten 的 Shuffle,设置这个直接内存缓冲区大小(默认 32K),作用类似spark.shuffle.file.buffer。spark.shuffle.spill:确保spark.shuffle.memoryFraction或spark.memory.fraction设置合理,为 Shuffle 分配足够内存,减少溢出次数。监控 GC 和溢出情况。sort):Spark 1.2+ 的默认方式。对每个分区排序并合并小文件。通常最稳定高效。tungsten-sort):基于 Project Tungsten,使用堆外内存和更高效的编码。在 Spark 1.4+ 可用,有时性能更好(尤其处理原始类型时)。通常当spark.shuffle.manager=sort且满足条件(序列化器支持重定位、非聚合 Shuffle 等)时会自动使用。spark.shuffle.consolidateFiles):(在较新 Spark 版本中已被优化或默认行为替代)在老版本中启用此选项可以让多个 Reduce Task 共享同一个 Mapper 输出的合并文件,减少小文件数量。新版本 Sort Shuffle 本身已优化文件数量。spark.reducer.maxSizeInFlight:控制每次 Reduce Task 从远程 Executor 拉取数据的最大大小(默认 48M)。增大此值(如 96M)可以提高吞吐量,但会增加内存使用。需监控网络和内存。spark.shuffle.io.maxRetries和spark.shuffle.io.retryWait:网络不稳定时,增加重试次数和等待时间以避免 Fetch Failed 错误。但过度重试会拖慢作业。spark.shuffle.io.numConnectionsPerPeer:如果集群节点很多且网络是瓶颈,适当增加此值(默认 1)可以提升并发连接数。spark.shuffle.compress:(默认开启)压缩 Shuffle 数据(写和读)。使用高效的压缩算法:spark.io.compression.codec:推荐lz4(速度快)或zstd(压缩率高,速度也不错)。snappy是默认值,也是不错的选择。避免使用低效的lzf。spark.shuffle.service.enabled:启用 External Shuffle Service。这允许 Executor 在退出后(如动态资源分配下)Shuffle 文件仍能被访问,提高稳定性。通常在生产环境推荐启用。spark.sql.shuffle.partitions(SQL/DataFrame/Dataset):控制 Shuffle 后(如 Join, Aggregation)的分区数(默认 200)。这是最重要的优化点之一。repartition/coalesce:在 RDD 操作中显式控制分区数。spark.sql.shuffle.partitions,让倾斜 Key 分散到更多分区(对于单个 Key 数据量特别大的情况效果有限)。skew join优化 (Spark 3.0+ AQE):自适应查询执行 (AQE) 能自动检测倾斜 Join 并将倾斜的分区分裂成更小的子分区进行处理。强烈推荐启用 AQE (spark.sql.adaptive.enabled=true)。reduceByKey比groupByKey更能容忍一定程度的倾斜(因为 Map 端合并了)。对于 Join 倾斜,考虑广播小表或使用SortMergeJoin/ShuffleHashJoin的替代方案。spark.sql.adaptive.enabled=true):Spark 3.0+ 的核心优化特性。SortMergeJoin切换为BroadcastJoin(如果发现小表符合广播条件)。spark.sql.adaptive.coalescePartitions.enabled,spark.sql.adaptive.coalescePartitions.minPartitionNum,spark.sql.adaptive.advisoryPartitionSizeInBytes,spark.sql.adaptive.skewJoin.enabled,spark.sql.adaptive.skewJoin.skewedPartitionFactor,spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes等。强烈建议在生产环境中启用并适当配置 AQE。spark.local.dir) 配置到 SSD 磁盘上能极大提升 Shuffle 写/读的 I/O 性能。这是非常有效的优化。spark.executor.memory),特别是当spark.memory.fraction分配给 Storage 和 Execution(包含 Shuffle)时。减少溢出到磁盘的次数。优化流程建议:
Spill (Memory),Spill (Disk))。Shuffle Write Time/Shuffle Read Time。| 类别 | 优化建议 |
|---|---|
| 算子选择 | 使用reduceByKey代替groupByKey,选择合适的 Join |
| 分区策略 | 控制合理的并发度、分区数,避免极端数据倾斜 |
| 参数调优 | 内存、缓冲区、网络传输参数细致设置 |
| 数据倾斜 | 通过打散、随机 key、局部聚合等方式规避热点 key |
| AQE | 开启 Spark SQL 的自适应执行,自动处理 Join/倾斜问题 |
| 文件合并 | 启用 consolidateFiles 降低磁盘负担 |