《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(四)

  1. 云栖社区>
  2. 博客>
  3. 正文

《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(四)

青衫无名 2017-05-19 15:28:00 浏览3577
展开阅读全文

使用Spark SQL命令行工具

Spark SQL CLI是一个很方便的工具,它可以用local mode运行hive metastore service,并且在命令行中执行输入的查询。注意Spark SQL CLI目前还不支持和Thrift JDBC server通信。

用如下命令,在spark目录下启动一个Spark SQL CLI

./bin/spark-sql

Hive配置在conf目录下hive-site.xml,core-site.xml,hdfs-site.xml中设置。你可以用这个命令查看完整的选项列表:./bin/spark-sql –help

升级指南

1.5升级到1.6

  • 从Spark-1.6.0起,默认Thrift server 将运行于多会话并存模式下(multi-session)。这意味着,每个JDBC/ODBC连接有其独立的SQL配置和临时函数注册表。table的缓存仍然是公用的。如果你更喜欢老的单会话模式,只需设置spark.sql.hive.thriftServer.singleSession为true即可。当然,你也可在spark-defaults.conf中设置,或者将其值传给start-thriftserver.sh –conf(如下):
./sbin/start-thriftserver.sh \
     --conf spark.sql.hive.thriftServer.singleSession=true \
     ...

1.4升级到1.5

  • Tungsten引擎现在默认是启用的,Tungsten是通过手动管理内存优化执行计划,同时也优化了表达式求值的代码生成。这两个特性都可以通过把spark.sql.tungsten.enabled设为false来禁用。
  • Parquet schema merging默认不启用。需要启用的话,设置spark.sql.parquet.mergeSchema为true即可
  • Python接口支持用点(.)来访问字段内嵌值,例如df[‘table.column.nestedField’]。但这也意味着,如果你的字段名包含点号(.)的话,你就必须用重音符来转义,如:table.`column.with.dots`.nested。
  • 列式存储内存分区剪枝默认是启用的。要禁用,设置spark.sql.inMemoryColumarStorage.partitionPruning为false即可
  • 不再支持无精度限制的decimal。Spark SQL现在强制最大精度为38位。对于BigDecimal对象,类型推导将会使用(38,18)精度的decimal类型。如果DDL中没有指明精度,默认使用的精度是(10,0)
  • 时间戳精确到1us(微秒),而不是1ns(纳秒)
  • 在“sql”这个SQL变种设置中,浮点数将被解析为decimal。HiveQL解析保持不变。
  • 标准SQL/DataFrame函数均为小写,例如:sum vs SUM。
  • 当推测任务被启用是,使用DirectOutputCommitter是不安全的,因此,DirectOutputCommitter在推测任务启用时,将被自动禁用,且忽略相关配置。
  • JSON数据源不再自动加载其他程序产生的新文件(例如,不是Spark SQL插入到dataset中的文件)。对于一个JSON的持久化表(如:Hive metastore中保存的表),用户可以使用REFRESH TABLE这个SQL命令或者HiveContext.refreshTable来把新文件包括进来。

1.3升级到1.4

DataFrame数据读写接口

根据用户的反馈,我们提供了一个新的,更加流畅的API,用于数据读(SQLContext.read)写(DataFrame.write),同时老的API(如:SQLCOntext.parquetFile, SQLContext.jsonFile)将被废弃。

有关SQLContext.read和DataFrame.write的更详细信息,请参考API文档。

DataFrame.groupBy保留分组字段

根据用户的反馈,我们改变了DataFrame.groupBy().agg()的默认行为,在返回的DataFrame结果中保留了分组字段。如果你想保持1.3中的行为,设置spark.sql.retainGroupColumns为false即可。

// 在1.3.x中,如果要保留分组字段"department", 你必须显式的在agg聚合时包含这个字段
df.groupBy("department").agg($"department", max("age"), sum("expense"))

// 而在1.4+,分组字段"department"默认就会包含在返回的DataFrame中
df.groupBy("department").agg(max("age"), sum("expense"))

// 要回滚到1.3的行为(不包含分组字段),按如下设置即可:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")

1.2升级到1.3

在Spark 1.3中,我们去掉了Spark SQL的”Alpha“标签,并清理了可用的API。从Spark 1.3起,Spark SQL将对1.x系列二进制兼容。这个兼容性保证不包括显式的标注为”unstable(如:DeveloperAPI或Experimental)“的API。

SchemaRDD重命名为DataFrame

对于用户来说,Spark SQL 1.3最大的改动就是SchemaRDD改名为DataFrame。主要原因是,DataFrame不再直接由RDD派生,而是通过自己的实现提供RDD的功能。DataFrame只需要调用其rdd方法就能转成RDD。

在Scala中仍然有SchemaRDD,只不过这是DataFrame的一个别名,以便兼容一些现有代码。但仍然建议用户改用DataFrame。Java和Python用户就没这个福利了,他们必须改代码。

统一Java和Scala API

在Spark 1.3之前,有单独的java兼容类(JavaSQLContext和JavaSchemaRDD)及其在Scala API中的镜像。Spark 1.3中将Java API和Scala API统一。两种语言的用户都应该使用SQLContext和DataFrame。一般这些类中都会使用两种语言中都有的类型(如:Array取代各语言独有的集合)。有些情况下,没有通用的类型(例如:闭包或者maps),将会使用函数重载来解决这个问题。

另外,java特有的类型API被删除了。Scala和java用户都应该用org.apache.spark.sql.types来编程描述一个schema。

隐式转换隔离,DSL包移除 – 仅针对scala

Spark 1.3之前的很多示例代码,都在开头用 import sqlContext._,这行将会导致所有的sqlContext的函数都被引入进来。因此,在Spark 1.3我们把RDDs到DataFrames的隐式转换隔离出来,单独放到SQLContext.implicits对象中。用户现在应该这样写:import sqlContext.implicits._

另外,隐式转换也支持由Product(如:case classes或tuples)组成的RDD,但需要调用一个toDF方法,而不是自动转换。

如果需要使用DSL(被DataFrame取代的API)中的方法,用户之前需要导入DSL(import org.apache.spark.sql.catalyst.dsl), 而现在应该要导入 DataFrame API(import org.apache.spark.sql.functions._)

移除org.apache.spark.sql中DataType别名 – 仅针对scala

Spark 1.3删除了sql包中的DataType类型别名。现在,用户应该使用 org.apache.spark.sql.types中的类。

UDF注册挪到sqlContext.udf中 – 针对java和scala

注册UDF的函数,不管是DataFrame,DSL或者SQL中用到的,都被挪到SQLContext.udf中。

sqlContext.udf.register("strLen", (s: String) => s.length())

Python UDF注册保持不变。

Python DataTypes不再是单例

在python中使用DataTypes,你需要先构造一个对象(如:StringType()),而不是引用一个单例。

Shark用户迁移指南

调度

用户可以通过如下命令,为JDBC客户端session设定一个Fair Scheduler pool。

SET spark.sql.thriftserver.scheduler.pool=accounting;

Reducer个数

在Shark中,默认的reducer个数是1,并且由mapred.reduce.tasks设定。Spark SQL废弃了这个属性,改为 spark.sql.shuffle.partitions, 并且默认200,用户可通过如下SET命令来自定义:

SET spark.sql.shuffle.partitions=10;
SELECT page, count(*) c
FROM logs_last_month_cached
GROUP BY page ORDER BY c DESC LIMIT 10;

你也可以把这个属性放到hive-site.xml中来覆盖默认值。

目前,mapred.reduce.tasks属性仍然能被识别,并且自动转成spark.sql.shuffle.partitions

缓存

shark.cache表属性已经不存在了,并且以”_cached”结尾命名的表也不再会自动缓存。取而代之的是,CACHE TABLE和UNCACHE TABLE语句,用以显式的控制表的缓存:

CACHE TABLE logs_last_month;
UNCACHE TABLE logs_last_month;

注意:CACHE TABLE tbl 现在默认是饥饿模式,而非懒惰模式。再也不需要手动调用其他action来触发cache了!

从Spark-1.2.0开始,Spark SQL新提供了一个语句,让用户自己控制表缓存是否是懒惰模式

CACHE [LAZY] TABLE [AS SELECT] ...

以下几个缓存相关的特性不再支持:

  • 用户定义分区级别的缓存逐出策略
  • RDD 重加载
  • 内存缓存直接写入策略

兼容Apache Hive

