Apache Spark源码走读(四)Hive on Spark运行环境搭建 &hiveql on spark实现详解

  1. 云栖社区>
  2. 博客>
  3. 正文

Apache Spark源码走读(四)Hive on Spark运行环境搭建 &hiveql on spark实现详解

许鹏 2016-09-14 18:53:33 浏览2725
展开阅读全文

<一>Hive on Spark运行环境搭建

楔子

Hive是基于Hadoop的开源数据仓库工具,提供了类似于SQL的HiveQL语言,使得上层的数据分析人员不用知道太多MapReduce的知识就能对存储于Hdfs中的海量数据进行分析。由于这一特性而收到广泛的欢迎。

Hive的整体框架中有一个重要的模块是执行模块,这一部分是用Hadoop中MapReduce计算框架来实现,因而在处理速度上不是非常令人满意。由于Spark出色的处理速度,有人已经成功将HiveQL的执行利用Spark来运行,这就是已经非常闻名的Shark开源项目。

在Spark 1.0中,Spark自身提供了对Hive的支持。本文不准备分析Spark是如何来提供对Hive的支持的,而只着重于如何搭建Hive On Spark的测试环境。

安装概览

整体的安装过程分为以下几步:

  1. 搭建Hadoop集群 (整个cluster由3台机器组成,一台作为Master,另两台作为Slave)
  2. 编译Spark 1.0,使其支持Hadoop 2.4.0和Hive
  3. 运行Hive on Spark的测试用例 (Spark和Hadoop Namenode运行在同一台机器)

Hadoop集群搭建

创建虚拟机

创建基于kvm的虚拟机,利用libvirt提供的图形管理界面,创建3台虚拟机,非常方便。内存和ip地址分配如下

  1. master 2G    192.168.122.102
  2. slave1  4G    192.168.122.103
  3. slave2  4G    192.168.122.104

在虚拟机上安装os的过程就略过了,我使用的是arch linux,os安装完成之后,确保以下软件也已经安装

  1. jdk
  2. openssh

创建用户组和用户

在每台机器上创建名为hadoop的用户组,添加名为hduser的用户,具体bash命令如下所示

groupadd hadoop
useradd -b /home -m -g hadoop hduser
passwd hduser

无密码登录

在启动slave机器上的datanode或nodemanager的时候需要输入用户名密码,为了避免每次都要输入密码,可以利用如下指令创建无密码登录。注意是从master到slave机器的单向无密码。

cd $HOME/.ssh
ssh-keygen -t dsa

将id_dsa.pub复制为authorized_keys,然后上传到slave1和slave2中的$HOME/.ssh目录

cp id_dsa.pub authorized_keys
#确保在slave1和slave2机器中,hduser的$HOME目录下已经创建好了.ssh目录
scp authorized_keys slave1:$HOME/.ssh
scp authorized_keys slave2:$HOME/.ssh

更改每台机器上的/etc/hosts

在组成集群的master, slave1和slave2中,向/etc/hosts文件添加如下内容

192.168.122.102 master
192.168.122.103 slave1
192.168.122.104 slave2

如果更改完成之后,可以在master上执行ssh slave1来进行测试,如果没有输入密码的过程就直接登录入slave1就说明上述的配置成功。

下载hadoop 2.4.0

以hduser身份登录master,执行如下指令

cd /home/hduser
wget http://mirror.esocc.com/apache/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz
mkdir yarn
tar zvxf hadoop-2.4.0.tar.gz -C yarn

修改hadoop配置文件

添加如下内容到.bashrc

export HADOOP_HOME=/home/hduser/yarn/hadoop-2.4.0
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop

修改$HADOOP_HOME/libexec/hadoop-config.sh

在hadoop-config.sh文件开头处添加如下内容

export JAVA_HOME=/opt/java

$HADOOP_CONF_DIR/yarn-env.sh

在yarn-env.sh开头添加如下内容

export JAVA_HOME=/opt/java
export HADOOP_HOME=/home/hduser/yarn/hadoop-2.4.0
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop

 xml配置文件修改

文件1: $HADOOP_CONF_DIR/core-site.xml


<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://master:9000</value>
  </property>
  <property>
    <name>hadoop.tmp.dir</name>
    <value>/home/hduser/yarn/hadoop-2.4.0/tmp</value>
  </property>
</configuration>

文件2: $HADOOP_CONF_DIR/hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
 <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 <configuration>
   <property>
     <name>dfs.replication</name>
     <value>2</value>
   </property>
   <property>
     <name>dfs.permissions</name>
     <value>false</value>
   </property>
 </configuration>

 文件3: $HADOOP_CONF_DIR/mapred-site.xml

<?xml version="1.0"?>
<configuration>
 <property>
   <name>mapreduce.framework.name</name>
   <value>yarn</value>
 </property>
</configuration>

文件4: $HADOOP_CONF_DIR/yarn-site.xml

<?xml version="1.0"?>
 <configuration>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
  <property>
    <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
    <value>org.apache.hadoop.mapred.ShuffleHandler</value>
  </property>
  <property>
    <name>yarn.resourcemanager.resource-tracker.address</name>
    <value>master:8025</value>
  </property>
  <property>
    <name>yarn.resourcemanager.scheduler.address</name>
    <value>master:8030</value>
  </property>
  <property>
    <name>yarn.resourcemanager.address</name>
    <value>master:8040</value>
  </property>
 </configuration>

 文件5: $HADOOP_CONF_DIR/slaves

在文件中添加如下内容

slave1
slave2

创建tmp目录

在$HADOOP_HOME下创建tmp目录

mkdir $HADOOP_HOME/tmp

复制yarn目录到slave1和slave2

刚才所作的配置文件更改发生在master机器上,将整个更改过的内容全部复制到slave1和slave2。

for target in slave1 slave2
do 
    scp -r yarn $target:~/
    scp $HOME/.bashrc $target:~/
done

批量处理是不是很爽。

格式化namenode

在master机器上对namenode进行格式化

bin/hadoop namenode -format

启动cluster集群

sbin/hadoop-daemon.sh start namenode
sbin/hadoop-daemons.sh start datanode
sbin/yarn-daemon.sh start resourcemanager
sbin/yarn-daemons.sh start nodemanager
sbin/mr-jobhistory-daemon.sh start historyserver

 注意: daemon.sh表示只在本机运行,daemons.sh表示在所有的cluster节点上运行。

验证hadoop集群安装正确与否

跑一个wordcount示例,具体步骤不再列出,可参考本系列中的第11篇

编译Spark 1.0

Spark的编译还是很简单的,所有失败的原因大部分可以归结于所依赖的jar包无法正常下载。

为了让Spark 1.0支持hadoop 2.4.0和hive,请使用如下指令编译

SPARK_HADOOP_VERSION=2.4.0 SPARK_YARN=true   SPARK_HIVE=true sbt/sbt assembly

如果一切顺利将会在assembly目录下生成 spark-assembly-1.0.0-SNAPSHOT-hadoop2.4.0.jar

创建运行包

编译之后整个$SPARK_HOME目录下所有的文件体积还是很大的,大概有两个多G。有哪些是运行的时候真正需要的呢,下面将会列出这些目录和文件。

  1. $SPARK_HOME/bin
  2. $SPARK_HOME/sbin
  3. $SPARK_HOME/lib_managed
  4. $SPARK_HOME/conf
  5. $SPARK_HOME/assembly/target/scala-2.10

将上述目录的内容复制到/tmp/spark-dist,然后创建压缩包

mkdir /tmp/spark-dist
for i in $SPARK_HOME/{bin,sbin,lib_managed,conf,assembly/target/scala-2.10}
do 
  cp -r $i /tmp/spark-dist
done
cd /tmp/
tar czvf spark-1.0-dist.tar.gz spark-dist

上传运行包到master机器

将生成的运行包上传到master(192.168.122.102)

scp spark-1.0-dist.tar.gz hduser@192.168.122.102:~/

运行hive on spark测试用例

经过上述重重折磨,终于到了最为紧张的时刻了。

以hduser身份登录master机,解压spark-1.0-dist.tar.gz

#after login into the master as hduser
tar zxvf spark-1.0-dist.tar.gz
cd spark-dist

更改conf/spark-env.sh

export SPARK_LOCAL_IP=127.0.0.1
export SPARK_MASTER_IP=127.0.0.1

运行最简单的example

用bin/spark-shell指令启动shell之后,运行如下scala代码

val sc: SparkContext // An existing SparkContext.
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
import hiveContext._

hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
hql("FROM src SELECT key, value").collect().foreach(println)

如果一切顺利,最后一句hql会返回key及value

参考资料

  1. Steps to install Hadoop 2.x release (Yarn or Next-Gen) on multi-node cluster

<二>hiveql on spark实现详解

概要

