首家大数据培训挂牌机构 股票代码:837906 | EN CN
【小牛原创】Spark SQL 从入门到实战 -- spark sql 1.6版本相关api
【小牛原创】Spark SQL 从入门到实战 -- 概述
Spark Streaming:大规模流式数据处理
spark RDD 相关需求
spark RDD 高级应用
Spark手册 - load&save
Spark手册 - debug
Spark手册 - cache&checkpoint
Spark手册 - RDD Action API
Spark手册 - Partitioner源码
Spark手册 - RDD Transformation API
Spark手册 - RDD的依赖关系
Spark手册 - RDD入门
Spark手册 - 远程debug
Spark手册 - 在IDEA中编写WordCount程序(3)
Spark手册 - 在IDEA中编写WordCount程序(2)
Spark手册 - 在IDEA中编写WordCount程序(1)
Spark手册 - 执行Spark程序
Spark手册 - 集群安装
20页PPT|视频类网站大数据生态 Spark在爱奇艺的应用实践
Spark机器学习入门实例——大数据集(30+g)二分类
Spark官方文档中文翻译:Spark SQL 之 Data Sources
使用Spark MLlib来训练并服务于自然语言处理模型
Spark知识体系完整解读
案例 :Spark应用案例现场分享(IBM Datapalooza)
最全的Spark基础知识解答
Spark在GrowingIO数据无埋点全量采集场景下的实践
Apache Spark探秘:三种分布式部署方式比较
Apache Spark探秘:多进程模型还是多线程模型?
Apache Spark探秘:实现Map-side Join和Reduce-side Join
Apache Spark探秘:利用Intellij IDEA构建开发环境
spark on yarn的技术挑战
Apache Spark学习:将Spark部署到Hadoop 2.2.0上
Hadoop与Spark常用配置参数总结
基于Spark Mllib,SparkSQL的电影推荐系统
spark作业调优秘籍,解数据倾斜之痛
Spark入门必学:预测泰坦尼克号上的生还情况
小牛学堂浅谈基于Spark大数据平台日志审计系统的设计与实现
【Hadoop Summit Tokyo 2016】使用基于Lambda架构的Spark的近实时的网络异常检测和流量分析
Spark编程环境搭建经验分享
Spark技术在京东智能供应链预测的应用
spark中textFile、groupByKey、collect、flatMap、map结合小案例
Spark中DataFrame的schema讲解
深度剖析Spark分布式执行原理
【Spark Summit East 2017】从容器化Spark负载中获取的经验
内存分析技术哪家强?Spark占几何
Spark系列之一:Spark,一种快速数据分析替代方案
6种最常见的Hadoop和Spark项目
Hadoop vs Spark
Hadoop与Spark常用配置参数总结
Spark RPC通信层设计原理分析
Spark Standalone架构设计要点分析
Spark UnifiedMemoryManager内存管理模型分析
网易的Spark技术分享

Spark手册 - cache&checkpoint

于2017-06-16由小牛君创建

分享到:


1.  cache&checkpoint

1.1.   persist,cache,unpersist

RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

下面是所有的缓存级别:

image.png

 

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.getStorageLevel
res2: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)
c.cache
c.getStorageLevel
res4: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized,1 replicas)
 
 
val d = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
d.persist(org.apache.spark.storage.StorageLevel.
DISK_ONLY_2)
d.getStorageLevel
res10: org.apache.spark.storage.StorageLevel = StorageLevel(disk, 2 replicas)
 
 
val e = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
e.persist(org.apache.spark.storage.StorageLevel.
MEMORY_AND_DISK_2)
e.getStorageLevel
res12: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deseri
alized, 2 replicas)
 
c.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
 
c.getStorageLevel
res23: org.apache.spark.storage.StorageLevel = StorageLevel(memory, 1 replicas)
 
 
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
 

@DeveloperApi
 
class StorageLevel private(
   
private var _useDisk: Boolean,
   
private var _useMemory: Boolean,
   
private var _useOffHeap: Boolean,
   
private var _deserialized: Boolean,
   
private var _replication: Int = 1)
 
extends Externalizable {
 
object StorageLevel {
 
val NONE = new StorageLevel(false, false, false, false)
 
val DISK_ONLY = new StorageLevel(true, false, false, false)
 
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
 
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
 
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
 
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
 
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
 
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
 
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
 
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
 
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
 
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
 

 

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.getStorageLevel
 

res78: org.apache.spark.storage.StorageLevel   = StorageLevel(1 replicas)

 

c.cache
c.getStorageLevel
 

res80:   org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1   replicas)

 

c.unpersist(true)
c.getStorageLevel
 

res82: org.apache.spark.storage.StorageLevel   = StorageLevel(1 replicas)

 

c.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2)
c.getStorageLevel
 

res84:   org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory,   deserialized, 2 replicas)

def unpersist(blocking: Boolean = true): this.type = {
  logInfo(
"Removing RDD " + id + " from persistence list")
  sc.unpersistRDD(
id, blocking)
 
storageLevel = StorageLevel.NONE
 
this
 
}
 

 

 

1.2.   checkpoint

//设置目录时会自动创建目录
sc.setCheckpointDir("check_point_home")
 
val a = sc.parallelize(1 to 4)
a.checkpoint
a.count()
 
val rdd1 = a.map("a"*_)
rdd1.checkpoint
rdd1.count()
 
val rdd2 = rdd1.map((_,1))
rdd2.checkpoint()
rdd2.count()
 
val rdd3 = rdd2.reduceByKey(_+_)
rdd3.count()
 
 

经测试执行完rdd2.count(),将数据源删除,rdd3.count()任然可以获得数据

sc.setCheckpointDir("checkpoint")

val rdd =   sc.textFile("/hdfs/logs/words.txt").flatMap(_.split("   ")).map((_, 1)).reduceByKey(_+_)

rdd.checkpoint

 

rdd.isCheckpointed

res18: Boolean = false

 

rdd.count

res19: Long = 5

 

rdd.isCheckpointed

res20: Boolean = true

 

rdd.getCheckpointFile

res21:Option[String]=Some(file:/D:/jdk/spark-2.1.0-bin-hadoop2.6/bin/checkpoint/3eb91f62-90c0-463b-a776-9a8a86bcbea9/rdd-25)

 

/**
 * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
 * directory set with
`SparkContext#setCheckpointDir` and all references to its parent
 * RDDs will be removed. This function must be called before any job has been
 * executed on this RDD. It is strongly recommended that this RDD is persisted in
 * memory, otherwise saving it on a file will require recomputation.
 */
def checkpoint(): Unit = RDDCheckpointData.synchronized {
 
if (context.checkpointDir.isEmpty) {
   
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
  }
else if (checkpointData.isEmpty) {
   
checkpointData = Some(new ReliableRDDCheckpointData(this))
  }
}

checkpoint源码注释,可理解为:

标记该RDD作为检查点。

它将被保存在通过SparkContext#setCheckpointDir方法设置的检查点目录中

它所引用的所有父RDD引用将全部被移除

这个方法在这个RDD上必须在所有job执行前运行。

强烈建议将这个RDD缓存在内存中,否则这个保存文件的计算任务将重新计算。

 

从中我们得知,在执行checkpoint方法时,最好同时,将该RDD缓存起来,否则,checkpoint也会产生一个计算任务。