首家大数据培训挂牌机构 股票代码:837906 | EN CN
【小牛原创】Spark SQL 从入门到实战 -- spark sql 1.6版本相关api
【小牛原创】Spark SQL 从入门到实战 -- 概述
Spark Streaming:大规模流式数据处理
spark RDD 相关需求
spark RDD 高级应用
Spark手册 - load&save
Spark手册 - debug
Spark手册 - cache&checkpoint
Spark手册 - RDD Action API
Spark手册 - Partitioner源码
Spark手册 - RDD Transformation API
Spark手册 - RDD的依赖关系
Spark手册 - RDD入门
Spark手册 - 远程debug
Spark手册 - 在IDEA中编写WordCount程序(3)
Spark手册 - 在IDEA中编写WordCount程序(2)
Spark手册 - 在IDEA中编写WordCount程序(1)
Spark手册 - 执行Spark程序
Spark手册 - 集群安装
20页PPT|视频类网站大数据生态 Spark在爱奇艺的应用实践
Spark机器学习入门实例——大数据集(30+g)二分类
Spark官方文档中文翻译:Spark SQL 之 Data Sources
使用Spark MLlib来训练并服务于自然语言处理模型
Spark知识体系完整解读
案例 :Spark应用案例现场分享(IBM Datapalooza)
最全的Spark基础知识解答
Spark在GrowingIO数据无埋点全量采集场景下的实践
Apache Spark探秘:三种分布式部署方式比较
Apache Spark探秘:多进程模型还是多线程模型?
Apache Spark探秘:实现Map-side Join和Reduce-side Join
Apache Spark探秘:利用Intellij IDEA构建开发环境
spark on yarn的技术挑战
Apache Spark学习:将Spark部署到Hadoop 2.2.0上
Hadoop与Spark常用配置参数总结
基于Spark Mllib,SparkSQL的电影推荐系统
spark作业调优秘籍,解数据倾斜之痛
Spark入门必学:预测泰坦尼克号上的生还情况
小牛学堂浅谈基于Spark大数据平台日志审计系统的设计与实现
【Hadoop Summit Tokyo 2016】使用基于Lambda架构的Spark的近实时的网络异常检测和流量分析
Spark编程环境搭建经验分享
Spark技术在京东智能供应链预测的应用
spark中textFile、groupByKey、collect、flatMap、map结合小案例
Spark中DataFrame的schema讲解
深度剖析Spark分布式执行原理
【Spark Summit East 2017】从容器化Spark负载中获取的经验
内存分析技术哪家强?Spark占几何
Spark系列之一:Spark,一种快速数据分析替代方案
6种最常见的Hadoop和Spark项目
Hadoop vs Spark
Hadoop与Spark常用配置参数总结
Spark RPC通信层设计原理分析
Spark Standalone架构设计要点分析
Spark UnifiedMemoryManager内存管理模型分析
网易的Spark技术分享

spark RDD 相关需求

于2017-06-16由小牛君创建

分享到:


1.  spark RDD 相关需求

1.1.   选出最受欢迎的老师

小牛学堂官网有学科,每个学科有很多老师授课,选出每个学科中被访问量最多的那个老师

数据样本

http://www.xiaoniu.com/bigdata/laozhao

http://www.xiaoniu.com/bigdata/laozhao

http://www.xiaoniu.com/bigdata/laozhao

http://www.xiaoniu.com/bigdata/laozhao

http://www.xiaoniu.com/bigdata/laozhao

http://www.xiaoniu.com/bigdata/laoduan

http://www.xiaoniu.com/bigdata/laoduan

http://www.xiaoniu.com/javaEE/xiaoxu

http://www.xiaoniu.com/javaEE/xiaoxu

http://www.xiaoniu.com/javaEE/laoyang

http://www.xiaoniu.com/javaEE/laoyang

http://www.xiaoniu.com/javaEE/laoyang

代码实现

val sc = new SparkContext(new SparkConf().setAppName("FavoriteTeacher").setMaster("local[*]"))
sc.setLogLevel(
"WARN")
 
val lines: RDD[String] = sc.textFile("/hdfs/logs/access.log")
 
val subjectTeacherTuple: RDD[(String, String)] = lines.map(line => {
 
//http://www.xiaoniu.com/bigdata/laozhao
 
val r: Regex = "http://.+/(.+)/(.+)".r
 
val r(subjct, teacher) = line
  (subjct, teacher)
})
 
val subjects = subjectTeacherTuple.map(_._1).distinct().collect()
 
val values = subjectTeacherTuple
  .map((_,
1))
  .reduceByKey(_ + _)
  .map(x => (x._1._1, (x._1._2, x._2)))
  .groupByKey()
  .mapValues(_.toList.sortBy(_._2).reverse.take(
1)(0))
  .foreach(println)
sc.stop()
 

 

res:

(javaEE,(laoyang,249000))

(bigdata,(laozhao,419000))

 

1.2.   取各栏目粉丝量最多的用户

有一个微博网站有很多栏目,每一个栏目都有几千万用户,每个用户会有很多的粉丝

要求取出各栏目粉丝量最多的用户

样本数据

医疗 user1 user8397

艺术 user5 user6766

教育 user7 user9601

艺术 user5 user8036

法律 user7 user3953

艺术 user1 user1888

教育 user2 user5185

法律 user1 user5051

音乐 user6 user4751

法律 user6 user1204

法律 user7 user9517

教育 user3 user898

医疗 user6 user9067

音乐 user3 user2010

法律 user1 user6622

代码实现

对于选出最受欢迎的老师的需求,我们只需要预聚合后,进行groupBy,

然后对每一个key里面的value用scala集合的方法排序即可。

但对于该需求,预聚合后,每个分区仍然有几千万的用户需要排序统计

如果进行groupBy,一个分区内的数据会全部转成内存中的数组,有内存溢出的危险

我们可以利用repartitionAndSortWithinPartitions按照key进行排序,并根据栏目指定好分区

这样排序后,只需要取出每个分区里的第一个元素,就是每个栏目粉丝量最多的用户

 

val sc = new SparkContext(new SparkConf().setAppName("FavoriteUser").setMaster("local[*]"))
sc.setLogLevel(
"WARN")
 
val lines: RDD[String] = sc.textFile("/hdfs/logs/weibo.log")
 
val columnUserTuple: RDD[(String, String)] = lines.map(line => {
 
//医疗 user7 user7051
 
val split: Array[String] = line.split("\\s+")
  (split(
0), split(1))
})
 
val columns = columnUserTuple.map(_._1).distinct().collect()
 
//申明一个Ordering[(String,Int)]类型的隐式值,用于修改默认的排序规则
 
implicit val sortkey = Ordering[(String,Int)].on[(String,Int)](x=>(x._1,-x._2))
 
val values = columnUserTuple
  .map((_,
1))
  .reduceByKey(_ + _)
  .map(x => ((x._1._1, x._2), x._1._2))
 
//按照指定的分区器进行shuffle分区,并对key进行排序,由于上面申明Ordering[(String,Int)]类型的隐式值,原始的排序将会被修改
 
.repartitionAndSortWithinPartitions(new Partitioner {
   
private val rules: Map[String, Int] = columns.zipWithIndex.toMap
   
override def numPartitions: Int = rules.size + 1
   
override def getPartition(key: Any): Int = {
     
val column = key.asInstanceOf[(String,Int)]._1
     
rules.getOrElse(column, -1) + 1
   
}
  })
 
//由于每个分区都是粉丝数量最多的用户,所以只需取出每个分区第一个元素及为该栏目粉丝量最多的用户
 
.mapPartitions(it => {
   
if (it.hasNext) Iterator.single(it.next())
   
else it
  })
  .foreach(println)
sc.stop()
 

 

res:

((艺术,2926),user4)

((医疗,2932),user3)

((教育,2898),user7)

((音乐,2927),user5)

((法律,2947),user3)

 

若已经明确要统计栏目,也可以采用如下简化的方式实现

val sc = new SparkContext(new SparkConf().setAppName("FavoriteUser").setMaster("local[*]"))
sc.setLogLevel(
"WARN")
 
val lines: RDD[String] = sc.textFile("/hdfs/logs/weibo.log")
 
val columnUserAndCounts: RDD[((String, String), Int)] = lines.map(line => {
 
//医疗 user7 user7051
 
val split: Array[String] = line.split("\\s+")
 
val column: String = split(0)
 
val user: String = split(1)
  ((column, user),
1)
})
  .reduceByKey(_ + _)
 
 
val columns = Array("教育", "法律", "医疗", "艺术", "音乐")
 
for (column <- columns) {
  columnUserAndCounts.filter(t => column.equals(t._1._1))
    .top(
1)(Ordering[Int].on(_._2))
    .map {
case ((column, user), count) => s"$column $user $count" }
    .foreach(println)
}
sc.stop()
 

 

res:

教育 user7 2898

法律 user3 2947

医疗 user3 2932

艺术 user4 2926

音乐 user5 2927

 

 

 

1.3.   获取各手机停留时间最长的基站坐标和停留时间

样本数据

基站ID,经度,纬度,信号类型

9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6

CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6

16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6

 

手机号,时间,基站ID,进出类型(1表示进入基站范围,0表示离开基站范围)

18611132889,20160327075000,9F36407EAD0629FC166F14DDE7970F68,1

18611132889,20160327081000,9F36407EAD0629FC166F14DDE7970F68,0

18611132889,20160327081100,CC0710CC94ECC657A8561DE549D940E0,1

18611132889,20160327082000,CC0710CC94ECC657A8561DE549D940E0,0

18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1

18611132889,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1

18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0

18611132889,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0

18688888888,20160327171000,CC0710CC94ECC657A8561DE549D940E0,1

18688888888,20160327171600,CC0710CC94ECC657A8561DE549D940E0,0

18611132889,20160327180500,CC0710CC94ECC657A8561DE549D940E0,1

18611132889,20160327181500,CC0710CC94ECC657A8561DE549D940E0,0

18611132889,20160327182000,9F36407EAD0629FC166F14DDE7970F68,1

18611132889,20160327230000,9F36407EAD0629FC166F14DDE7970F68,0

 

代码实现

val sc = new SparkContext(new SparkConf().setAppName("UserLocation").setMaster("local[*]"))
sc.setLogLevel(
"WARN")
 
val lines: RDD[String] = sc.textFile("/hdfs/loc/data.txt")
 
val bsAndPhoneMaxStatTime = lines.map(line => {
 
val fields = line.split(",")
 
var time = fields(1).toLong
 
if (fields(3).equals("1")) time = -time
  ((fields(
2), fields(0)), time)
})
  .reduceByKey(_+_)
  .map{
case ((baseid,phone),statTime)=>(phone,(baseid,statTime))}
  .groupByKey()
  .mapValues(t=>t.toList.sortBy(_._2).reverse.take(
1)(0))
  .map{
case (phone,(baseid,statTime))=>(baseid,(phone,statTime))}
 
 
//加载基站数据
 
val baseInfo = sc.textFile("/hdfs/loc/location.txt")
 
val baseIdAndLocation: RDD[(String, (String, String))] = baseInfo.map(info => {
 
val fields = info.split(",")
  (fields(
0), (fields(1), fields(2)))
})
 
val result = bsAndPhoneMaxStatTime.join(baseIdAndLocation)
    .values
    .map{
case ((phone,stattime),(x,y))=>s"$phone $x $y $stattime"}
result.foreach(println)
sc.stop()
 

 

res:

18611132889 116.296302   40.032296 97500

18688888888 116.296302   40.032296 87600

 

1.4.   分组topn的终极优化实现

上面的问题实际都是分组topn问题,直接groupBy的话,普通数据量可以满足要求,但组内数据过多就可能内存溢出。

后来通过先排序,然后按照指定规则分区后,取分区内的第一个元素,解决了内存溢出的问题。

但实际操作发现,当每个分区内部数据很大很大时,shuffle过程很慢很慢,虽然最终没有内存溢出,但这并不是最优的实现。

spark能否实现mapreducecombine呢?如果可以的话,每个分区都计算自己的topn,这样shuffle过程的数据量将大大减少。最终运行效率也会变得很快。

实际上spark是可以实现combine的,我们以各栏目粉丝量最多的一个用户的问题为例

具体实现代码如下:

package com.edu360
 
 
import org.apache.spark.rdd.RDD
 
import org.apache.spark.{SparkConf, SparkContext}
 
 
import scala.collection.mutable
 
 
object FavoriteUser3 {
 
 
def main(args: Array[String]): Unit = {
   
val sc = new SparkContext(new SparkConf().setAppName("FavoriteUser").setMaster("local[*]"))
    sc.setLogLevel(
"WARN")
   
val lines: RDD[String] = sc.textFile("/hdfs/logs/weibo.log")
    lines.map(line => {
     
//医疗 user7 user7051
     
val split: Array[String] = line.split("\\s+")
      (split(
0), split(1))
    }).map((_,
1))
     
//计算出各栏目每个用户的粉丝数
     
.reduceByKey(_ + _)
     
//模拟mapreduce的combine对每个分区内部进行聚合,计算各栏目的topn
     
.mapPartitions(it => {
       
val columntoUserFanscount = new mutable.HashMap[String, mutable.TreeSet[(String, Int)]]()
        it.foreach {
case ((column, user), fanscount) =>
         
val userFansCount: mutable.TreeSet[(String, Int)] = columntoUserFanscount.getOrElse(column, {
           
val newUserFansCount: mutable.TreeSet[(String, Int)] = new mutable.TreeSet[(String, Int)]()(Ordering[(Int, String)].on(x => (-x._2, x._1)))
            columntoUserFanscount.put(column, newUserFansCount)
            newUserFansCount
          })
          userFansCount.add((user, fanscount))
         
if (userFansCount.size > 1)
            userFansCount.remove(userFansCount.last)
        }
        columntoUserFanscount.toIterator
      })
     
//合并每一个栏目下面的TreeSet
     
.reduceByKey((set1, set2) => {
        set2.foreach(set1.add(_))
       
if (set1.size > 1)
          set1.remove(set1.last)
        set1
      })
     
//获取treeset第一个元素
     
.mapValues(_.head)
      .foreach(println)
    sc.stop()
  }
}
 

 

res:

(医疗,(user3,2932))

(法律,(user3,2947))

(艺术,(user4,2926))

(教育,(user7,2898))

(音乐,(user5,2927))

 

自己实现combine还是比较麻烦的,其实spark提供的合并算子直接支持combinereduce聚合,例如:

combineByKeyaggregateByKeycombinereduce可以使用不同的逻辑

具体实现代码如下:

package com.edu360.rdd
 
 
import org.apache.spark.rdd.RDD
 
import org.apache.spark.{SparkConf, SparkContext}
 
 
import scala.collection.mutable
 
 
object FavoriteUser4 {
 
 
def main(args: Array[String]): Unit = {
   
val sc = new SparkContext(new SparkConf().setAppName("FavoriteUser").setMaster("local[*]"))
    sc.setLogLevel(
"WARN")
   
val lines: RDD[String] = sc.textFile("/hdfs/logs/weibo.log")
    lines.map(line => {
     
//医疗 user7 user7051
     
val split: Array[String] = line.split("\\s+")
      (split(
0), split(1))
    }).map((_,
1))
     
//计算出各栏目每个用户的粉丝数
     
.reduceByKey(_ + _)
      .map {
case ((column, user), fanscount) => (column, (user, fanscount)) }
     
//aggregateByKey第一个柯林化参数产生一个初始值,通过Ordering。on对(String, Int)类型元组的排序规则进行修改
      //第二个柯林化参数,的第一个参数相当于mapreduce的combine,第二个参数是相当于reduce,是对所有分区结果的合并
     
.aggregateByKey(mutable.TreeSet[(String, Int)]()(Ordering[(Int, String)].on(x => (-x._2, x._1))))((set, userFanscount) => {
        set.add(userFanscount)
       
if (set.size > 1) set.remove(set.last)
        set
      }, (set1, set2) => {
        set2.foreach(set1.add(_))
       
if (set1.size > 1)
          set1.remove(set1.last)
        set1
      })
     
//获取treeset第一个元素
     
.mapValues(_.head)
      .foreach(println)
 
    sc.stop()
  }
}
 

 

res:

(医疗,(user3,2932))

(法律,(user3,2947))

(艺术,(user4,2926))

(教育,(user7,2898))

(音乐,(user5,2927))

 

由于本需求只是求top1,其实combinereduce的逻辑完全可以变成一样

下面是针对top1更简单的实现:

package com.edu360.rdd
 
 
import org.apache.spark.rdd.RDD
 
import org.apache.spark.{SparkConf, SparkContext}
 
 
object FavoriteUser5 {
 
 
def main(args: Array[String]): Unit = {
   
val sc = new SparkContext(new SparkConf().setAppName("FavoriteUser").setMaster("local[*]"))
    sc.setLogLevel(
"WARN")
   
val lines: RDD[String] = sc.textFile("/hdfs/logs/weibo.log")
    lines.map(line => {
     
//医疗 user7 user7051
     
val split: Array[String] = line.split("\\s+")
      (split(
0), split(1))
    }).map((_,
1))
     
//计算出各栏目每个用户的粉丝数
     
.reduceByKey(_ + _)
      .map {
case ((column, user), fanscount) => (column, (user, fanscount)) }
     
//reduceByKey会使用该函数,先对各分区的集合取top1,然后对所有分区的结果取top1
     
.reduceByKey((x, y) => {
     
if (x._2 > y._2) x else y
    })
      .foreach(println)
    sc.stop()
  }
}
 

 

res:

(医疗,(user3,2932))

(法律,(user3,2947))

(艺术,(user4,2926))

(教育,(user7,2898))

(音乐,(user5,2927))

 

 

 

1.5.   获取各手机的家庭位置和工作位置

样本数据为上一个案例,现在认为一个手机在一个基站的时间点为9:00-21:00次数多于时间点为21:00-9:00的次数则认为是工作地点

否则为家庭地点。

一个手机在多个基站为家庭地点,则取停留时间最长的基站为家庭地点,工作地点亦是

实现代码

val sc = new SparkContext(new SparkConf().setAppName("UserLocation").setMaster("local[*]"))
sc.setLogLevel(
"WARN")
 
val lines: RDD[String] = sc.textFile("/hdfs/loc/data.txt")
 
 
val baseIdAndPhoneloc: RDD[((String, String), String)] = lines.map(line => {
 
val fields = line.split(",")
 
val phone = fields(0)
 
var hour = fields(1).substring(8, 10).toInt
 
val baseId = fields(2)
 
if (hour > 9 && hour < 21) //早上9点到晚上九点多的是工作地点2,否则是家庭地点1,用正负号表示
   
((baseId, phone), 1)
 
else
   
((baseId, phone), -1)
}).reduceByKey(_ + _)
  .map {
case ((baseId, phone), count) =>
   
if (count >= 0) ((baseId, phone), "工作地址")
   
else ((baseId, phone), "家庭地址")
  }
 
 
val bsPhoneStatTime: RDD[((String, String), Long)] = lines.map(line => {
 
val fields = line.split(",")
 
val phone = fields(0)
 
var time = fields(1).toLong
 
val baseId = fields(2)
 
val tp = fields(3)
 
if (tp.equals("1")) time = -time
  ((baseId, phone), time)
}).reduceByKey(_ + _)
 
 
val baseIdPhonelocMax: RDD[(String, (String, String))] = baseIdAndPhoneloc.join(bsPhoneStatTime)
  .map {
case ((baseId, phone), (homeOrWork, statTime)) => ((phone, homeOrWork), (baseId, statTime)) }
  .groupByKey()
  .mapValues(_.toList.sortBy(_._2).reverse.head)
  .map {
case ((phone, homeOrWork), (baseId, statTime)) => (baseId, (phone, homeOrWork)) }
 
 
//加载基站数据
 
val baseInfo = sc.textFile("/hdfs/loc/location.txt")
 
val baseIdAndLocation: RDD[(String, (String, String))] = baseInfo.map(info => {
 
val fields = info.split(",")
 
val baseId = fields(0)
 
val x = fields(1)
 
  • 在线咨询

  • 免费热线

    免费热线400-6767088
    免费回拨
    手机请直接输入:如1392877****
    座机前加区号:如010—6974****
    入您的电话号码,点击“免费回拨”按钮稍后您将接到我们的电话,该通话对您完全免费,请放心接听!
  • 资料发放

  • 技术答疑

  • 关注微信

    小牛学堂微信公众号