位置:首页 > Spark手册 - RDD Action API >

Spark手册 - RDD Action API

作者:小牛君|发布时间:2017-06-16

小牛学堂的课程大纲最近进行了再一次升级,并且同时推出Java大数据平台开发班、Python爬虫与数据挖掘班、Spark项目班、Spark大神班、机器学习算法实战班、BI数据分析实战班, 目前这类人群凤毛麟角,导致这个行业的平均薪资极高,为此小牛学堂集合了行业的诸多大牛开设对应班级,为想学习的同学提供机会!
如果想了解详细情况,请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:Spark大数据交流学习群613807316

以下是本文正文:


1.  RDD Action API

动作

含义

reduce(func)

通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的

collect()

在驱动程序中,以数组的形式返回数据集的所有元素

count()

返回RDD的元素个数

first()

返回RDD的第一个元素(类似于take(1))

take(n)

返回一个由数据集的前n个元素组成的数组

takeSample(withReplacement,num, [seed])

返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子

takeOrdered(n[ordering])

排序并取前N个元素

saveAsTextFile(path)

将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

saveAsSequenceFile(path

将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。

saveAsObjectFile(path

将RDD中的元素用NullWritable作为key,实际元素作为value保存为sequencefile格式

countByKey()

针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

foreach(func)

在数据集的每一个元素上,运行函数func进行更新。

 

 

 

1.1.   collect

以数组形式收集结果

def collect(): Array[T]

def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.collect
res48: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)
def collect(): Array[T] = withScope {
 
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

 

 

1.2.   collectAsMap [Pair]

对于kv形式的RDD,先以数组形式收集结果,然后将其转换为map

val a = sc.parallelize(List(1, 2, 1, 3), 1)
 
val b = a.zip(a)
b.collectAsMap
 

res49: scala.collection.Map[Int,Int] =   Map(2 -> 2, 1 -> 1, 3 -> 3)

def collectAsMap(): Map[K, V] = self.withScope {
 
val data = self.collect()
 
val map = new mutable.HashMap[K, V]
  map.sizeHint(data.length)
  data.foreach { pair => map.put(pair._1, pair._2) }
  map
}

 

 

1.3.   count,countApprox

def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]

 

count计算当前RDD的元素总个数

countApprox在给定的时间范围内计算出总个数可能的范围

 res2: Long = 4
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

d.countApprox(1).getFinalValue()

res2: org.apache.spark.partial.BoundedDouble   = [2.000, 2.000]

def countApprox(
    timeout: Long,
    confidence: Double =
0.95): PartialResult[BoundedDouble] = withScope {
  require(
0.0 <= confidence && confidence <= 1.0, s"confidence ($confidence) must be in [0,1]")
 
val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) =>
   
var result = 0L
   
while (iter.hasNext) {
      result +=
1L
     
iter.next()
    }
    result
  }
 
val evaluator = new CountEvaluator(partitions.length, confidence)
  sc.runApproximateJob(
this, countElements, evaluator, timeout)
}
 

 

 

 

1.4.   countByKey,countByValue

countByKey计算每个key出现次数

countByValue计算整个元素出现的总次数

val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2)
c.countByKey
res52: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1)
def countByKey(): Map[K, Long] = self.withScope {
  self.mapValues(_ =>
1L).reduceByKey(_ + _).collect().toMap
}
val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2)
c.countByValue()
res53: scala.collection.Map[(Int, String),Long] = Map((5,Mouse) -> 1, (3,Gnu) ->
 1, (3,Dog) -> 1, (3,Yak) -> 1)
val c = sc.parallelize(List("hello world hello me","hello you hello me","good good very good","haha"), 2)
c.flatMap(_.split(
" ")).countByValue()
res55: scala.collection.Map[String,Long] = Map(good -> 3, world -> 1, you -> 1,
me -> 2, haha -> 1, hello -> 4, very -> 1)
 
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {
  map(value => (value,
null)).countByKey()
}

 

1.5.   take,first,max,min

take取出前N个元素

first取出第一个元素

max取出最大的元素

min取出最小的元素

 res56: Array[String] = Array(dog, cat)
b.max
 

res13: String = salmon

b.first
res57: String = dog
b.min
 

res58: String = ape

def first(): T = withScope {
  take(
1) match {
   
case Array(t) => t
   
case _ => throw new UnsupportedOperationException("empty collection")
  }
}
 

 

def max()(implicit ord: Ordering[T]): T = withScope {
 
this.reduce(ord.max)
}
def min()(implicit ord: Ordering[T]): T = withScope {
 
this.reduce(ord.min)
}
def take(num: Int): Array[T] = withScope {
 
val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)
 
if (num == 0) {
   
new Array[T](0)
  }
else {
   
val buf = new ArrayBuffer[T]
   
val totalParts = this.partitions.length
   
var partsScanned = 0
   
while (buf.size < num && partsScanned < totalParts) {
     
// The number of partitions to try in this iteration. It is ok for this number to be
      // greater than totalParts because we actually cap it at totalParts in runJob.
     
var numPartsToTry = 1L
     
if (partsScanned > 0) {
       
// If we didn't find any rows after the previous iteration, quadruple and retry.
        // Otherwise, interpolate the number of partitions we need to try, but overestimate
        // it by 50%. We also cap the estimation in the end.
       
if (buf.isEmpty) {
          numPartsToTry = partsScanned * scaleUpFactor
        }
else {
          
// the left side of max is >=1 whenever partsScanned >= 2
         
numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
          numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
        }
      }
 
     
val left = num - buf.size
     
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
     
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
 
      res.foreach(buf ++= _.take(num - buf.size))
      partsScanned += p.size
    }
 
    buf.toArray
  }
}
 

 

 

