《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(一)

  1. 云栖社区>
  2. 博客>
  3. 正文

《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(一)

青衫无名 2017-05-19 15:24:00 浏览1987
展开阅读全文

Spark SQL, DataFrames 以及 Datasets 编程指南

概要

Spark SQL是Spark中处理结构化数据的模块。与基础的Spark RDD API不同,Spark SQL的接口提供了更多关于数据的结构信息和计算任务的运行时信息。在Spark内部,Spark SQL会能够用于做优化的信息比RDD API更多一些。Spark SQL如今有了三种不同的API:SQL语句、DataFrame API和最新的Dataset API。不过真正运行计算的时候,无论你使用哪种API或语言,Spark SQL使用的执行引擎都是同一个。这种底层的统一,使开发者可以在不同的API之间来回切换,你可以选择一种最自然的方式,来表达你的需求。

 

本文中所有的示例都使用Spark发布版本中自带的示例数据,并且可以在spark-shell、pyspark shell以及sparkR shell中运行。

SQL

Spark SQL的一种用法是直接执行SQL查询语句,你可使用最基本的SQL语法,也可以选择HiveQL语法。Spark SQL可以从已有的Hive中读取数据。更详细的请参考Hive Tables 这一节。如果用其他编程语言运行SQL,Spark SQL将以DataFrame返回结果。你还可以通过命令行command-line 或者 JDBC/ODBC 使用Spark SQL。

DataFrames

DataFrame是一种分布式数据集合,每一条数据都由几个命名字段组成。概念上来说,她和关系型数据库的表 或者 R和Python中的data frame等价,只不过在底层,DataFrame采用了更多优化。DataFrame可以从很多数据源(sources)加载数据并构造得到,如:结构化数据文件,Hive中的表,外部数据库,或者已有的RDD。

DataFrame API支持ScalaJavaPython, and R

Datasets

Dataset是Spark-1.6新增的一种API,目前还是实验性的。Dataset想要把RDD的优势(强类型,可以使用lambda表达式函数)和Spark SQL的优化执行引擎的优势结合到一起。Dataset可以由JVM对象构建(constructed )得到,而后Dataset上可以使用各种transformation算子(map,flatMap,filter 等)。

Dataset API 对 Scala 和 Java的支持接口是一致的,但目前还不支持Python,不过Python自身就有语言动态特性优势(例如,你可以使用字段名来访问数据,row.columnName)。对Python的完整支持在未来的版本会增加进来。

入门

入口:SQLContext

Spark SQL所有的功能入口都是SQLContext 类,及其子类。不过要创建一个SQLContext对象,首先需要有一个SparkContext对象。

val sc: SparkContext // 假设已经有一个 SparkContext 对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 用于包含RDD到DataFrame隐式转换操作
import sqlContext.implicits._

除了SQLContext之外,你也可以创建HiveContext,HiveContext是SQLContext 的超集。

除了SQLContext的功能之外,HiveContext还提供了完整的HiveQL语法,UDF使用,以及对Hive表中数据的访问。要使用HiveContext,你并不需要安装Hive,而且SQLContext能用的数据源,HiveContext也一样能用。HiveContext是单独打包的,从而避免了在默认的Spark发布版本中包含所有的Hive依赖。如果这些依赖对你来说不是问题(不会造成依赖冲突等),建议你在Spark-1.3之前使用HiveContext。而后续的Spark版本,将会逐渐把SQLContext升级到和HiveContext功能差不多的状态。

spark.sql.dialect选项可以指定不同的SQL变种(或者叫SQL方言)。这个参数可以在SparkContext.setConf里指定,也可以通过 SQL语句的SET key=value命令指定。对于SQLContext,该配置目前唯一的可选值就是”sql”,这个变种使用一个Spark SQL自带的简易SQL解析器。而对于HiveContext,spark.sql.dialect 默认值为”hiveql”,当然你也可以将其值设回”sql”。仅就目前而言,HiveSQL解析器支持更加完整的SQL语法,所以大部分情况下,推荐使用HiveContext。

创建DataFrame

Spark应用可以用SparkContext创建DataFrame,所需的数据来源可以是已有的RDD(existing RDD),或者Hive表,或者其他数据源(data sources.)

以下是一个从JSON文件创建DataFrame的小栗子:

val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 将DataFrame内容打印到stdout
df.show()

DataFrame操作

DataFrame提供了结构化数据的领域专用语言支持,包括ScalaJavaPython and R.

这里我们给出一个结构化数据处理的基本示例:

val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 创建一个 DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 展示 DataFrame 的内容
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// 打印数据树形结构
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// select "name" 字段
df.select("name").show()
// name
// Michael
// Andy
// Justin

// 展示所有人,但所有人的 age 都加1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// 筛选出年龄大于21的人
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// 计算各个年龄的人数
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1

DataFrame的完整API列表请参考这里:API Documentation

除了简单的字段引用和表达式支持之外,DataFrame还提供了丰富的工具函数库,包括字符串组装,日期处理,常见的数学函数等。完整列表见这里:DataFrame Function Reference.

编程方式执行SQL查询

SQLContext.sql可以执行一个SQL查询,并返回DataFrame结果。

val sqlContext = ... // 已有一个 SQLContext 对象
val df = sqlContext.sql("SELECT * FROM table")

创建Dataset

Dataset API和RDD类似,不过Dataset不使用Java序列化或者Kryo,而是使用专用的编码器(Encoder )来序列化对象和跨网络传输通信。如果这个编码器和标准序列化都能把对象转字节,那么编码器就可以根据代码动态生成,并使用一种特殊数据格式,这种格式下的对象不需要反序列化回来,就能允许Spark进行操作,如过滤、排序、哈希等。

// 对普通类型数据的Encoder是由 importing sqlContext.implicits._ 自动提供的
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // 返回: Array(2, 3, 4)

// 以下这行不仅定义了case class,同时也自动为其创建了Encoder
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()

// DataFrame 只需提供一个和数据schema对应的class即可转换为 Dataset。Spark会根据字段名进行映射。
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]

和RDD互操作

Spark SQL有两种方法将RDD转为DataFrame。

1. 使用反射机制,推导包含指定类型对象RDD的schema。这种基于反射机制的方法使代码更简洁,而且如果你事先知道数据schema,推荐使用这种方式;

2. 编程方式构建一个schema,然后应用到指定RDD上。这种方式更啰嗦,但如果你事先不知道数据有哪些字段,或者数据schema是运行时读取进来的,那么你很可能需要用这种方式。

利用反射推导schema

Spark SQL的Scala接口支持自动将包含case class对象的RDD转为DataFrame。对应的case class定义了表的schema。case class的参数名通过反射,映射为表的字段名。case class还可以嵌套一些复杂类型,如Seq和Array。RDD隐式转换成DataFrame后,可以进一步注册成表。随后,你就可以对表中数据使用SQL语句查询了。

// sc 是已有的 SparkContext 对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 为了支持RDD到DataFrame的隐式转换
import sqlContext.implicits._

// 定义一个case class.
// 注意:Scala 2.10的case class最多支持22个字段,要绕过这一限制,
// 你可以使用自定义class,并实现Product接口。当然,你也可以改用编程方式定义schema
case class Person(name: String, age: Int)

// 创建一个包含Person对象的RDD,并将其注册成table
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// sqlContext.sql方法可以直接执行SQL语句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// SQL查询的返回结果是一个DataFrame,且能够支持所有常见的RDD算子
// 查询结果中每行的字段可以按字段索引访问:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// 或者按字段名访问:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] 会一次性返回多列,并以Map[String, T]为返回结果类型
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// 返回结果: Map("name" -> "Justin", "age" -> 19)


转载自 并发编程网 - ifeve.com

网友评论

登录后评论
0/500
评论