1. 背景介绍
数据倾斜是在大数据计算中,经常会面临一个非常棘手的问题。数据倾斜会导致 Spark 作业性能大幅下降,这远远低于我们的期望。为了确保 Spark 作业的高性能,我们需要进行数据倾斜调优。数据倾斜调优是一项复杂的任务,需要采用多种技术方案来解决不同类型的数据倾斜问题。
2. 数据倾斜的影响
Spark 任务数据倾斜是指在分布式计算过程中,数据分布不均匀,导致个别任务处理的数据量远大于其他任务。这种现象会对Spark任务的性能和稳定性产生负面影响。以下是数据倾斜所带来的影响:
- 性能下降:由于个别任务处理的数据量较大,需要更长的时间来完成。在Spark集群中,所有任务都必须完成后,整个作业才能结束。因此,处理时间较长的任务会拖慢整个作业的运行速度。
- 资源浪费:由于大部分任务较快地完成,集群中的其他节点可能会处于空闲状态,等待数据倾斜的任务完成。这种情况下,集群资源的利用率较低,导致资源浪费。
- 容易出现OOM(内存溢出)异常:数据倾斜可能导致单个任务的内存需求远超过预期。在处理大量数据时,可能触发OOM异常,导致任务失败。
- 可靠性降低:数据倾斜可能导致任务执行失败或长时间无法完成,影响整个作业的可靠性。
3. 数据倾斜的原理
数据倾斜的原理很简单:在执行shuffle操作时,需要将各节点上相同key的数据拉取到某个节点上的一个task进行处理,如按key进行聚合或join等操作。如果某个key对应的数据量特别大,就容易发生数据倾斜现象。
例如,绝大多数key可能仅对应1000条数据,但个别key却对应了100万条数据。这样的情况下,大部分task将只会处理1000条数据,并在1秒内完成。但个别task可能要处理100万条数据,需要运行一两个小时。这会导致整个Spark作业的运行时间被这些少数task拉长。因此,当出现数据倾斜时,Spark作业看起来运行得非常缓慢。甚至,由于某个task处理的数据量过大,可能会导致内存溢出。
从下面的图示中,我们可以观察到以下情况:
“hello”这个key在三个节点上对应了总共7条数据,这些数据将被拉取到同一个task中进行处理。
与此同时,“world”和“you”这两个key分别仅对应1条数据。因此,另外两个task只需分别处理1条数据。
由于第一个task需要处理的数据量更大,其运行时间将比另外两个task更长。整个stage的运行速度将由运行最慢的那个task决定。
4. 定位数据倾斜
导致数据倾斜的算子
数据倾斜仅在 shuffle 过程中发生。以下是一些常用的可能触发shuffle操作的算子:distinct
、groupByKey
、reduceByKey
、aggregateByKey
、join
、cogroup
和repartition
。当出现数据倾斜时,可能是由于代码中使用了这些算子之一所导致的。
出现数据倾斜的 Stage
为了确定数据倾斜发生在哪个stage,可以通过检查Spark作业的运行日志或者Spark Web UI来定位。如果使用yarn-client模式提交作业,可以在本地日志中查看当前运行到了哪个stage。例如,可以参考这篇文章:Hive 数据倾斜问题定位排查及解决。
如果使用yarn-cluster模式提交作业,可以通过Spark Web UI查看当前运行到了哪个stage。
此外,无论使用哪种提交模式,都可以在Spark Web UI上查看当前stage的各个task分配的数据量。通过这种方式,可以进一步确定是否是数据分配不均匀导致的数据倾斜。
如下图所示,我们可以从倒数第三列观察到每个task的运行时间。某些task执行非常快,仅需几秒钟即可完成;另一些task执行速度较慢,需要几分钟才能完成。仅从运行时间上看,我们已经可以确定出现了数据倾斜。此外,倒数第一列展示了每个task处理的数据量。可以明显看到,运行时间短的task仅需处理几百KB的数据,而运行时间长的task需要处理几千KB的数据,数据处理量相差10倍。这进一步确认了数据倾斜的发生。
在确定数据倾斜发生在哪个stage之后,我们需要根据 stage 划分原理推算出倾斜的那个 stage 对应代码中的哪一部分。这部分代码中肯定会包含一个shuffle类算子。
要精确推算 stage 与代码之间的对应关系,需要对 Spark 源码有深入理解。但在此,我们可以介绍一个相对简单且实用的推算方法:只需观察 Spark 代码中出现的 shuffle 类算子或 Spark SQL 中出现的可能导致 shuffle 的语句(如group by语句),就可以判断,以这个地方为界限,划分出了前后两个stage。
以 Spark 最基础的入门程序 WordCount 为例,说明如何用最简单的方法大致推算出一个stage对应的代码。在下面的示例中,整个代码里只有一个reduceByKey
算子会发生shuffle。因此,我们可以认为,以这个算子为界限,将划分出前后两个stage。
// 导入Spark配置类
import org.apache.spark.SparkConf
// 导入Spark上下文类
import org.apache.spark.SparkContext
// 创建Spark配置对象
val conf = new SparkConf()
// 根据配置创建Spark上下文对象
val sc = new SparkContext(conf)
// 从HDFS上读取文本文件内容,并创建一个名为lines的RDD
val lines = sc.textFile("hdfs://...")
// 将lines中的每一行用空格拆分成单词,得到一个名为words的RDD
val words = lines.flatMap(_.split(" "))
// 为每个单词创建一个键值对,其中键是单词,值是1,得到一个名为pairs的RDD
val pairs = words.map((_, 1))
// ======= stage 划分 ========
// 根据键(单词)对值(计数)进行汇总,将同一个单词的计数相加,得到一个名为wordCounts的RDD
val wordCounts = pairs.reduceByKey(_ + _)
// 收集wordCounts中的所有结果,并在控制台上逐行打印
wordCounts.collect().foreach(println(_))
- stage 0 主要负责执行从
textFile
到map
操作,以及执行shuffle write操作。我们可以简单地将shuffle write操作理解为对pairs
RDD中的数据进行分区操作,这样每个task处理的数据中,相同的key会写入同一个磁盘文件内。 - stage 1 主要负责执行从
reduceByKey
到collect
操作。在stage 1的各个task开始运行时,它们会首先执行shuffle read操作。执行shuffle read操作的task会从stage 0的各个task所在节点拉取属于自己需要处理的那些key,然后对同一个key进行全局性的聚合或join等操作。在这个例子中,这意味着对key的value值进行累加。在stage 1执行完reduceByKey
算子之后,最终的wordCounts
RDD就被计算出来了。接着,会执行collect
算子,将所有数据拉取到Driver上,供我们遍历和打印输出。
通过分析单词计数程序,我们希望能帮助大家了解最基本的stage划分原理,以及在两个stage边界处如何执行shuffle操作。这样,我们就能快速定位发生数据倾斜的stage对应代码的某个部分。
例如,在Spark Web UI或本地日志中,我们发现stage 1的某些task执行特别慢,判断stage 1出现了数据倾斜。此时,我们可以回到代码中,确定stage 1主要包括了reduceByKey
这个shuffle类算子。基本上,我们可以确定是由reduceByKey
算子导致的数据倾斜问题。
5. 出现内存溢出问题
解决内存溢出问题时,定位问题代码相对容易。我们建议直接查看yarn-client模式下本地日志的异常栈,或者通过YARN查看yarn-cluster模式下的日志中的异常栈。通常情况下,通过异常栈信息可以定位到代码中导致内存溢出的具体行数。接下来,在该行代码附近查找,通常会发现shuffle类算子。此时,很可能是这个算子导致了数据倾斜。
请注意,不能仅凭偶然发生的内存溢出就判断是否发生了数据倾斜。因为编写的代码中可能存在bug,或者数据异常偶然导致了内存溢出。因此,还是需要按照前面介绍的方法,通过Spark Web UI查看报错的那个stage的各个task的运行时间以及分配的数据量,才能确定是否确实是数据倾斜导致了内存溢出。
6. 查看导致倾斜key的分布
在确定数据倾斜发生的位置后,通常需要分析导致数据倾斜的那个执行了shuffle操作的RDD或Hive表,并检查其中key的分布情况。这主要是为了为后续选择解决方案提供依据。根据不同的key分布以及不同的shuffle算子组合,可能需要选择不同的技术方案来解决问题。此时,根据你执行的操作也不同,可以有多种方式来查看key分布:
- 如果数据倾斜是由Spark SQL中的group by或join语句导致的,可以查询SQL中涉及的表的key分布情况。
- 如果数据倾斜是由对Spark RDD执行的shuffle算子引起的,可以在Spark作业中添加代码来查看key分布,例如使用RDD.countByKey()。接着将统计出的各个key出现的次数通过collect或take操作获取到客户端并打印,这样便能查看key的分布情况。
例如,在单词计数程序中,如果确定是 stage 1 的 reduceByKey 算子导致了数据倾斜,那么应该检查执行 reduceByKey 操作的 RDD 中的 key 分布情况。在此示例中,我们关注的是 pairs RDD。如下所示,我们可以先对 pairs 进行10%的样本抽取,然后使用 countByKey 算子统计每个 key 出现的次数,最后在客户端遍历并打印样本数据中各个 key 的出现次数。
// 对 pairs RDD 中的数据进行抽样,采样率为 10%(0.1),不放回抽样(false)
val sampledPairs = pairs.sample(false, 0.1)
// 对抽样后的数据进行按 key 的计数统计
val sampledWordCounts = sampledPairs.countByKey()
// 遍历并打印样本数据中各个 key 的出现次数
sampledWordCounts.foreach(println(_))
7. 数据倾斜的解决方案
7.1 提高shuffle操作的并行度
- 适用场景:
当必须直接处理数据倾斜时,优先考虑这个方案,因为它是最简单的处理数据倾斜的方法。
- 实现思路
在对 RDD 执行 shuffle 算子时,给算子传入一个参数,如 reduceByKey(1000),该参数设置了 shuffle read task 的数量。对于 Spark SQL 中的 shuffle 类语句,如 group by、join 等,需要设置一个参数:spark.sql.shuffle.partitions,该参数代表了 shuffle read task 的并行度,其默认值为 200,对许多场景来说可能过小。
- 实现原理
增加 shuffle read task 的数量可让原本分配给一个 task 的多个 key 分配给多个 task,从而使每个 task 处理更少的数据。例如,如果原本有 5 个 key,每个 key 对应 10 条数据,这 5 个 key 都分配给一个 task,那么该 task 需要处理 50 条数据。增加了 shuffle read task 后,每个 task 只分配到一个 key,即每个 task 处理 10 条数据,自然每个 task 的执行时间会变短。
- 优点
实现相对简单,可有效缓解和减轻数据倾斜的影响。
- 缺点
仅缓解了数据倾斜,没有彻底消除问题,效果有限。
- 实践经验
该方案通常无法完全解决数据倾斜,因为在极端情况下,如某个 key 对应的数据量有 100 万,无论 task 数量增加到多少,这个 key 仍会分配到一个 task 中处理,导致数据倾斜。因此,这种方案只是在发现数据倾斜时尝试使用的第一种方法,试图用简单的方法缓解数据倾斜,或与其他方案结合使用。
7.2 使用 ETL 预处理数据
- 适用场景
这个方案适用于导致数据倾斜的原因是不均匀分布的 Hive 表。如果这个 Hive 表中的数据分布很不均匀(例如某个 key 有 100 万条数据,而其他 key 只有 10 条数据),并且业务场景需要频繁地使用 Spark 对这个 Hive 表进行分析操作,那么这个技术方案是比较合适的。
- 实现思路
可以考虑通过 Hive 进行数据预处理(例如,通过 Hive ETL 对数据按 key 进行聚合,或预先与其他表进行 join 操作),然后让 Spark 作业针对预处理后的 Hive 表进行操作。由于数据已经进行了预处理,Spark 作业中就不需要再使用原先的 shuffle 类算子进行类似操作了。
- 实现原理
这种方案从根本上解决了数据倾斜问题,因为它完全避免了在 Spark 中执行 shuffle 类算子,因此肯定不会出现数据倾斜。但是,这种方法只治标不治本,因为数据本身存在分布不均匀的问题,Hive ETL 中进行 group by 或 join 等 shuffle 操作时,仍会出现数据倾斜,导致 Hive ETL 速度较慢。我们只是将数据倾斜的问题提前到 Hive ETL 中,避免了 Spark 程序中的数据倾斜。
- 优点
实现简单便捷,效果非常好,完全避免了数据倾斜,大幅度提升了 Spark 作业的性能。
- 缺点
治标不治本,Hive ETL 中仍会出现数据倾斜。
- 实践经验
在 Java 系统与 Spark 结合使用的项目中,如果 Java 代码频繁调用 Spark 作业,且对 Spark 作业的执行性能要求较高,这种方案比较合适。将数据倾斜问题提前到上游的 Hive ETL,每天执行一次,只有那一次比较慢,之后每次 Java 调用 Spark 作业时,执行速度都会很快,提供更好的用户体验。
- 项目实践经验
美团·点评的交互式用户行为分析系统采用了这种方案。该系统主要允许用户通过 Java Web 系统提交数据分析统计任务,后端通过 Java 提交 Spark 作业进行数据分析统计。要求 Spark 作业速度快,尽量在 10 分钟以内,否则速度太慢,用户体验会受到影响。因此,项目将部分 Spark 作业的 shuffle 操作提前到 Hive ETL 中,让 Spark 直接使用预处理过的 Hive 中
7.3 过滤导致倾斜的key
- 适用场景
这个方案适合当导致倾斜的 key 只有少数几个,且对计算结果的影响不大的情况。例如,99% 的 key 对应 10 条数据,但只有一个 key 对应了 100 万数据,从而导致数据倾斜。
- 实现思路
如果判断那些少数数据量特别多的 key 对作业的执行和计算结果不是特别重要,可以直接过滤掉这些 key。在 Spark SQL 中,可以使用 where 子句过滤掉这些 key;在 Spark Core 中,可以对 RDD 执行 filter 算子过滤掉这些 key。如果需要动态判断并过滤数据量最多的 key,可以使用 sample 算子对 RDD 进行采样,计算出每个 key 的数量,然后过滤掉数据量最多的 key。
- 实现原理
过滤掉导致数据倾斜的 key 后,这些 key 就不会参与计算,自然就不会产生数据倾斜。
- 优点
实现简单,效果很好,可以完全规避数据倾斜。
- 缺点
适用场景较少,大多数情况下,导致倾斜的 key 数量较多,并不仅限于少数几个。
- 实践经验
在项目中曾使用这种方案解决数据倾斜。有一次,某一天的 Spark 作业突然出现 OOM,经调查发现,是由于 Hive 表中某个 key 的数据异常导致数据量暴增。因此采取每次执行前先进行采样,计算出样本中数据量最大的几个 key,然后在程序中将这些 key 过滤掉。
7.4 两阶段聚合
- 适用场景:
两阶段聚合是一种适用于在对RDD执行reduceByKey等聚合类shuffle算子或在Spark SQL中使用group by语句进行分组聚合的方案。它通过结合局部聚合和全局聚合的方式,提高了数据处理效率。
- 实现思路
该方案的核心是进行两阶段聚合。首先是局部聚合,为每个 key 添加一个随机数,如 10 以内的随机数。这样,原先相同的 key 变得不同,例如 (hello, 1) 变为 (1_hello, 1) 和 (2_hello, 1)。接着对添加了随机数的数据执行 reduceByKey 等聚合操作进行局部聚合。局部聚合结果变为 (1_hello, 2) 和 (2_hello, 2)。然后去除各个 key 的前缀,再次进行全局聚合操作,得到最终结果,如 (hello, 4)。
- 实现原理
通过给原本相同的 key 添加随机前缀,将原本由一个 task 处理的数据分散到多个 task 上进行局部聚合,解决单个 task 处理数据量过多的问题。去除随机前缀后,再次进行全局聚合,得到最终结果。
- 优点
对于聚合类的 shuffle 操作导致的数据倾斜,效果较好。通常可以解决数据倾斜,或至少大幅度缓解数据倾斜,将 Spark 作业性能提升数倍以上。
- 缺点
仅适用于聚合类的 shuffle 操作,适用范围相对较窄。若为 join 类的 shuffle 操作,需使用其他解决方案。
- 给RDD中的每个key都打上一个随机前缀
// 第一步,给RDD中的每个key都打上一个随机前缀。
import org.apache.spark.rdd.RDD
import scala.util.Random
// 假设已经存在一个键值对类型的RDD[(Long, Long)]
val rdd: RDD[(Long, Long)] = ???
// 为每个键添加随机前缀,将原本相同的键变为多个不同的键
val randomPrefixRdd: RDD[(String, Long)] = rdd.map {
case (key, value) =>
// 生成一个0到9之间的随机数作为前缀
val prefix = Random.nextInt(10)
// 将随机数前缀与原始键连接,形成新的键,并返回新的键值对
(s"${prefix}_$key", value)
}
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
new PairFunction<Tuple2<Long,Long>, String, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(10);
return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
}
});
- 对打上随机前缀的key进行局部聚合
// 对带有随机前缀的键值对进行局部聚合
val localAggrRdd: RDD[(String, Long)] = randomPrefixRdd.reduceByKey((v1, v2) => v1 + v2)
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd. reduceByKey (
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
- 去除RDD中每个key的随机前缀
// 移除随机前缀,还原为原始键值对
val removedRandomPrefixRdd: RDD[(Long, Long)] = localAggrRdd.map {
case (key, value) =>
val originalKey = key.split("_")(1).toLong
(originalKey, value)
}
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction<Tuple2<String,Long>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
throws Exception {
long originalKey = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2<Long, Long>(originalKey, tuple._2);
}
});
- 对去除了随机前缀的RDD进行全局聚合
// 对移除随机前缀后的键值对进行全局聚合
val globalAggrRdd: RDD[(Long, Long)] = removedRandomPrefixRdd.reduceByKey(_ + _)
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd. reduceByKey (
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
7.5 将reduce join转为map join
- 方案适用场景
此方案适用于在对 RDD 进行 join 类操作,或在 Spark SQL 中使用 join 语句时,且其中一个 RDD 或表的数据量相对较小(如几百M或一两G)。
- 方案实现思路
使用 Broadcast 变量和 map 类算子来实现 join 操作,而非使用 join 算子,从而完全避免 shuffle 操作,彻底消除数据倾斜。首先,将较小的 RDD 数据通过 collect 算子拉取到 Driver 端内存,然后创建一个 Broadcast 变量。接着,对另一个 RDD 执行 map 类算子,在算子函数内从 Broadcast 变量中获取较小 RDD 的全量数据,并将其与当前 RDD 的每条数据按连接 key 进行比对。如果连接 key 相同,则按需将两个 RDD 的数据连接起来。
- 方案实现原理
普通的 join 操作会进行 shuffle,将相同 key 的数据拉取到一个 shuffle read task 中再进行join(reduce join)。而在一个较小的 RDD 中使用 map join(广播小RDD全量数据+map算子)可以避免 shuffle 操作,因此不会产生数据倾斜。
- 方案优点
对于因 join 操作引起的数据倾斜问题,该方案效果非常好,因为完全避免了 shuffle 操作,从而消除了数据倾斜。
- 方案缺点
适用场景较有限,只适用于一个大表和一个小表的情况。由于需要广播小表,这将消耗较多内存资源,同时在 Driver 和每个 Executor 内存中都会驻留一份小 RDD 的全量数据。如果广播的 RDD 数据量较大(如10G以上),可能会导致内存溢出。因此,该方案不适合两个大表的情况。
- 代码示例:
需要注意的是下面的做法,仅仅适用于rdd1中的key没有重复,全部是唯一的场景。如果rdd1中有多个相同的key,那么就得用flatMap类的操作,在进行join的时候不能用map,而是得遍历rdd1所有数据进行join。rdd2中每条数据都可能会返回多条join后的数据。
// 首先将数据量比较小的RDD的数据,collect到Driver中来。
val rdd1Data: List[(Long, Row)] = rdd1.collect().toList
// 然后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每个Executor就只有一份RDD的数据。
// 可以尽可能节省内存空间,并且减少网络传输性能开销。
val rdd1DataBroadcast: Broadcast[List[(Long, Row)]] = sc.broadcast(rdd1Data)
// 对另外一个RDD执行map类操作,而不再是join类操作。
val joinedRdd: RDD[(String, (String, Row))] = rdd2.map { tuple =>
// 在算子函数中,通过广播变量,获取到本地Executor中的rdd1数据。
val rdd1Data: List[(Long, Row)] = rdd1DataBroadcast.value
// 可以将rdd1的数据转换为一个Map,便于后面进行join操作。
val rdd1DataMap: Map[Long, Row] = rdd1Data.toMap
// 获取当前RDD数据的key以及value。
val key: String = tuple._1.toString
val value: String = tuple._2
// 从rdd1数据Map中,根据key获取到可以join到的数据。
val rdd1Value: Row = rdd1DataMap(key.toLong)
(key, (value, rdd1Value))
}
// 首先将数据量比较小的RDD的数据, collect 到Driver中来。
List<Tuple2<Long, Row>> rdd1Data = rdd1. collect ()
// 然后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每个Executor就只有一份RDD的数据。
// 可以尽可能节省内存空间,并且减少网络传输性能开销。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);
// 对另外一个RDD执行map类操作,而不再是join类操作。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)
throws Exception {
// 在算子函数中,通过广播变量,获取到本地Executor中的rdd1数据。
List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
// 可以将rdd1的数据转换为一个Map,便于后面进行join操作。
Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
for(Tuple2<Long, Row> data : rdd1Data) {
rdd1DataMap.put(data._1, data._2);
}
// 获取当前RDD数据的key以及value。
String key = tuple._1;
String value = tuple._2;
// 从rdd1数据Map中,根据key获取到可以join到的数据。
Row rdd1Value = rdd1DataMap.get(key);
return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
}
});
7.6 对key分拆join操作
适用场景:当两个数据量较大的 RDD/Hive 表进行 join 且无法采用将 reduce join 转为 map join 的方法时,若数据倾斜是由一个 RDD/Hive 表中少数几个 key 的数据量过大而另一个表中所有 key 分布较均匀所导致的,此方案较为合适。
实现思路:
- 对含有少数数据量过大的 key 的 RDD,采用 sample 算子抽取样本,统计各 key 数量,找出数据量最大的几个 key。
- 从原 RDD 中拆分出这几个 key 对应的数据,形成一个独立 RDD,并为每个 key 添加 n 以内的随机数前缀。不会导致倾斜的大部分 key 形成另一个 RDD。
- 对需要 join 的另一个 RDD,过滤出这几个倾斜 key 对应的数据,形成一个独立 RDD。将每条数据扩充为 n 条数据,这些数据依次附加一个 0~n 的前缀。不会导致倾斜的大部分 key 形成另一个 RDD。
- 将带有随机前缀的独立 RDD 与另一个扩充 n 倍的独立 RDD 进行 join,将原先相同的 key 分散成 n 份,分配到多个 task 中进行 join。
- 另外两个普通 RDD 照常 join。
- 最后使用 union 算子合并两次 join 的结果,得到最终 join 结果。
实现原理:针对少数几个导致数据倾斜的 key,将它们分拆成独立 RDD,添加随机前缀,分散到 n 份进行 join。这样,这几个 key 对应的数据不再集中在少数 task 上,而是分散到多个 task 进行 join。
优点:针对仅有少数 key 导致的数据倾斜,此方案可以有效地打散 key 进行 join,且仅需针对少数倾斜 key 对应的数据扩充 n 倍,避免占用过多内存。
缺点:若导致倾斜的 key 数量特别多,如成千上万个 key 都导致数据倾斜,此方案不适用。
- 代码示例
import org.apache.spark.rdd.RDD
import org.apache.spark.HashPartitioner
import scala.Tuple2
import org.apache.spark.sql.Row
import scala.util.Random
// 首先从包含了少数几个导致数据倾斜key的rdd1中,采样10%的样本数据。
val sampledRDD: RDD[(Long, String)] = rdd1.sample(false, 0.1)
// 对样本数据RDD统计出每个key的出现次数,并按出现次数降序排序。
// 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。
// 具体取出多少个数据量最多的key,由大家自己决定,我们这里就取1个作为示范。
val mappedSampledRDD: RDD[(Long, Long)] = sampledRDD.map { case (key, value) => (key, 1L) }
val countedSampledRDD: RDD[(Long, Long)] = mappedSampledRDD.reduceByKey(_ + _)
// 将(key, count)格式的countedSampledRDD转换为(count, key)格式
val reversedSampledRDD: RDD[(Long, Long)] = countedSampledRDD.map {
case (key, count) => (count, key)
}
// 按照count降序排序,取出导致数据倾斜的最大key
val skewedUserid: Long = reversedSampledRDD.sortByKey(ascending = false).take(1).head._2
// 从rdd1中分拆出导致数据倾斜的key,形成独立的RDD
val skewedRDD: RDD[(Long, String)] = rdd1.filter { case (key, value) => key == skewedUserid }
// 从rdd1中分拆出不导致数据倾斜的普通key,形成独立的RDD。
val commonRDD: RDD[(Long, String)] = rdd1.filter { case (key, _) => key != skewedUserid }
// rdd2是那个所有key的分布相对较为均匀的rdd。
// 过滤出rdd2中与导致数据倾斜的key对应的数据,分拆成单独的rdd
val skewedRdd2: RDD[(String, Row)] = rdd2.filter { case (key, _) => key == skewedUserid }
.flatMap { case (key, row) =>
// 对每条数据扩容100倍,打上0~100的前缀
for {
i <- 0 until 100
} yield (s"$i_$key", row)
}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import scala.util.Random
// 为rdd1中分拆出来的导致倾斜的key的独立rdd中的每条数据都打上100以内的随机前缀
val prefixedSkewedRDD: RDD[(String, String)] = skewedRDD.map { case (key, value) =>
val prefix = Random.nextInt(100)
(s"${prefix}_$key", value)
}
// 将这个rdd1中分拆出来的独立rdd与上面rdd2中分拆出来的独立rdd进行join
val joinedRDD1: RDD[(Long, (String, Row))] = prefixedSkewedRDD
.join(skewedRdd2)
.map { case (_, (value1, row)) =>
val key = value1.split("_")(1).toLong
(key, (value1, row))
}
// 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join
val joinedRDD2: RDD[(Long, (String, Row))] = commonRDD.join(rdd2)
// 将倾斜key join后的结果与普通key join后的结果,union起来,得到最终的join结果
val joinedRDD: RDD[(Long, (String, Row))] = joinedRDD1.union(joinedRDD2)
// 首先从包含了少数几个导致数据倾斜key的rdd1中,采样10%的样本数据。
JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);
// 对样本数据RDD统计出每个key的出现次数,并按出现次数降序排序。
// 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。
// 具体取出多少个数据量最多的key,由大家自己决定,我们这里就取1个作为示范。
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
new PairFunction<Tuple2<Long,String>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
throws Exception {
return new Tuple2<Long, Long>(tuple._1, 1L);
}
});
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD. reduceByKey (
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair(
new PairFunction<Tuple2<Long,Long>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
throws Exception {
return new Tuple2<Long, Long>(tuple._2, tuple._1);
}
});
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;
// 从rdd1中分拆出导致数据倾斜的key,形成独立的RDD。
JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
new Function<Tuple2<Long,String>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, String> tuple) throws Exception {
return tuple._1.equals(skewedUserid);
}
});
// 从rdd1中分拆出不导致数据倾斜的普通key,形成独立的RDD。
JavaPairRDD<Long, String> commonRDD = rdd1.filter(
new Function<Tuple2<Long,String>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, String> tuple) throws Exception {
return !tuple._1.equals(skewedUserid);
}
});
// rdd2,就是那个所有key的分布相对较为均匀的rdd。
// 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,并对rdd中的数据使用flatMap算子都扩容100倍。
// 对扩容的每条数据,都打上0~100的前缀。
JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
new Function<Tuple2<Long,Row>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
return tuple._1.equals(skewedUserid);
}
}).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<Tuple2<String, Row>> call(
Tuple2<Long, Row> tuple) throws Exception {
Random random = new Random();
List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
for(int i = 0; i < 100; i++) {
list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));
}
return list;
}
});
// 将rdd1中分拆出来的导致倾斜的key的独立rdd,每条数据都打上100以内的随机前缀。
// 然后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
new PairFunction<Tuple2<Long,String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Tuple2<Long, String> tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(100);
return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
}
})
.join(skewedUserid2infoRDD)
.mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Tuple2<String, Row>> call(
Tuple2<String, Tuple2<String, Row>> tuple)
throws Exception {
long key = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
}
});
// 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);
// 将倾斜key join后的结果与普通key join后的结果,uinon起来。
// 就是最终的join结果。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);
7.7 使用随机前缀和扩容RDD进行join
- 适用场景
此方案适用于在进行 join 操作时,由于 RDD 中存在大量导致数据倾斜的 key,从而无法通过分拆 key 进行优化的情况。
实现思路
参考对key分拆join操作,首先分析 RDD/Hive 表的数据分布,找到导致数据倾斜的 RDD/Hive 表,如多个 key 对应超过 1 万条数据。
为该 RDD 的每条数据添加一个 n 以内的随机前缀。
对另一个正常的 RDD 进行扩容,将每条数据扩充为 n 条数据,并分别添加 0~n 的前缀。
对两个处理后的 RDD 进行 join。
实现原理
通过为相同的 key 添加随机前缀,将它们转换为不同的 key,这样可以将处理任务分散到多个 task 中,而不是由一个 task 处理大量相同的 key。与对key分拆join操作不同,这个方案针对有大量倾斜 key 的情况,整个 RDD 需要进行数据扩容,对内存资源要求较高。
- 优点
适用于处理 join 类型的数据倾斜,效果显著,性能提升明显。
- 缺点
该方案主要是缓解数据倾斜,并未完全避免。由于需要对整个 RDD 进行扩容,对内存资源要求较高。
- 实践经验
在一个数据需求开发过程中,发现 join 操作导致数据倾斜。优化前,作业执行时间约为 60 分钟,采用此方案优化后,执行时间缩短至 10 分钟,性能提升了 6 倍。
- 代码示例:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import scala.util.Random
import scala.collection.mutable.ListBuffer
// 首先将其中一个 key 分布相对较为均匀的 RDD 膨胀 100 倍
val expandedRDD: RDD[(String, Row)] = rdd1.flatMap { case (key, row) =>
val list = new ListBuffer[(String, Row)]()
for (i <- 0 until 100) {
list.append((s"0_$key", row))
}
list
}
// 其次,将另一个有数据倾斜 key 的 RDD,每条数据都打上 100 以内的随机前缀
val mappedRDD: RDD[(String, String)] = rdd2.map { case (key, value) =>
val prefix = Random.nextInt(100)
(s"$prefix_$key", value)
}
// 将两个处理后的 RDD 进行 join 即可
val joinedRDD: RDD[(String, (String, Row))] = mappedRDD.join(expandedRDD)
// 首先将其中一个key分布相对较为均匀的RDD膨胀100倍。
JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(
new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
throws Exception {
List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
for(int i = 0; i < 100; i++) {
list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
}
return list;
}
});
// 其次,将另一个有数据倾斜key的RDD,每条数据都打上100以内的随机前缀。
JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(
new PairFunction<Tuple2<Long,String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Tuple2<Long, String> tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(100);
return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
}
});
// 将两个处理后的RDD进行join即可。
JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);
8. 总结
在实践中,处理简单数据倾斜场景时,通常只需使用一种方案就能解决。然而,针对复杂的数据倾斜问题,可能需要将多种方案组合使用。在有多个数据倾斜环节的 Spark 作业中,可以先运用预处理和过滤数据的方法来缓解数据倾斜,接着提升某些 shuffle 操作的并行度以优化性能,最后针对不同的聚合或 join 操作选择合适的优化方案。只有深入理解这些方案的思路和原理,才能根据不同情况灵活运用多种方案,解决数据倾斜问题。
- 总结:
- 单一方案适用于简单数据倾斜:对于较简单的数据倾斜问题,使用一种方案就能得到有效改善。
- 复杂数据倾斜需要方案组合:在面临复杂的数据倾斜场景时,将多种解决方案结合起来使用可能更为有效。
- 预处理和过滤数据缓解数据倾斜:通过对数据进行预处理和过滤,可以在一定程度上减轻数据倾斜的影响。
- 提升 shuffle 操作并行度优化性能:调整并行度可以优化 shuffle 过程中的性能,减轻数据倾斜的影响。
- 针对聚合或 join 操作选择合适方案:根据具体场景,为不同的聚合或 join 操作选择最适合的优化方案。
- 理解方案原理以灵活运用:深入了解各种解决方案的原理和思路,以便在实际应用中更加灵活地运用这些方案,针对具体情况进行优化。