SparkContext 源码分析

简介: SparkContext 源码分析更多资源github: https://github.com/opensourceteams/spark-scala-mavencsdn(汇总视频在线看): https://blog.

SparkContext 源码分析

更多资源

Youtub 视频分享

bilibili 视频分享

文档说明

Main entry point for Spark functionality.
 A SparkContext represents the connection to a Spark cluster, 
 and can be used to create RDDs, accumulators and broadcast variables on that cluster.
  Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one. 
  This limitation may eventually be removed; see SPARK-2243 for more details.

翻译

).Spark功能主要入口点
).一个SparkContext表示与一个Spark集群的连接
).在Spark集群上,能创建RDDs,累加器,广播变量
).每个JVM仅仅只有一个SparkContext可能是活动的
).在创建一个新的SparkContext之前,你必须停掉活动的SparkContext,这个限制最终可能被 移除,看SPARK-2243 更多详情

SparkContext原理图

SparkContext原理图

xmind文件下载

https://github.com/opensourceteams/spark-scala-maven/blob/master/md/images/spark/SparkContext.xmind

配置信息

可配置信息

  • spark.jars = jar文件路径(可迭代的)
  • spark.files = 文件路径
  • spark.eventLog.dir=/tmp/spark-events // 事件日志目录
  • spark.eventLog.compress=false //事件日志是否压缩
  • spark.shuffle.manager=sort //指定shuffler manager
// Let the user specify short names for shuffle managers
   val shortShuffleMgrNames = Map(
     "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
     "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
     "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
   val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
   val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
   val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
  • spark.memory.useLegacyMode=true //指定内存管理器
      val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", true)
      val memoryManager: MemoryManager =
        if (useLegacyMemoryManager) {
          new StaticMemoryManager(conf, numUsableCores)
        } else {
          UnifiedMemoryManager(conf, numUsableCores)
        }
    
- spark.ui.showConsoleProgress=true      //展示控制台的进度信息
- spark.ui.enabled=true      //是否开启SparkUI
- spark.executor.memory=      //spark executor 的内存
- SPARK_EXECUTOR_MEMORY=  //spark executor 的内存
- SPARK_MEM=  //spark executor 的内存
 ```scala
 /**
   *查找顺序,找到前面的就不找后面的了(配置文件中设置值单位为 byte),默认值为1024MB
 spark.executor.memory  >  SPARK_EXECUTOR_MEMORY >  SPARK_MEM
   *
   /
  _executorMemory = _conf.getOption("spark.executor.memory")
      .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
      .orElse(Option(System.getenv("SPARK_MEM"))
      .map(warnSparkMem))
      .map(Utils.memoryStringToMb)
      .getOrElse(1024)
  • spark.scheduler.mode=FIFO //TaskSchedulerImpl 调度模式,可选(FIFO,FAIR,NONE)
/**
*  "FAIR" and "FIFO" determines which policy is used
*    to order tasks amongst a Schedulable's sub-queues
*  "NONE" is used when the a Schedulable has no sub-queues.
*/
object SchedulingMode extends Enumeration {

 type SchedulingMode = Value
 val FAIR, FIFO, NONE = Value
}
  • spark.cores.max=2 设置executor占用cpu内核个数

  • spark.executor.extraJavaOptions= //设置executor启动执行的java参数

  • spark.executor.extraClassPath= //设置 executor 执行的classpath

  • spark.executor.extraLibraryPath= //设置 executor LibraryPath

  • spark.executor.cores= //executor core 个数分配

  • spark.rpc.lookupTimeout=“120s” //设置 RPCTimeout超时时间

  • spark.network.timeout=“120s” //设置 RPCTimeout超时时间

 /** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
 private[spark] def lookupRpcTimeout(conf: SparkConf): RpcTimeout = {
   RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s")
 }

Spark系统设置配置信息

  • spark.driver.host = Utils.localHostName()
  • spark.driver.port = 0
  • spark.executor.id = driver

主要内容

创建作业进度监听器

 _jobProgressListener = new JobProgressListener(_conf)
 listenerBus.addListener(jobProgressListener)

创建SparkEnv

_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
  • 创建DriverEnv
 SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
  • 指定默认的 spark.rpc = org.apache.spark.rpc.netty.NettyRpcEnvFactory
  private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
    val rpcEnvNames = Map(
      "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
      "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
    val rpcEnvName = conf.get("spark.rpc", "netty")
    val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
    Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
  }
  • 创建NettyRpcEnv并启动,此时启动 ‘sparkDriver’
def create(config: RpcEnvConfig): RpcEnv = {
   val sparkConf = config.conf
   // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
   // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
   val javaSerializerInstance =
     new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
   val nettyEnv =
     new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)
   if (!config.clientMode) {
     val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
       nettyEnv.startServer(actualPort)
       (nettyEnv, nettyEnv.address.port)
     }
     try {
       Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
     } catch {
       case NonFatal(e) =>
         nettyEnv.shutdown()
         throw e
     }
   }
   nettyEnv
 }
  • 创建ActorSystem并启动,此时启动 ‘sparkDriverActorSystem’
 /**
  * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
  * ActorSystem itself and its port (which is hard to get from Akka).
  *
  * Note: the `name` parameter is important, as even if a client sends a message to right
  * host + port, if the system name is incorrect, Akka will drop the message.
  *
  * If indestructible is set to true, the Actor System will continue running in the event
  * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
  */
 def createActorSystem(
     name: String,
     host: String,
     port: Int,
     conf: SparkConf,
     securityManager: SecurityManager): (ActorSystem, Int) = {
   val startService: Int => (ActorSystem, Int) = { actualPort =>
     doCreateActorSystem(name, host, actualPort, conf, securityManager)
   }
   Utils.startServiceOnPort(port, startService, conf, name)
 }
  • 指定spark序列化器: org.apache.spark.serializer.JavaSerializer

val serializer = instantiateClassFromConfSerializer
logDebug(s"Using serializer: ${serializer.getClass}")

val closureSerializer = instantiateClassFromConf[Serializer](
  "spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")

- 实例化 MapOutputTrackerMaster 
    ```scala
    val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
    val mapOutputTracker = if (isDriver) {
      new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
    } else {
      new MapOutputTrackerWorker(conf)
    }
  • 注册 MapOutputTracker 到 NettyRpcEndpointRef(通信用 和map输出信息的追踪)
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
   val addr = RpcEndpointAddress(nettyEnv.address, name)
   val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
   synchronized {
     if (stopped) {
       throw new IllegalStateException("RpcEnv has been stopped")
     }
     if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
       throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
     }
     val data = endpoints.get(name)
     endpointRefs.put(data.endpoint, data.ref)
     receivers.offer(data)  // for the OnStart message
   }
   endpointRef
 }
  • 实例化ShuflleManager
// Let the user specify short names for shuffle managers
   val shortShuffleMgrNames = Map(
     "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
     "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
     "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
   val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
   val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
   val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
  • 实例化内存管理器

val useLegacyMemoryManager = conf.getBoolean(“spark.memory.useLegacyMode”, true)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}

- 注册 BlockManagerMaster  到 NettyRpcEndpointRef(通信用)
  ```scala
    val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
      BlockManagerMaster.DRIVER_ENDPOINT_NAME,
      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
      conf, isDriver)
  • 缓存管理器 实例化BlockManager
   // NB: blockManager is not valid until initialize() is called later.
   val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
     serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
     blockTransferService, securityManager, numUsableCores)

   val cacheManager = new CacheManager(blockManager)
  • 创建测量系统
val metricsSystem = if (isDriver) {
     // Don't start metrics system right now for Driver.
     // We need to wait for the task scheduler to give us an app ID.
     // Then we can start the metrics system.
     MetricsSystem.createMetricsSystem("driver", conf, securityManager)
   } else {
     // We need to set the executor ID before the MetricsSystem is created because sources and
     // sinks specified in the metrics configuration file will want to incorporate this executor's
     // ID into the metrics they report.
     conf.set("spark.executor.id", executorId)
     val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
     ms.start()
     ms
   }
  • 创建临时目录,如果是分布式模式,这是一个executor的当前工作目录
 // Set the sparkFiles directory, used when downloading dependencies.  In local mode,
   // this is a temporary directory; in distributed mode, this is the executor's current working
   // directory.
   val sparkFilesDir: String = if (isDriver) {
     Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
   } else {
     "."
   }
  • 注册 OutputCommitCoordinator 到 NettyRpcEndpointRef(通信用)
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
     new OutputCommitCoordinator(conf, isDriver)
   }
   val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
     new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
   outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
  • new SparkEnv 并返回
 val envInstance = new SparkEnv(
      executorId,
      rpcEnv,
      actorSystem,
      serializer,
      closureSerializer,
      cacheManager,
      mapOutputTracker,
      shuffleManager,
      broadcastManager,
      blockTransferService,
      blockManager,
      securityManager,
      sparkFilesDir,
      metricsSystem,
      memoryManager,
      outputCommitCoordinator,
      conf)

    // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
    // called, and we only need to do it for driver. Because driver may run as a service, and if we
    // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
    if (isDriver) {
      envInstance.driverTmpDirToDelete = Some(sparkFilesDir)
    }

    envInstance

创建SparkUI

   _ui =
     if (conf.getBoolean("spark.ui.enabled", true)) {
       Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
         _env.securityManager, appName, startTime = startTime))
     } else {
       // For tests, do not enable the UI
       None
     }
   // Bind the UI before starting the task scheduler to communicate
   // the bound port to the cluster manager properly
   _ui.foreach(_.bind())

注册心跳接收器

   // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
   // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
   _heartbeatReceiver = env.rpcEnv.setupEndpoint(
     HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

创建和启动调度器(TaskScheduler,DAGScheduler)

   // Create and start the scheduler
   val (sched, ts) = SparkContext.createTaskScheduler(this, master)
   _schedulerBackend = sched
   _taskScheduler = ts
   _dagScheduler = new DAGScheduler(this)
   _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

   // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
   // constructor
   _taskScheduler.start()
  • org.apache.spark.scheduler.TaskSchedulerImpl 文档说明
/**
).SchedulerBackend 对多种类型的集群调度任务
).LocalBackend 设置  isLocal为true, 也能调度本地任务
).SchedulerBackend.处理常用逻辑,决定跨作业的调度顺序,唤醒和启动推测的任务
).客户端应该先调用  initialize() 和  start(),然后通过 runTasks方法提交任务集

 * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
 * It can also work with a local setup by using a LocalBackend and setting isLocal to true.
 * It handles common logic, like determining a scheduling order across jobs, waking up to launch
 * speculative tasks, etc.
 *
 * Clients should first call initialize() and start(), then submit task sets through the
 * runTasks method.
 *
 * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
 * threads, so it needs locks in public API methods to maintain its state. In addition, some
 * SchedulerBackends synchronize on themselves when they want to send events here, and then
 * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
 * we are holding a lock on ourselves.
 */
  • Standalone模式创建TaskSchedulerImpl并初使化中指定 backend为SparkDeploySchedulerBackend
     case SPARK_REGEX(sparkUrl) =>
       val scheduler = new TaskSchedulerImpl(sc)
       val masterUrls = sparkUrl.split(",").map("spark://" + _)
       val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
       scheduler.initialize(backend)
       (backend, scheduler)
 def initialize(backend: SchedulerBackend) {
   this.backend = backend
   // temporarily set rootPool name to empty
   rootPool = new Pool("", schedulingMode, 0, 0)
   schedulableBuilder = {
     schedulingMode match {
       case SchedulingMode.FIFO =>
         new FIFOSchedulableBuilder(rootPool)
       case SchedulingMode.FAIR =>
         new FairSchedulableBuilder(rootPool, conf)
     }
   }
   schedulableBuilder.buildPools()
 }

任务调度器启动

  • 任务调度器启动_taskScheduler.start()
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
   // constructor
   _taskScheduler.start()
  • 调用SparkDeploySchedulerBackend start方法
  override def start() {
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }
  • 再调用CoarseGrainedSchedulerBackend 的start方法 registerRpcEndpoint
    注册(通信用) [CoarseGrainedScheduler]
  • 实例化ApplicationDescription 包含 command (org.apache.spark.executor.CoarseGrainedExecutorBackend)
  • 启动 AppClient registerRpcEndpoint 注册(通信用)[AppClient]
override def start() {
    super.start()
    launcherBackend.connect()

    // The endpoint for executors to talk to us
    val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
      RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

    // When testing, expose the parent class path to the child. This is processed by
    // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
    // when the assembly is built with the "*-provided" profiles enabled.
    val testingClassPath =
      if (sys.props.contains("spark.testing")) {
        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
        Nil
      }

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
    client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
  }
  • CoarseGrainedSchedulerBackend 文档
/**
). 一个后端调度器 等待粗粒度 executors 通过 Akka连接他
).在Spark作业期间,这个后端调度接管着每个executor 比起交出executors给别人调度
).无论何时完成任务,都要求调度器启动一个新的executor给每个每个新的任务
).executors 可以以多种方式启动,例如 粗粒度 Mesos模式 Mesos任务
).或Spark standalone 部署模式的 standalone 处理

* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
* This backend holds onto each executor for the duration of the Spark job rather than relinquishing
* executors whenever a task is done and asking the scheduler to launch a new executor for
* each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the
* coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode
* (spark.deploy.*).
*/
private[spark]
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
 extends ExecutorAllocationClient with SchedulerBackend with Logging
{
  • 调用ClientEndpoint 的 onStart方法 异步向所有master注册,向master发送消息: RegisterApplication
override def onStart(): Unit = {
     try {
       registerWithMaster(1)
     } catch {
       case e: Exception =>
         logWarning("Failed to connect to master", e)
         markDisconnected()
         stop()
     }
   }
 /**
    * Register with all masters asynchronously. It will call `registerWithMaster` every
    * REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.
    * Once we connect to a master successfully, all scheduling work and Futures will be cancelled.
    *
    * nthRetry means this is the nth attempt to register with master.
    */
   private def registerWithMaster(nthRetry: Int) {
     registerMasterFutures.set(tryRegisterAllMasters())
     registrationRetryTimer.set(registrationRetryThread.scheduleAtFixedRate(new Runnable {
       override def run(): Unit = {
         if (registered.get) {
           registerMasterFutures.get.foreach(_.cancel(true))
           registerMasterThreadPool.shutdownNow()
         } else if (nthRetry >= REGISTRATION_RETRIES) {
           markDead("All masters are unresponsive! Giving up.")
         } else {
           registerMasterFutures.get.foreach(_.cancel(true))
           registerWithMaster(nthRetry + 1)
         }
       }
     }, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
   }
   /**
    *  Register with all masters asynchronously and returns an array `Future`s for cancellation.
    */
   private def tryRegisterAllMasters(): Array[JFuture[_]] = {
     for (masterAddress <- masterRpcAddresses) yield {
       registerMasterThreadPool.submit(new Runnable {
         override def run(): Unit = try {
           if (registered.get) {
             return
           }
           logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
           val masterRef =
             rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
           masterRef.send(RegisterApplication(appDescription, self))
         } catch {
           case ie: InterruptedException => // Cancelled
           case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
         }
       })
     }
   }

org.apache.spark.rpc.netty.Dispatcher

类文档说明

/**
一个消息分配器,负责将RPC消息路由到适当的端点
* A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
*/
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {

注册RPC端点 (关键通信及OnStart方法的调用)

/**
 * new EndpointData(name, endpoint, endpointRef) 的时候会进行 
 *  val inbox = new Inbox(ref, endpoint)的操作
 * Inbox 实例化时会进行如下操作,当相于首先增加OnStart消息
 // OnStart should be the first message to process
  inbox.synchronized {
    messages.add(OnStart)
  }
 */
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
      if (stopped) {
        throw new IllegalStateException("RpcEnv has been stopped")
      }
      if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
      }
      val data = endpoints.get(name)
      endpointRefs.put(data.endpoint, data.ref)
      receivers.offer(data)  // for the OnStart message
    }
    endpointRef
  }

Inbox

A inbox that stores messages for an [[RpcEndpoint]] and posts messages to it thread-safely.

/**
* Inbox 实例化时会进行如下操作,当相于首先增加OnStart消息给当前的对象
(也就是每个RpcEndpoint 子类进行注册时,首先增加OnStart消息)
* OnStart消息在 process方法中会进行 endpoint实现类的onStart() 方法回调
*/
 // OnStart should be the first message to process
 inbox.synchronized {
   messages.add(OnStart)
 }

  /**
   * Process stored messages.
   */
  def process(dispatcher: Dispatcher): Unit = {
	  ......
case OnStart =>
            endpoint.onStart()
            if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
              inbox.synchronized {
                if (!stopped) {
                  enableConcurrent = true
                }
              }
            }

入口代码块 400行

try {
    _conf = config.clone()
    _conf.validateSettings()

    if (!_conf.contains("spark.master")) {
      throw new SparkException("A master URL must be set in your configuration")
    }
    if (!_conf.contains("spark.app.name")) {
      throw new SparkException("An application name must be set in your configuration")
    }

    // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
    // yarn-standalone is deprecated, but still supported
    if ((master == "yarn-cluster" || master == "yarn-standalone") &&
        !_conf.contains("spark.yarn.app.id")) {
      throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " +
        "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
    }

    if (_conf.getBoolean("spark.logConf", false)) {
      logInfo("Spark configuration:\n" + _conf.toDebugString)
    }

    // Set Spark driver host and port system properties
    _conf.setIfMissing("spark.driver.host", Utils.localHostName())
    _conf.setIfMissing("spark.driver.port", "0")

    _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)

    _jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
    _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0))
      .toSeq.flatten

    _eventLogDir =
      if (isEventLogEnabled) {
        val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
          .stripSuffix("/")
        Some(Utils.resolveURI(unresolvedDir))
      } else {
        None
      }

    _eventLogCodec = {
      val compress = _conf.getBoolean("spark.eventLog.compress", false)
      if (compress && isEventLogEnabled) {
        Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
      } else {
        None
      }
    }

    _conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)

    if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")

    // "_jobProgressListener" should be set up before creating SparkEnv because when creating
    // "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
    _jobProgressListener = new JobProgressListener(_conf)
    listenerBus.addListener(jobProgressListener)

    // Create the Spark execution environment (cache, map output tracker, etc)
    _env = createSparkEnv(_conf, isLocal, listenerBus)
    SparkEnv.set(_env)

    // If running the REPL, register the repl's output dir with the file server.
    _conf.getOption("spark.repl.class.outputDir").foreach { path =>
      val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
      _conf.set("spark.repl.class.uri", replUri)
    }

    _metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)

    _statusTracker = new SparkStatusTracker(this)

    _progressBar =
      if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
        Some(new ConsoleProgressBar(this))
      } else {
        None
      }

    _ui =
      if (conf.getBoolean("spark.ui.enabled", true)) {
        Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
          _env.securityManager, appName, startTime = startTime))
      } else {
        // For tests, do not enable the UI
        None
      }
    // Bind the UI before starting the task scheduler to communicate
    // the bound port to the cluster manager properly
    _ui.foreach(_.bind())

    _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)

    // Add each JAR given through the constructor
    if (jars != null) {
      jars.foreach(addJar)
    }

    if (files != null) {
      files.foreach(addFile)
    }

    _executorMemory = _conf.getOption("spark.executor.memory")
      .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
      .orElse(Option(System.getenv("SPARK_MEM"))
      .map(warnSparkMem))
      .map(Utils.memoryStringToMb)
      .getOrElse(1024)

    // Convert java options to env vars as a work around
    // since we can't set env vars directly in sbt.
    for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
      value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
      executorEnvs(envKey) = value
    }
    Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
      executorEnvs("SPARK_PREPEND_CLASSES") = v
    }
    // The Mesos scheduler backend relies on this environment variable to set executor memory.
    // TODO: Set this only in the Mesos scheduler.
    executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
    executorEnvs ++= _conf.getExecutorEnv
    executorEnvs("SPARK_USER") = sparkUser

    // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
    // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
    _heartbeatReceiver = env.rpcEnv.setupEndpoint(
      HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

    // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()

    _applicationId = _taskScheduler.applicationId()
    _applicationAttemptId = taskScheduler.applicationAttemptId()
    _conf.set("spark.app.id", _applicationId)
    _ui.foreach(_.setAppId(_applicationId))
    _env.blockManager.initialize(_applicationId)

    // The metrics system for Driver need to be set spark.app.id to app ID.
    // So it should start after we get app ID from the task scheduler and set spark.app.id.
    metricsSystem.start()
    // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
    metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

    _eventLogger =
      if (isEventLogEnabled) {
        val logger =
          new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
            _conf, _hadoopConfiguration)
        logger.start()
        listenerBus.addListener(logger)
        Some(logger)
      } else {
        None
      }

    // Optionally scale number of executors dynamically based on workload. Exposed for testing.
    val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
    if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
      logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
    }

    _executorAllocationManager =
      if (dynamicAllocationEnabled) {
        Some(new ExecutorAllocationManager(this, listenerBus, _conf))
      } else {
        None
      }
    _executorAllocationManager.foreach(_.start())

    _cleaner =
      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
    _cleaner.foreach(_.start())

    setupAndStartListenerBus()
    postEnvironmentUpdate()
    postApplicationStart()

    // Post init
    _taskScheduler.postStartHook()
    _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
    _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
    _executorAllocationManager.foreach { e =>
      _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
    }

    // Make sure the context is stopped if the user forgets about it. This avoids leaving
    // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
    // is killed, though.
    _shutdownHookRef = ShutdownHookManager.addShutdownHook(
      ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
      logInfo("Invoking stop() from shutdown hook")
      stop()
    }
  } catch {
    case NonFatal(e) =>
      logError("Error initializing SparkContext.", e)
      try {
        stop()
      } catch {
        case NonFatal(inner) =>
          logError("Error stopping SparkContext after init error.", inner)
      } finally {
        throw e
      }
  }

相关文章
|
存储 JSON 缓存
【源码解读】|SparkContext源码解读(下)
【源码解读】|SparkContext源码解读
|
存储 分布式计算 Spark
【源码解读】|SparkContext源码解读(上)
【源码解读】|SparkContext源码解读
154 0
【源码解读】|SparkContext源码解读(上)
|
分布式计算 算法 大数据
Spark - RangePartitioner rangeBounds 生成 源码分析 & 实践
本文主要探索RangePartitioner 源码中rangeBounds 的生成,rangeBounds 用于对 key 进行范围分区,通过源码可以学习到如何在分布式大数据下采样并获取近似均分的范围。
123 0
Spark - RangePartitioner rangeBounds 生成 源码分析 & 实践
|
存储 分布式计算 Scala
Spark - 一文搞懂 Partitioner
spark 处理 RDD 时提供了 foreachPartition 和 mapPartition 的方法对 partition 进行处理,一个 partition 内可能包含一个文件或者多个文件的内容,Partitioner 可以基于 pairRDD 的 key 实现自定义partition 的内容。
209 0
Spark - 一文搞懂 Partitioner
|
Java 调度
LockSupport 以及 park、unpark 方法源码分析(上)
LockSupport 是 jsr 166 中新增的 juc 工具类。 LockSupport 类主要用于创建锁和其他同步类来实现线程阻塞。 这个类与他使用的每个线程进行关联, 如果可用就立即 park , 我们可以通过 unpack 方法进行唤醒。
341 0
|
Java
LockSupport 以及 park、unpark 方法源码分析(下)
LockSupport 是 jsr 166 中新增的 juc 工具类。 LockSupport 类主要用于创建锁和其他同步类来实现线程阻塞。 这个类与他使用的每个线程进行关联, 如果可用就立即 park , 我们可以通过 unpack 方法进行唤醒。
303 0
|
分布式计算 Spark Java
Spark2.4.0 SparkSession 源码分析
创建SparkContext new SparkSession
3232 0
|
分布式计算 Spark 索引
Spark2.4.0源码分析之WorldCount ShuffleMapTask处理(八)
- 理解Executor中是如何调用Task的过程 - 理解ShuffleMapTask是处理过程
1577 0
|
分布式计算 资源调度 调度
Spark2.4.0 SparkContext 源码分析
createSparkEnv Started SparkUI 注册端点HeartbeatReceiver createTaskScheduler 启动任务调度器,指定默认任务调度模式FIFO,构建调度池 new DAGScheduler 注册DriverEndpoint端点:Coarse...
2899 0
|
分布式计算 Spark
Spark2.4.0 SparkEnv 源码分析
SparkEnv对象构建 SparkEnv类中做如下操作 ).new SecurityManager() ).new NettyRpcEnvFactory() ).创建NettyRpcEnv ).
1717 0