首家大数据培训挂牌机构 股票代码:837906 | EN CN
【小牛原创】Spark SQL 从入门到实战 -- spark sql 1.6版本相关api
【小牛原创】Spark SQL 从入门到实战 -- 概述
Spark Streaming:大规模流式数据处理
spark RDD 相关需求
spark RDD 高级应用
Spark手册 - load&save
Spark手册 - debug
Spark手册 - cache&checkpoint
Spark手册 - RDD Action API
Spark手册 - Partitioner源码
Spark手册 - RDD Transformation API
Spark手册 - RDD的依赖关系
Spark手册 - RDD入门
Spark手册 - 远程debug
Spark手册 - 在IDEA中编写WordCount程序(3)
Spark手册 - 在IDEA中编写WordCount程序(2)
Spark手册 - 在IDEA中编写WordCount程序(1)
Spark手册 - 执行Spark程序
Spark手册 - 集群安装
20页PPT|视频类网站大数据生态 Spark在爱奇艺的应用实践
Spark机器学习入门实例——大数据集(30+g)二分类
Spark官方文档中文翻译:Spark SQL 之 Data Sources
使用Spark MLlib来训练并服务于自然语言处理模型
Spark知识体系完整解读
案例 :Spark应用案例现场分享(IBM Datapalooza)
最全的Spark基础知识解答
Spark在GrowingIO数据无埋点全量采集场景下的实践
Apache Spark探秘:三种分布式部署方式比较
Apache Spark探秘:多进程模型还是多线程模型?
Apache Spark探秘:实现Map-side Join和Reduce-side Join
Apache Spark探秘:利用Intellij IDEA构建开发环境
spark on yarn的技术挑战
Apache Spark学习:将Spark部署到Hadoop 2.2.0上
Hadoop与Spark常用配置参数总结
基于Spark Mllib,SparkSQL的电影推荐系统
spark作业调优秘籍,解数据倾斜之痛
Spark入门必学:预测泰坦尼克号上的生还情况
小牛学堂浅谈基于Spark大数据平台日志审计系统的设计与实现
【Hadoop Summit Tokyo 2016】使用基于Lambda架构的Spark的近实时的网络异常检测和流量分析
Spark编程环境搭建经验分享
Spark技术在京东智能供应链预测的应用
spark中textFile、groupByKey、collect、flatMap、map结合小案例
Spark中DataFrame的schema讲解
深度剖析Spark分布式执行原理
【Spark Summit East 2017】从容器化Spark负载中获取的经验
内存分析技术哪家强?Spark占几何
Spark系列之一:Spark,一种快速数据分析替代方案
6种最常见的Hadoop和Spark项目
Hadoop vs Spark
Hadoop与Spark常用配置参数总结
Spark RPC通信层设计原理分析
Spark Standalone架构设计要点分析
Spark UnifiedMemoryManager内存管理模型分析
网易的Spark技术分享

spark RDD 高级应用

于2017-06-16由小牛君创建

分享到:


1.  spark RDD 高级应用

1.1.   JdbcRDD直接读取数据库

bigdata数据库有一个members

数据如下:

image.png

代码:

