《深入理解Spark:核心思想与源码分析》——3.12节Spark环境更新

简介:

本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.12节Spark环境更新,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.12 Spark环境更新
在SparkContext的初始化过程中,可能对其环境造成影响,所以需要更新环境,代码如下。

postEnvironmentUpdate()
postApplicationStart()

SparkContext初始化过程中,如果设置了spark.jars属性, spark.jars指定的jar包将由addJar方法加入httpFileServer的jarDir变量指定的路径下。spark.files指定的文件将由addFile方法加入httpFileServer的fileDir变量指定的路径下。见代码清单3-49。
代码清单3-49 依赖文件处理

val jars: Seq[String] =
    conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten

val files: Seq[String] =
    conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten

// Add each JAR given through the constructor
    if (jars != null) {
        jars.foreach(addJar)
    }

    if (files != null) {
        files.foreach(addFile)
}
httpFileServer的addFile和addJar方法,见代码清单3-50。
代码清单3-50 HttpFileServer提供对依赖文件的访问
def addFile(file: File) : String = {
    addFileToDir(file, fileDir)
    serverUri + "/files/" + file.getName
}

def addJar(file: File) : String = {
    addFileToDir(file, jarDir)
    serverUri + "/jars/" + file.getName
}

def addFileToDir(file: File, dir: File) : String = {
    if (file.isDirectory) {
        throw new IllegalArgumentException(s"$file cannot be a directory.")
    }
    Files.copy(file, new File(dir, file.getName))
    dir + "/" + file.getName
}

postEnvironmentUpdate的实现见代码清单3-51,其处理步骤如下:
1)通过调用SparkEnv的方法environmentDetails最终影响环境的JVM参数、Spark 属性、系统属性、classPath等,参见代码清单3-52。
2)生成事件SparkListenerEnvironmentUpdate,并post到listenerBus,此事件被Environ-mentListener监听,最终影响EnvironmentPage页面中的输出内容。
代码清单3-51 postEnvironmentUpdate的实现

private def postEnvironmentUpdate() {
    if (taskScheduler != null) {
        val schedulingMode = getSchedulingMode.toString
        val addedJarPaths = addedJars.keys.toSeq
        val addedFilePaths = addedFiles.keys.toSeq
        val environmentDetails =
            SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
        val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
        listenerBus.post(environmentUpdate)
    }
}
代码清单3-52 environmentDetails的实现
val jvmInformation = Seq(
    ("Java Version", s"$javaVersion ($javaVendor)"),
    ("Java Home", javaHome),
    ("Scala Version", versionString)
).sorted

val schedulerMode =
    if (!conf.contains("spark.scheduler.mode")) {
        Seq(("spark.scheduler.mode", schedulingMode))
    } else {
        Seq[(String, String)]()
    }
val sparkProperties = (conf.getAll ++ schedulerMode).sorted

// System properties that are not java classpaths
val systemProperties = Utils.getSystemProperties.toSeq
val otherProperties = systemProperties.filter { case (k, _) =>
    k != "java.class.path" && !k.startsWith("spark.")
}.sorted

// Class paths including all added jars and files
val classPathEntries = javaClassPath
    .split(File.pathSeparator)
    .filterNot(_.isEmpty)
    .map((_, "System Classpath"))
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted

Map[String, Seq[(String, String)]](
    "JVM Information" -> jvmInformation,
    "Spark Properties" -> sparkProperties,
    "System Properties" -> otherProperties,
    "Classpath Entries" -> classPaths)
}
postApplicationStart方法很简单,只是向listenerBus发送了SparkListenerApplicationStart事件,代码如下。
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), startTime, sparkUser))
相关文章
|
4月前
|
分布式计算 资源调度 监控
【Spark】 Spark的基础环境 Day03
【Spark】 Spark的基础环境 Day03
36 0
【Spark】 Spark的基础环境 Day03
|
4月前
|
分布式计算 Hadoop Java
Note_Spark_Day01:Spark 基础环境
Note_Spark_Day01:Spark 基础环境
53 0
|
5月前
|
分布式计算 Java Apache
window环境下安装spark
window环境下安装spark
125 0
|
分布式计算 大数据 调度
Spark 原理_总体介绍_集群环境 | 学习笔记
快速学习 Spark 原理_总体介绍_集群环境
52 0
Spark 原理_总体介绍_集群环境 | 学习笔记
|
分布式计算 Scala Spark
Spark worker 定时更新心跳 | 学习笔记
快速学习 Spark worker 定时更新心跳
152 0
Spark worker 定时更新心跳 | 学习笔记
|
分布式计算 资源调度 Hadoop
基于mac构建大数据伪分布式学习环境(七)-部署Scala及Spark学习环境
本文主要讲解如何部署Scala与单机伪分布式Spark计算引擎
92 0
|
分布式计算 Hadoop 大数据
分布式集群环境之Spark的安装与配置(Centos7)
分布式集群环境之Spark的安装与配置(Centos7)
270 0
分布式集群环境之Spark的安装与配置(Centos7)
|
Apache 分布式计算 Spark
Apache Spark Delta Lake 事务日志实现源码分析
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
1961 0
|
分布式计算 Java Shell
Spark源码分析之Spark Shell(上)
终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。
901 0
|
分布式计算 Spark Hadoop
Spark MapOutputTracker源码分析
## 技能标签 - Spark ShuffleMapTask处理完成后,把MapStatus数据(BlockManagerId,[compressSize])发送给MapOutputTrackerMaster.
1663 0