Spark中不同组件间的通信使用Rpc进行远程通信(Driver, executor,client)。而在同一组件内,spark还有事件监听机制,如spark中各种指标的采集主要就是通过事件监听机制获取的。
在SparkContext进行初始化的过程中,第一个初始化的内部组件就是LiveListenerBus。
_listenerBus* = new LiveListenerBus(*_conf*)
Spark 事件总线
Spark中的事件总线采用监听器模式设计,其大致流程可以用下面的简图表示。

ListenerBus特征是Spark内所有事件总线实现的基类,下图示出ListenerBus的一部分继承结构。

在Spark中定义了一个特质ListenerBus主要用于接收事件并提交到对应的事件监听器, 添加和移除事件监听器。
`private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {`
`private[this] val *listenersPlusTimers* = new CopyOnWriteArrayList[(L, Option[Timer])]`
/**
* Add a listener to listen events. This method is thread-safe and can be called in any thread.
*/
final def addListener(listener: L): Unit = {
listenersPlusTimers.add((listener, getTimer(listener)))
}
/**
* Remove a listener and it won't receive any events. This method is thread-safe and can be called
* in any thread.
*/
final def removeListener(listener: L): Unit = {
listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
listenersPlusTimers.remove(listenerAndTimer)
}
}
def postToAll(event: E): Unit = {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here we use
// Java Iterator directly.
val iter = listenersPlusTimers.iterator
while (iter.hasNext) {
val listenerAndMaybeTimer = iter.next()
val listener = listenerAndMaybeTimer._1
val maybeTimer = listenerAndMaybeTimer._2
val maybeTimerContext = if (maybeTimer.isDefined) {
maybeTimer.get.time()
} else {
null
}
try {
doPostEvent(listener, event)
if (Thread.interrupted()) {
// We want to throw the InterruptedException right away so we can associate the interrupt
// with this listener, as opposed to waiting for a queue.take() etc. to detect it.
throw new InterruptedException()
}
} catch {
case ie: InterruptedException =>
logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}. " +
s"Removing that listener.", ie)
removeListenerOnError(listener)
case NonFatal(e) if !isIgnorableException(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
} finally {
if (maybeTimerContext != null) {
maybeTimerContext.stop()
}
}
}
}
protected def doPostEvent(listener: L, event: E): Unit
可以看出在ListenerBus中定义了向所有的事件监听器进行提交事件,但是具体的提交事件的方法是有待子类进行实现。
SparkListenerBus: 主要用于提交SparkListenerEvent 类型事件提交到监听器。
StreamingListenerBus: 主要是将StreamingListenerEvent提交到监听器
LiveListenerBus: 主要采用异步线程提交SparkListenerEvent 类型事件提交到监听器。
在SparkListenerBus 中实现了ListenerBus 的doPost的方法,其中实现可以将stageSubmitted,stageCompleted,jobStart,metricsUpdate ... 事件的添加。
在SparkListenerBus中,当事件需要通知监听器时会调用postToAll遍历所有的监听器,并调用SparkListenerBus的doPost方法对事件类型进行匹配后调用不同的监听器,整个过程是同步的调用,在监听器较多的情况下比较耗时。
AsyncEventQueue 异步监听器
在SparkListenerBus的实现类AsyncEventQueue中,提供了异步事件队列机制,它也是SparkContext中的事件总线LiveListenerBus的基础。
private class AsyncEventQueue(
val name: String,
conf: SparkConf,
metrics: LiveListenerBusMetrics,
bus: LiveListenerBus)
extends SparkListenerBus
with Logging {
import AsyncEventQueue._
// Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
// it's perpetually being added to more quickly than it's being drained.
// 容量为1w的事件队列
private val eventQueue= new LinkedBlockingQueue[SparkListenerEvent](
conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
// Keep the event count separately, so that waitUntilEmpty() can be implemented properly;
// this allows that method to return only when the events in the queue have been fully
// processed (instead of just dequeued).
private val eventCount= new AtomicLong()
/** A counter for dropped events. It will be reset every time we log it. */
private val droppedEventsCounter= new AtomicLong(0L)
/** When`droppedEventsCounter`was logged last time in milliseconds. */
@volatile private var lastReportTimestamp= 0L
private val logDroppedEvent= new AtomicBoolean(false)
private var sc: SparkContext = null
private val started= new AtomicBoolean(false)
private val stopped= new AtomicBoolean(false)
private val droppedEvents= metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents")
private val processingTime= metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime")
// Remove the queue size gauge first, in case it was created by a previous incarnation of
// this queue that was removed from the listener bus.
metrics.metricRegistry.remove(s"queue.$name.size")
metrics.metricRegistry.register(s"queue.$name.size", new Gauge[Int] {
override def getValue: Int =eventQueue.size()
})
private val dispatchThread= new Thread(s"spark-listener-group-$name") {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
dispatch()
}
}
该类的构造参数有四个,分别是队列名,SparkConf, LiveListenerBusMetrics和LiveListenerBus
eventQueue是一个存储SparkListenerEvent事件的阻塞队列LinkedBlockingQueue。它的大小是通过配置参数spark.scheduler.listenerbus.eventqueue.capacity来设置的,默认值10000。如果不设置阻塞队列的大小,那么默认值会是Integer.MAX_VALUE,有OOM的风险。
eventCount则是当前待处理事件的计数。因为事件从队列中弹出不代表已经处理完成,所以不能直接用队列的实际大小来表示。它是AtomicLong类型的,以保证修改的原子性。
started、stopped属性
这两个属性分别用来标记队列的启动与停止状态。
dispatchThread属性
dispatchThread是将队列中的事件分发到各监听器的守护线程,实际上调用了dispatch()方法。而Utils.tryOrStopSparkContext()方法的作用在于执行代码块时如果抛出异常,就另外起一个线程关闭SparkContext。
private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
var next: SparkListenerEvent =eventQueue.take()
while (next !=POISON_PILL) {
val ctx =processingTime.time()
try {
super.postToAll(next)
} finally {
ctx.stop()
}
eventCount.decrementAndGet()
next =eventQueue.take()
}
eventCount.decrementAndGet()
}
可见,该方法循环地从事件队列中取出事件,并调用父类ListenerBus特征的postToAll()方法。那么事件是如何入队的。
def post(event: SparkListenerEvent): Unit = {
if (stopped.get()) {
return
}
eventCount.incrementAndGet()
if (eventQueue.offer(event)) {
return
}
eventCount.decrementAndGet()
droppedEvents.inc()
droppedEventsCounter.incrementAndGet()
if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
logError(s"Dropping event from queue$name. " +
"This likely means one of the listeners is too slow and cannot keep up with " +
"the rate at which tasks are being started by the scheduler.")
}
logTrace(s"Dropping event$event")
val droppedCount =droppedEventsCounter.get
if (droppedCount > 0) {
// Don't log too frequently
if (System.currentTimeMillis() -lastReportTimestamp>= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
val prevLastReportTimestamp =lastReportTimestamp
lastReportTimestamp= System.currentTimeMillis()
val previous = new java.util.Date(prevLastReportTimestamp)
logWarning(s"Dropped$droppedCount events from$name since$previous.")
}
}
}
}
该方法首先检查队列是否已经停止。如果是运行状态,就试图将事件event入队。若offer()方法返回false,表示队列已满,将丢弃事件的计数器自增,并标记有事件被丢弃。最后,若当前的时间戳与上一次输出droppedEventsCounter值的间隔大于1分钟,就在日志里输出它的值。
理解了AsyncEventQueue的细节之后,我们就可以进一步来看LiveListenerBus的实现了。
LiveListenerBus 的实现
LiveListenerBus内部用到了AsyncEventQueue作为核心。
private[spark] class LiveListenerBus(conf: SparkConf) {
import LiveListenerBus._
private var sparkContext: SparkContext = _
private[spark] val metrics= new LiveListenerBusMetrics(conf)
// Indicate if `start()` is called
private val started= new AtomicBoolean(false)
// Indicate if `stop()` is called
private val stopped= new AtomicBoolean(false)
private val queues= new CopyOnWriteArrayList[AsyncEventQueue]()
// Visible for testing.
@volatile private[scheduler] var queuedEvents= new mutable.ListBuffer[SparkListenerEvent]()
可以看出多出来的主要是queues与queuedEvents。
queues属性
queues维护一个AsyncEventQueue的列表,也就是说LiveListenerBus中会有多个事件队列。它采用CopyOnWriteArrayList来保证线程安全性。
queuedEvents属性
queuedEvents维护一个SparkListenerEvent的列表,它的用途是在LiveListenerBus启动成功之前,缓存可能已经收到的事件。在启动之后,这些缓存的事件会首先投递出去。
LiveListenerBus作为一个事件总线,也必须提供监听器注册、事件投递等功能,这些都是在AsyncEventQueue基础之上实现的。
private[spark] def addToQueue(
listener: SparkListenerInterface,
queue: String): Unit = synchronized {
if (stopped.get()) {
throw new IllegalStateException("LiveListenerBus is stopped.")
}
queues.asScala.find(_.name == queue) match {
case Some(queue) =>
queue.addListener(listener)
case None =>
val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
newQueue.addListener(listener)
if (started.get()) {
newQueue.start(sparkContext)
}
queues.add(newQueue)
}
}
该方法将监听器listener注册到名为queue的队列中。它会在queues列表中寻找符合条件的队列,如果该队列已经存在,就调用父类ListenerBus的addListener()方法直接注册监听器。反之,就先创建一个AsyncEventQueue,注册监听器到新的队列中。
def addToSharedQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, SHARED_QUEUE)
}
def addToManagementQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
}
def addToStatusQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, APP_STATUS_QUEUE)
}
def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, EVENT_LOG_QUEUE)
}
LiveListenerBus还提供了另外4种直接注册监听器的方法,分别对应内置的4个队列,其名称在伴生对象中有定义。
def post(event: SparkListenerEvent): Unit = {
if (stopped.get()) {
return
}
metrics.numEventsPosted.inc()
if (queuedEvents == null) {
postToQueues(event)
return
}
synchronized {
if (!started.get()) {
queuedEvents += event
return
}
}
postToQueues(event)
}
private def postToQueues(event: SparkListenerEvent): Unit = {
val it = queues.iterator()
while (it.hasNext()) {
it.next().post(event)
}
}
post()方法会检查queuedEvents中有无缓存的事件,以及事件总线是否还没有启动。投递时会调用postToQueues()方法,将事件发送给所有队列,由AsyncEventQueue来完成投递到监听器的工作。
自定义监听器
class MySparkAppListener(val sparkConf: SparkConf) extends SparkListener with Logging{
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
val appId = applicationStart.appId
logInfo(appId)
}
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
logInfo("app end time " + applicationEnd.time)
}
}
object MyObject {
def main(args : Array[String]) : Unit = {
val sparkConf=new SparkConf()
sparkConf.set("spark.extraListeners","org.apache.spark.MySparkAppListener")
val sc = new SparkContext(sparkConf)
.....
}
}
以DAGScheduler中的JobSubmitted为例,梳理下整个过程:
1.在DAGScheduler处理JobSubmitted消息的函数在handleJobSubmitted中,在submitStage之前,会通过消息总线将SparkListenerJobStart监控事件发送到消息总线。
2.在LiveListenerBus类内部,会将SparkListenerJobStart事件依次塞入到所有多列中(上图中的AsyncEventQueue中的Queue)。
3.与此同时,每个AsyncEventQueue中的Queue对应一个Thread,该线程将持续从队列中取出监听事件,将该事件发送给与该列队相连的所有事件监听器。
4.各个事件监听器根据不同的event类型,进行对应的处理。
以上就是事件响应处理的整体流程。
此外,还有一个问题是:监听器是怎么注册到消息总线内部的队列的?
以DAGScheduler中的ListenerBus为例,这个listenerbus是在SparkContext中初始化的,并且通过调用addToEventLogQueue,addToStatusQueue,addToManagementQueue,addToSharedQueue函数将各个监听器加入到不同的队列中去。