位置:首页 > spark RDD 高级应用 >

spark RDD 高级应用

作者:小牛君|发布时间:2017-06-16

小牛学堂的课程大纲最近进行了再一次升级,并且同时推出Java大数据平台开发班、Python爬虫与数据挖掘班、Spark项目班、Spark大神班、机器学习算法实战班、BI数据分析实战班, 目前这类人群凤毛麟角,导致这个行业的平均薪资极高,为此小牛学堂集合了行业的诸多大牛开设对应班级,为想学习的同学提供机会!
如果想了解详细情况,请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:210992946

以下是本文正文:


1.  spark RDD 高级应用

1.1.   JdbcRDD直接读取数据库

bigdata数据库有一个members

数据如下:

image.png

代码:

val sc = new SparkContext(new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[*]"))
sc.setLogLevel(
"WARN")
 
val jdbcRDD = new JdbcRDD(
  sc,
  () => DriverManager.getConnection(
"jdbc:mysql://localhost:3306/bigdata", "root", "root"),
 
"SELECT * FROM members where id >= ? AND id <= ?",
 
2003, 2009, 3,
  r => {
   
val id = r.getInt(1)
   
val name = r.getString(2)
   
val area: String = r.getString(3)
   
val regdate: Timestamp = r.getTimestamp(4)
   
val lastdate: Timestamp = r.getTimestamp(5)
   
s"$id $name $area $regdate $lastdate"
 
}
)
jdbcRDD.foreach(println)
sc.stop()
 

res:

2003 Linda7 深圳 2003-04-11 00:00:00.0 2004-04-25 00:00:00.0

2005 bibalily 广州 2002-04-11 00:00:00.0 2004-04-25 00:00:00.0

2004 Liz 上海 2013-10-11   00:00:00.0 2015-06-12 00:00:00.0

2007 蒋艳铮 上海 2005-04-11   00:00:00.0 2007-04-02 00:00:00.0

2006 加斐 深圳 2012-04-11   00:00:00.0 2016-05-03 00:00:00.0

2008 张渠 北京 2000-04-11   00:00:00.0 2004-04-25 00:00:00.0

2009 骆嫣 上海 2006-04-11   00:00:00.0 2007-04-25 00:00:00.0

class JdbcRDD[T: ClassTag](
    sc: SparkContext,
    getConnection: () => Connection,
    sql:
String,
    lowerBound: Long,
    upperBound: Long,
    numPartitions: Int,
    mapRow: (ResultSet) =>
T = JdbcRDD.resultSetToObjectArray _)
 

 

sc: SparkContext,

getConnection: () => Connection,获取数据库连接的控制抽象函数

sql: String,用于读取数据的sql语句,必须有2?占位符,用于被lowerBound和upperBound注入

lowerBound: Long,起始id

upperBound: Long,结束id

numPartitions: Int,分区数量

mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _,将结果集一行数据转换为指定数据格式,模式是转换为数组

JdbcRDD会根据lowerBoundupperBoundnumPartitions计算每个分区处理的分区范围

例如lowerBound=1upperBound=9numPartitions=3

0号分区处理id范围在1~3的数据,1号分区处理id范围在4~6的数据,2号分区处理id范围在7~9的数据

 

 

1.2.   spark读取hdfs文件获取切片

只有HadoopRDD NewHadoopRDD mapPartitionsWithSplit方法

先用hadoopFile读取hdfs文件创建HadoopRDD

然后调用mapPartitionsWithSplit可获取当前处理的分区的迭代器和当前分区对应的hdfs切片

 

新hadoopAPI方式
import org.apache.hadoop.io.{LongWritable, Text}
 
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
 
sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](
"/hdfs/wordcount/in/*")
  .asInstanceOf[NewHadoopRDD[LongWritable, Text]]
  .mapPartitionsWithInputSplit((split, it) => {
   
val filename: String = split.asInstanceOf[FileSplit].getPath.getName
    it.map(x => (filename, x._2))
  })
  .foreach(println)
res:
(words2.txt,hello world)
(words.txt,hello world)
(words3.txt,hello world)
(words.txt,hello you giles)
(words2.txt,you ahe jjio)
(words.txt,hello me)
(words3.txt,hello you)
(words2.txt,hello me)
(words.txt,me you me to)
(words3.txt,hello me)
(words2.txt,me you me to)
(words3.txt,me you me to)
(words3.txt,hjk fhg)
@DeveloperApi
 
def mapPartitionsWithInputSplit[U: ClassTag](
    f: (InputSplit,
Iterator[(K, V)]) => Iterator[U],
    preservesPartitioning: Boolean =
false): RDD[U] = {
 
new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
}
 

 

hadoopAPI方式

import org.apache.hadoop.io.{LongWritable, Text}
 
import org.apache.hadoop.mapred.{FileSplit, TextInputFormat}
 
 
val rdd: RDD[(String, Text)] = sc.hadoopFile("/hdfs/wordcount/in/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
  .asInstanceOf[HadoopRDD[LongWritable, Text]]
  .mapPartitionsWithInputSplit((split, it) => {
   
val filename: String = split.asInstanceOf[FileSplit].getPath.getName
    it.map(x => (filename, x._2))
  })
rdd.foreach(println)
 

res:

(words2.txt,hello world)

(words.txt,hello world)

(words2.txt,you ahe jjio)

(words3.txt,hello world)

(words2.txt,hello me)

(words3.txt,hello you)

(words2.txt,me you me to)

(words3.txt,hello me)

(words.txt,hello you giles)

(words3.txt,me you me to)

(words.txt,hello me)

(words3.txt,hjk fhg)

(words.txt,me you me to)

@DeveloperApi
 
def mapPartitionsWithInputSplit[U: ClassTag](
    f: (InputSplit,
Iterator[(K, V)]) => Iterator[U],
    preservesPartitioning: Boolean =
false): RDD[U] = {
 
new HadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
}

 

1.3.   spark模拟实现mapreduce版wordcount

object MapreduceWordcount {
 
def main(args: Array[String]): Unit = {
   
import org.apache.spark._
   
val sc: SparkContext = new SparkContext(new SparkConf().setAppName("wordcount").setMaster("local[*]"))
    sc.setLogLevel(
"WARN")
   
   
import org.apache.hadoop.io.{LongWritable, Text}
   
import org.apache.hadoop.mapred.TextInputFormat
   
import org.apache.spark.rdd.HadoopRDD
 
   
import scala.collection.mutable.ArrayBuffer
   
   
def map(k: LongWritable, v: Text, collect: ArrayBuffer[(String, Int)]) = {
     
for (word <- v.toString.split("\\s+"))
        collect += ((word,
1))
    }
   
def reduce(key: String, value: Iterator[Int], collect: ArrayBuffer[(String, Int)]) = {
      collect += ((key, value.sum))
    }
   
val rdd = sc.hadoopFile("/hdfs/wordcount/in1/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 2)
      .asInstanceOf[HadoopRDD[LongWritable, Text]]
      .mapPartitionsWithInputSplit((split, it) =>{
       
val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
        it.foreach(kv => map(kv._1, kv._2, collect))
        collect.toIterator
      })
      .repartitionAndSortWithinPartitions(
new HashPartitioner(2))
      .mapPartitions(it => {
       
val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
       
var lastKey: String = ""
       
var values: ArrayBuffer[Int] = ArrayBuffer[Int]()
       
for ((currKey, value) <- it) {
         
if (!currKey.equals(lastKey)) {
           
if (values.length != 0)
              reduce(lastKey, values.toIterator, collect)
            values.clear()
          }
          values += value
          lastKey = currKey
        }
       
if (values.length != 0) reduce(lastKey, values.toIterator, collect)
        collect.toIterator
      })
    rdd.foreach(println)
 
  }
}
 

 

 

1.4.   广播变量

image.png

 

每一个executor进程中可能存在多大task线程,若我们在drive中声明变量后,直接在rdd算子代码中使用的话,会导致该变量复制到每一个task线程中。

但使用广播变量broadcast方法将变量广播出去,则该变量在一个executor进程中只复制一次

SparkContext#def broadcast[T: ClassTag](value: T): Broadcast[T]

示例代码如下:

package com.edu360.rdd.ip
 
 
import org.apache.spark.broadcast.Broadcast
 
import org.apache.spark.rdd.RDD
 
import org.apache.spark.{SparkConf, SparkContext}
 
 
import scala.collection.mutable.ArrayBuffer
 
 
object BroadcastDemo {
 
def main(args: Array[String]): Unit = {
   
val sc = new SparkContext(new SparkConf().setAppName("IpLocation").setMaster("local[*]"))
    sc.setLogLevel(
"WARN")
   
val nonBroadcastVariable = "nonBroadcastVariable"
   
val broadcastVariable: Broadcast[String] = sc.broadcast("broadcastVariable")
   
val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4))
    rdd.map(x => {
     
if (x % 2 == 0) (x, nonBroadcastVariable)
     
else (x, broadcastVariable.value)
    })
      .foreach(println)
   
val ints1: ArrayBuffer[Int] = new ArrayBuffer[Int]()
    println(
"driver中的普通变量在rdd算子中修改前:" + ints1)
    rdd.foreach(ints1 += _)
    println(
"driver中的普通变量在rdd算子中修改后:" + ints1)
   
val broadcastInts: Broadcast[ArrayBuffer[Int]] = sc.broadcast(new ArrayBuffer[Int]())
    println(
"driver中的广播变量在rdd算子中修改前:" + broadcastInts.value)
    rdd.foreach(broadcastInts.value += _)
    println(
"driver中的广播变量在rdd算子中修改后:" + broadcastInts.value)
 
    sc.stop()
  }
}
 

 

res:

(2,nonBroadcastVariable)

(1,broadcastVariable)

(4,nonBroadcastVariable)

(3,broadcastVariable)

driver中的普通变量在rdd算子中修改前:ArrayBuffer()

driver中的普通变量在rdd算子中修改后:ArrayBuffer()

driver中的广播变量在rdd算子中修改前:ArrayBuffer()

driver中的广播变量在rdd算子中修改后:ArrayBuffer(1, 2, 3, 4)

上面的运行结果表示,无论是普通变量还是广播变量都可以直接在rdd算子中使用

但普通变量在rdd算子中被修改无法在drive端获取,因为drive端只负责向executor进程下发任务指令,driveexecutor进程在不同的jvm中,executor进程中被修改的变量,无法体现到drive端。

不过广播变量在rdd算子中被修改,可以在driver中被获取。

不过实际开发中尽量使用collect来获取各分区的数据。

 

 

广播变量的综合案例

 



了解更多详情请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:210992946

分享到: