Spark入门介绍

  1. 云栖社区>
  2. 博客>
  3. 正文

Spark入门介绍

云hbase+spark 2019-06-22 23:06:22 浏览701
展开阅读全文

前言

Spark自从2014年1.2版本发布以来,已成为大数据计算的通用组件。网上介绍Spark的资源也非常多,但是不利于用户快速入门,所以本文主要通从用户的角度来介绍Spark,让用户能快速的认识Spark,知道Spark是什么、能做什么、怎么去做。

Spark是什么

摘用官网的定义:
Spark是一个快速的、通用的分布式计算系统。
提供了高级API,如:Java、Scala、Python和R。
同时也支持高级工具,如:Spark SQL处理结构化数据、MLib处理机器学习、GraphX用于图计算、Spark Streming用于流数据处理。
也就是说Spark提供了灵活的、丰富接口的大数据处理能力。下图是Spark的模块图:


f1020f04e2793030de5cdd9b21ed5d284a4fa4c6


用户使用的SQL、Streaming、MLib、GraphX接口最终都会转换成Spark Core分布式运行。
目前用户用的比较多的是SQL和Streaming,这里先主要介绍下这两个。

Spark SQL

Spark SQL是Spark提供的SQL接口,用户使用Spark SQL可以像使用传统数据库一样使用SQL。例如:创建表、删除表、查询表、join表等。连接到Spark SQL后可以做如下操作(可参考:如何连接Spark SQL)。

# 在Spark中创建一个表:test_parquet,表的存储文件格式为:parquet
create table test_parquet(
    id int,
    name string,
    value double
) using parquet;

此命令运行完毕后,Spark系统会在hdfs上创建一个名称为test_parquet的目录,例如/user/hive/warehouse/test_parquet/。
然后往Spark表中插入数据。

# 往Spark表:test_parquet插入数据
insert into test_parquet values(1001, 'name1001', 95.49);
insert into test_parquet values(1002, 'name1002', 73.25);
insert into test_parquet values(1003, 'name1003', 25.65);
insert into test_parquet values(1004, 'name1004', 23.39);
insert into test_parquet values(1005, 'name1005', 8.64);
insert into test_parquet values(1006, 'name1006', 52.60);
insert into test_parquet values(1007, 'name1007', 42.16);
insert into test_parquet values(1008, 'name1008', 85.39);
insert into test_parquet values(1009, 'name1009', 7.22);
insert into test_parquet values(1010, 'name1010', 10.43);

插入数据的步骤运行后,Spark会在hdfs目录:/user/hive/warehouse/test_parquet/中创建一些列后缀为.parquet的文件,如下:

/user/hive/warehouse/test_parquet/part-00000-49fefdf2-2ef0-4b7d-9414-6ac52e0390cb-c000.snappy.parquet
/user/hive/warehouse/test_parquet/part-00000-52106855-8dd8-4f3a-8746-3025cf4898ea-c000.snappy.parquet
/user/hive/warehouse/test_parquet/part-00000-89d738e9-754b-44b0-abd3-f2dd91cd0389-c000.snappy.parquet
/user/hive/warehouse/test_parquet/part-00000-bce8efef-13ef-42e2-a6c7-8611e21e931a-c000.snappy.parquet

插入完成后开始查询数据。

select * from test_parquet

查询数据的过程是Spark并行从hdfs系统上拉取每个Parquet,然后在Spark中并行计算。
这里只是简单列举了Spark 创建Parquet表的过程,Spark也可以支持读取其他格式的表,例如对接数据库等。需要了解可参考:Spark Connectors
Spark SQL除了对用户提供了SQL命令的接口,也提供了API接口。Datasets(DataFrames),例如使用API创建Parquet如下:

//spark 读取json格式文件,返回一个DataFrames
val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames保存为Parquet格式文件。
peopleDF.write.parquet("people.parquet")

//读取Parquet文件,返回DataFrames
val parquetFileDF = spark.read.parquet("people.parquet")

// DataFrames注册成一个Parquet表
parquetFileDF.createOrReplaceTempView("parquetFile")
//使用SQL查询Parquet表
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
//打印数据
namesDF.map(attributes => "Name: " + attributes(0)).show()

Spark Streaming

Spark Streaming是流式处理系统,可以处理流式数据。下面用个例子说明Streaming的过程。
Spark Streaming可以对接Kafka。假如kafka产生的数据格式为:

id        values      time 
id001     98.2     1560414467
id002     99.2     1560414468
id001     87.2     1560414469

现在业务需要每分钟从Kafka读取一批数据,对数进行信息补齐,因为kafka拿到的数据只有id信息,用户想补齐name信息。
假如具有id、name信息的表存储在Phoenix中。
这样就可以通过Spark Streaming来完成这些业务诉求。在Spark的业务处理逻辑中拿到kafka的数据后,使用id关联Phoenix表拿到name信息,然再写入到其他数据库。
例如此业务的Spark Streaming的业务逻辑代码如下:

//scala代码样例
val words = messages.transform { rdd =>
      rdd.map {line =>
        println(s"==0== words = $line")
        //逗号分隔
        val words = line.value().split(",")
        words
      }
    }.foreachRDD { lineArray =>
      lineArray.foreachPartition { dataPartition =>
        val connectionPool = Phoenix5xConnectionPool(queryServerAddress)
        val phoenixConn = connectionPool.borrowObject()
        val statment = phoenixConn.createStatement()
        var i = 0
        while (dataPartition.hasNext) {
          val kv = dataPartition.next()
          //关联id、获取name信息。
          val rs = statment.executeQuery(s"select name from $phoenixTableName where id = ${kv(0)}")
          val name = rs.getString(1)
          //把结果写入到数据库
          statment.execute(s"upsert into $resultTable values('${kv(0)}','${kv(1)}', ${kv(2)}, '$name')")
          i = i + 1
          if (i % batchSize == 0) {
            phoenixConn.commit()
          }
        }
        phoenixConn.commit()
        connectionPool.returnObject(phoenixConn)
      }
    }

Spark Streming对接Kafka可参考:Spark对接kafkak快速入门。Spark Streming对接Phoenix代码可参考:SparkStremingSparkSQL

Spark适合做什么

先看下Spark在当前常用的BigData业务架构中的位置。
下图是常用的BigData 大数据组件Spark+HBase+Cassandra+ES(Solor),这些组件组合可覆盖BigData 95%以上的业务场景。


231951e99b2870a56ff6b99d1cfb76f94629ba58

图中数据BigData分4个层次,由上到下分别为:
业务系统层:一般是直接面向用户的业务系统。
计算层:Spark的分布式计算。
数据库层:HBase+Cassandra数据库提供实时查询的能力。
存储层:HDFS或者OSS。

这里主要介绍下计算层Spark。
Spark计算层会把数据从数据库、列式存储(数仓)中拉去到Spark中进行分布式计算。我们把Spark打开看下是如何分布式计算的。先看下Spark运行时候的部署结构。

4baf3453317f5dadec2eb79397911451c56efcef

由上图可以看到Spark部署时分布式的,有一个Driver,有N个Executor。业务系统对接Driver,Driver把计算逻辑发送到每个Executor运行,Executor运行结果再返回。
所以当Spark拉取数据库、数仓数据时会并行拉取到每个Executor做并行运算。

7e3564a2c2090d77eac8065c0205ab67ce15765c

例如Spark SQL中查询表的例子,以及Spark Streming的中处理批数据的例子,Spark运算时是每个Executor并处理数据的,Executor处理数据的逻辑是由用户编码控制的,例如用户写的SQL语句,调用API写的业务代码等。

那么Spark适合什么样的计算呢?
下图列出了Spark 和HBase数据库各自适用的场景(摘自HBase和Spark对比):
对比项目             Phoenix(HBase) Spark
SQL复杂度 简单查询, 必须命中索引 且 命中后 返回的数据较少,如果是join,则join任意一则返回的数据量在10w以下,且另一侧必须命中索引。 为了保障集群稳定性,一些复杂的sql及耗时的sql会被平台拒绝运行。 全部支持执行完成,支持Spark 映射到Phoenix,做到Spark在简单SQL查询能到Phoenix同样的性能,不过Spark定位为 分析的场景,与Phoenix 纯TP有本质的区别
集群 HBase共享一个集群,本质是HBase提供的SQL Spark需要单独购买的集群,Spark集群运算不影响其它数据库
并发 单机 1w-5w左右 Spark最高不超过100
延迟 延迟在ms级别,一些命中较多的数据的sql会到 秒 一般延迟在300ms以上,大部分sql需要秒,分钟,甚至小时
数据Update Phoenix支持 Spark不支持
支持业务 在线业务 离线业务 或者 准在线业务


举个例子说明下上面每一项对应的场景,如下图:

739a8c7ea7a8487eca82df071bc44e9c36123927


此图描述的是用户登录手机淘宝,淘宝根据用户的ID信息在淘宝首页推荐商品这样的一个流程。我们看下每个流程中哪些场景适合Phoenix、Spark。
1、 获取用户的推荐列表。
用户登录后,手机淘宝要根据用户的ID从“用户推荐商品列表 user_reco_list”这个表中获取信息。SQL语句可能是这样的:
select * from user_reco_list where user_id = 'user0001' and time='2019-06-22'

这个SQL的特点如下:

  • 简单:只有select *,没有join、group by。
  • 有关键字过滤:user_id = 'user0001'。
  • 返回的结果集少(大概返回几十行)。
  • user_reco_list表数据量很大(百亿级别)
  • 并发量很大,可能同时会有上万个用户同时登录。
  • 低时延:用户一登录要立刻显示推荐。

类似这种特点的业务查询就适合使用在线数据Phoenix。
2、 统计用户的浏览记录。
“用户推荐商品列表 user_reco_list”中数据是怎么来的呢?是从用户的浏览记录、购买记录、加入购物车记录等信息统计而来的。后台任务每天凌晨从用户的记录中进行大量的统计分析,然后把结果写入“用户推荐商品列表 user_reco_list”。SQL语句可能是这的:

select sum(click) as counts, user_id, reco_id from user_scan_list group by user_id, reco_id where times>= '2018-06-30' and times<'2018-12-30' 

这个SQL的特点如下:

  • 统计分析:有sum、group by。
  • 查询时间范围大:times的时间范围要半年,即扫描的数据量大。
  • 返回的结果集大,可能返回百万级。
  • user_scan_list表数据量很大,百亿级别、千亿级等。
  • 并发量小、每天凌晨计算一次。
  • 高时延:计算结果可能要分钟级别、甚至小时级别。

类似这种特点的业务查询就适合使用离线数仓:Spark 列存(Parquet)。
通过上面的例子大概可以认识到哪些场景适合Spark,哪些适合Phoenix。Spark和Phoenix相互配合,解决大数据的问题。

Spark如何建数仓

那Spark如何建数仓呢?本质就是把数据导入到Spark,使用Spark的列式存储文件格式(例如parquet)存储数据,使用Spark完成批处理、离线分析的业务。
例如在Spark创建一个以天为分区的明细表:

#创建parquet格式的分区表
create table test_parquet_pt(
    id int,
    name string,
    value double,
    dt string
) using parquet
partitioned by(dt);

#导入数据到Spark表:test_parquet_pt。
#例如从Phoenix表中增量导入1天的数据量。
insert into test_parquet_pt select * from phoenix_table where dt>='2019-01-02' and dt<'2019-01-03';

#分析Spark表
select avg(value) from test_parquet_pt group by dt;

上面只是个简单的实例,下面举例几个实际的业务场景。
先看下面的一个典型的业务场景。

_


上图是一个典型的复杂分析及查询系统。数据流程由图可见:
  1. 数据由APP、传感器、商业系统等客户的业务系统产生发送到Kafka系统。
  2. Spark Streming 对接kafka周期读取数据入库到在线数据库HBase/Phoenix,用户的运营系统实时查询在线数据库。
  3. HBase/Phoenix数据库周期同步到Spark数仓做离线计算。离线计算的结果写回HBase/Phoenix或者其他业务数据库。

上面是一个常用的方案。Spark创建数仓也有客户对数仓进行分层,例如下图:

_


客户把数仓分为四层:操作数据、公共明细、公共汇总、应用数据。每一层的数据由上一层汇聚、统计计算得来,每一层的数据应用于不同的业务场景。此场景的明细可参考:HBase+Spark构建游戏大数据平台
数据归档到Spark可参考X-Pack Spark提供的:批量归档

小结

本文只是对Spark做个入门的介绍,更详细的资料可参考:Spark社区资料。X-Pack Spark请参考: X-Pack Spark分析引擎

网友评论

登录后评论
0/500
评论
云hbase+spark
+ 关注