Spark SQL设计时考虑了和Hive metastore,SerDes以及UDF的兼容性。目前这些兼容性斗是基于Hive-1.2.1版本,并且Spark SQL可以连到不同版本的Hive metastore(从0.12.0到1.2.1,参考:http://spark.apache.org/docs/latest/sql-programming-guide.html#interacting-with-different-versions-of-hive-metastore

部署在已有的Hive仓库之上

Spark SQL Thrift JDBC server采用了”out of the box”(开箱即用)的设计,使用很方便,并兼容已有的Hive安装版本。你不需要修改已有的Hive metastore或者改变数据的位置,或者表分区。

支持的Hive功能

Spark SQL 支持绝大部分Hive功能,如:

  • Hive查询语句:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 所有的Hive操作符:
    • Relational operators (===<><>>=<=, etc)
    • Arithmetic operators (+-*/%, etc)
    • Logical operators (AND&&OR||, etc)
    • Complex type constructors
    • Mathematical functions (signlncos, etc)
    • String functions (instrlengthprintf, etc)
  • 用户定义函数(UDF)
  • 用户定义聚合函数(UDAF)
  • 用户定义序列化、反序列化(SerDes)
  • 窗口函数(Window functions)
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • 查询子句
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • 采样
  • 执行计划详细(Explain)
  • 分区表,包括动态分区插入
  • 视图
  • 所有Hive DDL(data definition language):
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
  • 绝大部分Hive数据类型:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

不支持的Hive功能

以下是目前不支持的Hive特性的列表。多数是不常用的。

不支持的Hive常见功能

  • bucket表:butcket是Hive表的一个哈希分区

不支持的Hive高级功能

  • UNION类操作
  • 去重join
  • 字段统计信息收集:Spark SQL不支持同步的字段统计收集

Hive输入、输出格式

  • CLI文件格式:对于需要回显到CLI中的结果,Spark SQL仅支持TextOutputFormat。
  • Hadoop archive — Hadoop归档

Hive优化

一些比较棘手的Hive优化目前还没有在Spark中提供。有一些(如索引)对应Spark SQL这种内存计算模型来说并不重要。另外一些,在Spark SQL未来的版本中会支持。

  • 块级别位图索引和虚拟字段(用来建索引)
  • 自动计算reducer个数(join和groupBy算子):目前在Spark SQL中你需要这样控制混洗后(post-shuffle)并发程度:”SET spark.sql.shuffle.partitions=[num_tasks];”
  • 元数据查询:只查询元数据的请求,Spark SQL仍需要启动任务来计算结果
  • 数据倾斜标志:Spark SQL不会理会Hive中的数据倾斜标志
  • STREAMTABLE join提示:Spark SQL里没有这玩艺儿
  • 返回结果时合并小文件:如果返回的结果有很多小文件,Hive有个选项设置,来合并小文件,以避免超过HDFS的文件数额度限制。Spark SQL不支持这个。

参考

数据类型

Spark SQL和DataFrames支持如下数据类型:

  • Numeric types(数值类型)
    • ByteType: 1字节长的有符号整型,范围:-128 到 127.
    • ShortType: 2字节长有符号整型,范围:-32768 到 32767.
    • IntegerType: 4字节有符号整型,范围:-2147483648 到 2147483647.
    • LongType: 8字节有符号整型,范围: -9223372036854775808 to 9223372036854775807.
    • FloatType: 4字节单精度浮点数。
    • DoubleType: 8字节双精度浮点数
    • DecimalType: 任意精度有符号带小数的数值。内部使用java.math.BigDecimal, BigDecimal包含任意精度的不缩放整型,和一个32位的缩放整型
  • String type(字符串类型)
    • StringType: 字符串
  • Binary type(二进制类型)
    • BinaryType: 字节序列
  • Boolean type(布尔类型)
    • BooleanType: 布尔类型
  • Datetime type(日期类型)
    • TimestampType: 表示包含年月日、时分秒等字段的日期
    • DateType: 表示包含年月日字段的日期
  • Complex types(复杂类型)
    • ArrayType(elementType, containsNull):数组类型,表达一系列的elementType类型的元素组成的序列,containsNull表示数组能否包含null值
    • MapType(keyType, valueType, valueContainsNull):映射集合类型,表示一个键值对的集合。键的类型是keyType,值的类型则由valueType指定。对应MapType来说,键是不能为null的,而值能否为null则取决于valueContainsNull。
    • StructType(fields):表示包含StructField序列的结构体。
      • StructField(name, datatype, nullable): 表示StructType中的一个字段,name是字段名,datatype是数据类型,nullable表示该字段是否可以为空

所有Spark SQL支持的数据类型都在这个包里:org.apache.spark.sql.types,你可以这样导入之:

import  org.apache.spark.sql.types._
Data type Value type in Scala API to access or create a data type
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])注意:默认containsNull为true
MapType scala.collection.Map MapType(keyTypevalueType, [valueContainsNull])注意:默认valueContainsNull为true
StructType org.apache.spark.sql.Row StructType(fields)注意:fields是一个StructFields的序列,并且同名的字段是不允许的。
StructField 定义字段的数据对应的Scala类型(例如,如果StructField的dataType为IntegerType,则其数据对应的scala类型为Int) StructField(namedataTypenullable)

NaN语义

这是Not-a-Number的缩写,某些float或double类型不符合标准浮点数语义,需要对其特殊处理:

  • NaN == NaN,即:NaN和NaN总是相等
  • 在聚合函数中,所有NaN分到同一组
  • NaN在join操作中可以当做一个普通的join key
  • NaN在升序排序中排到最后,比任何其他数值都大
  • 转载自 并发编程网 - ifeve.com

网友评论

登录后评论
0/500
评论
青衫无名
+ 关注