When is the onStart() method of Spark source code analysis Master called?

As we all know, the life cycle method of Master is: constructor - > onStart - > receive * - > onstop; but there is no direct call to onStart in Master's main method, so when is the onStart method called?

This is actually related to the underlying Netty communication architecture of Spark.

In the main method of Master:

 val rpcEnv: RpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)

When the create() method of RpcEnv is called, NettyRpcEnv is finally initialized and executed and the object is finally returned.
Let's take a look at the source code of NettyRpcEnv:

private[netty] class NettyRpcEnv(
                                    val conf: SparkConf,
                                    javaSerializerInstance: JavaSerializerInstance,
                                    host: String,
                                    securityManager: SecurityManager) extends RpcEnv(conf) with Logging{

    private[netty] val transportConf = SparkTransportConf.fromSparkConf(
        conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
        "rpc",
        conf.getInt("spark.rpc.io.threads", 0))
    // Creating a message distributor will create an inbox. It can improve the ability to process messages asynchronously - >
    private val dispatcher: Dispatcher = new Dispatcher(this)
    ......
} 

Let's take a look at the source code of Dispatcher:

/**
 * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
  * Message distributor, responsible for routing RPC messages to one or more appropriate endpoints
 */
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
  // Endpoint data: endpoint name, endpoint, EndpointRef
  private class EndpointData(
      val name: String,
      val endpoint: RpcEndpoint,
      val ref: NettyRpcEndpointRef) {
    // Inbox for each Endpoint
    val inbox: Inbox = new Inbox(ref, endpoint)
  }
  // Cache of mapping relationship between endpoint instance name and endpoint data. Endpoint data can be quickly found or deleted according to endpoint name
  private val endpoints: ConcurrentMap[String, EndpointData] =
    new ConcurrentHashMap[String, EndpointData]
  // Mapping relationship between endpoint instance and endpoint ref
  private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] =
    new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]

  // Track the receivers whose inboxes may contain messages.
  // The blocking queue where EndpointData is stored. Only EndpointData with messages in Inbox will join this queue
  private val receivers = new LinkedBlockingQueue[EndpointData]

  /**
   * True if the dispatcher has been stopped. Once stopped, all messages posted will be bounced
   * immediately.
    * dispatcher Whether to stop
   */
  @GuardedBy("this")
  private var stopped = false

  def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
      if (stopped) {
        throw new IllegalStateException("RpcEnv has been stopped")
      }
      // Put RpcEndpoint into the map set
      if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
      }
      // RpcEndpoint that was just put into the map collection in the past
      val data = endpoints.get(name)
      // Map RpcEndpoint and RpcEndpointRef
      endpointRefs.put(data.endpoint, data.ref)
      // Put endpoint data in blocking queue
      receivers.offer(data)  // for the OnStart message
    }
    endpointRef
  }

  ......

  /** Thread pool used for dispatching messages.
    *
    * Thread pool for scheduling messages
    * */
  private val threadpool: ThreadPoolExecutor = {
    // Size of thread pool
    val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
      math.max(2, Runtime.getRuntime.availableProcessors()))
    // Create thread pool. Fixed size, and start threads are background threads
    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
    // Start the thread running MessageLoop
    for (i <- 0 until numThreads) {
      // ->
      pool.execute(new MessageLoop)
    }
    // Return to thread pool
    pool
  }

  /** Message loop used for dispatching messages. */
  private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        while (true) {
          try {
            // Take an EndpointData from the blocked EndpointData queue
            val data = receivers.take()
            if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
              receivers.offer(PoisonPill)
              return
            }
            // process information
            data.inbox.process(Dispatcher.this)
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
        case ie: InterruptedException => // exit
      }
    }
  }

  /** A poison endpoint that indicates MessageLoop should exit its message loop.
    * A toxic endpoint indicates that MessageLoop should exit
    * */
  private val PoisonPill = new EndpointData(null, null, null)
}

It can be seen that the function in threadpool variable will run, and finally go to data.inbox.process(Dispatcher.this) method.
Check the source code of the method:

/**
      * Process stored messages.
      * Handling stored messages
      */
    def process(dispatcher: Dispatcher): Unit = {
        var message: InboxMessage = null
        inbox.synchronized {
            if (!enableConcurrent && numActiveThreads != 0) {
                return
            }
            message = messages.poll()
            if (message != null) {
                numActiveThreads += 1
            } else {
                return
            }
        }
        while (true) {
            safelyCall(endpoint) {
                message match {
					......
                    case OnStart =>
                        // Call the onStart method of RpcEndpoint
                        endpoint.onStart()
                        if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
                            inbox.synchronized {
                                if (!stopped) {
                                    enableConcurrent = true
                                }
                            }
                        }
                    
                    ......
                }
            }
            
            ......
        }
    }

When EndpointData is created, the Inbox object is initialized:

private[netty] class Inbox(
                              val endpointRef: NettyRpcEndpointRef,
                              val endpoint: RpcEndpoint)
    extends Logging {
	......
    // OnStart should be the first message to process
    inbox.synchronized {  // Synchronized (lock) {}
        // Put a message OnStart in the message
        messages.add(OnStart)
    }

Because the message "onStart" will be put into the Inbox, when the dispatcher repeatedly receives the data in the Inbox, it will match its pattern and then process it!
The onStart() method of Master is called.

Published 7 original articles, won praise 0, visited 41
Private letter follow

Tags: Netty Spark IE

Posted on Fri, 07 Feb 2020 08:25:08 -0800 by jib