位置:首页 > Spark手册 - load&save >

Spark手册 - load&save

作者:小牛君|发布时间:2017-06-16

小牛学堂的课程大纲最近进行了再一次升级,并且同时推出Java大数据平台开发班、Python爬虫与数据挖掘班、Spark项目班、Spark大神班、机器学习算法实战班、BI数据分析实战班, 目前这类人群凤毛麟角,导致这个行业的平均薪资极高,为此小牛学堂集合了行业的诸多大牛开设对应班级,为想学习的同学提供机会!
如果想了解详细情况,请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:Spark大数据交流学习群613807316

以下是本文正文:


1.  load&save

1.1.   从内存创建RDD

val   c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
  c.collect

res0: Array[String] = Array(Gnu, Cat,   Rat, Dog, Gnu, Rat)

def parallelize[T: ClassTag](
    seq:
Seq[T],
    numSlices: Int = defaultParallelism): RDD[
T] = withScope {
  assertNotStopped()
 
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
 

 

val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4))
rdd.collect
def makeRDD[T: ClassTag](
    seq:
Seq[T],
    numSlices: Int = defaultParallelism): RDD[
T] = withScope {
  parallelize(seq, numSlices)
}

 

1.2.   textFile&saveAsTextFile

val rdd=sc.textFile("/hdfs/wordcount/in/words.txt").flatMap(_.split("\\s+")).map((_,1)).reduceByKey(_+_);
rdd.saveAsTextFile("/hdfs/wordcount/out")
rdd.saveAsTextFile("/hdfs/wordcount/out2",classOf[org.apache.hadoop.io.compress.GzipCodec])
def textFile(
    path:
String,
    minPartitions: Int = defaultMinPartitions): RDD[
String] = withScope {
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
}
def saveAsTextFile(path: String): Unit = withScope {
 
  val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
 
val textClassTag = implicitly[ClassTag[Text]]
 
val r = this.mapPartitions { iter =>
   
val text = new Text()
    iter.map { x =>
      text.set(x.toString)
      (NullWritable.get(), text)
    }
  }
  RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag,
null)
    .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope {
 
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
 
val textClassTag = implicitly[ClassTag[Text]]
 
val r = this.mapPartitions { iter =>
   
val text = new Text()
    iter.map { x =>
      text.set(x.toString)
      (NullWritable.get(), text)
    }
  }
  RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag,
null)
    .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
}
 

 

1.3.   saveAsObjectFile&objectFile

val x = sc.parallelize(1 to 10, 3)
x.saveAsObjectFile(
"/hdfs/obj")
 
val y = sc.objectFile[Int]("/hdfs/obj")
y.collect
res25: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
 
def saveAsObjectFile(path: String): Unit = withScope {
 
this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
    .map(x => (NullWritable.get(),
new BytesWritable(Utils.serialize(x))))
    .saveAsSequenceFile(path)
}
 
def objectFile[T: ClassTag](
    path:
String,
    minPartitions: Int = defaultMinPartitions): RDD[
T] = withScope {
  assertNotStopped()
  sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
    .flatMap(x => Utils.deserialize[Array[
T]](x._2.getBytes,Utils.getContextOrSparkClassLoader))
}
 

 

1.4.   saveAsSequenceFile&sequenceFile

val v = sc.parallelize(Array(("owl",3), ("gnu",4), ("dog",1), ("cat",2), ("ant",5)), 2)
v.saveAsSequenceFile(
"/hdfs/obj/hd_seq_file")
 
val y = sc.sequenceFile[String,Int]("/hdfs/obj/seq_file")
y.collect
res31: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))
源码:
def saveAsSequenceFile(
    path:
String,
    codec: Option[
Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
 
def anyToWritable[U <% Writable](u: U): Writable = u
 

 
val convertKey = self.keyClass != keyWritableClass
 
val convertValue = self.valueClass != valueWritableClass
 
  logInfo(
"Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," +
    valueWritableClass.getSimpleName +
")" )
 
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
 
val jobConf = new JobConf(self.context.hadoopConfiguration)
 
if (!convertKey && !convertValue) {
    self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec)
  }
else if (!convertKey && convertValue) {
    self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(
      path, keyWritableClass, valueWritableClass, format, jobConf, codec)
  }
else if (convertKey && !convertValue) {
    self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(
      path, keyWritableClass, valueWritableClass, format, jobConf, codec)
  }
else if (convertKey && convertValue) {
    self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(
      path, keyWritableClass, valueWritableClass, format, jobConf, codec)
  }
}
 
def sequenceFile[K, V]
     (path:
String, minPartitions: Int = defaultMinPartitions)
     (
implicit km: ClassTag[K], vm: ClassTag[V],
      kcf: () => WritableConverter[
K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
  withScope {
    assertNotStopped()
   
val kc = clean(kcf)()
   
val vc = clean(vcf)()
   
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
   
val writables = hadoopFile(path, format,
      kc.writableClass(km).asInstanceOf[Class[Writable]],
      vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
    writables.map {
case (k, v) => (kc.convert(k), vc.convert(v)) }
  }
}
 



了解更多详情请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:Spark大数据交流学习群613807316

分享到: