?Spark架构与原理系列:Spark 事件监听机制

Spark中不同组件间的通信使用Rpc进行远程通信(Driver, executor,client)。而在同一组件内,spark还有事件监听机制,如spark中各种指标的采集主要就是通过事件监听机制获取的。

在SparkContext进行初始化的过程中,第一个初始化的内部组件就是LiveListenerBus。

_listenerBus* = new LiveListenerBus(*_conf*)

Spark 事件总线

Spark中的事件总线采用监听器模式设计,其大致流程可以用下面的简图表示。

ListenerBus特征是Spark内所有事件总线实现的基类,下图示出ListenerBus的一部分继承结构。

在Spark中定义了一个特质ListenerBus主要用于接收事件并提交到对应的事件监听器, 添加和移除事件监听器。

`private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {`

`private[this] val *listenersPlusTimers* = new CopyOnWriteArrayList[(L, Option[Timer])]`
/**
   * Add a listener to listen events. This method is thread-safe and can be called in any thread.
   */

  final def addListener(listener: L): Unit = {
    listenersPlusTimers.add((listener, getTimer(listener)))
  }

  /**
   * Remove a listener and it won't receive any events. This method is thread-safe and can be called
   * in any thread.
   */

  final def removeListener(listener: L): Unit = {
    listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
      listenersPlusTimers.remove(listenerAndTimer)
    }
  }

def postToAll(event: E): Unit = {
    // JavaConverters can create a JIterableWrapper if we use asScala.
    // However, this method will be called frequently. To avoid the wrapper cost, here we use
    // Java Iterator directly.
    val iter = listenersPlusTimers.iterator
    while (iter.hasNext) 
{
      val listenerAndMaybeTimer = iter.next()
      val listener = listenerAndMaybeTimer._1
      val maybeTimer = listenerAndMaybeTimer._2
      val maybeTimerContext = if (maybeTimer.isDefined) {
        maybeTimer.get.time()
      } else {
        null
      }
      try {
        doPostEvent(listener, event)
        if (Thread.interrupted()) {
          // We want to throw the InterruptedException right away so we can associate the interrupt
          // with this listener, as opposed to waiting for a queue.take() etc. to detect it.
          throw new InterruptedException()
        }
      } catch {
        case ie: InterruptedException =>
          logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}.  " +
            s"Removing that listener.", ie)
          removeListenerOnError(listener)
        case NonFatal(e) if !isIgnorableException(e) =>
          logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
      } finally {
        if (maybeTimerContext != null) {
          maybeTimerContext.stop()
        }
      }
    }
  }

protected def doPostEvent(listener: L, event: E): Unit

可以看出在ListenerBus中定义了向所有的事件监听器进行提交事件,但是具体的提交事件的方法是有待子类进行实现。

  • SparkListenerBus: 主要用于提交SparkListenerEvent 类型事件提交到监听器。

  • StreamingListenerBus: 主要是将StreamingListenerEvent提交到监听器

  • LiveListenerBus: 主要采用异步线程提交SparkListenerEvent 类型事件提交到监听器。

在SparkListenerBus 中实现了ListenerBus 的doPost的方法,其中实现可以将stageSubmitted,stageCompleted,jobStart,metricsUpdate ... 事件的添加。

在SparkListenerBus中,当事件需要通知监听器时会调用postToAll遍历所有的监听器,并调用SparkListenerBus的doPost方法对事件类型进行匹配后调用不同的监听器,整个过程是同步的调用,在监听器较多的情况下比较耗时。

AsyncEventQueue 异步监听器

在SparkListenerBus的实现类AsyncEventQueue中,提供了异步事件队列机制,它也是SparkContext中的事件总线LiveListenerBus的基础。

private class AsyncEventQueue(
    val name: String,
    conf: SparkConf,
    metrics: LiveListenerBusMetrics,
    bus: LiveListenerBus)
  extends SparkListenerBus
  with Logging {

  import AsyncEventQueue._

  // Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
  // it's perpetually being added to more quickly than it's being drained.
  // 容量为1w的事件队列
  private val eventQueue= new LinkedBlockingQueue[SparkListenerEvent](
    conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))

  // Keep the event count separately, so that waitUntilEmpty() can be implemented properly;
  // this allows that method to return only when the events in the queue have been fully
  // processed (instead of just dequeued).
  private val eventCount= new AtomicLong()

/** A counter for dropped events. It will be reset every time we log it. */
private val droppedEventsCounter= new AtomicLong(0L)

/** When`droppedEventsCounter`was logged last time in milliseconds. */
@volatile private var lastReportTimestamp= 0L

  private val logDroppedEvent= new AtomicBoolean(false)

  private var sc: SparkContext = null

  private val started= new AtomicBoolean(false)
  private val stopped= new AtomicBoolean(false)

  private val droppedEvents= metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents")
  private val processingTime= metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime")

  // Remove the queue size gauge first, in case it was created by a previous incarnation of
  // this queue that was removed from the listener bus.
  metrics.metricRegistry.remove(s"queue.$name.size")
  metrics.metricRegistry.register(s"queue.$name.size", new Gauge[Int] {
    override def getValue: Int =eventQueue.size()
  })

  private val dispatchThread= new Thread(s"spark-listener-group-$name") {
    setDaemon(true)
    override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
      dispatch()
    }
  }

该类的构造参数有四个,分别是队列名,SparkConf, LiveListenerBusMetrics和LiveListenerBus

  • eventQueue是一个存储SparkListenerEvent事件的阻塞队列LinkedBlockingQueue。它的大小是通过配置参数spark.scheduler.listenerbus.eventqueue.capacity来设置的,默认值10000。如果不设置阻塞队列的大小,那么默认值会是Integer.MAX_VALUE,有OOM的风险。

  • eventCount则是当前待处理事件的计数。因为事件从队列中弹出不代表已经处理完成,所以不能直接用队列的实际大小来表示。它是AtomicLong类型的,以保证修改的原子性。

  • started、stopped属性

这两个属性分别用来标记队列的启动与停止状态。

  • dispatchThread属性

dispatchThread是将队列中的事件分发到各监听器的守护线程,实际上调用了dispatch()方法。而Utils.tryOrStopSparkContext()方法的作用在于执行代码块时如果抛出异常,就另外起一个线程关闭SparkContext。

private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
  var next: SparkListenerEvent =eventQueue.take()
  while (next !=POISON_PILL) {
    val ctx =processingTime.time()
    try {
      super.postToAll(next)
    } finally {
      ctx.stop()
    }
eventCount.decrementAndGet()
    next =eventQueue.take()
  }
eventCount.decrementAndGet()
}

可见,该方法循环地从事件队列中取出事件,并调用父类ListenerBus特征的postToAll()方法。那么事件是如何入队的。

def post(event: SparkListenerEvent): Unit = {
  if (stopped.get()) {
    return
  }

eventCount.incrementAndGet()
  if (eventQueue.offer(event)) {
    return
  }

eventCount.decrementAndGet()
droppedEvents.inc()
droppedEventsCounter.incrementAndGet()
  if (logDroppedEvent.compareAndSet(falsetrue)) {
    // Only log the following message once to avoid duplicated annoying logs.
    logError(s"Dropping event from queue$name. " +
      "This likely means one of the listeners is too slow and cannot keep up with " +
      "the rate at which tasks are being started by the scheduler.")
  }
  logTrace(s"Dropping event$event")

  val droppedCount =droppedEventsCounter.get
  if (droppedCount > 0) {
    // Don't log too frequently
    if (System.currentTimeMillis() -lastReportTimestamp>= 60 * 1000) {
      // There may be multiple threads trying to decrease droppedEventsCounter.
      // Use "compareAndSet" to make sure only one thread can win.
      // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
      // then that thread will update it.
      if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
        val prevLastReportTimestamp =lastReportTimestamp
        lastReportTimestamp= System.currentTimeMillis()
        val previous = new java.util.Date(prevLastReportTimestamp)
        logWarning(s"Dropped$droppedCount events from$name since$previous.")
      }
    }
  }
}

该方法首先检查队列是否已经停止。如果是运行状态,就试图将事件event入队。若offer()方法返回false,表示队列已满,将丢弃事件的计数器自增,并标记有事件被丢弃。最后,若当前的时间戳与上一次输出droppedEventsCounter值的间隔大于1分钟,就在日志里输出它的值。

理解了AsyncEventQueue的细节之后,我们就可以进一步来看LiveListenerBus的实现了。

LiveListenerBus 的实现

LiveListenerBus内部用到了AsyncEventQueue作为核心。

private[spark] class LiveListenerBus(conf: SparkConf) {

  import LiveListenerBus._

  private var sparkContext: SparkContext = _

  private[spark] val metrics= new LiveListenerBusMetrics(conf)

  // Indicate if `start()` is called
  private val started= new AtomicBoolean(false)
  // Indicate if `stop()` is called
  private val stopped= new AtomicBoolean(false)

  private val queues= new CopyOnWriteArrayList[AsyncEventQueue]()

  // Visible for testing.
  @volatile private[scheduler] var queuedEvents= new mutable.ListBuffer[SparkListenerEvent]()

可以看出多出来的主要是queues与queuedEvents。

  • queues属性

queues维护一个AsyncEventQueue的列表,也就是说LiveListenerBus中会有多个事件队列。它采用CopyOnWriteArrayList来保证线程安全性。

  • queuedEvents属性

queuedEvents维护一个SparkListenerEvent的列表,它的用途是在LiveListenerBus启动成功之前,缓存可能已经收到的事件。在启动之后,这些缓存的事件会首先投递出去。

LiveListenerBus作为一个事件总线,也必须提供监听器注册、事件投递等功能,这些都是在AsyncEventQueue基础之上实现的。

 private[spark] def addToQueue(
      listener: SparkListenerInterface,
      queue: String
): Unit 
= synchronized {
    if (stopped.get()) {
      throw new IllegalStateException("LiveListenerBus is stopped.")
    }

    queues.asScala.find(_.name == queue) match {
      case Some(queue=>
        queue.addListener(listener)

      case None =>
        val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
        newQueue.addListener(listener)
        if (started.get()) {
          newQueue.start(sparkContext)
        }
        queues.add(newQueue)
    }
  }

该方法将监听器listener注册到名为queue的队列中。它会在queues列表中寻找符合条件的队列,如果该队列已经存在,就调用父类ListenerBus的addListener()方法直接注册监听器。反之,就先创建一个AsyncEventQueue,注册监听器到新的队列中。

def addToSharedQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, SHARED_QUEUE)
  }

  def addToManagementQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
  }

  def addToStatusQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, APP_STATUS_QUEUE)
  }

  def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, EVENT_LOG_QUEUE)
  }

LiveListenerBus还提供了另外4种直接注册监听器的方法,分别对应内置的4个队列,其名称在伴生对象中有定义。

def post(event: SparkListenerEvent): Unit = {
    if (stopped.get()) {
      return
    }
    metrics.numEventsPosted.inc()

    if (queuedEvents == null) {
      postToQueues(event)
      return
    }

    synchronized {
      if (!started.get()) {
        queuedEvents += event
        return
      }
    }

    postToQueues(event)
  }

  private def postToQueues(event: SparkListenerEvent): Unit = {
    val it = queues.iterator()
    while (it.hasNext()) {
      it.next().post(event)
    }
  }

post()方法会检查queuedEvents中有无缓存的事件,以及事件总线是否还没有启动。投递时会调用postToQueues()方法,将事件发送给所有队列,由AsyncEventQueue来完成投递到监听器的工作。

自定义监听器

class MySparkAppListener(val sparkConf: SparkConf) extends SparkListener with Logging{

  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
    val appId = applicationStart.appId
    logInfo(appId)
  }

  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
    logInfo("app end time " + applicationEnd.time)
  }

}

object MyObject {
    def main(args : Array[String]) : Unit = {
        val sparkConf=new SparkConf()

        sparkConf.set("spark.extraListeners","org.apache.spark.MySparkAppListener")

        val sc = new SparkContext(sparkConf)
        .....
    }
}

以DAGScheduler中的JobSubmitted为例,梳理下整个过程:

1.在DAGScheduler处理JobSubmitted消息的函数在handleJobSubmitted中,在submitStage之前,会通过消息总线将SparkListenerJobStart监控事件发送到消息总线。

2.在LiveListenerBus类内部,会将SparkListenerJobStart事件依次塞入到所有多列中(上图中的AsyncEventQueue中的Queue)。

3.与此同时,每个AsyncEventQueue中的Queue对应一个Thread,该线程将持续从队列中取出监听事件,将该事件发送给与该列队相连的所有事件监听器。

4.各个事件监听器根据不同的event类型,进行对应的处理。

以上就是事件响应处理的整体流程。

此外,还有一个问题是:监听器是怎么注册到消息总线内部的队列的?

以DAGScheduler中的ListenerBus为例,这个listenerbus是在SparkContext中初始化的,并且通过调用addToEventLogQueue,addToStatusQueue,addToManagementQueue,addToSharedQueue函数将各个监听器加入到不同的队列中去。

请使用浏览器的分享功能分享到微信等