位置:首页 > Spark手册 - cache&checkpoint >

Spark手册 - cache&checkpoint

作者:小牛君|发布时间:2017-06-16

小牛学堂的课程大纲最近进行了再一次升级,并且同时推出Java大数据平台开发班、Python爬虫与数据挖掘班、Spark项目班、Spark大神班、机器学习算法实战班、BI数据分析实战班, 目前这类人群凤毛麟角,导致这个行业的平均薪资极高,为此小牛学堂集合了行业的诸多大牛开设对应班级,为想学习的同学提供机会!
如果想了解详细情况,请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:210992946

以下是本文正文:


1.  cache&checkpoint

1.1.   persist,cache,unpersist

RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

下面是所有的缓存级别:

image.png

 

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.getStorageLevel
res2: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)
c.cache
c.getStorageLevel
res4: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized,1 replicas)
 
 
val d = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
d.persist(org.apache.spark.storage.StorageLevel.
DISK_ONLY_2)
d.getStorageLevel
res10: org.apache.spark.storage.StorageLevel = StorageLevel(disk, 2 replicas)
 
 
val e = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
e.persist(org.apache.spark.storage.StorageLevel.
MEMORY_AND_DISK_2)
e.getStorageLevel
res12: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deseri
alized, 2 replicas)
 
c.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
 
c.getStorageLevel
res23: org.apache.spark.storage.StorageLevel = StorageLevel(memory, 1 replicas)
 
 
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
 

@DeveloperApi
 
class StorageLevel private(
   
private var _useDisk: Boolean,
   
private var _useMemory: Boolean,
   
private var _useOffHeap: Boolean,
   
private var _deserialized: Boolean,
   
private var _replication: Int = 1)
 
extends Externalizable {
 
object StorageLevel {
 
val NONE = new StorageLevel(false, false, false, false)
 
val DISK_ONLY = new StorageLevel(true, false, false, false)
 
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
 
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
 
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
 
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
 
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
 
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
 
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
 
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
 
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
 
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
 

 

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.getStorageLevel
 

res78: org.apache.spark.storage.StorageLevel   = StorageLevel(1 replicas)

 

c.cache
c.getStorageLevel
 

res80:   org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1   replicas)

 

c.unpersist(true)
c.getStorageLevel
 

res82: org.apache.spark.storage.StorageLevel   = StorageLevel(1 replicas)

 

c.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2)
c.getStorageLevel
 

res84:   org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory,   deserialized, 2 replicas)

def unpersist(blocking: Boolean = true): this.type = {
  logInfo(
"Removing RDD " + id + " from persistence list")
  sc.unpersistRDD(
id, blocking)
 
storageLevel = StorageLevel.NONE
 
this
 
}
 

 

 

1.2.   checkpoint

//设置目录时会自动创建目录
sc.setCheckpointDir("check_point_home")
 
val a = sc.parallelize(1 to 4)
a.checkpoint
a.count()
 
val rdd1 = a.map("a"*_)
rdd1.checkpoint
rdd1.count()
 
val rdd2 = rdd1.map((_,1))
rdd2.checkpoint()
rdd2.count()
 
val rdd3 = rdd2.reduceByKey(_+_)
rdd3.count()
 
 

经测试执行完rdd2.count(),将数据源删除,rdd3.count()任然可以获得数据

sc.setCheckpointDir("checkpoint")

val rdd =   sc.textFile("/hdfs/logs/words.txt").flatMap(_.split("   ")).map((_, 1)).reduceByKey(_+_)

rdd.checkpoint

 

rdd.isCheckpointed

res18: Boolean = false

 

rdd.count

res19: Long = 5

 

rdd.isCheckpointed

res20: Boolean = true

 

rdd.getCheckpointFile

res21:Option[String]=Some(file:/D:/jdk/spark-2.1.0-bin-hadoop2.6/bin/checkpoint/3eb91f62-90c0-463b-a776-9a8a86bcbea9/rdd-25)

 

/**
 * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
 * directory set with
`SparkContext#setCheckpointDir` and all references to its parent
 * RDDs will be removed. This function must be called before any job has been
 * executed on this RDD. It is strongly recommended that this RDD is persisted in
 * memory, otherwise saving it on a file will require recomputation.
 */
def checkpoint(): Unit = RDDCheckpointData.synchronized {
 
if (context.checkpointDir.isEmpty) {
   
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
  }
else if (checkpointData.isEmpty) {
   
checkpointData = Some(new ReliableRDDCheckpointData(this))
  }
}

checkpoint源码注释,可理解为:

标记该RDD作为检查点。

它将被保存在通过SparkContext#setCheckpointDir方法设置的检查点目录中

它所引用的所有父RDD引用将全部被移除

这个方法在这个RDD上必须在所有job执行前运行。

强烈建议将这个RDD缓存在内存中,否则这个保存文件的计算任务将重新计算。

 

从中我们得知,在执行checkpoint方法时,最好同时,将该RDD缓存起来,否则,checkpoint也会产生一个计算任务。

 



了解更多详情请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:210992946

分享到: