位置:首页 > spark RDD 相关需求 >

spark RDD 相关需求

作者:小牛君|发布时间:2017-06-16

小牛学堂的课程大纲最近进行了再一次升级,并且同时推出Java大数据平台开发班、Python爬虫与数据挖掘班、Spark项目班、Spark大神班、机器学习算法实战班、BI数据分析实战班, 目前这类人群凤毛麟角,导致这个行业的平均薪资极高,为此小牛学堂集合了行业的诸多大牛开设对应班级,为想学习的同学提供机会!
如果想了解详细情况,请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:Spark大数据交流学习群613807316

以下是本文正文:


1.  spark RDD 相关需求

1.1.   选出最受欢迎的老师

小牛学堂官网有学科,每个学科有很多老师授课,选出每个学科中被访问量最多的那个老师

数据样本

http://www.xiaoniu.com/bigdata/laozhao

http://www.xiaoniu.com/bigdata/laozhao

http://www.xiaoniu.com/bigdata/laozhao

http://www.xiaoniu.com/bigdata/laozhao

http://www.xiaoniu.com/bigdata/laozhao

http://www.xiaoniu.com/bigdata/laoduan

http://www.xiaoniu.com/bigdata/laoduan

http://www.xiaoniu.com/javaEE/xiaoxu

http://www.xiaoniu.com/javaEE/xiaoxu

http://www.xiaoniu.com/javaEE/laoyang

http://www.xiaoniu.com/javaEE/laoyang

http://www.xiaoniu.com/javaEE/laoyang

代码实现

val sc = new SparkContext(new SparkConf().setAppName("FavoriteTeacher").setMaster("local[*]"))
sc.setLogLevel(
"WARN")
 
val lines: RDD[String] = sc.textFile("/hdfs/logs/access.log")
 
val subjectTeacherTuple: RDD[(String, String)] = lines.map(line => {
 
//http://www.xiaoniu.com/bigdata/laozhao
 
val r: Regex = "http://.+/(.+)/(.+)".r
 
val r(subjct, teacher) = line
  (subjct, teacher)
})
 
val subjects = subjectTeacherTuple.map(_._1).distinct().collect()
 
val values = subjectTeacherTuple
  .map((_,
1))
  .reduceByKey(_ + _)
  .map(x => (x._1._1, (x._1._2, x._2)))
  .groupByKey()
  .mapValues(_.toList.sortBy(_._2).reverse.take(
1)(0))
  .foreach(println)
sc.stop()
 

 

res:

(javaEE,(laoyang,249000))

(bigdata,(laozhao,419000))

 

1.2.   取各栏目粉丝量最多的用户

有一个微博网站有很多栏目,每一个栏目都有几千万用户,每个用户会有很多的粉丝

要求取出各栏目粉丝量最多的用户

样本数据

医疗 user1 user8397

艺术 user5 user6766

教育 user7 user9601

艺术 user5 user8036

法律 user7 user3953

艺术 user1 user1888

教育 user2 user5185

法律 user1 user5051

音乐 user6 user4751

法律 user6 user1204

法律 user7 user9517

教育 user3 user898

医疗 user6 user9067

音乐 user3 user2010

法律 user1 user6622

代码实现

对于选出最受欢迎的老师的需求,我们只需要预聚合后,进行groupBy,

然后对每一个key里面的value用scala集合的方法排序即可。

但对于该需求,预聚合后,每个分区仍然有几千万的用户需要排序统计

如果进行groupBy,一个分区内的数据会全部转成内存中的数组,有内存溢出的危险

我们可以利用repartitionAndSortWithinPartitions按照key进行排序,并根据栏目指定好分区

这样排序后,只需要取出每个分区里的第一个元素,就是每个栏目粉丝量最多的用户

 

val sc = new SparkContext(new SparkConf().setAppName("FavoriteUser").setMaster("local[*]"))
sc.setLogLevel(
"WARN")
 
val lines: RDD[String] = sc.textFile("/hdfs/logs/weibo.log")
 
val columnUserTuple: RDD[(String, String)] = lines.map(line => {
 
//医疗 user7 user7051
 
val split: Array[String] = line.split("\\s+")
  (split(
0), split(1))
})
 
val columns = columnUserTuple.map(_._1).distinct().collect()
 
//申明一个Ordering[(String,Int)]类型的隐式值,用于修改默认的排序规则
 
implicit val sortkey = Ordering[(String,Int)].on[(String,Int)](x=>(x._1,-x._2))
 
val values = columnUserTuple
  .map((_,
1))
  .reduceByKey(_ + _)
  .map(x => ((x._1._1, x._2), x._1._2))
 
//按照指定的分区器进行shuffle分区,并对key进行排序,由于上面申明Ordering[(String,Int)]类型的隐式值,原始的排序将会被修改
 
.repartitionAndSortWithinPartitions(new Partitioner {
   
private val rules: Map[String, Int] = columns.zipWithIndex.toMap
   
override def numPartitions: Int = rules.size + 1
   
override def getPartition(key: Any): Int = {
     
val column = key.asInstanceOf[(String,Int)]._1
     
rules.getOrElse(column, -1) + 1
   
}
  })
 
//由于每个分区都是粉丝数量最多的用户,所以只需取出每个分区第一个元素及为该栏目粉丝量最多的用户
 
.mapPartitions(it => {
   
if (it.hasNext) Iterator.single(it.next())
   
else it
  })
  .foreach(println)
sc.stop()
 

 

res:

((艺术,2926),user4)

((医疗,2932),user3)

((教育,2898),user7)

((音乐,2927),user5)

((法律,2947),user3)

 

若已经明确要统计栏目,也可以采用如下简化的方式实现

val sc = new SparkContext(new SparkConf().setAppName("FavoriteUser").setMaster("local[*]"))
sc.setLogLevel(
"WARN")
 
val lines: RDD[String] = sc.textFile("/hdfs/logs/weibo.log")
 
val columnUserAndCounts: RDD[((String, String), Int)] = lines.map(line => {
 
//医疗 user7 user7051
 
val split: Array[String] = line.split("\\s+")
 
val column: String = split(0)
 
val user: String = split(1)
  ((column, user),
1)
})
  .reduceByKey(_ + _)
 
 
val columns = Array("教育", "法律", "医疗", "艺术", "音乐")
 
for (column <- columns) {
  columnUserAndCounts.filter(t => column.equals(t._1._1))
    .top(
1)(Ordering[Int].on(_._2))
    .map {
case ((column, user), count) => s"$column $user $count" }
    .foreach(println)
}
sc.stop()
 

 

res:

教育 user7 2898

法律 user3 2947

医疗 user3 2932

艺术 user4 2926

音乐 user5 2927

 

 

 

1.3.   获取各手机停留时间最长的基站坐标和停留时间

样本数据

基站ID,经度,纬度,信号类型

9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6

CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6

16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6

 

手机号,时间,基站ID,进出类型(1表示进入基站范围,0表示离开基站范围)

18611132889,20160327075000,9F36407EAD0629FC166F14DDE7970F68,1

18611132889,20160327081000,9F36407EAD0629FC166F14DDE7970F68,0

18611132889,20160327081100,CC0710CC94ECC657A8561DE549D940E0,1

18611132889,20160327082000,CC0710CC94ECC657A8561DE549D940E0,0

18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1

18611132889,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1

18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0

18611132889,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0

18688888888,20160327171000,CC0710CC94ECC657A8561DE549D940E0,1

18688888888,20160327171600,CC0710CC94ECC657A8561DE549D940E0,0

18611132889,20160327180500,CC0710CC94ECC657A8561DE549D940E0,1

18611132889,20160327181500,CC0710CC94ECC657A8561DE549D940E0,0

18611132889,20160327182000,9F36407EAD0629FC166F14DDE7970F68,1

18611132889,20160327230000,9F36407EAD0629FC166F14DDE7970F68,0

 

代码实现

val sc = new SparkContext(new SparkConf().setAppName("UserLocation").setMaster("local[*]"))
sc.setLogLevel(
"WARN")
 
val lines: RDD[String] = sc.textFile("/hdfs/loc/data.txt")
 
val bsAndPhoneMaxStatTime = lines.map(line => {
 
val fields = line.split(",")
 
var time = fields(1).toLong
 
if (fields(3).equals("1")) time = -time
  ((fields(
2), fields(0)), time)
})
  .reduceByKey(_+_)
  .map{
case ((baseid,phone),statTime)=>(phone,(baseid,statTime))}
  .groupByKey()
  .mapValues(t=>t.toList.sortBy(_._2).reverse.take(
1)(0))
  .map{
case (phone,(baseid,statTime))=>(baseid,(phone,statTime))}
 
 
//加载基站数据
 
val baseInfo = sc.textFile("/hdfs/loc/location.txt")
 
val baseIdAndLocation: RDD[(String, (String, String))] = baseInfo.map(info => {
 
val fields = info.split(",")
  (fields(
0), (fields(1), fields(2)))
})
 