在新近发布的spark 1.0中新加了sql的模块,更为引人注意的是对hive中的hiveql也提供了良好的支持,作为一个源码分析控,了解一下spark是如何完成对hql的支持是一件非常有趣的事情。

Hive简介

Hive的由来

以下部分摘自Hadoop definite guide中的Hive一章

Hive由Facebook出品,其设计之初目的是让精通SQL技能的分析师能够对Facebook存放在HDFS上的大规模数据集进行分析和查询。

Hive大大简化了对大规模数据集的分析门槛(不再要求分析人员具有很强的编程能力),迅速流行起来,成为Hadoop生成圈上的Killer Application. 目前已经有很多组织把Hive作为一个通用的,可伸缩数据处理平台。

数据模型(Data Model)

Hive所有的数据都存在HDFS中,在Hive中有以下几种数据模型

  • Tables(表) table和关系型数据库中的表是相对应的,每个表都有一个对应的hdfs目录,表中的数据经序列化后存储在该目录,Hive同时支持表中的数据存储在其它类型的文件系统中,如NFS或本地文件系统
  • 分区(Partitions) Hive中的分区起到的作用有点类似于RDBMS中的索引功能,每个Partition都有一个对应的目录,这样在查询的时候,可以减少数据规模
  • 桶(buckets) 即使将数据按分区之后,每个分区的规模有可能还是很大,这个时候,按照关键字的hash结果将数据分成多个buckets,每个bucket对应于一个文件

Query Language

 HiveQL是Hive支持的类似于SQL的查询语言。HiveQL大体可以分成下面两种类型

  1. DDL(data definition language)  比如创建数据库(create database),创建表(create table),数据库和表的删除
  2. DML(data manipulation language) 数据的添加,查询
  3. UDF(user defined function) Hive还支持用户自定义查询函数

Hive architecture

hive的整体框架图如下图所示

 

由上图可以看出,Hive的整体架构可以分成以下几大部分:

  1. 用户接口  支持CLI, JDBC和Web UI
  2. Driver Driver负责将用户指令翻译转换成为相应的MapReduce Job
  3. MetaStore 元数据存储仓库,像数据库和表的定义这些内容就属于元数据这个范畴,默认使用的是Derby存储引擎

HiveQL执行过程

HiveQL的执行过程如下所述

  1. parser 将HiveQL解析为相应的语法树
  2. Semantic Analyser 语义分析
  3. Logical Plan Generating 生成相应的LogicalPlan
  4. Query Plan Generating
  5. Optimizer

最终生成MapReduce的Job,交付给Hadoop的MapReduce计算框架具体运行。

Hive实例

最好的学习就是实战,Hive这一小节还是以一个具体的例子来结束吧。

前提条件是已经安装好hadoop,具体安装可以参考源码走读11或走读9

step 1: 创建warehouse

warehouse用来存储raw data

$ $HADOOP_HOME/bin/hadoop fs -mkdir       /tmp
$ $HADOOP_HOME/bin/hadoop fs -mkdir       /user/hive/warehouse
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w   /tmp
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w   /user/hive/warehouse

step 2: 启动hive cli

$ export HIVE_HOME=<hive-install-dir>
$ $HIVE_HOME/bin/hive

step 3: 创建表

创建表,首先将schema数据写入到metastore,另一件事情就是在warehouse目录下创建相应的子目录,该子目录以表的名称命名

