【小牛原创】Spark SQL 从入门到实战 -- spark sql 1.6版本相关api
【小牛原创】Spark SQL 从入门到实战 -- 概述
Spark Streaming:大规模流式数据处理
spark RDD 相关需求
spark RDD 高级应用
Spark手册 - load&save
Spark手册 - debug
Spark手册 - cache&checkpoint
Spark手册 - RDD Action API
Spark手册 - Partitioner源码
Spark手册 - RDD Transformation API
Spark手册 - RDD的依赖关系
Spark手册 - RDD入门
Spark手册 - 远程debug
Spark手册 - 在IDEA中编写WordCount程序(3)
Spark手册 - 在IDEA中编写WordCount程序(2)
Spark手册 - 在IDEA中编写WordCount程序(1)
Spark手册 - 执行Spark程序
Spark手册 - 集群安装
20页PPT|视频类网站大数据生态 Spark在爱奇艺的应用实践
Spark机器学习入门实例——大数据集(30+g)二分类
Spark官方文档中文翻译:Spark SQL 之 Data Sources
使用Spark MLlib来训练并服务于自然语言处理模型
Spark知识体系完整解读
案例 :Spark应用案例现场分享(IBM Datapalooza)
最全的Spark基础知识解答
Spark在GrowingIO数据无埋点全量采集场景下的实践
Apache Spark探秘:三种分布式部署方式比较
Apache Spark探秘:多进程模型还是多线程模型?
Apache Spark探秘:实现Map-side Join和Reduce-side Join
Apache Spark探秘:利用Intellij IDEA构建开发环境
spark on yarn的技术挑战
Apache Spark学习:将Spark部署到Hadoop 2.2.0上
Hadoop与Spark常用配置参数总结
基于Spark Mllib,SparkSQL的电影推荐系统
spark作业调优秘籍,解数据倾斜之痛
Spark入门必学:预测泰坦尼克号上的生还情况
小牛学堂浅谈基于Spark大数据平台日志审计系统的设计与实现
【Hadoop Summit Tokyo 2016】使用基于Lambda架构的Spark的近实时的网络异常检测和流量分析
Spark编程环境搭建经验分享
Spark技术在京东智能供应链预测的应用
spark中textFile、groupByKey、collect、flatMap、map结合小案例
Spark中DataFrame的schema讲解
深度剖析Spark分布式执行原理
【Spark Summit East 2017】从容器化Spark负载中获取的经验
内存分析技术哪家强?Spark占几何
Spark系列之一:Spark,一种快速数据分析替代方案
6种最常见的Hadoop和Spark项目
Hadoop vs Spark
Hadoop与Spark常用配置参数总结
Spark RPC通信层设计原理分析
Spark Standalone架构设计要点分析
Spark UnifiedMemoryManager内存管理模型分析
网易的Spark技术分享

Apache Spark探秘:实现Map-side Join和Reduce-side Join

于2017-03-29由小牛君创建

分享到:



在大数据处理场景中,多表Join是非常常见的一类运算。为了便于求解,通常会将多表join问题转为多个两表连接问题。两表Join的实现算法非常多,一般我们会根据两表的数据特点选取不同的join算法,其中,最常用的两个算法是map-side join和reduce-side join。本文将介绍如何在apache spark中实现这两种算法。

(1)Map-side Join

Map-side Join使用场景是一个大表和一个小表的连接操作,其中,“小表”是指文件足够小,可以加载到内存中。该算法可以将join算子执行在Map端,无需经历shuffle和reduce等阶段,因此效率非常高。

在Hadoop MapReduce中, map-side join是借助DistributedCache实现的。DistributedCache可以帮我们将小文件分发到各个节点的Task工作目录下,这样,我们只需在程序中将文件加载到内存中(比如保存到Map数据结构中),然后借助Mapper的迭代机制,遍历另一个大表中的每一条记录,并查找是否在小表中,如果在则输出,否则跳过。

