首家大数据培训挂牌机构 股票代码:837906 | EN CN
【小牛原创】Spark SQL 从入门到实战 -- spark sql 1.6版本相关api
【小牛原创】Spark SQL 从入门到实战 -- 概述
Spark Streaming:大规模流式数据处理
spark RDD 相关需求
spark RDD 高级应用
Spark手册 - load&save
Spark手册 - debug
Spark手册 - cache&checkpoint
Spark手册 - RDD Action API
Spark手册 - Partitioner源码
Spark手册 - RDD Transformation API
Spark手册 - RDD的依赖关系
Spark手册 - RDD入门
Spark手册 - 远程debug
Spark手册 - 在IDEA中编写WordCount程序(3)
Spark手册 - 在IDEA中编写WordCount程序(2)
Spark手册 - 在IDEA中编写WordCount程序(1)
Spark手册 - 执行Spark程序
Spark手册 - 集群安装
20页PPT|视频类网站大数据生态 Spark在爱奇艺的应用实践
Spark机器学习入门实例——大数据集(30+g)二分类
Spark官方文档中文翻译:Spark SQL 之 Data Sources
使用Spark MLlib来训练并服务于自然语言处理模型
Spark知识体系完整解读
案例 :Spark应用案例现场分享(IBM Datapalooza)
最全的Spark基础知识解答
Spark在GrowingIO数据无埋点全量采集场景下的实践
Apache Spark探秘:三种分布式部署方式比较
Apache Spark探秘:多进程模型还是多线程模型?
Apache Spark探秘:实现Map-side Join和Reduce-side Join
Apache Spark探秘:利用Intellij IDEA构建开发环境
spark on yarn的技术挑战
Apache Spark学习:将Spark部署到Hadoop 2.2.0上
Hadoop与Spark常用配置参数总结
基于Spark Mllib,SparkSQL的电影推荐系统
spark作业调优秘籍,解数据倾斜之痛
Spark入门必学:预测泰坦尼克号上的生还情况
小牛学堂浅谈基于Spark大数据平台日志审计系统的设计与实现
【Hadoop Summit Tokyo 2016】使用基于Lambda架构的Spark的近实时的网络异常检测和流量分析
Spark编程环境搭建经验分享
Spark技术在京东智能供应链预测的应用
spark中textFile、groupByKey、collect、flatMap、map结合小案例
Spark中DataFrame的schema讲解
深度剖析Spark分布式执行原理
【Spark Summit East 2017】从容器化Spark负载中获取的经验
内存分析技术哪家强?Spark占几何
Spark系列之一:Spark,一种快速数据分析替代方案
6种最常见的Hadoop和Spark项目
Hadoop vs Spark
Hadoop与Spark常用配置参数总结
Spark RPC通信层设计原理分析
Spark Standalone架构设计要点分析
Spark UnifiedMemoryManager内存管理模型分析
网易的Spark技术分享

Spark手册 - RDD的依赖关系

于2017-06-16由小牛君创建

分享到:


1.1.   RDD的依赖关系

RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

image.png

 

1.1.1.   窄依赖

窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用

总结:窄依赖我们形象的比喻为独生子女

1.1.2.   宽依赖

宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition

总结:窄依赖我们形象的比喻为超生

1.1.3.   Lineage

RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

1.2.   DAG的生成

DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。

 

 

1.2.1.   wordcountstage划分

代码:

val   rdd=sc.textFile("/hdfs/wordcount/in/*").flatMap(_.split("   ")).map((_,1)).reduceByKey(_+_)

rdd.saveAsTextFile("/hdfs/wordcount/out")

 

stage划分情况:

image.png

上图每一个点代表一个rdd,具体rdd依赖关系如下:

scala> rdd.toDebugString

res7: String =

(3) ShuffledRDD[54] at reduceByKey at   <console>:24 []

 +-(3) MapPartitionsRDD[53] at map at   <console>:24 []

      |  MapPartitionsRDD[52] at   flatMap at <console>:24 []

      |  /hdfs/wordcount/in/*   MapPartitionsRDD[51] at textFile at <console>:24 []

      |  /hdfs/wordcount/in/*   HadoopRDD[50] at textFile at <console>:24 []

saveAsTextFile也会产生一个MapPartitionsRDD,则共产生6RDD

 

1.2.2.   复杂stage划分示例

image.png

代码:

val a=sc.parallelize(Array("hello"->7,"world"->3,"apple"->4,"hello"->8,"world"->2,"apple"->9,"hello"->4,"world"->32,"apple"->10),3)
 
val b=a.groupByKey()
b.foreachPartition(x=>println(x.toList))
res:
List((world,CompactBuffer(3, 2, 32)))
List((apple,CompactBuffer(4, 9, 10)))
List((hello,CompactBuffer(7, 8, 4)))

 
val c=sc.parallelize(Array("hello","world","apple","hello","world","apple"),2)
 
val d=c.map(x=>(x,x.length))
 
val e=sc.parallelize(Array("hello"->2,"world"->8,"apple"->3,"hello"->2,"world"->8,"apple"->3),2)
 
val f=d union e
f.foreachPartition(x=>println(x.toList))
List((hello,2), (world,8), (apple,3))
List((hello,5), (world,5), (apple,5))
List((hello,5), (world,5), (apple,5))
List((hello,2), (world,8), (apple,3))

 
val g=b join f
g.foreachPartition(x=>println(x.toList))
 

List((apple,(CompactBuffer(4, 9, 10),5)),   (apple,(CompactBuffer(4, 9, 10),5)), (

apple,(CompactBuffer(4, 9, 10),3)),   (apple,(CompactBuffer(4, 9, 10),3)))

List((world,(CompactBuffer(3, 2, 32),5)),   (world,(CompactBuffer(3, 2, 32),5)), (

world,(CompactBuffer(3, 2, 32),8)),   (world,(CompactBuffer(3, 2, 32),8)))

List((hello,(CompactBuffer(7, 8, 4),5)),   (hello,(CompactBuffer(7, 8, 4),5)), (he

llo,(CompactBuffer(7, 8, 4),2)),   (hello,(CompactBuffer(7, 8, 4),2)))

 

scala> g.toDebugString

res3: String =

(3) MapPartitionsRDD[8] at join at   <console>:36 []

 |    MapPartitionsRDD[7] at join at <console>:36 []

 |    CoGroupedRDD[6] at join at <console>:36 []

 |    ShuffledRDD[1] at groupByKey at <console>:26 []

 +-(3) ParallelCollectionRDD[0] at   parallelize at <console>:24 []

 +-(4) UnionRDD[5] at union at   <console>:30 []

      |  MapPartitionsRDD[3] at map at   <console>:26 []

      |  ParallelCollectionRDD[2] at   parallelize at <console>:24 []

      |  ParallelCollectionRDD[4] at   parallelize at <console>:24 []

image.png

 

1.2.3.   特殊joinstage划分

示例代码:

val   a=sc.parallelize(Array("hello"->7,"world"->3,"hello"->7,"world"->3),2).groupByKey()

val   b=sc.parallelize(Array("hello"->2,"world"->3,"hello"->6,"world"->5),2).groupByKey()

 

val rdd=a join b

rdd.foreachPartition(x=>println(x.toList))

 

res:

List()

List((hello,(CompactBuffer(7,   7),CompactBuffer(2, 6))), (apple,(CompactBuffer(3,

 3),CompactBuffer(3, 5))))

image.png

总结:join并不一定会走shuffle过程

 

 

 

1.3.   spark执行流程

image.png