val result = bsAndPhoneMaxStatTime.join(baseIdAndLocation)
    .values
    .map{
case ((phone,stattime),(x,y))=>s"$phone $x $y $stattime"}
result.foreach(println)
sc.stop()
 

 

res:

18611132889 116.296302   40.032296 97500

18688888888 116.296302   40.032296 87600

 

1.4.   分组topn的终极优化实现

上面的问题实际都是分组topn问题,直接groupBy的话,普通数据量可以满足要求,但组内数据过多就可能内存溢出。

后来通过先排序,然后按照指定规则分区后,取分区内的第一个元素,解决了内存溢出的问题。

但实际操作发现,当每个分区内部数据很大很大时,shuffle过程很慢很慢,虽然最终没有内存溢出,但这并不是最优的实现。

spark能否实现mapreducecombine呢?如果可以的话,每个分区都计算自己的topn,这样shuffle过程的数据量将大大减少。最终运行效率也会变得很快。

实际上spark是可以实现combine的,我们以各栏目粉丝量最多的一个用户的问题为例

具体实现代码如下:

package com.edu360
 
 
import org.apache.spark.rdd.RDD
 
import org.apache.spark.{SparkConf, SparkContext}
 
 
import scala.collection.mutable
 
 
object FavoriteUser3 {
 
 
def main(args: Array[String]): Unit = {
   
val sc = new SparkContext(new SparkConf().setAppName("FavoriteUser").setMaster("local[*]"))
    sc.setLogLevel(
"WARN")
   
val lines: RDD[String] = sc.textFile("/hdfs/logs/weibo.log")
    lines.map(line => {
     
//医疗 user7 user7051
     
val split: Array[String] = line.split("\\s+")
      (split(
0), split(1))
    }).map((_,
1))
     
//计算出各栏目每个用户的粉丝数
     
.reduceByKey(_ + _)
     
//模拟mapreduce的combine对每个分区内部进行聚合,计算各栏目的topn
     
.mapPartitions(it => {
       
val columntoUserFanscount = new mutable.HashMap[String, mutable.TreeSet[(String, Int)]]()
        it.foreach {
case ((column, user), fanscount) =>
         
val userFansCount: mutable.TreeSet[(String, Int)] = columntoUserFanscount.getOrElse(column, {
           
val newUserFansCount: mutable.TreeSet[(String, Int)] = new mutable.TreeSet[(String, Int)]()(Ordering[(Int, String)].on(x => (-x._2, x._1)))
            columntoUserFanscount.put(column, newUserFansCount)
            newUserFansCount
          })
          userFansCount.add((user, fanscount))
         
if (userFansCount.size > 1)
            userFansCount.remove(userFansCount.last)
        }
        columntoUserFanscount.toIterator
      })
     
//合并每一个栏目下面的TreeSet
     
.reduceByKey((set1, set2) => {
        set2.foreach(set1.add(_))
       
if (set1.size > 1)
          set1.remove(set1.last)
        set1
      })
     
//获取treeset第一个元素
     
.mapValues(_.head)
      .foreach(println)
    sc.stop()
  }
}
 

 

res:

(医疗,(user3,2932))

(法律,(user3,2947))

(艺术,(user4,2926))

(教育,(user7,2898))

(音乐,(user5,2927))

 

自己实现combine还是比较麻烦的,其实spark提供的合并算子直接支持combinereduce聚合,例如:

combineByKeyaggregateByKeycombinereduce可以使用不同的逻辑

具体实现代码如下:

package com.edu360.rdd
 
 
import org.apache.spark.rdd.RDD
 
import org.apache.spark.{SparkConf, SparkContext}
 
 
import scala.collection.mutable
 
 
object FavoriteUser4 {
 
 
def main(args: Array[String]): Unit = {
   
val sc = new SparkContext(new SparkConf().setAppName("FavoriteUser").setMaster("local[*]"))
    sc.setLogLevel(
"WARN")
   
val lines: RDD[String] = sc.textFile("/hdfs/logs/weibo.log")
    lines.map(line => {
     
//医疗 user7 user7051
     
val split: Array[String] = line.split("\\s+")
      (split(
0), split(1))
    }).map((_,
1))
     
//计算出各栏目每个用户的粉丝数
     
.reduceByKey(_ + _)
      .map {
case ((column, user), fanscount) => (column, (user, fanscount)) }
     
//aggregateByKey第一个柯林化参数产生一个初始值,通过Ordering。on对(String, Int)类型元组的排序规则进行修改
      //第二个柯林化参数,的第一个参数相当于mapreduce的combine,第二个参数是相当于reduce,是对所有分区结果的合并
     
.aggregateByKey(mutable.TreeSet[(String, Int)]()(Ordering[(Int, String)].on(x => (-x._2, x._1))))((set, userFanscount) => {
        set.add(userFanscount)
       
if (set.size > 1) set.remove(set.last)
        set
      }, (set1, set2) => {
        set2.foreach(set1.add(_))
       
if (set1.size > 1)
          set1.remove(set1.last)
        set1
      })
     
//获取treeset第一个元素
     
.mapValues(_.head)
      .foreach(println)
 
    sc.stop()
  }
}
 

 