CREATE TABLE u_data (
  userid INT,
  movieid INT,
  rating INT,
  unixtime STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;

step 4: 导入数据

导入的数据会存储在step 3中创建的表目录下

LOAD DATA LOCAL INPATH '/u.data'
OVERWRITE INTO TABLE u_data;

step 5: 查询

SELECT COUNT(*) FROM u_data;

hiveql on Spark

Q: 上一章节花了大量的篇幅介绍了hive由来,框架及hiveql执行过程。那这些东西跟我们标题中所称的hive on spark有什么关系呢?

Ans:  Hive的整体解决方案很不错,但有一些地方还值得改进,其中之一就是“从查询提交到结果返回需要相当长的时间,查询耗时太长之所以查询时间很长,一个主要的原因就是因为Hive原生是基于MapReduce的,哪有没有办法提高呢。您一定想到了,“不是生成MapReduce Job,而是生成Spark Job”, 充分利用Spark的快速执行能力来缩短HiveQl的响应时间。

下图是Spark 1.0中所支持的lib库,SQL是其唯一新添加的lib库,可见SQL在Spark 1.0中的地位之重要。

 

HiveContext

HiveContext是Spark提供的用户接口,HiveContext继承自SqlContext。

让我们回顾一下,SqlContext中牵涉到的类及其间的关系如下图所示,具体分析过程参见本系列中的源码走读之11

既然是继承自SqlContext,那么我们将普通sql与hiveql分析执行步骤做一个对比,可以得到下图。

有了上述的比较,就能抓住源码分析时需要把握的几个关键点:

  1. Entrypoint           HiveContext.scala
  2. QueryExecution    HiveContext.scala
    1. parser       HiveQl.scala
    2. optimizer    

数据

使用到的数据有两种

  1. Schema Data  像数据库的定义和表的结构,这些都存储在MetaStore中
  2. Raw data        即要分析的文件本身

Entrypoint

hiveql是整个的入口点,而hql是hiveql的缩写形式。

  def hiveql(hqlQuery: String): SchemaRDD = {
    val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
    // We force query optimization to happen right away instead of letting it happen lazily like
    // when using the query DSL.  This is so DDL commands behave as expected.  This is only
    // generates the RDD lineage for DML queries, but does not perform any execution.
    result.queryExecution.toRdd
    result
  }

上述hiveql的定义与sql的定义几乎一模一样,唯一的不同是sql中使用parseSql的结果作为SchemaRDD的入参而hiveql中使用HiveQl.parseSql作为SchemaRdd的入参

HiveQL, parser

parseSql的函数定义如代码所示,解析过程中将指令分成两大类

  • nativecommand     非select语句,这类语句的特点是执行时间不会因为条件的不同而有很大的差异,基本上都能在较短的时间内完成
  • 非nativecommand  主要是select语句
def parseSql(sql: String): LogicalPlan = {
    try {
      if (sql.toLowerCase.startsWith("set")) {
        NativeCommand(sql)
      } else if (sql.toLowerCase.startsWith("add jar")) {
        AddJar(sql.drop(8))
      } else if (sql.toLowerCase.startsWith("add file")) {
        AddFile(sql.drop(9))
      } else if (sql.startsWith("dfs")) {
        DfsCommand(sql)
      } else if (sql.startsWith("source")) {
        SourceCommand(sql.split(" ").toSeq match { case Seq("source", filePath) => filePath })
      } else if (sql.startsWith("!")) {
        ShellCommand(sql.drop(1))
      } else {
        val tree = getAst(sql)

        if (nativeCommands contains tree.getText) {
          NativeCommand(sql)
        } else {
          nodeToPlan(tree) match {
            case NativePlaceholder => NativeCommand(sql)
            case other => other
          }
        }
      }
    } catch {
      case e: Exception => throw new ParseException(sql, e)
      case e: NotImplementedError => sys.error(
        s"""
          |Unsupported language features in query: $sql
          |${dumpTree(getAst(sql))}
        """.stripMargin)
    }
  }	

哪些指令是nativecommand呢,答案在HiveQl.scala中的nativeCommands变量,列表很长,代码就不一一列出。

对于非nativeCommand,最重要的解析函数就是nodeToPlan

toRdd

Spark对HiveQL所做的优化主要体现在Query相关的操作,其它的依然使用Hive的原生执行引擎。

在logicalPlan到physicalPlan的转换过程中,toRdd最关键的元素

override lazy val toRdd: RDD[Row] =
      analyzed match {
        case NativeCommand(cmd) =>
          val output = runSqlHive(cmd)

          if (output.size == 0) {
            emptyResult
          } else {
            val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]]))
            sparkContext.parallelize(asRows, 1)
          }
        case _ =>
          executedPlan.execute().map(_.copy())
      }

native command的执行流程

由于native command是一些非耗时的操作,直接使用Hive中原有的exeucte engine来执行即可。这些command的执行示意图如下

analyzer

HiveTypeCoercion

val typeCoercionRules =
    List(PropagateTypes, ConvertNaNs, WidenTypes, PromoteStrings, BooleanComparisons, BooleanCasts,
      StringToIntegralCasts, FunctionArgumentConversion)		

optimizer

PreInsertionCasts存在的目的就是确保在数据插入执行之前,相应的表已经存在。

override lazy val optimizedPlan =
      optimizer(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))

此处要注意的是catalog的用途,catalog是HiveMetastoreCatalog的实例。

HiveMetastoreCatalog是Spark中对Hive Metastore访问的wrapper。HiveMetastoreCatalog通过调用相应的Hive Api可以获得数据库中的表及表的分区,也可以创建新的表和分区。

