《Hadoop实战手册》一1.7 从HDFS导出数据到MongoDB

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
简介:

本节书摘来异步社区《Hadoop实战手册》一书中的第1章,第1.7节,作者: 【美】Jonathan R. Owens , Jon Lentz , Brian Femiano 译者: 傅杰 , 赵磊 , 卢学裕 责编: 杨海玲,更多章节内容可以访问云栖社区“异步社区”公众号查看。

1.7 从HDFS导出数据到MongoDB

本节将使用MongoOutputFormat类加载HDFS中的数据并收集到MongoDB中。

准备工作
使用Mongo Hadoop适配器最简单的方法是从GitHub上克隆 Mongo-Hadoop工程,并且将该工程编译到一个特定的Hadoop版本。克隆该工程需要安装一个Git客户端。

本节假定你使用的Hadoop版本是CDH3。

Git客户端官方的下载地址是:http://git-scm.com/downloads

在Windows操作系统上可以通过http://windows.github.com/访问GitHub。

在Mac操作系统上可以通过http://mac.github.com/访问GitHub。

可以通过https://github.com/mongodb/mongo-hadoop获取到Mongo Hadoop适配器。该工程需要编译在特定的Hadoop版本上。编译完的JAR文件需要复制到Hadoop集群每个节点的$HADOOP_HOME/lib目录下。

Mongo Java的驱动包也需要安装到Hadoop集群每个节点的$HADOOP_HOME/lib目录下。该驱动包可从https://github.com/mongodb/mongo-java-driver/downloads下载。

操作步骤
完成下面步骤实现将HDFS数据复制到MongoDB。

1.通过下面的命令实现克隆mongo-hadoop工程:

git clone https://github.com/mongodb/mongo-hadoop.git
AI 代码解读

2.切换到稳定发布的1.0分支版本:

git checkout release-1.0
AI 代码解读

3.必须保持mongo-hadoop与Hadoop的版本一致。使用文本编辑器打开mongo-hadoop克隆目录下的build.sbt文件,将下面这行:

hadoopRelease in ThisBuild := "default"
AI 代码解读

修改为:

hadoopRelease in ThisBuild := "cdh3"
AI 代码解读

4.编译mongo-hadoop:

./sbt package.
AI 代码解读

这将会在core/target文件夹下生成一个名为mongo-hadoop-core_cdh3u3-1.0.0.jar的JAR文件。

5.从https://github.com/mongodb/mongo-java-driver/downloads下载MongoDB 2.8.0版本的Java驱动包。

6.复制mongo-hadoop和MongoDB Java驱动包到Hadoop集群每个节点的$HADOOP_HOME/lib:

cp mongo-hadoop-core_cdh3u3-1.0.0.jar mongo-2.8.0.jar $HADOOP_HOME/lib
AI 代码解读

7.编写MapReduce读取HDFS上weblog_entries.txt文件并通过MongoOutputFormat类将数据写入MongoDB中:

import java.io.*; 
import org.apache.commons.logging.*; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.*; 
import org.bson.*; 
import org.bson.types.ObjectId; 

import com.mongodb.hadoop.*; 
import com.mongodb.hadoop.util.*;

public class ExportToMongoDBFromHDFS { 

   private static final Log log = 
LogFactory.getLog(ExportToMongoDBFromHDFS.class); 
   public static class ReadWeblogs extends Mapper<LongWritable, Text,   
ObjectId, BSONObject>{ 

      public void map(Text key, Text value, Context context) throws   
IOException, InterruptedException{ 

         System.out.println("Key: " + key); 
         System.out.println("Value: " + value);

         String[] fields = value.toString().split("\t");

         String md5 = fields[0];
         String url = fields[1];
         String date = fields[2];
         String time = fields[3];
         String ip = fields[4];

         BSONObject b = new BasicBSONObject();
         b.put("md5", md5);
         b.put("url", url);
         b.put("date", date);
         b.put("time", time);
         b.put("ip", ip);
         context.write( new ObjectId(), b);
}
  }
  public static void main(String[] args) throws Exception{ 
      final Configuration conf = new Configuration(); 
      MongoConfigUtil.setOutputURI(conf,
      "mongodb://<HOST>:<PORT>/test. weblogs"); 
      System.out.println("Configuration: " + conf); 
      final Job job = new Job(conf, "Export to Mongo"); 
      Path in = new Path("/data/weblogs/weblog_entries.txt"); 
      FileInputFormat.setInputPaths(job, in); 
      job.setJarByClass(ExportToMongoDBFromHDFS.class); 
      job.setMapperClass(ReadWeblogs.class); 
      job.setOutputKeyClass(ObjectId.class); 
      job.setOutputValueClass(BSONObject.class); 
      job.setInputFormatClass(TextInputFormat.class); 
      job.setOutputFormatClass(MongoOutputFormat.class); 
      job.setNumReduceTasks(0); 
      System.exit(job.waitForCompletion(true) ? 0 : 1 ); 
   } 
}
AI 代码解读

8.导出为一个可运行的JAR文件,并运行该作业:

hadoop jar ExportToMongoDBFromHDFS.jar
AI 代码解读

9.在Mongo shell上验证weblogs已经导入MongoDB:

db.weblogs.find();
AI 代码解读

工作原理
Mongo Hadoop适配器提供了一种新的兼容Hadoop的文件系统实现包括MongoInputFormat和MongoOutputFormat。这些抽象实现使得访问MongoDB和访问任何兼容Hadoop的文件系统一样。

相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。 &nbsp; 相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
目录
打赏
0
相关文章
【赵渝强老师】史上最详细:Hadoop HDFS的体系架构
HDFS(Hadoop分布式文件系统)由三个核心组件构成:NameNode、DataNode和SecondaryNameNode。NameNode负责管理文件系统的命名空间和客户端请求,维护元数据文件fsimage和edits;DataNode存储实际的数据块,默认大小为128MB;SecondaryNameNode定期合并edits日志到fsimage中,但不作为NameNode的热备份。通过这些组件的协同工作,HDFS实现了高效、可靠的大规模数据存储与管理。
92 14
从Excel到Hadoop:数据规模的进化之路
从Excel到Hadoop:数据规模的进化之路
45 10
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
283 6
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
75 7
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
119 1
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
185 0
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
75 0
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
127 2
Flume+Hadoop:打造你的大数据处理流水线
本文介绍了如何使用Apache Flume采集日志数据并上传至Hadoop分布式文件系统(HDFS)。Flume是一个高可用、可靠的分布式系统,适用于大规模日志数据的采集和传输。文章详细描述了Flume的安装、配置及启动过程,并通过具体示例展示了如何将本地日志数据实时传输到HDFS中。同时,还提供了验证步骤,确保数据成功上传。最后,补充说明了使用文件模式作为channel以避免数据丢失的方法。
121 4