res:

(医疗,(user3,2932))

(法律,(user3,2947))

(艺术,(user4,2926))

(教育,(user7,2898))

(音乐,(user5,2927))

 

由于本需求只是求top1,其实combinereduce的逻辑完全可以变成一样

下面是针对top1更简单的实现:

package com.edu360.rdd
 
 
import org.apache.spark.rdd.RDD
 
import org.apache.spark.{SparkConf, SparkContext}
 
 
object FavoriteUser5 {
 
 
def main(args: Array[String]): Unit = {
   
val sc = new SparkContext(new SparkConf().setAppName("FavoriteUser").setMaster("local[*]"))
    sc.setLogLevel(
"WARN")
   
val lines: RDD[String] = sc.textFile("/hdfs/logs/weibo.log")
    lines.map(line => {
     
//医疗 user7 user7051
     
val split: Array[String] = line.split("\\s+")
      (split(
0), split(1))
    }).map((_,
1))
     
//计算出各栏目每个用户的粉丝数
     
.reduceByKey(_ + _)
      .map {
case ((column, user), fanscount) => (column, (user, fanscount)) }
     
//reduceByKey会使用该函数,先对各分区的集合取top1,然后对所有分区的结果取top1
     
.reduceByKey((x, y) => {
     
if (x._2 > y._2) x else y
    })
      .foreach(println)
    sc.stop()
  }
}
 

 

res:

(医疗,(user3,2932))

(法律,(user3,2947))

(艺术,(user4,2926))

(教育,(user7,2898))

(音乐,(user5,2927))

 

 

 

1.5.   获取各手机的家庭位置和工作位置

样本数据为上一个案例,现在认为一个手机在一个基站的时间点为9:00-21:00次数多于时间点为21:00-9:00的次数则认为是工作地点

否则为家庭地点。

一个手机在多个基站为家庭地点,则取停留时间最长的基站为家庭地点,工作地点亦是

实现代码

val sc = new SparkContext(new SparkConf().setAppName("UserLocation").setMaster("local[*]"))
sc.setLogLevel(
"WARN")
 
val lines: RDD[String] = sc.textFile("/hdfs/loc/data.txt")
 
 
val baseIdAndPhoneloc: RDD[((String, String), String)] = lines.map(line => {
 
val fields = line.split(",")
 
val phone = fields(0)
 
var hour = fields(1).substring(8, 10).toInt
 
val baseId = fields(2)
 
if (hour > 9 && hour < 21) //早上9点到晚上九点多的是工作地点2,否则是家庭地点1,用正负号表示
   
((baseId, phone), 1)
 
else
   
((baseId, phone), -1)
}).reduceByKey(_ + _)
  .map {
case ((baseId, phone), count) =>
   
if (count >= 0) ((baseId, phone), "工作地址")
   
else ((baseId, phone), "家庭地址")
  }
 
 
val bsPhoneStatTime: RDD[((String, String), Long)] = lines.map(line => {
 
val fields = line.split(",")
 
val phone = fields(0)
 
var time = fields(1).toLong
 
val baseId = fields(2)
 
val tp = fields(3)
 
if (tp.equals("1")) time = -time
  ((baseId, phone), time)
}).reduceByKey(_ + _)
 
 
val baseIdPhonelocMax: RDD[(String, (String, String))] = baseIdAndPhoneloc.join(bsPhoneStatTime)
  .map {
case ((baseId, phone), (homeOrWork, statTime)) => ((phone, homeOrWork), (baseId, statTime)) }
  .groupByKey()
  .mapValues(_.toList.sortBy(_._2).reverse.head)
  .map {
case ((phone, homeOrWork), (baseId, statTime)) => (baseId, (phone, homeOrWork)) }
 
 
//加载基站数据
 
val baseInfo = sc.textFile("/hdfs/loc/location.txt")
 
val baseIdAndLocation: RDD[(String, (String, String))] = baseInfo.map(info => {
 
val fields = info.split(",")
 
val baseId = fields(0)
 
val x = fields(1)
 
今日值班讲师 或者直接加入千人QQ群进行咨询:Spark大数据交流学习群613807316

分享到: