位置:首页 > Spark手册 - RDD的依赖关系 >

Spark手册 - RDD的依赖关系

作者:小牛君|发布时间:2017-06-16

小牛学堂的课程大纲最近进行了再一次升级,并且同时推出Java大数据平台开发班、Python爬虫与数据挖掘班、Spark项目班、Spark大神班、机器学习算法实战班、BI数据分析实战班, 目前这类人群凤毛麟角,导致这个行业的平均薪资极高,为此小牛学堂集合了行业的诸多大牛开设对应班级,为想学习的同学提供机会!
如果想了解详细情况,请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:Spark大数据交流学习群613807316

以下是本文正文:


1.1.   RDD的依赖关系

RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

image.png

 

1.1.1.   窄依赖

窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用

总结:窄依赖我们形象的比喻为独生子女

1.1.2.   宽依赖

宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition

总结:窄依赖我们形象的比喻为超生

1.1.3.   Lineage

RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

1.2.   DAG的生成

DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。

 

 

1.2.1.   wordcountstage划分

代码:

val   rdd=sc.textFile("/hdfs/wordcount/in/*").flatMap(_.split("   ")).map((_,1)).reduceByKey(_+_)

rdd.saveAsTextFile("/hdfs/wordcount/out")

 

stage划分情况:

image.png

上图每一个点代表一个rdd,具体rdd依赖关系如下:

scala> rdd.toDebugString

res7: String =

(3) ShuffledRDD[54] at reduceByKey at   <console>:24 []

 +-(3) MapPartitionsRDD[53] at map at   <console>:24 []

      |  MapPartitionsRDD[52] at   flatMap at <console>:24 []

      |  /hdfs/wordcount/in/*   MapPartitionsRDD[51] at textFile at <console>:24 []

      |  /hdfs/wordcount/in/*   HadoopRDD[50] at textFile at <console>:24 []

saveAsTextFile也会产生一个MapPartitionsRDD,则共产生6RDD

 

1.2.2.   复杂stage划分示例

image.png

代码:

val a=sc.parallelize(Array("hello"->7,"world"->3,"apple"->4,"hello"->8,"world"->2,"apple"->9,"hello"->4,"world"->32,"apple"->10),3)
 
val b=a.groupByKey()
b.foreachPartition(x=>println(x.toList))
res:
List((world,CompactBuffer(3, 2, 32)))
List((apple,CompactBuffer(4, 9, 10)))
List((hello,CompactBuffer(7, 8, 4)))

 
val c=sc.parallelize(Array("hello","world","apple","hello","world","apple"),2)
 
val d=c.map(x=>(x,x.length))
 
val e=sc.parallelize(Array("hello"->2,"world"->8,"apple"->3,"hello"->2,"world"->8,"apple"->3),2)
 
val f=d union e
f.foreachPartition(x=>println(x.toList))
List((hello,2), (world,8), (apple,3))
List((hello,5), (world,5), (apple,5))
List((hello,5), (world,5), (apple,5))
List((hello,2), (world,8), (apple,3))

 
val g=b join f
g.foreachPartition(x=>println(x.toList))
 

List((apple,(CompactBuffer(4, 9, 10),5)),   (apple,(CompactBuffer(4, 9, 10),5)), (

apple,(CompactBuffer(4, 9, 10),3)),   (apple,(CompactBuffer(4, 9, 10),3)))

List((world,(CompactBuffer(3, 2, 32),5)),   (world,(CompactBuffer(3, 2, 32),5)), (

world,(CompactBuffer(3, 2, 32),8)),   (world,(CompactBuffer(3, 2, 32),8)))

List((hello,(CompactBuffer(7, 8, 4),5)),   (hello,(CompactBuffer(7, 8, 4),5)), (he

llo,(CompactBuffer(7, 8, 4),2)),   (hello,(CompactBuffer(7, 8, 4),2)))

 

scala> g.toDebugString

res3: String =

(3) MapPartitionsRDD[8] at join at   <console>:36 []

 |    MapPartitionsRDD[7] at join at <console>:36 []

 |    CoGroupedRDD[6] at join at <console>:36 []

 |    ShuffledRDD[1] at groupByKey at <console>:26 []

 +-(3) ParallelCollectionRDD[0] at   parallelize at <console>:24 []

 +-(4) UnionRDD[5] at union at   <console>:30 []

      |  MapPartitionsRDD[3] at map at   <console>:26 []

      |  ParallelCollectionRDD[2] at   parallelize at <console>:24 []

      |  ParallelCollectionRDD[4] at   parallelize at <console>:24 []

image.png

 

1.2.3.   特殊joinstage划分

示例代码:

val   a=sc.parallelize(Array("hello"->7,"world"->3,"hello"->7,"world"->3),2).groupByKey()

val   b=sc.parallelize(Array("hello"->2,"world"->3,"hello"->6,"world"->5),2).groupByKey()

 

val rdd=a join b

rdd.foreachPartition(x=>println(x.toList))

 

res:

List()

List((hello,(CompactBuffer(7,   7),CompactBuffer(2, 6))), (apple,(CompactBuffer(3,

 3),CompactBuffer(3, 5))))

image.png

总结:join并不一定会走shuffle过程

 

 

 

1.3.   spark执行流程

image.png

 



了解更多详情请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:Spark大数据交流学习群613807316

分享到: