HBase实操:Spark-Read-HBase-Snapshot-Demo 分享

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

前言:之前给大家分享了Spark通过接口直接读取HBase的一个小demo:HBase-Spark-Read-Demo,但如果在数据量非常大的情况下,Spark直接扫描HBase表必然会对HBase集群造成不小的压力。基于此,今天再给大家分享一下Spark通过Snapshot直接读取HBase HFile文件的方式。

首先我们先创建一个HBase表:test,并插入几条数据,如下:

hbase(main):003:0> scan 'test'
ROW                                              COLUMN+CELL                                                                                                                                                                                                                       
 r1                                              column=f:name, timestamp=1583318512414, value=zpb                                                                                               
 r2                                              column=f:name, timestamp=1583318517079, value=lisi                                                                                               
 r3                                              column=f:name, timestamp=1583318520839, value=wang                                                                                               

接着,我们创建该HBase表的快照,其在HDFS上路径如下:

hbase(main):005:0> snapshot 'test', 'test-snapshot'
0 row(s) in 0.3690 seconds

$ hdfs dfs -ls /apps/hbase/data/.hbase-snapshot
Found 1 items
drwxr-xr-x   - hbase hdfs          0 2020-03-21 21:24 /apps/hbase/data/.hbase-snapshot/test-snapshot

代码如下:

import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.{SparkConf, SparkContext}


object SparkReadHBaseSnapshotDemo {

  //   主函数
  def main(args: Array[String]) {

    // 设置spark访问入口
    val conf = new SparkConf().setAppName("SparkReadHBaseSnapshotDemo")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .setMaster("local")//调试

    val sc = new SparkContext(conf)
    // 获取HbaseRDD
    val job = Job.getInstance(getHbaseConf())
    TableSnapshotInputFormat.setInput(job, "test-snapshot", new Path("/user/tmp"))

    val hbaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
      hbaseRDD.map(_._2).map(getRes(_)).count()
  }


  def getRes(result: org.apache.hadoop.hbase.client.Result): String = {
    val rowkey = Bytes.toString(result.getRow())
    val name = Bytes.toString(result.getValue("f".getBytes, "name".getBytes))
    println(rowkey+"---"+name)
    name
  }
  // 构造 Hbase 配置信息
  def getHbaseConf(): Configuration = {
    val conf: Configuration = HBaseConfiguration.create()
    conf.set(TableInputFormat.SCAN, getScanStr())
    conf
  }

  // 获取扫描器
  def getScanStr(): String = {
    val scan = new Scan()
    // scan.set....  各种过滤
    val proto = ProtobufUtil.toScan(scan)
    Base64.encodeBytes(proto.toByteArray())
  }
}

注:上述代码需将core-site.xml&hdfs-site.xml&hbase-site.xml文件放在资源目录resources下。否则,应在代码中进行配置,代码如下:

package com.xcar.etl

import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.{SparkConf, SparkContext}


object SparkReadHBaseSnapshotDemo2 {

  val HBASE_ZOOKEEPER_QUORUM = "xxxx.com.cn"

  //   主函数
  def main(args: Array[String]) {

    // 设置spark访问入口
    val conf = new SparkConf().setAppName("SparkReadHBaseSnapshotDemo2")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .setMaster("local")//调试

    val sc = new SparkContext(conf)
    // 获取HbaseRDD
    val job = Job.getInstance(getHbaseConf())
    TableSnapshotInputFormat.setInput(job, "test-snapshot", new Path("/user/tmp"))

    val hbaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    hbaseRDD.map(_._2).map(getRes(_)).count()
  }

  def getRes(result: org.apache.hadoop.hbase.client.Result): String = {
    val rowkey = Bytes.toString(result.getRow())
    val name = Bytes.toString(result.getValue("f".getBytes, "name".getBytes))
    println(rowkey+"---"+name)
    name
  }

  // 构造 Hbase 配置信息
  def getHbaseConf(): Configuration = {
    val conf: Configuration = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("zookeeper.znode.parent", "/hbase")
    conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM)
    conf.set("hbase.rootdir", "/apps/hbase")
    // 设置查询的表名
    conf.set(TableInputFormat.INPUT_TABLE, "test")
    conf.set("fs.defaultFS","hdfs://xxxxxx:8020") 
    conf.set(TableInputFormat.SCAN, getScanStr())
    conf
  }

  // 获取扫描器
  def getScanStr(): String = {
    val scan = new Scan()
    // scan.set....  各种过滤
    val proto = ProtobufUtil.toScan(scan)
    Base64.encodeBytes(proto.toByteArray())
  }
}

TableSnapshotInputFormat.setInput 方法参数解析:

public static void setInput(org.apache.hadoop.mapreduce.Job job,
                            String snapshotName,
                            org.apache.hadoop.fs.Path restoreDir)
                            throws IOException
参数解析:
job - the job to configure
snapshotName - the name of the snapshot to read from
restoreDir - a temporary directory to restore the snapshot into. 
Current user should have write permissions to this directory, and this should not be a subdirectory of rootdir. 
After the job is finished, restoreDir can be deleted.

项目用到的 pom.xml 文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zpb.test</groupId>
    <artifactId>spark-read-hbase-snapshot-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spark-read-hbase-snapshot-demo</name>
    <url>http://maven.apache.org</url>

    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

    <properties>
        <cdh.hbase.version>1.2.0-cdh5.7.0</cdh.hbase.version>
        <cdh.spark.version>1.6.0-cdh5.7.0</cdh.spark.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${cdh.spark.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${cdh.hbase.version}</version>
        </dependency>
    </dependencies>
</project>

欢迎关注本人公众号【HBase工作笔记】,解锁更多姿势~
900px

相关实践学习
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
2月前
|
分布式计算 API Spark
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
67 11
|
5月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
87 0
|
消息中间件 分布式计算 网络协议
Spark Streaming 快速入门(实操)
Spark Streaming 快速入门(实操)
340 0
Spark Streaming 快速入门(实操)
|
4月前
|
分布式计算 分布式数据库 API
Spark与HBase的集成与数据访问
Spark与HBase的集成与数据访问
|
分布式计算 分布式数据库 Scala
Spark查询Hbase小案例
写作目的 1)正好有些Spark连接HBase的需求,当个笔记本,到时候自己在写的时候,可以看 2)根据rowkey查询其实我还是查询了好久才找到,所以整理了一下 3)好久没发博客了,水一篇
179 0
Spark查询Hbase小案例
|
存储 缓存 分布式计算
Hadoop原理与技术——Hbase实操
Hadoop原理与技术——Hbase实操
110 0
Hadoop原理与技术——Hbase实操
|
分布式计算 数据处理 分布式数据库
《基于HBase和Spark构建企业级数据处理平台》电子版地址
基于HBase和Spark构建企业级数据处理平台
89 0
《基于HBase和Spark构建企业级数据处理平台》电子版地址
|
分布式计算 Hadoop Linux
云计算集群搭建记录[Hadoop|Zookeeper|Hbase|Spark | Docker]更新索引 |动态更新
为了能够更好的查看所更新的文章,讲该博文设为索引 小约定 为了解决在编辑文件等操作的过程中的权限问题,博主一律默认采用root账户登录 对于初次安装的用户可以采用如下命令行:
116 0
云计算集群搭建记录[Hadoop|Zookeeper|Hbase|Spark | Docker]更新索引 |动态更新
|
SQL 分布式计算 关系型数据库
|
9月前
|
SQL 分布式计算 Hadoop
Hadoop集群hbase的安装
Hadoop集群hbase的安装
147 0