val sc = new SparkContext(new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[*]"))
sc.setLogLevel(
"WARN")
 
val jdbcRDD = new JdbcRDD(
  sc,
  () => DriverManager.getConnection(
"jdbc:mysql://localhost:3306/bigdata", "root", "root"),
 
"SELECT * FROM members where id >= ? AND id <= ?",
 
2003, 2009, 3,
  r => {
   
val id = r.getInt(1)
   
val name = r.getString(2)
   
val area: String = r.getString(3)
   
val regdate: Timestamp = r.getTimestamp(4)
   
val lastdate: Timestamp = r.getTimestamp(5)
   
s"$id $name $area $regdate $lastdate"
 
}
)
jdbcRDD.foreach(println)
sc.stop()
 

res:

2003 Linda7 深圳 2003-04-11 00:00:00.0 2004-04-25 00:00:00.0

2005 bibalily 广州 2002-04-11 00:00:00.0 2004-04-25 00:00:00.0

2004 Liz 上海 2013-10-11   00:00:00.0 2015-06-12 00:00:00.0

2007 蒋艳铮 上海 2005-04-11   00:00:00.0 2007-04-02 00:00:00.0

2006 加斐 深圳 2012-04-11   00:00:00.0 2016-05-03 00:00:00.0

2008 张渠 北京 2000-04-11   00:00:00.0 2004-04-25 00:00:00.0

2009 骆嫣 上海 2006-04-11   00:00:00.0 2007-04-25 00:00:00.0

class JdbcRDD[T: ClassTag](
    sc: SparkContext,
    getConnection: () => Connection,
    sql:
String,
    lowerBound: Long,
    upperBound: Long,
    numPartitions: Int,
    mapRow: (ResultSet) =>
T = JdbcRDD.resultSetToObjectArray _)
 

 

sc: SparkContext,

getConnection: () => Connection,获取数据库连接的控制抽象函数

sql: String,用于读取数据的sql语句,必须有2?占位符,用于被lowerBound和upperBound注入

lowerBound: Long,起始id

upperBound: Long,结束id

numPartitions: Int,分区数量

mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _,将结果集一行数据转换为指定数据格式,模式是转换为数组

JdbcRDD会根据lowerBoundupperBoundnumPartitions计算每个分区处理的分区范围

例如lowerBound=1upperBound=9numPartitions=3

0号分区处理id范围在1~3的数据,1号分区处理id范围在4~6的数据,2号分区处理id范围在7~9的数据

 

 

1.2.   spark读取hdfs文件获取切片

只有HadoopRDD NewHadoopRDD mapPartitionsWithSplit方法

先用hadoopFile读取hdfs文件创建HadoopRDD

然后调用mapPartitionsWithSplit可获取当前处理的分区的迭代器和当前分区对应的hdfs切片

 

新hadoopAPI方式
import org.apache.hadoop.io.{LongWritable, Text}
 
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
 
sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](
"/hdfs/wordcount/in/*")
  .asInstanceOf[NewHadoopRDD[LongWritable, Text]]
  .mapPartitionsWithInputSplit((split, it) => {
   
val filename: String = split.asInstanceOf[FileSplit].getPath.getName
    it.map(x => (filename, x._2))
  })
  .foreach(println)
res:
(words2.txt,hello world)
(words.txt,hello world)
(words3.txt,hello world)
(words.txt,hello you giles)
(words2.txt,you ahe jjio)
(words.txt,hello me)
(words3.txt,hello you)
(words2.txt,hello me)
(words.txt,me you me to)
(words3.txt,hello me)
(words2.txt,me you me to)
(words3.txt,me you me to)
(words3.txt,hjk fhg)
@DeveloperApi
 
def mapPartitionsWithInputSplit[U: ClassTag](
    f: (InputSplit,
Iterator[(K, V)]) => Iterator[U],
    preservesPartitioning: Boolean =
false): RDD[U] = {
 
new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
}
 

 

hadoopAPI方式

import org.apache.hadoop.io.{LongWritable, Text}
 
import org.apache.hadoop.mapred.{FileSplit, TextInputFormat}
 
 
val rdd: RDD[(String, Text)] = sc.hadoopFile("/hdfs/wordcount/in/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
  .asInstanceOf[HadoopRDD[LongWritable, Text]]
  .mapPartitionsWithInputSplit((split, it) => {
   
val filename: String = split.asInstanceOf[FileSplit].getPath.getName
    it.map(x => (filename, x._2))
  })
rdd.foreach(println)
 

res:

(words2.txt,hello world)

(words.txt,hello world)

(words2.txt,you ahe jjio)

(words3.txt,hello world)

(words2.txt,hello me)

(words3.txt,hello you)

(words2.txt,me you me to)

(words3.txt,hello me)

(words.txt,hello you giles)

(words3.txt,me you me to)

(words.txt,hello me)

(words3.txt,hjk fhg)

(words.txt,me you me to)

@DeveloperApi
 
def mapPartitionsWithInputSplit[U: ClassTag](
    f: (InputSplit,
Iterator[(K, V)]) => Iterator[U],
    preservesPartitioning: Boolean =
false): RDD[U] = {
 
new HadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
}

 

1.3.   spark模拟实现mapreduce版wordcount

object MapreduceWordcount {
 
def main(args: Array[String]): Unit = {
   
import org.apache.spark._
   
val sc: SparkContext = new SparkContext(new SparkConf().setAppName("wordcount").setMaster("local[*]"))
    sc.setLogLevel(
"WARN")
   
   
import org.apache.hadoop.io.{LongWritable, Text}
   
import org.apache.hadoop.mapred.TextInputFormat
   
import org.apache.spark.rdd.HadoopRDD
 
   
import scala.collection.mutable.ArrayBuffer
   
   
def map(k: LongWritable, v: Text, collect: ArrayBuffer[(String, Int)]) = {
     
for (word <- v.toString.split("\\s+"))
        collect += ((word,
1))
    }
   
def reduce(key: String, value: Iterator[Int], collect: ArrayBuffer[(String, Int)]) = {
      collect += ((key, value.sum))
    }
   
val rdd = sc.hadoopFile("/hdfs/wordcount/in1/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 2)
      .asInstanceOf[HadoopRDD[LongWritable, Text]]
      .mapPartitionsWithInputSplit((split, it) =>{
       
val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
        it.foreach(kv => map(kv._1, kv._2, collect))
        collect.toIterator
      })
      .repartitionAndSortWithinPartitions(
new HashPartitioner(2))
      .mapPartitions(it => {
       
val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
       
var lastKey: String = ""
       
var values: ArrayBuffer[Int] = ArrayBuffer[Int]()
       
for ((currKey, value) <- it) {
         
if (!currKey.equals(lastKey)) {
           
if (values.length != 0)
              reduce(lastKey, values.toIterator, collect)
            values.clear()
          }
          values += value
          lastKey = currKey
        }
       
if (values.length != 0) reduce(lastKey, values.toIterator, collect)
        collect.toIterator
      })
    rdd.foreach(println)
 
  }
}
 

 

 

1.4.   广播变量

image.png

 

每一个executor进程中可能存在多大task线程,若我们在drive中声明变量后,直接在rdd算子代码中使用的话,会导致该变量复制到每一个task线程中。

但使用广播变量broadcast方法将变量广播出去,则该变量在一个executor进程中只复制一次

SparkContext#def broadcast[T: ClassTag](value: T): Broadcast[T]

示例代码如下:

package com.edu360.rdd.ip
 
 
import org.apache.spark.broadcast.Broadcast
 
import org.apache.spark.rdd.RDD
 
import org.apache.spark.{SparkConf, SparkContext}
 
 
import scala.collection.mutable.ArrayBuffer
 
 
object BroadcastDemo {
 
def main(args: Array[String]): Unit = {
   
val sc = new SparkContext(new SparkConf().setAppName("IpLocation").setMaster("local[*]"))
    sc.setLogLevel(
"WARN")
   
val nonBroadcastVariable = "nonBroadcastVariable"
   
val broadcastVariable: Broadcast[String] = sc.broadcast("broadcastVariable")
   
val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4))
    rdd.map(x => {
     
if (x % 2 == 0) (x, nonBroadcastVariable)
     
else (x, broadcastVariable.value)
    })
      .foreach(println)
   
val ints1: ArrayBuffer[Int] = new ArrayBuffer[Int]()
    println(
"driver中的普通变量在rdd算子中修改前:" + ints1)
    rdd.foreach(ints1 += _)
    println(
"driver中的普通变量在rdd算子中修改后:" + ints1)
   
val broadcastInts: Broadcast[ArrayBuffer[Int]] = sc.broadcast(new ArrayBuffer[Int]())
    println(
"driver中的广播变量在rdd算子中修改前:" + broadcastInts.value)
    rdd.foreach(broadcastInts.value += _)
    println(
"driver中的广播变量在rdd算子中修改后:" + broadcastInts.value)
 
    sc.stop()
  }
}
 

 

res:

(2,nonBroadcastVariable)

(1,broadcastVariable)

(4,nonBroadcastVariable)

(3,broadcastVariable)

driver中的普通变量在rdd算子中修改前:ArrayBuffer()

driver中的普通变量在rdd算子中修改后:ArrayBuffer()

driver中的广播变量在rdd算子中修改前:ArrayBuffer()

driver中的广播变量在rdd算子中修改后:ArrayBuffer(1, 2, 3, 4)

上面的运行结果表示,无论是普通变量还是广播变量都可以直接在rdd算子中使用

但普通变量在rdd算子中被修改无法在drive端获取,因为drive端只负责向executor进程下发任务指令,driveexecutor进程在不同的jvm中,executor进程中被修改的变量,无法体现到drive端。

不过广播变量在rdd算子中被修改,可以在driver中被获取。

不过实际开发中尽量使用collect来获取各分区的数据。

 

 

广播变量的综合案例