1.6.   takeOrdered,top

正序或倒序取出前n个元素

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

val c = sc.parallelize(Array(6, 9, 4, 7,   5, 8), 2)

c.top(2)

res28: Array[Int] = Array(9, 8)

 

def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  takeOrdered(num)(ord.reverse)
}

val b =   sc.parallelize(List("dog", "cat", "ape",   "salmon", "gnu"), 2)

b.takeOrdered(2)

res19: Array[String] = Array(ape, cat)

 
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
 
if (num == 0) {
    Array.empty
  }
else {
   
val mapRDDs = mapPartitions { items =>
     
// Priority是一个最大长度为num,按照ord倒序排序的集合,现通过ord.reverse转为正序排列
     
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
      //按照ord排序取出items的前num个元素,
      queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
      Iterator.single(queue)
    }
   
if (mapRDDs.partitions.length == 0) {
      Array.empty
    }
else {
      mapRDDs.reduce { (queue1, queue2) =>
        queue1 ++= queue2
        queue1
      }.toArray.sorted(ord)
    }
  }
}
 

 

class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])

Priority是一个最大长度为num,按照ord倒序排序的集合,现通过ord.reverse恢复正序排列,保留前num个元素

util.collection.Utils.takeOrdered(items, num)(ord)

然后将这些元素放到BoundedPriorityQueue中

在 mapRDDs.reduce中,将所有的BoundedPriorityQueue的合并起来

由于BoundedPriorityQueue只维护前num个

 

1.7.   foreach,foreachPartition

foreach遍历每一个元素

foreachPartition遍历每一个分区

def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
 
val cleanF = sc.clean(f)
  sc.runJob(
this, (iter: Iterator[T]) => cleanF(iter))
}
def foreach(f: T => Unit): Unit = withScope {
 
val cleanF = sc.clean(f)
  sc.runJob(
this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

 

 

1.8.   lookup

查找指定key出现的所有value

scala> val a =   sc.parallelize(List("dog", "tiger", "lion",   "cat", "panther", "ea

gle"), 2).keyBy(_.length)

scala> a.collect

res8: Array[(Int, String)] =   Array((3,dog), (5,tiger), (4,lion), (3,cat), (7,panther), (5,eagle))

scala> a.lookup(5)

res10: Seq[String] = WrappedArray(tiger,   eagle)

def lookup(key: K): Seq[V] = self.withScope {
  self.partitioner
match {
   
case Some(p) =>
     
val index = p.getPartition(key)
     
val process = (it: Iterator[(K, V)]) => {
       
val buf = new ArrayBuffer[V]
       
for (pair <- it if pair._1 == key) {
          buf += pair._2
        }
        buf
      } : Seq[V]
     
val res = self.context.runJob(self, process, Array(index))
      res(
0)
   
case None =>
      self.filter(_._1 == key).map(_._2).collect()
  }
}
 

 

1.9.   aggregate

进行聚合运算

val z = sc.parallelize(List(1,2,3,4,5,6), 2)
 
val aggregate: Int = z.aggregate(0)(math.max(_, _), _ + _)
  aggregate: Int = 9
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
 
// Clone the zero value since we will also be serializing it as part of tasks
 
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
 
val cleanSeqOp = sc.clean(seqOp)
 
val cleanCombOp = sc.clean(combOp)
 
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
 
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
  sc.runJob(
this, aggregatePartition, mergeResult)
  jobResult
}
 

 

1.10. reduce

进行聚合运算

val a = sc.parallelize(1 to 100, 3)
a.reduce(_ + _)
res27: Int = 5050
def reduce(f: (T, T) => T): T = withScope {
 
val cleanF = sc.clean(f)
 
val reducePartition: Iterator[T] => Option[T] = iter => {
   
if (iter.hasNext) {
      Some(iter.reduceLeft(cleanF))
    }
else {
      None
    }
  }
 
var jobResult: Option[T] = None
 
val mergeResult = (index: Int, taskResult: Option[T]) => {
   
if (taskResult.isDefined) {
      jobResult = jobResult
match {
       
case Some(value) => Some(f(value, taskResult.get))
       
case None => taskResult
      }
    }
  }
  sc.runJob(
this, reducePartition, mergeResult)
 
// Get the final result out of our Option, or throw an exception if the RDD was empty
 
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
 

 

1.11. fold

根据给定初始值进行聚合运算

def fold(zeroValue: T)(op: (T, T) => T): T

val a =   sc.parallelize(List(1,2,3), 3)
  a.fold(0)(_ + _)
  res59: Int = 6

def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
 
// Clone the zero value since we will also be serializing it as part of tasks
 
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
 
val cleanOp = sc.clean(op)
 
val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
 
val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
  sc.runJob(
this, foldPartition, mergeResult)
  jobResult
}
 

 

1.12. sum

val x =   sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)
  x.sum

def sum(): Double = self.withScope {
  self.fold(
0.0)(_ + _)
}

 

 



了解更多详情请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:Spark大数据交流学习群613807316

分享到: