# 【Spark四十三】RDD算子逻辑执行图第三一部分

www.MyException.Cn  网友分享于：2015-02-11  浏览：0次
【Spark四十三】RDD算子逻辑执行图第三部分

1.interSection

2.join

## 1.interSection

1.示例代码

```package spark.examples

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._

object SparkRDDIntersection {

def main(args : Array[String]) {
val conf = new SparkConf().setAppName("SparkRDDDistinct").setMaster("local");
val sc = new SparkContext(conf);
val rdd1 = sc.parallelize(List(1,8,2,1,4,2,7,6,2,3,3,1), 3)
val rdd2 = sc.parallelize(List(1,8,7,9,6,2,1), 2)
val pairs = rdd1.intersection(rdd2);

pairs.saveAsTextFile("file:///D:/intersection" + System.currentTimeMillis());

println(pairs.toDebugString)
}

}
```

1.1 RDD的依赖关系：

```(3) MappedRDD[7] at intersection at SparkRDDIntersection.scala:13 []
|  FilteredRDD[6] at intersection at SparkRDDIntersection.scala:13 []
|  MappedValuesRDD[5] at intersection at SparkRDDIntersection.scala:13 []
|  CoGroupedRDD[4] at intersection at SparkRDDIntersection.scala:13 []
+-(3) MappedRDD[2] at intersection at SparkRDDIntersection.scala:13 []
|  |  ParallelCollectionRDD[0] at parallelize at SparkRDDIntersection.scala:11 []
+-(2) MappedRDD[3] at intersection at SparkRDDIntersection.scala:13 []
|  ParallelCollectionRDD[1] at parallelize at SparkRDDIntersection.scala:12 []```

1.2 运行结果：

part-000000: 6

part-000001: 1 7

part-000002: 8 2

2.RDD依赖图

3.intersection的源代码

```  /**
* Return the intersection of this RDD and another one. The output will not contain any duplicate
* elements, even if the input RDDs did.
*
* Note that this method performs a shuffle internally.
*/
def intersection(other: RDD[T]): RDD[T] = {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}```

3.1 RDD的取交集算子是使用cogroup，首先将Key相同的Value聚合到一个数组中，然后进行过滤

3.2 即使RDD内部有重复的元素，也会过滤掉

## 2.join

1. 示例源代码：

```package spark.examples

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._

object SparkRDDJoin {

def main(args : Array[String]) {
val conf = new SparkConf().setAppName("SparkRDDJoin").setMaster("local");
val sc = new SparkContext(conf);

//第一个参数是集合，第二个参数是分区数
val rdd1 = sc.parallelize(List((1,2),(2,3), (3,4),(4,5),(5,6)), 3)
val rdd2 = sc.parallelize(List((3,6),(2,8)), 2);

//join操作的RDD的元素类型必须是K/V类型
val pairs = rdd1.join(rdd2);

pairs.saveAsTextFile("file:///D:/join" + System.currentTimeMillis());

println(pairs.toDebugString)
}

}
```

1.1 RDD依赖图

```(3) FlatMappedValuesRDD[4] at join at SparkRDDJoin.scala:17 []
|  MappedValuesRDD[3] at join at SparkRDDJoin.scala:17 []
|  CoGroupedRDD[2] at join at SparkRDDJoin.scala:17 []
+-(3) ParallelCollectionRDD[0] at parallelize at SparkRDDJoin.scala:13 []
+-(2) ParallelCollectionRDD[1] at parallelize at SparkRDDJoin.scala:14 []```

1.2 计算结果

part-00000: (3,(4,6))

part-00001:空

part-00002:(2,(3,8))

2. RDD依赖图

3.join的源代码

```  /**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1; w <- pair._2) yield (v, w)
)
}```

1. 从源代码中可以看到，图中所描绘的过程是正确的，对于一个给定的Key，假如RDD1中有m个（K，V)，RDD2中有n个(K,V‘)，那么结果中将由m*n个(K，(V,V'))

2.