在Apache Spark中,同样存在类似于DistributedCache的功能,称为“广播变量”(Broadcast variable)。其实现原理与DistributedCache非常类似,但提供了更多的数据/文件广播算法,包括高效的P2P算法,该算法在节点数目非常多的场景下,效率远远好于DistributedCache这种基于HDFS共享存储的方式,具体比较可参考“Performance and Scalability of Broadcast in Spark”。使用MapReduce DistributedCache时,用户需要显示地使用File API编写程序从本地读取小表数据,而Spark则不用,它借助Scala语言强大的函数闭包特性,可以隐藏数据/文件广播过程,让用户编写程序更加简单。

假设两个文件,一小一大,且格式类似为:

Key,value,value

Key,value,value

则利用Spark实现map-side的算法如下:

    var table1 = sc.textFile(args(1))
    var table2 = sc.textFile(args(2))

    // table1 is smaller, so broadcast it as a map<String, String>
    var pairs = table1.map { x =>
      var pos = x.indexOf(',')
      (x.substring(0, pos), x.substring(pos + 1))
    }.collectAsMap
    var broadCastMap = sc.broadcast(pairs) //save table1 as map, and broadcast it

    // table2 join table1 in map side
    var result = table2.map { x =>
      var pos = x.indexOf(',')
      (x.substring(0, pos), x.substring(pos + 1))
    }.mapPartitions({ iter =>
      var m = broadCastMap.value
      for{
        (key, value) <- iter
        if(m.contains(key))
      } yield (key, (value, m.get(key).getOrElse("")))
    })

    result.saveAsTextFile(args(3)) //save result to local file or HDFS

(2)Reduce-side Join

当两个文件/目录中的数据非常大,难以将某一个存放到内存中时,Reduce-side Join是一种解决思路。该算法需要通过Map和Reduce两个阶段完成,在Map阶段,将key相同的记录划分给同一个Reduce Task(需标记每条记录的来源,便于在Reduce阶段合并),在Reduce阶段,对key相同的进行合并。

Spark提供了Join算子,可以直接通过该算子实现reduce-side join,但要求RDD中的记录必须是pair,即RDD[KEY, VALUE],同样前一个例利用Reduce-side join实现如下:

    var table1 = sc.textFile(args(1))
    var table2 = sc.textFile(args(2))

    var pairs = table1.map{x =>
      var pos = x.indexOf(',')
      (x.substring(0, pos), x.substring(pos + 1))
    }

    var result = table2.map{x =>
      var pos = x.indexOf(',')
      (x.substring(0, pos), x.substring(pos + 1))
    }.join(pairs)

    result.saveAsTextFile(args(3))

(3)总结

本文介绍了Spark中map-side join和reduce-side join的编程思路,希望对大家有借鉴意义。但需要注意的是,在使用这两种算法处理较大规模的数据时,通常需要对多个参数进行调优,否则可能会产生OOM问题。通常需要调优的相关参数包括,map端数据输出buffer大小,reduce端数据分组方法(基于map还是基于sort),等等。

(4)两个问题

问题1:如果在map-side join中,不使用以下语句对文件1进行广播,

var broadCastMap = sc.broadcast(pairs)
也可以在后面程序中直接使用变量pairs存储的数据进行join,这两种方式有什么异同,性能会有何不同?
问题2:将map-side join中的以下语句:
    mapPartitions({ iter =>
      var m = broadCastMap.value
      for{
        (key, value) <- iter
        if(m.contains(key))
      } yield (key, (value, m.get(key).getOrElse("")))

改为:

var m = broadCastMap.value //这一句放在var table2 = sc.textFile(args(2))后面
map {case (key, value) =>
  if(m.contains(key)) (key, (value, m.get(key).getOrElse("")))
}

最终结果是有问题的,为什么? 本文两个示例程序可以从百度网盘上下载,地址为Spark-Join-Exmaple