位置:首页 > 【小牛原创】Spark SQL 从入门到实战 -- spark sql 1.6版本相关api >

【小牛原创】Spark SQL 从入门到实战 -- spark sql 1.6版本相关api

作者:小牛君|发布时间:2017-06-08

小牛学堂的课程大纲最近进行了再一次升级,并且同时推出Java大数据平台开发班、Python爬虫与数据挖掘班、Spark项目班、Spark大神班、机器学习算法实战班、BI数据分析实战班, 目前这类人群凤毛麟角,导致这个行业的平均薪资极高,为此小牛学堂集合了行业的诸多大牛开设对应班级,为想学习的同学提供机会!
如果想了解详细情况,请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:210992946

以下是本文正文:


spark sql 1.6版本相关api

2.1.    创建SQLContext

SQLContext是Spark SQL进行结构化数据处理的入口,可以通过它进行DataFrame的创建及SQL的执行

只需对SparkContext进行包装即可

创建代码如下:

val conf = new SparkConf().setAppName("sql").setMaster("local[*]")

  val sc = new SparkContext(conf)

  val sqlContext = new SQLContext(sc)

 

2.2.    DataFrames创建

DataFrame与RDD类似,也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上 看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。由于与R和Pandas的DataFrame类似,Spark DataFrame很好地继承了传统单机数据分析的开发体验。

 

在本地有一个文件D:\hdfs\logs\person.txt,有三列,分别是id、name、age、face_value、gender,用空格分隔

1 laozhao 18 99 m

2 laoduan 20 98 f

3 laoyang 19 98 m

读取数据创建RDD

val lineRDD = sc.textFile("/hdfs/logs/person.txt").map(_.split(" "))

 

2.2.1.     通过case class RDD创建DataFrame

定义case class

case class Person(id: Long, name: String, age: Int, fv: Double, gender: String)

创建DataFrame(spark shell中已默认导入隐式转换,不需要再次导入)

val personRDD: RDD[Person] = lineRDD.map(arr => Person(arr(0).toLong, arr(1), arr(2).toInt, arr(3).toDouble, arr(4)))
//直接创建DataFrame

  val personDF1: DataFrame = sqlContext.createDataFrame(personRDD)

  //通过SQLContext类的实例的implicits对象中隐式转换创建DataFrame

  import sqlContext.implicits._

  val personDF2: DataFrame = personRDD.toDF

 

 

2.2.2.   通过seq集合创建DataFrame

//直接通过seq集合创建DataFrame

  val personDF3: DataFrame = sqlContext.createDataFrame(Array(Person(1,"zhangsan",23,98,"m"),Person(2,"lisi",23,98,"m"),Person(1,"zhangsan",23,98,"m")))

 

2.2.3.   通过StructType创建DataFrame

需要导入的类型

import org.apache.spark.sql.types._

  import org.apache.spark.sql.Row

 

val rowRdd: RDD[Row] = lineRDD.map(arr => Row(arr(0).toLong, arr(1), arr(2).toInt, arr(3).toDouble, arr(4)))

  

  //通过 StructType 直接指定每个字段的schema

  val schema: StructType = StructType(

  Array(

    StructField("id", LongType, true),

    StructField("name", StringType, true),

    StructField("age", IntegerType, true),

    StructField("fv", DoubleType, true),

    StructField("gender", StringType, true)

  )

)

  //将RDD和和schema信息关联

  val personDF: DataFrame = sqlContext.createDataFrame(rowRdd, schema)

 

 

2.2.4.   通过jdbc读取mysql数据创建DataFrame

val src_table: String = "person"

  val user: String = "root"

  val password: String = "root"

 
//创建一个RDD,这个RDD以后从MySQL中读取数据

  val personDF: DataFrame = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.40.1:3306/bigdata", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> src_table, "user" -> user, "password" -> password)).load()

personDF.printSchema()

 

 

2.2.5.   读取json文件创建DataFrame

val dataFrame: DataFrame = sqlContext.read.json("/hdfs/logs/json")

 

 

2.2.6.   读取parquet文件创建DataFrame

val dataFrame: DataFrame = sqlContext.read.parquet("/hdfs/logs/parquet")

 

 

 

2.3.    DataFrames数据保存

2.3.1.   保存到mysql

val result: DataFrame = sqlContext.sql("select id, name, face_value fv from t_person order by fv desc")

  

  val prop = new Properties()

prop.put("user", user)

prop.put("password", password)

  //触发action,把输出存储到MySQL中,write是一个Action,将数据追加到数据库

  result.write.mode("append").jdbc("jdbc:mysql://192.168.40.1:3306/bigdata", s"bigdata.$dest_table", prop)

 

 

2.3.2.   保存为json文件

result.repartition(1).write.mode(SaveMode.Append).json("/hdfs/logs/json")

 

 

2.3.3.   保存为parquet文件

result.repartition(1).write.mode(SaveMode.Append).parquet("/hdfs/logs/parquet")

 

 

 

 

2.4.    DataFrame常用操作

2.4.1.     DSL风格语法

//查看DataFrame中的内容

personDF.show

 

//查看DataFrame部分列中的内容

personDF.select(personDF.col("name")).show

personDF.select(col("name"), col("age")).show

personDF.select("name").show

 

//打印DataFrame的Schema信息

personDF.printSchema

 

//查询所有的name和age,并将age+1

personDF.select(col("id"), col("name"), col("age") + 1).show

personDF.select(personDF("id"), personDF("name"), personDF("age") + 1).show

 

//过滤age大于等于18的

personDF.filter(col("age") >= 18).show

 

//按年龄进行分组并统计相同年龄的人数

personDF.groupBy("age").count().show()

 

2.4.2.     SQL风格语法

如果想使用SQL风格的语法,需要将DataFrame注册成表

personDF.registerTempTable("t_person")

 

//查询年龄最大的前两名

sqlContext.sql("select * from t_person order by age desc limit 2").show

//查询 t_person表按照fv倒序,age正序

SELECT * FROM t_person ORDER BY fv DESC,age ASC

//显示表的Schema信息

sqlContext.sql("desc t_person").show

 

2.5. 自定义udf函数

示例:

sqlContext.udf.register("en2cn", (en: String) => {

  en match {

    case "cn" => "中国"

    case "jp" => "小日本"

    case "us" => "美国佬"

    case _ => "火星人"

  }

})

使用

//执行SQL

  val result: DataFrame = sqlContext.sql("SELECT name, en2cn(nation) FROM t_user")

result.show()

 

 

2.6. 案例

2.6.1.   读取mysql数据处理后写回到mysql中

源数据表内容

 

目标表结构

 

 

完整代码:

package com.edu360

  

  import java.util.Properties

  

  import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}

  import org.apache.spark.{SparkConf, SparkContext}

  

  object MySqlDataSource {

  

  def main(args: Array[String]): Unit = {

    val src_table: String = args(0)

    val dest_table: String = args(1)

    val user: String = "root"

    val password: String = "root"

  

    val conf = new SparkConf().setAppName("MySQLDataSorce").setMaster("local[*]")

    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    sc.setLogLevel("WARN")

  

    //创建一个RDD,这个RDD以后从MySQL中读取数据

    val personDF: DataFrame = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.40.1:3306/bigdata", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> src_table, "user" -> user, "password" -> password)).load()

  

    //创建临时表

    personDF.registerTempTable("t_person")

    //执行sql,sql算子是Transformation

    val result: DataFrame = sqlContext.sql("select id, name, face_value fv from t_person order by fv desc")

  

    //触发action,把输出存储到MySQL中,write是一个Action

    val prop = new Properties()

    prop.put("user", user)

    prop.put("password", password)

    //将数据追加到数据库

    result.write.mode("append").jdbc("jdbc:mysql://192.168.40.1:3306/bigdata", s"bigdata.$dest_table", prop)

  

    sc.stop()

  }

}

 

 

 

打jar包并提交集群

/usr/local/app/spark-1.6.3-bin-hadoop2.6/bin/spark-submit \

--class com.edu360.MySqlDataSource \

--master spark://mini1:7077 \

--driver-class-path /root/mysql-connector-java-5.1.38.jar \

/root/spark-sql.jar person person_bak

因为有在driver端操作数据库,则必须通过--driver-class-path参数指定外部jar包

若打好的jar中没有mysql的驱动jar包,则可能还需通过--jars指定外部jar包

/usr/local/app/spark-1.6.3-bin-hadoop2.6/bin/spark-submit \

--class com.edu360.MySqlDataSource \

--master spark://mini1:7077 \

--jars /root/mysql-connector-java-5.1.38.jar \

--driver-class-path /root/mysql-connector-java-5.1.38.jar \

/root/original-spark-sql-1.0-SNAPSHOT.jar person person_bak

 

具体过程:

 

 

2.7. spark sql 2.X api

spark 2.X对RDD进行优化,并将RDD和DataFrame的API统一抽象成dataset

DataFrame就是Dataset[Row]

RDD则是Dataset.rdd

总之Dataset包括DataFrame和RDD

2.7.1.   构建spark seesion

//spark2.x的SQL执行的入口是SparkSession,是单例的

val spark: SparkSession = SparkSession.builder().appName("DataSetWithSchema").master("local[*]").getOrCreate()

 

2.7.2.   获取用于创建dataset的DataFrameReader

val read: DataFrameReader = spark.read

 

2.7.3.   获取用于创建RDD的SparkContext

val sc: SparkContext = spark.sparkContext

 

2.7.4.   将rdd转化为DataFrame

通过StructType

//管理schema

  val schema = StructType(

  List(

    StructField("id", LongType),

    StructField("name", StringType),

    StructField("age", IntegerType),

    StructField("fv", DoubleType)

  )

)

  val personDF: DataFrame = spark.createDataFrame(rowRdd, schema)

 

 

通过case class

case class Person(id: Long, name: String, age: Int, fv: Double)
val personRDD: RDD[Person] = sc

  .textFile("/hdfs/logs/person.txt")

  .map(line => {

    val fields = line.split(",")

    Person(fields(0).toLong, fields(1), fields(2).toInt, fields(3).toDouble)

  })

  val personDF: DataFrame = spark.createDataFrame(personRDD)

 

 

2.7.5.   将DataFrame注册成view并执行sql语句

//注册视图

  personDF.createTempView("t_person")

  val result: DataFrame = spark.sql("SELECT * FROM t_person order by fv desc, age asc")

 

result.show()

 

DataFrame的dsl风格

import spark.implicits._

personDF.select($"id", $"name", $"age", $"fv").orderBy($"fv" desc,$"age" asc).show

 

 

2.7.6.   对Dataset进行rdd相关操作

//读取数据创建Dataset

  val lines: Dataset[String] = spark.read.textFile("/hdfs/wordcount/in/*")

  //导入隐式转换

  import spark.implicits._

  val words: Dataset[String] = lines.flatMap(_.split(" "))

words.show()

 

注意:必须导入SparkSession实例内部的隐式转换

 

 

 

 

2.7.7.   Dataset的DSL风格

//导入一下sql的聚合函数

  import org.apache.spark.sql.functions._

  val result: DataFrame = words.groupBy($"value" as "word").agg(count("*") as "counts").sort($"counts" desc)

result.show()

 

 

注:

$方法是一个隐式转换

implicit class StringToColumn(val sc: StringContext) {

  def $(args: Any*): ColumnName = {

    new ColumnName(sc.s(args: _*))

  }

}

 

 

 

 

 


了解更多详情请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:210992946

分享到: