Apache Spark源码走读(七)Standalone部署方式分析&sql的解析与执行

  1. 云栖社区>
  2. 博客>
  3. 正文

Apache Spark源码走读(七)Standalone部署方式分析&sql的解析与执行

许鹏 2016-09-14 20:19:29 浏览2338
展开阅读全文

<一>Standalone部署方式分析

楔子

在Spark源码走读系列之2中曾经提到Spark能以Standalone的方式来运行cluster,但没有对Application的提交与具体运行流程做详细的分析,本文就这些问题做一个比较详细的分析,并且对在standalone模式下如何实现HA进行讲解。

没有HA的Standalone运行模式

先从比较简单的说起,所谓的没有ha是指master节点没有ha。

组成cluster的两大元素即Master和Worker。slave worker可以有1到多个,这些worker都处于active状态。

Driver Application可以运行在Cluster之内,也可以在cluster之外运行,先从简单的讲起即Driver Application独立于Cluster。那么这样的整体框架如下图所示,由driver,master和多个slave worker来共同组成整个的运行环境。

执行顺序

步骤1 运行master

$SPARK_HOME/sbin/start_master.sh

start_master.sh中最关键的一句就是

"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT

检测Master的jvm进程

root     23438     1 67 22:57 pts/0    00:00:05 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip localhost --port 7077 --webui-port 8080

Master的日志在$SPARK_HOME/logs目录下

步骤2 运行worker,可以启动多个

./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077

worker运行时,需要注册到指定的master url,这里就是spark://localhost:7077.

Master侧收到RegisterWorker通知,其处理代码如下

case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
    {
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else if (idToWorker.contains(id)) {
        sender ! RegisterWorkerFailed("Duplicate worker ID")
      } else {
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          sender, workerUiPort, publicAddress)
        if (registerWorker(worker)) {
          persistenceEngine.addWorker(worker)
          sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
          schedule()
        } else {
          val workerAddress = worker.actor.path.address
          logWarning("Worker registration failed. Attempted to re-register worker at same " +
            "address: " + workerAddress)
          sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "
            + workerAddress)
        }
      }
    }

步骤3 运行Spark-shell

MASTER=spark://localhost:7077 $SPARK_HOME/bin/spark-shell

spark-shell属于application,有关appliation的运行日志存储在$SPARK_HOME/works目录下

spark-shell作为application,在Master侧其处理的分支是RegisterApplication,具体处理代码如下。

case RegisterApplication(description) => {
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, sender)
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        sender ! RegisteredApplication(app.id, masterUrl)
        schedule()
      }
    }

每当有新的application注册到master,master都要调度schedule函数将application发送到相应的worker,在对应的worker启动相应的ExecutorBackend. 具体代码请参考Master.scala中的schedule函数,代码就不再列出。

步骤4 结果检测

/opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip localhost --port 7077 --webui-port 8080
root     23752 23745 21 23:00 pts/0    00:00:25 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.repl.Main
root     23986 23938 25 23:02 pts/2    00:00:03 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://localhost:7077
root     24047 23986 34 23:02 pts/2    00:00:04 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:40053/user/CoarseGrainedScheduler 0 localhost 4 akka.tcp://sparkWorker@localhost:53568/user/Worker app-20140511230059-0000

从运行的进程之间的关系可以看出,worker和master之间的连接建立完毕之后,如果有新的driver application连接上master,master会要求worker启动相应的ExecutorBackend进程。此后若有什么Task需要运行,则会运行在这些Executor之上。可以从以下的日志信息得出此结论,当然看源码亦可。

14/05/11 23:02:36 INFO Worker: Asked to launch executor app-20140511230059-0000/0 for Spark shell
14/05/11 23:02:36 INFO ExecutorRunner: Launch command: "/opt/java/bin/java" "-cp" ":/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar" "-Xms512M" "-Xmx512M" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "akka.tcp://spark@localhost:40053/user/CoarseGrainedScheduler" "0" "localhost" "4" "akka.tcp://sparkWorker@localhost:53568/user/Worker" "app-20140511230059-0000"

