【小牛原创】Spark SQL 从入门到实战 -- spark sql 1.6版本相关api
【小牛原创】Spark SQL 从入门到实战 -- 概述
Spark Streaming:大规模流式数据处理
spark RDD 相关需求
spark RDD 高级应用
Spark手册 - load&save
Spark手册 - debug
Spark手册 - cache&checkpoint
Spark手册 - RDD Action API
Spark手册 - Partitioner源码
Spark手册 - RDD Transformation API
Spark手册 - RDD的依赖关系
Spark手册 - RDD入门
Spark手册 - 远程debug
Spark手册 - 在IDEA中编写WordCount程序(3)
Spark手册 - 在IDEA中编写WordCount程序(2)
Spark手册 - 在IDEA中编写WordCount程序(1)
Spark手册 - 执行Spark程序
Spark手册 - 集群安装
20页PPT|视频类网站大数据生态 Spark在爱奇艺的应用实践
Spark机器学习入门实例——大数据集(30+g)二分类
Spark官方文档中文翻译:Spark SQL 之 Data Sources
使用Spark MLlib来训练并服务于自然语言处理模型
Spark知识体系完整解读
案例 :Spark应用案例现场分享(IBM Datapalooza)
最全的Spark基础知识解答
Spark在GrowingIO数据无埋点全量采集场景下的实践
Apache Spark探秘:三种分布式部署方式比较
Apache Spark探秘:多进程模型还是多线程模型?
Apache Spark探秘:实现Map-side Join和Reduce-side Join
Apache Spark探秘:利用Intellij IDEA构建开发环境
spark on yarn的技术挑战
Apache Spark学习:将Spark部署到Hadoop 2.2.0上
Hadoop与Spark常用配置参数总结
基于Spark Mllib,SparkSQL的电影推荐系统
spark作业调优秘籍,解数据倾斜之痛
Spark入门必学:预测泰坦尼克号上的生还情况
小牛学堂浅谈基于Spark大数据平台日志审计系统的设计与实现
【Hadoop Summit Tokyo 2016】使用基于Lambda架构的Spark的近实时的网络异常检测和流量分析
Spark编程环境搭建经验分享
Spark技术在京东智能供应链预测的应用
spark中textFile、groupByKey、collect、flatMap、map结合小案例
Spark中DataFrame的schema讲解
深度剖析Spark分布式执行原理
【Spark Summit East 2017】从容器化Spark负载中获取的经验
内存分析技术哪家强?Spark占几何
Spark系列之一:Spark,一种快速数据分析替代方案
6种最常见的Hadoop和Spark项目
Hadoop vs Spark
Hadoop与Spark常用配置参数总结
Spark RPC通信层设计原理分析
Spark Standalone架构设计要点分析
Spark UnifiedMemoryManager内存管理模型分析
网易的Spark技术分享

Spark手册 - load&save

于2017-06-16由小牛君创建

分享到:


1.  load&save

1.1.   从内存创建RDD

val   c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
  c.collect

res0: Array[String] = Array(Gnu, Cat,   Rat, Dog, Gnu, Rat)

def parallelize[T: ClassTag](
    seq:
Seq[T],
    numSlices: Int = defaultParallelism): RDD[
T] = withScope {
  assertNotStopped()
 
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
 

 

val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4))
rdd.collect
def makeRDD[T: ClassTag](
    seq:
Seq[T],
    numSlices: Int = defaultParallelism): RDD[
T] = withScope {
  parallelize(seq, numSlices)
}

 

1.2.   textFile&saveAsTextFile

val rdd=sc.textFile("/hdfs/wordcount/in/words.txt").flatMap(_.split("\\s+")).map((_,1)).reduceByKey(_+_);
rdd.saveAsTextFile("/hdfs/wordcount/out")
rdd.saveAsTextFile("/hdfs/wordcount/out2",classOf[org.apache.hadoop.io.compress.GzipCodec])
def textFile(
    path:
String,
    minPartitions: Int = defaultMinPartitions): RDD[
String] = withScope {
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
}
def saveAsTextFile(path: String): Unit = withScope {
 
  val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
 
val textClassTag = implicitly[ClassTag[Text]]
 
val r = this.mapPartitions { iter =>
   
val text = new Text()
    iter.map { x =>
      text.set(x.toString)
      (NullWritable.get(), text)
    }
  }
  RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag,
null)
    .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope {
 
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
 
val textClassTag = implicitly[ClassTag[Text]]
 
val r = this.mapPartitions { iter =>
   
val text = new Text()
    iter.map { x =>
      text.set(x.toString)
      (NullWritable.get(), text)
    }
  }
  RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag,
null)
    .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
}
 

 

1.3.   saveAsObjectFile&objectFile

val x = sc.parallelize(1 to 10, 3)
x.saveAsObjectFile(
"/hdfs/obj")
 
val y = sc.objectFile[Int]("/hdfs/obj")
y.collect
res25: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
 
def saveAsObjectFile(path: String): Unit = withScope {
 
this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
    .map(x => (NullWritable.get(),
new BytesWritable(Utils.serialize(x))))
    .saveAsSequenceFile(path)
}
 
def objectFile[T: ClassTag](
    path:
String,
    minPartitions: Int = defaultMinPartitions): RDD[
T] = withScope {
  assertNotStopped()
  sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
    .flatMap(x => Utils.deserialize[Array[
T]](x._2.getBytes,Utils.getContextOrSparkClassLoader))
}
 

 

1.4.   saveAsSequenceFile&sequenceFile

val v = sc.parallelize(Array(("owl",3), ("gnu",4), ("dog",1), ("cat",2), ("ant",5)), 2)
v.saveAsSequenceFile(
"/hdfs/obj/hd_seq_file")
 
val y = sc.sequenceFile[String,Int]("/hdfs/obj/seq_file")
y.collect
res31: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))
源码:
def saveAsSequenceFile(
    path:
String,
    codec: Option[
Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
 
def anyToWritable[U <% Writable](u: U): Writable = u
 

 
val convertKey = self.keyClass != keyWritableClass
 
val convertValue = self.valueClass != valueWritableClass
 
  logInfo(
"Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," +
    valueWritableClass.getSimpleName +
")" )
 
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
 
val jobConf = new JobConf(self.context.hadoopConfiguration)
 
if (!convertKey && !convertValue) {
    self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec)
  }
else if (!convertKey && convertValue) {
    self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(
      path, keyWritableClass, valueWritableClass, format, jobConf, codec)
  }
else if (convertKey && !convertValue) {
    self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(
      path, keyWritableClass, valueWritableClass, format, jobConf, codec)
  }
else if (convertKey && convertValue) {
    self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(
      path, keyWritableClass, valueWritableClass, format, jobConf, codec)
  }
}
 
def sequenceFile[K, V]
     (path:
String, minPartitions: Int = defaultMinPartitions)
     (
implicit km: ClassTag[K], vm: ClassTag[V],
      kcf: () => WritableConverter[
K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
  withScope {
    assertNotStopped()
   
val kc = clean(kcf)()
   
val vc = clean(vcf)()
   
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
   
val writables = hadoopFile(path, format,
      kc.writableClass(km).asInstanceOf[Class[Writable]],
      vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
    writables.map {
case (k, v) => (kc.convert(k), vc.convert(v)) }
  }
}