HiveMetastoreCatalog

HiveMetastoreCatalog中会通过hive client来访问metastore中的元数据,使用了大量的Hive Api。其中包括了广为人知的deSer library。

以CreateTable函数为例说明对Hive Library的依赖。

def createTable(
      databaseName: String,
      tableName: String,
      schema: Seq[Attribute],
      allowExisting: Boolean = false): Unit = {
    val table = new Table(databaseName, tableName)
    val hiveSchema =
      schema.map(attr => new FieldSchema(attr.name, toMetastoreType(attr.dataType), ""))
    table.setFields(hiveSchema)

    val sd = new StorageDescriptor()
    table.getTTable.setSd(sd)
    sd.setCols(hiveSchema)

    // TODO: THESE ARE ALL DEFAULTS, WE NEED TO PARSE / UNDERSTAND the output specs.
    sd.setCompressed(false)
    sd.setParameters(Map[String, String]())
    sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat")
    sd.setOutputFormat("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")
    val serDeInfo = new SerDeInfo()
    serDeInfo.setName(tableName)
    serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
    serDeInfo.setParameters(Map[String, String]())
    sd.setSerdeInfo(serDeInfo)

    try client.createTable(table) catch {
      case e: org.apache.hadoop.hive.ql.metadata.HiveException
        if e.getCause.isInstanceOf[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] &&
           allowExisting => // Do nothing.
    }
  }

实验

结合源码,我们再对一个简单的例子作下说明。

可能你会想,既然spark也支持hql,那么我原先用hive cli创建的数据库和表用spark能不能访问到呢?答案或许会让你很纳闷,“在默认的配置下是不行的”。为什么?

Hive中的meta data采用的存储引擎是Derby,该存储引擎只能有一个访问用户。同一时刻只能有一个人访问,即便以同一用户登录访问也不行。针对这个局限,解决方法就是将metastore存储在mysql或者其它可以多用户访问的数据库中。

具体实例

  1. 创建表
  2. 导入数据
  3. 查询
  4. 删除表

在启动spark-shell之前,需要先设置环境变量HIVE_HOMEHADOOP_HOME.

启动spark-shell之后,执行如下代码

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
import hiveContext._

hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
hql("FROM src SELECT key, value").collect().foreach(println)
hql("drop table src")

create操作会在/user/hive/warehouse/目录下创建src目录,可以用以下指令来验证

$$HADOOP_HOME/bin/hdfs dfs -ls /user/hive/warehouse/

 drop表的时候,不仅metastore中相应的记录被删除,而且原始数据raw file本身也会被删除,即在warehouse目录下对应某个表的目录会被整体删除掉。

上述的create, load及query操作对metastore和raw data的影响可以用下图的表示:

hive-site.xml

如果想对hive默认的配置作修改,可以使用hive-site.xml。

具体步骤如下

 -  在$SPARK_HOME/conf目录下创建hive-site.xml

 -  根据需要,添写相应的配置项的值,可以这样做,将$HIVE_HOME/conf目录下的hive-default.xml复制到$SPARK_HOME/conf,然后重命名为hive-site.xml

Sql新功能预告

为了进一步提升sql的执行速度,在Spark开发团队在发布完1.0之后,会通过codegen的方法来提升执行速度。codegen有点类似于jvm中的jit技术。充分利用了scala语言的特性。

前景分析

Spark目前还缺乏一个非常有影响力的应用,也就通常所说的killer application。SQL是Spark在寻找killer application方面所做的一个积极尝试,也是目前Spark上最有热度的一个话题,但通过优化Hive执行速度来吸引潜在Spark用户,该突破方向选择正确与否还有待市场证明。

Hive除了在执行速度上为人诟病之外,还有一个最大的问题就是多用户访问的问题,相较第一个问题,第二个问题来得更为致命。无论是Facebook在Hive之后推出的Presto还是Cloudera推出的Impala都是针对第二问题提出的解决方案,目前都已经取得的了巨大优势。

小结

本文就Spark对HiveQL提供支持的这一功能进行了比较详细的分析,其中涉及到以下几个问题。

  1. 什么是hive
  2. hive有什么缺点,否则就没Spark或Shark啥事了
  3. Spark主要是针对hive的哪个不足做出改进
  4. Spark是如何对这个做改进的

参考资料

  1. programming hive
  2. Shark vs. Impala
  3. Hive Design

网友评论

登录后评论
0/500
评论
许鹏
+ 关注