# 【Spark四十五】RDD算子逻辑执行图第五一部分

【Spark四十五】RDD算子逻辑执行图第五部分

1. coalesce（联合，合并，接合，发音cola-les）

2. repartition

## 1.coalesce

1. 示例代码

```package spark.examples

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._

object SparkRDDCoalesce {

def main(args : Array[String]) {
val conf = new SparkConf().setAppName("SparkRDDDistinct").setMaster("local");
val sc = new SparkContext(conf);
val rdd1 = sc.parallelize(List(1,8,2,1,4,2,7,6,2,3,1,19,21, 66,74,22,21,72,78,102), 8)
val pairs = rdd1.coalesce(3, true);
pairs.saveAsTextFile("file:///D:/coalesce-0-" + System.currentTimeMillis());
val pairs2 = rdd1.coalesce(3, false);
pairs2.saveAsTextFile("file:///D:/coalesce-1-" + System.currentTimeMillis());

println(pairs.toDebugString)
}

}
```

1.1 依赖关系

```(3) MappedRDD[4] at coalesce at SparkRDDCoalesce.scala:12 []
|  CoalescedRDD[3] at coalesce at SparkRDDCoalesce.scala:12 []
|  ShuffledRDD[2] at coalesce at SparkRDDCoalesce.scala:12 []
+-(8) MapPartitionsRDD[1] at coalesce at SparkRDDCoalesce.scala:12 []
|  ParallelCollectionRDD[0] at parallelize at SparkRDDCoalesce.scala:11 []```

1.2 计算结果

1.2.1 shuffle为true

part-00000

4
7
6
1
21
21
78

part-00001

1
2
2
19
66
102

part-00002

8
1
2
3
74
22
72

1.2.2 shuffle为false

part-00000

1
8
2
1
4
part-00001

2
7
6
2
3
1
19

part-00002

21
66
74
22
21
72
78
102

2. RDD依赖图

3.源代码

``` /**
* Return a new RDD that is reduced into `numPartitions` partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* Note: With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
: RDD[T] = {
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t => ///将items转换为（递增的Key，item）形式
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1 ///整数的hashCode为其本身？是的，参见Java的Integer#hashCode方法
(position, t)
}
} : Iterator[(Int, T)]

// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions).values
} else { ///如果shuffle，则直接构造CoalescedRDD
new CoalescedRDD(this, numPartitions)
}
}```

## 2. repartition

```  /**
* Return a new RDD that has exactly numPartitions partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = {
coalesce(numPartitions, shuffle = true)
}```

可见repartition使用了shuffle为true的coalesce，主要用于对partition进行扩容(扩大partition)，如果是窄化partition，考虑使用coalesce以避免使用shuffle(言外之意，是使用shuffle为false版本的coalesce）