将mr写到Hbase上

简介:

新建maven项目
导入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.100</groupId>
  <artifactId>MRHbasetest</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
   
  <dependencies>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.3</version>
</dependency>

  <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.2.0</version>
</dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.2.0</version>
</dependency>
  
  </dependencies>
</project>

添加配置文件
(core-site.xml,hbase.site.xml,log4j.properties)

core-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
 <property>
                <name>fs.defaultFS</name>
                <value>hdfs://master:9000</value>
        </property>
        <property>
                <name>io.file.buffer.size</name>
                <value>131072</value>
        </property>
        <property>
                <name>hadoop.tmp.dir</name>
                <value>file:/usr/temp</value>
        </property>
        <property>
                <name>hadoop.proxyuser.root.hosts</name>
                <value>*</value>
        </property>
        <property>
                <name>hadoop.proxyuser.root.groups</name>
                <value>*</value>
        </property>
</configuration>

hbase-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
        <property>
                <name>hbase.zookeeper.quorum</name>
                <value>master,slave1,slave2</value>
                <description>The directory shared by RegionServers.</description>
        </property>
      
        <property>
        <name>hbase.zookeeper.property.clientPort</name>
        <value>2181</value>
        </property>
</configuration>

log4j.properties

# Global logging configuration
log4j.rootLogger=INFO, stdout
# MyBatis logging configuration...
log4j.logger.org 

 

.mybatis.example.BlogMapper=TRACE
# Console output...
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

代码

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

//计算wordcount 把结果保存到hbase里面
//bd17:wc 列簇:c 列名称 count 用单词table
public class MRToHbase {

    public static class MrToBaseMap extends Mapper<LongWritable, Text, Text, IntWritable> {

        private static final IntWritable ONE = new IntWritable(1);
        private String[] info;
        private Text outputKey = new Text();

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            info = value.toString().split("\\s");
            for (String word : info) {
                if(word.length()!=0){
                outputKey.set(word);
                context.write(outputKey, ONE);
                }
            }
        }

    }

    // reducer 类需要继承自hbase api 中提供的tablereducer 类型
    public static class MrToHBaseReduce extends TableReducer<Text, IntWritable, NullWritable> {
        private int sum;
        private NullWritable outputKey = NullWritable.get();
        private Put outputValue;

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, NullWritable, Mutation>.Context context)
                throws IOException, InterruptedException {

            sum = 0;
            for (IntWritable value : values) {
                System.out.println(value.toString());
                sum += value.get();
                // 构建put对象 即往hbase里面插入一条数据的具体内容
            }
            // 构建put对象 即往hbase里面插入一条数据的具体内容
            outputValue =new Put(Bytes.toBytes(key.toString()));
            outputValue.addColumn(Bytes.toBytes("c"), Bytes.toBytes("count"), Bytes.toBytes(sum+""));
            context.write(outputKey, outputValue);
        }

    }

    //main 方法启动  并且设置hbase链接和输出格式
    
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //使用hbaseconfiguration 来创建job的配置对象
        Configuration configuration =HBaseConfiguration.create();
        Job job =Job.getInstance(configuration);
        job.setJarByClass(MRToHbase.class);
        job.setJobName("wordcount写入到hbase");
        job.setMapperClass(MrToBaseMap.class);
        job.setReducerClass(MrToHBaseReduce.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Mutation.class);
        
        //使用TableMapReduceUtil 工具类来做与hbase 交互的mr的初始化设置
        TableMapReduceUtil.initTableReducerJob("bd17:wc", MrToHBaseReduce.class, job);
        FileInputFormat.addInputPath(job, new Path("/reversetext/reverse1.txt"));
        System.exit(job.waitForCompletion(true)?0:1);
    }
    
}
相关实践学习
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
4月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
82 0
|
8月前
|
SQL 分布式计算 Hadoop
Hadoop集群hbase的安装
Hadoop集群hbase的安装
140 0
|
4月前
|
分布式计算 Hadoop 关系型数据库
Hadoop任务scan Hbase 导出数据量变小分析
Hadoop任务scan Hbase 导出数据量变小分析
53 0
|
3月前
|
存储 分布式计算 Hadoop
Hadoop中的HBase是什么?请解释其作用和用途。
Hadoop中的HBase是什么?请解释其作用和用途。
40 0
|
4月前
|
SQL 分布式计算 Hadoop
Hadoop学习笔记(HDP)-Part.16 安装HBase
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
82 1
Hadoop学习笔记(HDP)-Part.16 安装HBase
|
8月前
|
分布式计算 Hadoop 分布式数据库
开机时监听Hadoop和Zookpeer启动之后再启动Hbase
开机时监听Hadoop和Zookpeer启动之后再启动Hbase
|
8月前
|
存储 分布式计算 Hadoop
Hadoop之Hbase安装和配置
Hadoop之Hbase安装和配置
721 0
|
SQL 分布式计算 安全
hadoop+hbase+zookeeper+hive
hadoop+hbase+zookeeper+hive
194 0
hadoop+hbase+zookeeper+hive
|
分布式计算 安全 Hadoop
hadoop+hbase+zookeeper安装指南
hadoop+hbase+zookeeper安装指南
213 0
hadoop+hbase+zookeeper安装指南
|
存储 缓存 分布式计算
Hadoop原理与技术——Hbase实操
Hadoop原理与技术——Hbase实操
108 0
Hadoop原理与技术——Hbase实操