Spark On HBASE

简介: 一、前言 MapReduce早已经对接了HBase,以HBase作为数据源,完成批量数据的读写。如今继MapReduce之后的Spark在大数据领域有着举足轻重的地位,无论跑批,流处理,甚至图计算等都有它的用武之地。

一、前言

MapReduce早已经对接了HBase,以HBase作为数据源,完成批量数据的读写。如今继MapReduce之后的Spark在大数据领域有着举足轻重的地位,无论跑批,流处理,甚至图计算等都有它的用武之地。Spark对接HBase成为不少用户的需求。

二、Spark On HBASE

1.可以解决的问题

Spark和HBASE无缝对接意味着我们不再需要关心安全和RDD与HBase交互的细节。更方便应用Spark带来的批处理,流处理等能力。比如以下常见的应用场景:

  1. 以HBase作为存储,通过Spark对流式数据处理。
  2. 以HBase作为存储,完成大规模的图或者DAG的计算。
  3. 通过Spark对HBase做BulkLoad操作
  4. 同Spark SQL对HBase数据做交互式分析

2.社区相关的工作

目前已经有多种Spark对接HBase的实现,这里我们选取三个有代表的工作进行分析:

2.1 华为: Spark-SQL-on-HBase

特点
扩展了Spark SQL的parse功能来对接HBase。通过coprocessor和自定义filter来提升读写性能。

优点

  • 扩展了对应的cli功能,支持scala shell和python shell
  • 多种性能优化方式,甚至支持sub plan到coprocessor实现partial aggregation.
  • 支持java和Python API
  • 支持row key组合
  • 支持常用DDL和DML(包括bulkload,但不支持update)

缺点

  • 不支持支持基于时间戳和版本的查询
  • 不支持安全
  • Row key支持原始类型或者String,不支持复杂数据类型

使用示例

  1. 在HBase中创建表,并写入数据
$HBase_Home/bin/hbase shell
create 'hbase_numbers', 'f'
for i in '1'..'100' do for j in '1'..'2' do put 'hbase_numbers', "row#{i}", "f:c#{j}", "#{i}#{j}" end end
  1. 使用sparksql创建表并与HBase表建立映射
$SPARK_HBASE_Home/bin/hbase-sql
CREATE TABLE numbers
rowkey STRING, a STRING, b STRING, PRIMARY KEY (rowkey)
MAPPED BY hbase_numbers COLS=[a=f.c1, b=f.c2];
  1. 查询
select a, b from numbers where b > "980"

2.2 Hortonworks: Apache HBase Connector

特点
以简单的方式实现了标准的Spark Datasource API,使用Spark Catalyst引擎做查询优化。同时通过scratch来构建RDD,也实现了许多常见的查询优化。

优点

  • native avro支持
  • 谓词下推和分区裁剪
  • 支持row key组合
  • 支持安全

缺点

  • SQL语法不够丰富,只支持spark sql原有的语法
  • 只支持java原始类型
  • 不支持多语言API

使用示例

  1. 定义 HBase Catalog
def catalog = s"""{
        |"table":{"namespace":"default", "name":"table1"},
        |"rowkey":"key",
        |"columns":{
          |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
          |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
          |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
          |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
          |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
          |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
          |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
          |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
          |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
        |}
      |}""".stripMargin
  1. 使用SQL查询
// Load the dataframe
val df = withCatalog(catalog)
//SQL example
df.createOrReplaceTempView("table")
sqlContext.sql("select count(col1) from table").show

2.3 Cloudrea: SparkOnHBase

特点
通过简单的接口实现链接Spark与HBASE, 支持常用的bulk读写。架构图如下:

Cloudera_Spark_2015_01

优点

  • 支持安全
  • 通过get或者scan直接生成rdd, 并可以使用API完成更高级的功能
  • 支持组合rowkey
  • 支持多种bulk操作
  • 为spark和 spark streaming提供相似的API
  • 支持谓词下推优化

缺点

  • 不支持复杂数据类型
  • SQL只支持spark sql原有的语法

使用示例

  1. 直接使用scan创建一个RDD
SparkConf sparkConf = new SparkConf().setAppName(  
                "Scan_RDD").set("spark.executor.memory", "2000m").setMaster(  
                "spark://xx.xx.xx.xx:7077")
                 .setJars(new String[]{"/path/to/hbase.jar"});  

val sc = new SparkContext(sparkConf)

val conf = HBaseConfiguration.create()

val hbaseContext = new HBaseContext(sc, conf)

var scan = new Scan()
scan.setCaching(100)

var getRdd = hbaseContext.hbaseRDD(tableName, scan)
  1. 创建一个RDD并把RDD的内容写入HBase
val sc = new SparkContext(sparkConf)

//This is making a RDD of
//(RowKey, columnFamily, columnQualifier, value)
val rdd = sc.parallelize(Array(
      (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
      (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
      (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
      (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
      (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
     )
    )

//Create the HBase config like you normally would  then
//Pass the HBase configs and SparkContext to the HBaseContext
val conf = HBaseConfiguration.create();
val hbaseContext = new HBaseContext(sc, conf);

//Now give the rdd, table name, and a function that will convert a RDD record to a put, and finally
// A flag if you want the puts to be batched
hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
    tableName,
    //This function is really important because it allows our source RDD to have data of any type
    // Also because puts are not serializable
    (putRecord) > {
      val put = new Put(putRecord._1)
      putRecord._2.foreach((putValue) > put.add(putValue._1, putValue._2, putValue._3))
       put
    },
    true);

2.4 综合对比

产品 SQL支持优化  支持安全 接口丰富易用度 易集成到HBase 社区活跃度
华为 近两年无更新
Hortonworks 较多 近一个月内有更新
Cloudrea 较高 已集成到HBASE trunk且持续更新

3. 最后

社区中有不少Spark on HBase的工作,出发点都是为了提供更易用,更高效的接口。其中Cloudrea的SparkOnHbase更加灵活简单,在2015年8月被提交到HBase的主干(trunk)上,模块名为HBase-Spark Module,目前准备在HBASE 2.0 正式Release, 相信这个特性一定是HBase新版本的一个亮点。 于此同时云HBase也会与社区同步发展,使用包括但不限于Spark On HBase的新特性,届时欢迎大家尝鲜。

如若文章中有不准确的描述,请多多指正,谢谢!

4. 参考

https://hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/
http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/
https://issues.apache.org/jira/browse/HBASE-13992
http://blog.madhukaraphatak.com/introduction-to-spark-two-part-6/
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-catalyst.htmlh

相关实践学习
云数据库HBase版使用教程
  相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情: https://cn.aliyun.com/product/hbase   ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
4月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
84 0
|
3月前
|
分布式计算 Kubernetes Java
spark on k8s native
spark on k8s native
|
3月前
|
分布式计算 分布式数据库 API
Spark与HBase的集成与数据访问
Spark与HBase的集成与数据访问
|
5月前
|
分布式计算 Kubernetes Serverless
Hago 的 Spark on ACK 实践
Hago 的 Spark on ACK 实践
|
6月前
|
分布式计算 资源调度 Hadoop
Spark on Yarn集群模式搭建及测试
Spark on Yarn集群模式搭建及测试
154 0
|
7月前
|
资源调度 分布式计算 大数据
大数据Spark on YARN
大数据Spark on YARN
79 0
|
9月前
|
资源调度 分布式计算 Hadoop
大数据平台搭建(容器环境)——Spark3.X on Yarn安装配置
大数据平台搭建(容器环境)——Spark3.X on Yarn安装配置
大数据平台搭建(容器环境)——Spark3.X on Yarn安装配置
|
12月前
|
SQL 分布式计算 资源调度
Spark on Yarn Job的执行流程简介
2017-12-19-Hadoop2.0架构及HA集群配置(1) 2017-12-24-Hadoop2.0架构及HA集群配置(2) 2017-12-25-Spark集群搭建 2017-12-29-Hadoop和Spark的异同 2017-12-28-Spark-HelloWorld(Spark开发环境搭建)
|
分布式计算 分布式数据库 Scala
Spark查询Hbase小案例
写作目的 1)正好有些Spark连接HBase的需求,当个笔记本,到时候自己在写的时候,可以看 2)根据rowkey查询其实我还是查询了好久才找到,所以整理了一下 3)好久没发博客了,水一篇
178 0
Spark查询Hbase小案例
|
分布式计算 数据处理 分布式数据库
《基于HBase和Spark构建企业级数据处理平台》电子版地址
基于HBase和Spark构建企业级数据处理平台
88 0
《基于HBase和Spark构建企业级数据处理平台》电子版地址