【小牛原创】Spark SQL 从入门到实战 -- spark sql 1.6版本相关api
【小牛原创】Spark SQL 从入门到实战 -- 概述
Spark Streaming:大规模流式数据处理
spark RDD 相关需求
spark RDD 高级应用
Spark手册 - load&save
Spark手册 - debug
Spark手册 - cache&checkpoint
Spark手册 - RDD Action API
Spark手册 - Partitioner源码
Spark手册 - RDD Transformation API
Spark手册 - RDD的依赖关系
Spark手册 - RDD入门
Spark手册 - 远程debug
Spark手册 - 在IDEA中编写WordCount程序(3)
Spark手册 - 在IDEA中编写WordCount程序(2)
Spark手册 - 在IDEA中编写WordCount程序(1)
Spark手册 - 执行Spark程序
Spark手册 - 集群安装
20页PPT|视频类网站大数据生态 Spark在爱奇艺的应用实践
Spark机器学习入门实例——大数据集(30+g)二分类
Spark官方文档中文翻译:Spark SQL 之 Data Sources
使用Spark MLlib来训练并服务于自然语言处理模型
Spark知识体系完整解读
案例 :Spark应用案例现场分享(IBM Datapalooza)
最全的Spark基础知识解答
Spark在GrowingIO数据无埋点全量采集场景下的实践
Apache Spark探秘:三种分布式部署方式比较
Apache Spark探秘:多进程模型还是多线程模型?
Apache Spark探秘:实现Map-side Join和Reduce-side Join
Apache Spark探秘:利用Intellij IDEA构建开发环境
spark on yarn的技术挑战
Apache Spark学习:将Spark部署到Hadoop 2.2.0上
Hadoop与Spark常用配置参数总结
基于Spark Mllib,SparkSQL的电影推荐系统
spark作业调优秘籍,解数据倾斜之痛
Spark入门必学:预测泰坦尼克号上的生还情况
小牛学堂浅谈基于Spark大数据平台日志审计系统的设计与实现
【Hadoop Summit Tokyo 2016】使用基于Lambda架构的Spark的近实时的网络异常检测和流量分析
Spark编程环境搭建经验分享
Spark技术在京东智能供应链预测的应用
spark中textFile、groupByKey、collect、flatMap、map结合小案例
Spark中DataFrame的schema讲解
深度剖析Spark分布式执行原理
【Spark Summit East 2017】从容器化Spark负载中获取的经验
内存分析技术哪家强?Spark占几何
Spark系列之一:Spark,一种快速数据分析替代方案
6种最常见的Hadoop和Spark项目
Hadoop vs Spark
Hadoop与Spark常用配置参数总结
Spark RPC通信层设计原理分析
Spark Standalone架构设计要点分析
Spark UnifiedMemoryManager内存管理模型分析
网易的Spark技术分享

Spark RPC通信层设计原理分析

于2017-03-14由小牛君创建

分享到:


Spark将RPC通信层设计的非常巧妙,融合了各种设计/架构模式,将一个分布式集群系统的通信层细节完全屏蔽,这样在上层的计算框架的设计中能够获得很好的灵活性。同时,如果上层想要增加各种新的特性,或者对来自不同企业或组织的程序员贡献的特性,也能够很容易地增加进来,可以避开复杂的通信层而将注意力集中在上层计算框架的处理和优化上,入手难度非常小。另外,对上层计算框架中的各个核心组件的开发、功能增强,以及Bug修复等都会变得更加容易。

Spark RPC层设计概览

Spark RPC层是基于优秀的网络通信框架Netty设计开发的,同时获得了Netty所具有的网络通信的可靠性和高效性。我们先把Spark中与RPC相关的一些类的关系梳理一下,为了能够更直观地表达RPC的设计,我们先从类的设计来看,如下图所示:
SparkRPC-ClassDiagram
通过上图,可以清晰地将RPC设计分离出来,能够对RPC层有一个整体的印象。了解Spark RPC层的几个核心的概念(我们通过Spark源码中对应的类名来标识),能够更好地理解设计:

  • RpcEndpoint

RpcEndpoint定义了RPC通信过程中的通信端对象,除了具有管理一个RpcEndpoint生命周期的操作(constructor -> onStart -> receive* -> onStop),并给出了通信过程中一个RpcEndpoint所具有的基于事件驱动的行为(连接、断开、网络异常),实际上对于Spark框架来说主要是接收消息并处理,具体可以看对应特质RpcEndpoint的代码定义,如下所示:

01 private[spark] trait RpcEndpoint {
02  
03   val rpcEnv: RpcEnv
04  
05   final def self: RpcEndpointRef = {
06     require(rpcEnv != null, "rpcEnv has not been initialized")
07     rpcEnv.endpointRef(this)
08   }
09  
10   def receive: PartialFunction[Any, Unit] = {
11     case _ => throw new SparkException(self + " does not implement 'receive'")
12   }
13  
14   def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
15     case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
16   }
17  
18   def onError(cause: Throwable): Unit = {
19     // By default, throw e and let RpcEnv handle it
20     throw cause
21   }
22  
23   def onConnected(remoteAddress: RpcAddress): Unit = {
24     // By default, do nothing.
25   }
26  
27   def onDisconnected(remoteAddress: RpcAddress): Unit = {
28     // By default, do nothing.
29   }
30  
31   def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
32     // By default, do nothing.
33   }
34  
35   def onStart(): Unit = {
36     // By default, do nothing.
37   }
38  
39   def onStop(): Unit = {
40     // By default, do nothing.
41   }
42  
43   final def stop(): Unit = {
44     val _self = self
45     if (_self != null) {
46       rpcEnv.stop(_self)
47     }
48   }
49 }

通过上面的receive方法,接收由RpcEndpointRef.send方法发送的消息,该类消息不需要进行响应消息(Reply),而只是在RpcEndpoint端进行处理。通过receiveAndReply方法,接收由RpcEndpointRef.ask发送的消息,RpcEndpoint端处理完消息后,需要给调用RpcEndpointRef.ask的通信端响应消息(Reply)。

  • RpcEndpointRef

RpcEndpointRef是一个对RpcEndpoint的远程引用对象,通过它可以向远程的RpcEndpoint端发送消息以进行通信。RpcEndpointRef特质的定义,代码如下所示:

01 private[spark] abstract class RpcEndpointRef(conf: SparkConf)  extends Serializable with Logging {
02  
03   private[this] val maxRetries = RpcUtils.numRetries(conf)
04   private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
05   private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
06  
07   def address: RpcAddress
08  
09   def name: String
10  
11   def send(message: Any): Unit
12  
13   def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
14  
15   def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
16  
17   def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)
18  
19   def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
20     ... ...
21   }
22  
23 }

上面代码中,send方法发送消息后不等待响应,亦即Send-and-forget,Spark中基于Netty实现,实现在NettyRpcEndpointRef中,如下所示:

1 override def send(message: Any): Unit = {
2   require(message != null, "Message is null")
3   nettyEnv.send(RequestMessage(nettyEnv.address, this, message))
4 }

可见,它是通过NettyRpcEnv来发送RequestMessage消息,并将当前NettyRpcEndpointRef封装到RequestMessage消息对象中发送出去,通信对端通过该NettyRpcEndpointRef能够识别出消息来源。
而ask方法发送消息后需要等待通信对端给予响应,通过Future来异步获取响应结果,也是在NettyRpcEndpointRef中实现,如下所示:

1 override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
2   nettyEnv.ask(RequestMessage(nettyEnv.address, this, message), timeout)
3 }

类似的,也是通过NettyRpcEnv来发送一个RequestMessage消息。

  • RpcEnv

一个RpcEnv是一个RPC环境对象,它负责管理RpcEndpoint的注册,以及如何从一个RpcEndpoint获取到一个RpcEndpointRef。RpcEndpoint是一个通信端,例如Spark集群中的Master,或Worker,都是一个RpcEndpoint。但是,如果想要与一个RpcEndpoint端进行通信,一定需要获取到该RpcEndpoint一个RpcEndpointRef,而获取该RpcEndpointRef只能通过一个RpcEnv环境对象来获取。所以说,一个RpcEnv对象才是RPC通信过程中的“指挥官”,在RpcEnv类中,有一个核心的方法:
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
通过上面方法,可以注册一个RpcEndpoint到RpcEnv环境对象中,有RpcEnv来管理RpcEndpoint到RpcEndpointRef的绑定关系。在注册RpcEndpoint时,每个RpcEndpoint都需要有一个唯一的名称。
Spark中基于Netty实现通信,所以对应的RpcEnv实现为NettyRpcEnv,上面方法的实现,如下所示:

1 override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
2   dispatcher.registerRpcEndpoint(name, endpoint)
3 }

调用NettyRpcEnv内部的Dispatcher对象注册一个RpcEndpoint:

01 def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
02   val addr = RpcEndpointAddress(nettyEnv.address, name)
03   val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
04   synchronized {
05     if (stopped) {
06       throw new IllegalStateException("RpcEnv has been stopped")
07     }
08     if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
09       throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
10     }
11     val data = endpoints.get(name)
12     endpointRefs.put(data.endpoint, data.ref)
13     receivers.offer(data)  // for the OnStart message
14   }
15   endpointRef
16 }

一个RpcEndpoint只能注册一次(根据RpcEndpoint的名称来检查唯一性),这样在Dispatcher内部注册并维护RpcEndpoint与RpcEndpointRef的绑定关系,通过如下两个内部结构:

1   private val endpoints = new ConcurrentHashMap[String, EndpointData]
2   private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
3 可以看到,一个命名唯一的RpcEndpoint在Dispatcher中对应一个EndpointData来维护其信息,该数据结构定义,如下所示:
4   private class EndpointData(
5       val name: String,
6       val endpoint: RpcEndpoint,
7       val ref: NettyRpcEndpointRef) {
8     val inbox = new Inbox(ref, endpoint)
9   }

这里,每一个命名唯一的RpcEndpoint对应一个线程安全的Inbox,所有发送给一个RpcEndpoint的消息,都由对应的Inbox将对应的消息路由给RpcEndpoint进行处理,后面我们会详细分析Inbox。

创建NettyRpcEnv环境对象

创建NettyRpcEnv对象,是一个非常重的操作,所以在框架里使用过程中要尽量避免重复创建。创建NettyRpcEnv,会创建很多用来处理底层RPC通信的线程和数据结构。具体的创建过程,如下图所示:
SparkRPC-Create-NettyRpcEnv
具体要点,描述如下:

  • 创建一个NettyRpcEnv对象对象,需要通过NettyRpcEnvFactory来创建
  • Dispatcher负责RPC消息的路由,它能够将消息路由到对应的RpcEndpoint进行处理
  • NettyStreamManager负责提供文件服务(文件、JAR文件、目录)
  • NettyRpcHandler负责处理网络IO事件,接收RPC调用请求,并通过Dispatcher派发消息
  • TransportContext负责管理网路传输上下文信息:创建MessageEncoder、MessageDecoder、TransportClientFactory、TransportServer
  • TransportServer配置并启动一个RPC Server服务

消息路由过程分析

基于Standalone模式,Spark集群具有Master和一组Worker,Worker与Master之间需要进行通信,我们以此为例,来说明基于Spark PRC层是如何实现消息的路由的。
首先看Master端实现,代码如下所示:

01 def startRpcEnvAndEndpoint(
02     host: String,
03     port: Int,
04     webUiPort: Int,
05     conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
06   val securityMgr = new SecurityManager(conf)
07   val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
08   val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
09     new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
10   val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
11   (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
12 }

上面代码中,创建一个RpcEnv对象,通过创建一个NettyRpcEnvFactory对象来完成该RpcEnv对象的创建,实际创建了一个NettyRpcEnv对象。接着,通过setupEndpoint方法注册一个RpcEndpoint,这里Master就是一个RpcEndpoint,返回的masterEndpoint是Master的RpcEndpointRef引用对象。下面,我们看一下,发送一个BoundPortsRequest消息,具体的消息路由过程,如下图所示:
Master
上图中显示本地消息和远程消息派发的流程,最主要的区别是在接收消息时:接收消息走的是Inbox,发送消息走的是Outbox。

本地消息路由

发送一个BoundPortsRequest消息,实际走的是本地消息路由,直接放到对应的Inbox中,对应的代码处理逻辑如下所示:

01 private def postMessage(
02     endpointName: String,
03     message: InboxMessage,
04     callbackIfStopped: (Exception) => Unit): Unit = {
05   val error = synchronized {
06     val data = endpoints.get(endpointName)
07     if (stopped) {
08       Some(new RpcEnvStoppedException())
09     } else if (data == null) {
10       Some(new SparkException(s"Could not find $endpointName."))
11     } else {
12       data.inbox.post(message)
13       receivers.offer(data)
14       None
15     }
16   }
17   // We don't need to call `onStop` in the `synchronized` block
18   error.foreach(callbackIfStopped)
19 }

上面通过data.inbox派发消息,然后将消息data :EndpointData放入到receivers队列,触发Dispatcher内部的MessageLoop线程去消费,如下所示:

01 private class MessageLoop extends Runnable {
02   override def run(): Unit = {
03     try {
04       while (true) {
05         try {
06           val data = receivers.take()
07           if (data == PoisonPill) {
08             // Put PoisonPill back so that other MessageLoops can see it.
09             receivers.offer(PoisonPill)
10             return
11           }
12           data.inbox.process(Dispatcher.this)
13         } catch {
14           case NonFatal(e) => logError(e.getMessage, e)
15         }
16       }
17     } catch {
18       case ie: InterruptedException => // exit
19     }
20   }
21 }

这里,又继续调用Inbox的process方法来派发消息到指定的RpcEndpoint。通过上面的序列图,我们可以通过源码分析看到,原始消息被层层封装为一个RpcMessage ,该消息在Inbox的process方法中处理派发逻辑,如下所示:

01 case RpcMessage(_sender, content, context) =>
02        try {
03          endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
04            throw new SparkException(s"Unsupported message $message from ${_sender}")
05          })
06        } catch {
07          case NonFatal(e) =>
08            context.sendFailure(e)
09            // Throw the exception -- this exception will be caught by the safelyCall function.
10            // The endpoint's onError function will be called.
11            throw e
12        }

到这里,消息已经发送给对应的RpcEndpoint的receiveAndReply方法,我们这里实际上是Master实现类,这里的消息解包后为content: BoundPortsRequest,接下来应该看Master的receiveAndReply方法如何处理该本地消息,代码如下所示:

1 case BoundPortsRequest =>
2   context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))

可以看出,实际上上面的处理逻辑没有什么处理,只是通过BoundPortsResponse返回了几个Master端的几个端口号数据。

远程消息路由

我们都知道,Worker启动时,会向Master注册,通过该场景我们分析一下远程消息路由的过程。
先看一下Worker端向Master注册过程,如下图所示:
Worker.ask
Worker启动时,会首先获取到一个Master的RpcEndpointRef远程引用,通过该引用对象能够与Master进行RPC通信,经过上面消息派发,最终通过Netty的Channel将消息发送到远程Master端。
通过前面说明,我们知道Worker向Master注册的消息RegisterWorker应该最终会被路由到Master对应的Inbox中,然后派发给Master进行处理。下面,我们看一下Master端接收并处理消息的过程,如下图所示:
Master.receiveAndReply
上图分为两部分:一部分是从远端接收消息RegisterWorker,将接收到的消息放入到Inbox中;另一部分是触发MessageLoop线程处理该消息,进而通过调用Inbox的process方法,继续调用RpcEndpoint(Master)的receiveAndReply方法,处理消息RegisterWorker,如下所示:

01 case RegisterWorker(
02     id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
03   logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
04     workerHost, workerPort, cores, Utils.megabytesToString(memory)))
05   if (state == RecoveryState.STANDBY) {
06     context.reply(MasterInStandby)
07   } else if (idToWorker.contains(id)) {
08     context.reply(RegisterWorkerFailed("Duplicate worker ID"))
09   } else {
10     val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
11       workerRef, workerWebUiUrl)
12     if (registerWorker(worker)) {
13       persistenceEngine.addWorker(worker)
14       context.reply(RegisteredWorker(self, masterWebUiUrl))
15       schedule()
16     } else {
17       val workerAddress = worker.endpoint.address
18       logWarning("Worker registration failed. Attempted to re-register worker at same " +
19         "address: " + workerAddress)
20       context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
21         + workerAddress))
22     }
23   }

如果Worker注册成功,则Master会通过context对象回复Worker响应:

1 context.reply(RegisteredWorker(self, masterWebUiUrl))

这样,如果一切正常,则Worker会收到RegisteredWorker响应消息,从而获取到Master的RpcEndpointRef引用对象,能够通过该引用对象与Master交互。