worker中启动exectuor的相关源码见worker中的receive函数,相关代码如下

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
          val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
            self, workerId, host,
            appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
            workDir, akkaUrl, ExecutorState.RUNNING)
          executors(appId + "/" + execId) = manager
          manager.start()
          coresUsed += cores_
          memoryUsed += memory_
          masterLock.synchronized {
            master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
          }
        } catch {
          case e: Exception => {
            logError("Failed to launch exector %s/%d for %s".format(appId, execId, appDesc.name))
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            masterLock.synchronized {
              master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
            }
          }
        }
      }

关于standalone的部署,需要详细研究的源码文件如下所列。

  • deploy/master/Master.scala
  • deploy/worker/worker.scala
  • executor/CoarseGrainedExecutorBackend.scala

查看进程之间的父子关系,请用"pstree"

使用下图来小结单Master的部署情况。

类的动态加载和反射

在谈部署Driver到Cluster上之前,我们先回顾一下java的一大特性“类的动态加载和反射机制”。本人不是一直写java代码出身,所以好多东西都是边用边学,难免挂一漏万。

所谓的反射,其实就是要解决在运行期实现类的动态加载。

来个简单的例子

package test;

public class Demo {

    public Demo() {
        System.out.println("Hi!");
    }

    @SuppressWarnings("unchecked")
    public static void main(String[] args) throws Exception {
        Class clazz = Class.forName("test.Demo");
        Demo demo = (Demo) clazz.newInstance();
    }
}

谈到这里,就自然想到了一个面试题,“谈一谈Class.forName和ClassLoader.loadClass的区别"。说到面试,我总是很没有信心,面试官都很屌的, :)。

在cluster中运行Driver Application

上一节之所以写到类的动态加载与反射都是为了谈这一节的内容奠定基础。

将Driver application部署到Cluster中,启动的时序大体如下图所示。

  •  首先启动Master,然后启动Worker
  • 使用”deploy.Client"将Driver Application提交到Cluster中
./bin/spark-class org.apache.spark.deploy.Client launch
   [client-options] \
      \
   [application-options]
  • Master在收到RegisterDriver的请求之后,会发送LaunchDriver给worker,要求worker启动一个Driver的jvm process
  • Driver Application在新生成的JVM进程中运行开始时会注册到master中,发送RegisterApplication给Master
  • Master发送LaunchExecutor给Worker,要求Worker启动执行ExecutorBackend的JVM Process
  • 一当ExecutorBackend启动完毕,Driver Application就可以将任务提交到ExecutorBackend上面执行,即LaunchTask指令

提交侧的代码,详见deploy/Client.scala

    driverArgs.cmd match {
      case "launch" =>
        // TODO: We could add an env variable here and intercept it in `sc.addJar` that would
        //       truncate filesystem paths similar to what YARN does. For now, we just require
        //       people call `addJar` assuming the jar is in the same directory.
        val env = Map[String, String]()
        System.getenv().foreach{case (k, v) => env(k) = v}

        val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

        val classPathConf = "spark.driver.extraClassPath"
        val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
          cp.split(java.io.File.pathSeparator)
        }

        val libraryPathConf = "spark.driver.extraLibraryPath"
        val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
          cp.split(java.io.File.pathSeparator)
        }

        val javaOptionsConf = "spark.driver.extraJavaOptions"
        val javaOpts = sys.props.get(javaOptionsConf)
        val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
          driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)

        val driverDescription = new DriverDescription(
          driverArgs.jarUrl,
          driverArgs.memory,
          driverArgs.cores,
          driverArgs.supervise,
          command)

        masterActor ! RequestSubmitDriver(driverDescription)

接收侧

从Deploy.client发送出来的消息被谁接收呢?答案比较明显,那就是Master。 Master.scala中的receive函数有专门针对RequestSubmitDriver的处理,具体代码如下

case RequestSubmitDriver(description) => {
      if (state != RecoveryState.ALIVE) {
        val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
        sender ! SubmitDriverResponse(false, None, msg)
      } else {
        logInfo("Driver submitted " + description.command.mainClass)
        val driver = createDriver(description)
        persistenceEngine.addDriver(driver)
        waitingDrivers += driver
        drivers.add(driver)
        schedule()

        // TODO: It might be good to instead have the submission client poll the master to determine
        //       the current status of the driver. For now it's simply "fire and forget".

        sender ! SubmitDriverResponse(true, Some(driver.id),
          s"Driver successfully submitted as ${driver.id}")
      }
    }

SparkEnv

SparkEnv对于整个Spark的任务来说非常关键,不同的role在创建SparkEnv时传入的参数是不相同的,如Driver和Executor则存在重要区别。

在Executor.scala中,创建SparkEnv的代码如下所示:

  private val env = {
    if (!isLocal) {
      val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
        isDriver = false, isLocal = false)
      SparkEnv.set(_env)
      _env.metricsSystem.registerSource(executorSource)
      _env
    } else {
      SparkEnv.get
    }
  }

Driver Application则会创建SparkContext,在SparkContext创建过程中,比较重要的一步就是生成SparkEnv,其代码如下:

 private[spark] val env = SparkEnv.create(
    conf,
    "",
    conf.get("spark.driver.host"),
    conf.get("spark.driver.port").toInt,
    isDriver = true,
    isLocal = isLocal,
    listenerBus = listenerBus)
  SparkEnv.set(env)

Standalone模式下HA的实现

Spark在standalone模式下利用zookeeper来实现了HA机制,这里所说的HA是专门针对Master节点的,因为上面所有的分析可以看出Master是整个cluster中唯一可能出现单点失效的节点。

采用zookeeper之后,整个cluster的组成如下图所示。

为了使用zookeeper,Master在启动的时候需要指定如下的参数,修改conf/spark-env.sh, SPARK_DAEMON_JAVA_OPTS中添加如下选项。

System property Meaning
spark.deploy.recoveryMode Set to ZOOKEEPER to enable standby Master recovery mode (default: NONE).
spark.deploy.zookeeper.url The ZooKeeper cluster url (e.g., 192.168.1.100:2181,192.168.1.101:2181).
spark.deploy.zookeeper.dir The directory in ZooKeeper to store recovery state (default: /spark).

实现HA的原理

 zookeeper提供了一个Leader Election机制,利用这个机制,可以实现HA功能,具体请参考zookeeper recipes

在Spark中没有直接使用zookeeper的api,而是使用了curator,curator对zookeeper做了相应的封装,在使用上更为友好。

小结

步步演进讲到在standalone模式下,如何利用zookeeper来实现ha。从中可以看出standalone master一个最主要的任务就是resource management和job scheduling,看到这两个主要功能的时候,您也许会想到这不就是YARN要解决的问题。对了,从本质上来说standalone是yarn的一个简化版本。

本系列下篇内容就要仔细讲讲spark部署到YARN上的实现细节。

 参考资料

  1. Spark Standalone Mode http://spark.apache.org/docs/latest/spark-standalone.html
  2. Cluster Mode Overview  http://spark.apache.org/docs/latest/cluster-overview.html

<二>sql的解析与执行

概要

在即将发布的spark 1.0中有一个新增的功能,即对sql的支持,也就是说可以用sql来对数据进行查询,这对于DBA来说无疑是一大福音,因为以前的知识继续生效,而无须去学什么scala或其它script.

一般来说任意一个sql子系统都需要有parser,optimizer,execution三大功能模块,在spark中这些又都是如何实现的呢,这些实现又有哪些亮点和问题?带着这些疑问,本文准备做一些比较深入的分析。

SQL模块分析有几大难点,分别为:

  1. sql分析和执行的通用过程,这个与是否用spark无关,应该是非常general的问题
  2. spark sql中具体实现时的整体架构
  3. 源码阅读时碰到的scala特殊语法,也就是常说的语法糖问题

为什么需要SQL

SQL是一种标准,一种用来进行数据分析的标准,已经存在多年。

在大数据的背景下,随着数据规模的日渐增大,原有的分析技巧是否就过时了呢?答案显然是否定的,原来的分析技巧在既有的分析维度上依然保持有效,当然对于新的数据我们想挖掘出更多有意思有价值的内容,这个目标可以交给数据挖掘或者机器学习去完成。

那么原有的数据分析人员如何快速的转换到Big Data的平台上来呢,去重新学一种脚本吗,直接用scala或python去编写RDD。显然这样的代价太高,学习成本大。数据分析人员希望底层存储机制和分析引擎的变换不要对上层分析的应用有直接的影响,需求用一句话来表达就是,“直接使用sql语句来对数据进行分析”。

这也是为什么Hive兴起的原因了。Hive的流行直接证明这种设计迎合了市场的需求。由于Hive是采用了Hadoop的MapReduce作为分析执行引擎,其处理速度上不是尽如人意。Spark以快著称,很快有好事者写出了Shark,Shark取得了非常不俗的成绩,迎得了极好的口碑。

毕竟Shark是游离于Spark之外的一个项目,不受Spark节制,那么Spark开发团队的目标是将对SQL支持作用Spark的核心功能里面。以上分析就是Spark中的sql功能的由来。

应用举例

val sqlContext = new org.apache.spark.sql.SQLContext(sc);
import sqlContext._
case class Person(name: String, age: Int)
val person = sc.textFile("examples/src/main/resources/people.txt").map(_.split(" ")).map(p => Person(p(0), p(1).trim.toInt))
person.registerAsTable("person")
val teenagers = sql("SELECT name, age FROM person WHERE age >= 13 and age <= 19")
teenagers.map(t => "name:" + t(0)).collect().foreach(println)

上述代码的逻辑非常清晰,就是将存在于person.txt中年龄界于13到19岁的年轻人名字打印出来。

SQL通用执行过程

SQL的组成部分

SQL语句大家都很熟悉,那么有没有仔细想过其有几大部分组成呢?可能你会说,”这还用问,不就是“select * from tablex where f1=?”,有什么好想吗?“

还是先来看看再说吧,说不定有些新的思维在里面呢?

 

上图是对最简单的sql语句的重新标注,SELECT表示是一种具体的操作,即查询数据,”f1,f2,f3"表示返回的结果,tableX是数据源,condition部分是查询条件。有没有发觉SQL表达式中的顺序与常见的RDD处理逻辑其在表达的顺序上有差异。还是继续用图来表示不同吧。

 

SQL语句在分析执行过程中会经历下图所示的几个步骤

  1. 语法解析
  2. 操作绑定
  3. 优化执行策略
  4. 交付执行

语法解析

语法解析之后,会形成一棵语法树,如下图所示。树中的每个节点是执行的rule,整棵树称之为执行策略。

策略优化

形成上述的执行策略树还只是第一步,因为这个执行策略可以进行优化,所谓的优化就是对树中节点进行合并或是进行顺序上的调整。

以大家熟悉的join操作为例,下图给出一个join优化的示例。A JOIN B等同于B JOIN A,但是顺序的调整可能给执行的性能带来极大的影响,下图就是调整前后的对比图。

再举一例,一般来说尽可能的先实施聚合操作(Aggregate)然后再join

小结

上述一大通分析,希望达到的目的就两个。

  1. 语法解析之后生成一个执行策略树
  2. 执行策略树可以优化,优化的过程就是对树中节点进行合并或者顺序调整

有关SQL查询分析优化的具体过程,强烈推荐参考query optimizer deep dive系列文章

SQL在spark中的实现

有了上述内容的铺垫,想必你已经意识到Spark如果要很好的支持sql,势必也要完成,解析,优化,执行的三大过程。

整个SQL部分的代码,其大致分类如下图所示:

  1. SqlParser生成LogicPlan Tree
  2. Analyzer和Optimizer将各种rule作用于LogicalPlan Tree
  3. 最终优化生成的LogicalPlan生成Spark RDD
  4. 最后将生成的RDD交由Spark执行

阶段1:生成LogicalPlan

在sql中引入了一种新的RDD,即SchemaRDD,且看SchemaRDD的构造函数:

class SchemaRDD(
    @transient val sqlContext: SQLContext,
    @transient protected[spark] val logicalPlan: LogicalPlan)

 构造函数中总共两入参一为SparkContext,另一个LogicalPlan。LogicalPlan又是如何生成的呢?

要回答这个问题,不得不回到整个问题的入口点sql函数sql函数的定义如下

  def sql(sqlText: String): SchemaRDD = {
    val result = new SchemaRDD(this, parseSql(sqlText))

    result.queryExecution.toRdd
    result
  }

parseSql(sqlText)负责生成LogicalPlan,parseSql就是SqlParser的一个实例。

SqlParser这一部分的代码要理解起来关键是要搞清楚StandardTokenParsers的调用规则,里面有一大堆的符号,如果不理解是什么意思,估计很难理清头绪。

由于apply函数可以不被显示调用,所以parseSql(sqlText)一句其实会隐式的调用SqlParser中的apply函数。

  def apply(input: String): LogicalPlan = {
    phrase(query)(new lexical.Scanner(input)) match {
      case Success(r, x) => r
      case x => sys.error(x.toString)
    }
  }

最最最让人蛋疼的一行代码就是phrase(query)(new lexical.Scanner(input))这里了,翻译过来就是如果输入的input字符串符合Lexical中定义的规则,则继续使用query处理。

看一下query的定义是什么:

  protected lazy val query: Parser[LogicalPlan] =
    select * (
      UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
      UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
    ) | insert

到了这里终于看到有LogicalPlan了,也就是说将普通的string转换成LogicalPlan在这里发生了

query这段代码同时说明,在目前的spark sql中仅支持selectinsert两种操作,至于delete, update暂不支持。

注:即便是到现在,估计你和当初一样对于SqlParser的使用还是一头雾水,不要紧,请参考ref[3]和[4]中的内容,至于那些稀奇古怪的符号到底是什么意思,请参考ref[5].

阶段2:QueryExecution

第一阶段,将string转换成为logicalplan tree,第二阶段将各种规则作用于LogicalPlan。

在第一阶段中展示的代码,哪一句会触发优化规则呢?是sql函数中的"result.queryExecution.toRdd",此处的queryExecution就是QueryExecution。这里又涉及到scala的一个语法糖问题。QueryExecution是一个抽象类,但却看到了下述的代码

 protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
    new this.QueryExecution { val logical = plan }

怎么可以创建抽象类的实例?我的世界坍塌了,呵呵。不要紧张,这在scala的世界是允许的,只不过scala是隐含的创建了一个QueryExecution的子类并初始化而已,java里的原则还是对的,人家背后有猫腻。

Ok,轮到阶段2中最重要的角色QueryExecution闪亮登场了

protected abstract class QueryExecution {
    def logical: LogicalPlan

    lazy val analyzed = analyzer(logical)
    lazy val optimizedPlan = optimizer(analyzed)
    lazy val sparkPlan = planner(optimizedPlan).next()
    lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

    /** Internal version of the RDD. Avoids copies and has no schema */
    lazy val toRdd: RDD[Row] = executedPlan.execute()

    protected def stringOrError[A](f: => A): String =
      try f.toString catch { case e: Throwable => e.toString }

    def simpleString: String = stringOrError(executedPlan)

    override def toString: String =
      s"""== Logical Plan ==
         |${stringOrError(analyzed)}
         |== Optimized Logical Plan ==
         |${stringOrError(optimizedPlan)}
         |== Physical Plan ==
         |${stringOrError(executedPlan)}
      """.stripMargin.trim

    def debugExec() = DebugQuery(executedPlan).execute().collect()
  }

三大步:

  1. lazy val analyzed = analyzer(logical)
  2. lazy val optimizedPlan = optimizer(analyzed)
  3. lazy val sparkPlan = planner(optimizedPlan).next()

无论analyzer还是optimizer,它们都是RuleExecutor的子类,

RuleExecutor的默认处理函数是apply,对所有的子类都是一样的,RuleExecutorapply函数定义如下,

def apply(plan: TreeType): TreeType = {
    var curPlan = plan

    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true

      // Run until fix point (or the max number of iterations as specified in the strategy.
      while (continue) {
        curPlan = batch.rules.foldLeft(curPlan) {
          case (plan, rule) =>
            val result = rule(plan)
            if (!result.fastEquals(plan)) {
              logger.trace(
                s"""
                  |=== Applying Rule ${rule.ruleName} ===
                  |${sideBySide(plan.treeString, result.treeString).mkString("\n")}
                """.stripMargin)
            }

            result
        }
        iteration += 1
        if (iteration > batch.strategy.maxIterations) {
          logger.info(s"Max iterations ($iteration) reached for batch ${batch.name}")
          continue = false
        }

        if (curPlan.fastEquals(lastPlan)) {
          logger.trace(s"Fixed point reached for batch ${batch.name} after $iteration iterations.")
          continue = false
        }
        lastPlan = curPlan
      }

      if (!batchStartPlan.fastEquals(curPlan)) {
        logger.debug(
          s"""
          |=== Result of Batch ${batch.name} ===
          |${sideBySide(plan.treeString, curPlan.treeString).mkString("\n")}
        """.stripMargin)
      } else {
        logger.trace(s"Batch ${batch.name} has no effect.")
      }
    }

    curPlan
  }

对于RuleExecutor的子类来说,最主要的是定义自己的batches,来看analyzer中的batches是如何定义的

val batches: Seq[Batch] = Seq(
    Batch("MultiInstanceRelations", Once,
      NewRelationInstances),
    Batch("CaseInsensitiveAttributeReferences", Once,
      (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
    Batch("Resolution", fixedPoint,
      ResolveReferences ::
      ResolveRelations ::
      NewRelationInstances ::
      ImplicitGenerate ::
      StarExpansion ::
      ResolveFunctions ::
      GlobalAggregates ::
      typeCoercionRules :_*),
    Batch("AnalysisOperators", fixedPoint,
      EliminateAnalysisOperators)
  )

batch中定义了一系列的规则,这里再次出现语法糖问题。“如何理解::这个操作符”? ::表示cons的意思,即连接生成一个list.

Batch构造函数中需要指定一系列的Rule,像ResolveReferences就是Rule,有关Rule的代码就不一一分析了。

 阶段3:LogicalPlan转换成Physical Plan

在阶段3最主要的代码就两行:

  1. lazy val executePlan: SparkPlan = prepareForExecution(sparkPlan)
  2. lazy val toRdd: RDD[Row] = executedPlan.execute()

与LogicalPlan不同,SparkPlan最重要的区别就是有execute函数

针对Sparkplan的具体实现,又要分成UnaryNode, LeafNode和BinaryNode,简要来说即单目运算符操作,叶子结点,双目运算符操作。每个子类的具体实现可以自行参考源码。

阶段4: 触发RDD执行

RDD被触发真正执行的过程在看了前面几篇文章之后想来难不住你来,所有的所有都在这一行代码。

teenagers.map(p => "name:"+p(0)).foreach(println)

如果真的不明白,建议回头再读一下Spark Job的执行过程分析。

总结

行为至此,可以收笔了。应该说SQL部分的代码涉及到的知识点还是比较多的,最重要的是理清两点,即SQL语句的通用处理过程。另一个是Spark SQL子系统中具体实现机制。

Spark Sql子模块的具体实现紧紧围绕LogicalPlan Tree展开,一是用sqlparser来生成logicalplan,二是用RuleExecutor将各种Rule作用于LogicalPlan。最后生成普通的RDD将会给Spark core处理。

参考资料

  1. Spark Catalyst 源码分析
  2. Query Optimizer Deep Diver
  3. playing with Scala Parser Combinator
  4. Parsing Text With Scala
  5. Parser Api

网友评论

登录后评论
0/500
评论
许鹏
+ 关注