位置:首页 > Spark手册 - RDD Transformation API >

Spark手册 - RDD Transformation API

作者:小牛君|发布时间:2017-06-16

小牛学堂的课程大纲最近进行了再一次升级,并且同时推出Java大数据平台开发班、Python爬虫与数据挖掘班、Spark项目班、Spark大神班、机器学习算法实战班、BI数据分析实战班, 目前这类人群凤毛麟角,导致这个行业的平均薪资极高,为此小牛学堂集合了行业的诸多大牛开设对应班级,为想学习的同学提供机会!
如果想了解详细情况,请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:Spark大数据交流学习群613807316

以下是本文正文:


内容略长,请自行搜索查看:


1.  RDD Transformation API

RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

本质是将一个RDD包装成另一个RDD,或多包装几次

常用的Transformation

转换

含义

map(func)

返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

filter(func)

返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

flatMap(func)

类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

mapPartitions(func)

类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是

(Int, Interator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)

根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子

union(otherDataset)

对源RDD和参数RDD求并集后返回一个新的RDD

intersection(otherDataset)

对源RDD和参数RDD求交集后返回一个新的RDD

distinct([numPartitions]))

对源RDD进行去重后返回一个新的RDD

groupByKey([numTasks|partitioner])

在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD

reduceByKey(func, [numTasks])

reduceByKey([partitioner],func)

在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置

sortByKey([ascending], [numPartitions])

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD,ascending表示是否正序

sortBy(func,[ascending], [numTasks])

与sortByKey类似,但是更灵活

join(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

cogroup(otherDataset1[,otherDataset2[,otherDataset3]],   [numTasks|partitioner])

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD

cartesian(otherDataset)

笛卡尔积

aggregateByKey(zeroValue,[numPartitions|partitioner])(seqOp,combOp)

对kv类型RDD[(K, V)]进行操作,在每一分区内部调用seqOp方法,首先将zeroValue传入seqOp第一个参数,将该分区迭代出来的第一个元素传入seqOp第二个参数,计算出一个结果,之后不断迭代该分区的元素,将上一次的计算结果传入seqOp第一个参数,将当前跌带出来的元素传入seqOp第二个参数,计算出该分区的最终结果。对于所有分区的结果使用comOp进行聚合

pipe(command, [envVars])

通过脚本命令(perl或bash )操作rdd

coalesce(numPartitions,[shuffle])

调整rdd分区数可设置是否shuffle

repartition(numPartitions)

根据HashPartitioner重新调整rdd分区数

repartitionAndSortWithinPartitions(partitioner)

根据指定分区器进行重新分区,并按照key进行排序

上下文需要一个Ordering[K]类型的隐式值

 

1.1.   ShuffledRDD

ShuffledRDD表示需要走Shuffle过程的网络传输,

ShuffledRDD的创建需指定设置分区器,可以指定序列化类、合并类和是否进行map端合并

new ShuffledRDD[K,   V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(
mapSideCombine)
 
 

1.1.1.   combineByKeyWithClassTag源码

下面的combineByKeygroupByKey 底层均是调用combineByKeyWithClassTag方法

@Experimental
 
def combineByKeyWithClassTag[C](
    createCombiner:
V => C,
    mergeValue: (
C, V) => C,
    mergeCombiners: (
C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean =
true,
    serializer: Serializer =
null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  require(mergeCombiners !=
null, "mergeCombiners must be defined") // required as of Spark 0.9.0
 
if (keyClass.isArray) {
   
if (mapSideCombine) {
     
throw new SparkException("Cannot use map-side combining with array keys.")
    }
   
if (partitioner.isInstanceOf[HashPartitioner]) {
     
throw new SparkException("HashPartitioner cannot partition array keys.")
    }
  }
 
val aggregator = new Aggregator[K, V, C](
    self.context.clean(createCombiner),
    self.context.clean(mergeValue),
    self.context.clean(mergeCombiners))
//若没有指定新的分区则不创建ShuffledRDD,每个分区内部进行聚合即可
 
if (self.partitioner == Some(partitioner)) {
    self.mapPartitions(iter => {
     
val context = TaskContext.get()
     
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
    }, preservesPartitioning =
true)
  }
else {
//否则按照指定的规则创建ShuffledRDD
   
new ShuffledRDD[K, V, C](self, partitioner)
      .setSerializer(serializer)
      .setAggregator(aggregator)
      .setMapSideCombine(
mapSideCombine)
  }
}
@DeveloperApi
 
case class Aggregator[K, V, C] (
    createCombiner:
V => C,
    mergeValue: (
C, V) => C,
    mergeCombiners: (
C, C) => C) {
 
 
def combineValuesByKey(
      iter:
Iterator[_ <: Product2[K, V]],
      context: TaskContext):
Iterator[(K, C)] = {
   
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
    combiners.insertAll(iter)
    updateMetrics(context, combiners)
    combiners.iterator
  }
 
 
def combineCombinersByKey(
      iter:
Iterator[_ <: Product2[K, C]],
      context: TaskContext):
Iterator[(K, C)] = {
   
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
    combiners.insertAll(iter)
    updateMetrics(context, combiners)
    combiners.iterator
  }
 
 
/** Update task metrics after populating the external map. */
 
private def updateMetrics(context: TaskContext, map: ExternalAppendOnlyMap[_, _, _]): Unit = {
    Option(context).foreach { c =>
      c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
      c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
      c.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
    }
  }
}
 

 

1.1.2.   combineByKey[Pair]

createCombiner根据对每个分区的第一个元素操作产生一个初始值

mergeValue对每个分区内部的元素进行迭代合并

mergeCombiners对所有分区的合并结果进行合并

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)]

val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
 
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
 
val c = b.zip(a)
scala> c.collect
res0: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rab
bit), (1,turkey), (2,wolf), (2,bear), (2,bee))
 
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d.foreach(println)
res:
 

(2,List(gnu, rabbit, salmon, bee, bear,   wolf))

(1,List(cat, dog, turkey))

def combineByKey[C](
    createCombiner:
V => C,
    mergeValue: (
C, V) => C,
    mergeCombiners: (
C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean =
true,
    serializer: Serializer =
null): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
    partitioner, mapSideCombine, serializer)(
null)
}
 

 

def combineByKey[C](
    createCombiner:
V => C,
    mergeValue: (
C, V) => C,
    mergeCombiners: (
C, C) => C): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(
null)
}
 

 

@Experimental
 
def combineByKeyWithClassTag[C](
      createCombiner:
V =>   C,
      mergeValue: (
C,   V) => C,
      mergeCombiners: (
C,   C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)]   = self.withScope {
      combineByKeyWithClassTag(createCombiner,mergeValue,mergeCombiners

       ,defaultPartitioner(self))
  }

def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
 
val rdds = (Seq(rdd) ++ others)
 
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
 
if (hasPartitioner.nonEmpty) {
    hasPartitioner.maxBy(_.partitions.length).
partitioner.get
  }
else {
   
if (rdd.context.conf.contains("spark.default.parallelism")) {
     
new HashPartitioner(rdd.context.defaultParallelism)
    }
else {
     
new HashPartitioner(rdds.map(_.partitions.length).max)
    }
  }
}
 

 

1.1.3.   aggregateByKey

每个分区使用zeroValue作为初始值,迭代每一个元素用seqOp进行合并,对所有分区的结果用combOp进行合并

def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) U, combOp: (U, U) U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) U, combOp: (U, U) U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) U, combOp: (U, U) U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD.aggregateByKey(
0)(math.max(_, _), _ + _).collect
res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
pairRDD.aggregateByKey(
100)(math.max(_, _), _ + _).collect
 

res7: Array[(String, Int)] =   Array((dog,100), (cat,200), (mouse,200))

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
    combOp: (
U, U) => U): RDD[(K, U)] = self.withScope {
 
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
 
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
 
val zeroArray = new Array[Byte](zeroBuffer.limit)
  zeroBuffer.get(zeroArray)
 
 
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
 
val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
 
 
// We will clean the combiner closure later in `combineByKey`
 
val cleanedSeqOp = self.context.clean(seqOp)
  combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
    cleanedSeqOp, combOp, partitioner)
}
 

 

 

1.1.4.   foldByKey [Pair]

每个分区使用zeroValue作为初始值,迭代每一个元素用func进行合并,对所有分区的结果用func再进行合并

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

val a =   sc.parallelize(List("dog", "tiger", "lion",   "cat", "panther", "eagle"), 2)

val b = a.map(x => (x.length, x))

b.collect

res1: Array[(Int, String)] =   Array((3,dog), (5,tiger), (4,lion), (3,cat), (7,panther), (5,eagle))

b.foldByKey("")(_ + _).collect

res85: Array[(Int, Sxtring)] =   Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))

def foldByKey(
    zeroValue:
V,
    partitioner: Partitioner)(func: (
V, V) => V): RDD[(K, V)] = self.withScope {
 
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
 
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
 
val zeroArray = new Array[Byte](zeroBuffer.limit)
  zeroBuffer.get(zeroArray)
 
 
// When deserializing, use a lazy val to create just one instance of the serializer per task
 
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
 
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
 
 
val cleanedFunc = self.context.clean(func)
  combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
    cleanedFunc, cleanedFunc, partitioner)
}

 

1.1.5.   reduceByKey [Pair]

每个分区迭代每一个元素用func进行合并,对所有分区的结果用func再进行合并

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

val a =   sc.parallelize(List("dog", "cat", "owl",   "gnu", "ant"), 2)
  val b = a.map(x => (x.length, x))
  b.reduceByKey(_ + _).collect
  res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))
 
  val a = sc.parallelize(List("dog", "tiger",   "lion", "cat", "panther", "eagle"),   2)
  val b = a.map(x => (x.length, x))
  b.reduceByKey(_ + _).collect
  res87: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther),   (5,tigereagle))

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
  combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

 

1.1.


了解更多详情请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:Spark大数据交流学习群613807316

分享到: