用 Hadoop 进行分布式数据处理,从 入门、进阶到应用开发

  1. 云栖社区>
  2. 博客列表>
  3. 正文

用 Hadoop 进行分布式数据处理,从 入门、进阶到应用开发

atlas2015 2013-09-27 23:05:00 浏览1351 评论0

摘要: 入门 简介: 本文是讨论 Hadoop 的系列中的第一篇。本文介绍 Hadoop 框架,包括 Hadoop 文件系统 (HDFS) 等基本元素和常用的节点类型。学习如何安装和配置单节点 Hadoop 集群,然后研究 MapReduce 应用程序。

入门

简介: 本文是讨论 Hadoop 的系列中的第一篇。本文介绍 Hadoop 框架,包括 Hadoop 文件系统 (HDFS) 等基本元素和常用的节点类型。学习如何安装和配置单节点 Hadoop 集群,然后研究 MapReduce 应用程序。最后,学习使用核心 Web 界面监视和管理 Hadoop 的方法。

尽管 Hadoop 是一些大型搜索引擎数据缩减功能的核心部分,但是它实际上是一个分布式数据处理框架。搜索引擎需要收集数据,而且是数量极大的数据。作为分布式框架,Hadoop 让许多应用程序能够受益于并行数据处理。

本文并不打算介绍 Hadoop 及其架构,而是演示一个简单的 Hadoop 设置。在 参考资料 中,可以找到关于 Hadoop 架构、组件和操作理论的更多信息。现在,我们来讨论 Hadoop 的安装和配置。

初始设置

对于本文中的示例,我们使用 Cloudera Hadoop 发行版。Cloudera 提供对各种 Linux® 发行版的支持,所以很适合初学者。

本文假设您的系统上已经安装了 Java™(至少是 1.6 版)和 cURL。如果还没有,需要先安装它们(更多信息见 参考资料)。

因为我运行 Ubuntu(Intrepid 版),所以使用 apt 实用程序获取 Hadoop 发行版。这个过程非常简单,我可以获取二进制包,而不需要下载并构建源代码。首先,告诉 apt Cloudera 站点的信息。然后,在 /etc/apt/sources.list.d/cloudera.list 中创建一个新文件并添加以下文本:

deb http://archive.cloudera.com/debian intrepid-cdh3 contrib
deb-src http://archive.cloudera.com/debian intrepid-cdh3 contrib

如果您运行 Jaunty 或其他版本,只需把 intrepid 替换为您的版本名(当前支持 Hardy、Intrepid、Jaunty、Karmic 和 Lenny)。

接下来,从 Cloudera 获取 apt-key 以检查下载的包:

$ curl -s http://archive.cloudera.com/debian/archive.key | \
sudo apt-key add - sudo apt-get update

然后,安装采用伪分布式配置的 Hadoop(所有 Hadoop 守护进程在同一个主机上运行):

$ sudo apt-get install hadoop-0.20-conf-pseudo
$

注意,这个配置大约 23MB(不包括 apt 可能下载的其他包)。这个配置非常适合体验 Hadoop 以及了解它的元素和界面。

最后,我设置了不需要密码的 SSH。如果打算使用 ssh localhost 并请求密码,就需要执行以下步骤。我假设这是专用的 Hadoop 机器,因为这个步骤对安全性有影响(见清单 1)。

清单1:设置不需要密码的SSH

$ sudo su -
# ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
# cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

最后,需要确保主机上有供 datanode使用的足够存储空间(缓存)。存储空间不足会导致系统表现异常(比如出现无法把数据复制到节点的错误)。

启动Hadoop

现在可以启动 Hadoop 了,这实际上要启动每个 Hadoop 守护进程。但是,首先使用 hadoop 命令对 Hadoop File System (HDFS) 进行格式化。hadoop 命令有许多用途,稍后讨论其中一部分。

首先,请求 namenode 对 DFS 文件系统进行格式化。在安装过程中完成了这个步骤,但是了解是否需要生成干净的文件系统是有用的。

# hadoop-0.20 namenode -format

在确认请求之后,文件系统进行格式化并返回一些信息。接下来,启动 Hadoop 守护进程。Hadoop 在这个伪分布式配置中启动 5 个守护进程:namenode、secondarynamenode、datanode、jobtracker 和 tasktracker。在启动每个守护进程时,会看到一些相关信息(指出存储日志的位置)。每个守护进程都在后台运行。图 1 说明完成启动之后伪分布式配置的架构。

图1伪分布式hadoop配置

伪分布式 Hadoop 配置的框图 

Hadoop 提供一些简化启动的辅助工具。这些工具分为启动(比如 start-dfs)和停止(比如 stop-dfs)两类。下面的简单脚本说明如何启动 Hadoop 节点:

# /usr/lib/hadoop-0.20/bin/start-dfs.sh
# /usr/lib/hadoop-0.20/bin/start-mapred.sh
#

要想检查守护进程是否正在运行,可以使用 jps 命令(这是用于 JVM 进程的 ps 实用程序)。这个命令列出 5 个守护进程及其进程标识符。

既然 Hadoop 守护进程已经在运行了,现在看看每个守护进程在 Hadoop 框架中的作用。namenode 是 Hadoop 中的主服务器,它管理文件系统名称空间和对集群中存储的文件的访问。还有一个 secondary namenode,它不是 namenode 的冗余守护进程,而是提供周期检查点和清理任务。在每个 Hadoop 集群中可以找到一个 namenode 和一个 secondary namenode。

datanode 管理连接到节点的存储(一个集群中可以有多个节点)。每个存储数据的节点运行一个 datanode 守护进程。

最后,每个集群有一个 jobtracker,它负责调度 datanode 上的工作。每个 datanode 有一个 tasktracker,它们执行实际工作。jobtracker 和 tasktracker 采用主-从形式,jobtracker 跨 datanode 分发工作,而 tasktracker 执行任务。jobtracker 还检查请求的工作,如果一个 datanode 由于某种原因失败,jobtracker 会重新调度以前的任务。

在这个简单的配置中,所有节点都驻留在同一个主机上(见 图 1)。但是,通过前面的讨论很容易看出 Hadoop 如何提供并行处理。尽管架构很简单,但是 Hadoop 能够方便地实现数据分发、负载平衡以及以容错的方式并行处理大量数据。

检查hdfs

可以通过几个检查确认 Hadoop(至少是 namenode)已经启动并正常运行。确认所有进程都在运行之后,可以使用 hadoop 命令检查本地名称空间(见清单 2)。

清单2检查对hdfs的访问
				
# hadoop-0.20 fs -ls /
Found 2 items
drwxr-xr-x   - root supergroup          0 2010-04-29 16:38 /user
drwxr-xr-x   - root supergroup          0 2010-04-29 16:28 /var
#

可以看出 namenode 已经启动,能够为本地名称空间提供服务。注意,使用 hadoop-0.20 命令检查文件系统。这个实用程序用于与 Hadoop 集群交互,包括检查文件系统、在集群中运行作业等等。请注意命令的结构:指定 hadoop-0.20 实用程序之后,定义一个命令(在这里是通用文件系统 shell)以及一个或多个选项(在这里使用 ls 请求文件列表)。因为 hadoop-0.20 是 Hadoop 集群的主要接口之一,您会看到本文中多次使用这个实用程序。清单 3 提供另外几个文件系统操作示例(创建子目录 test,列出它的内容,然后删除它),可以通过它们进一步了解这个接口。

清单3hdfs中文件系统操作
# hadoop-0.20 fs -mkdir test
# hadoop-0.20 fs -ls test
# hadoop-0.20 fs -rmr test
Deleted hdfs://localhost/user/root/test
# 

既然已经安装了 Hadoop 并测试了文件系统的基本接口,现在就该在真实的应用程序中测试 Hadoop 了。在这个示例中,我们使用 MapReduce 处理一个小数据集。map(映射) 和 reduce(缩减) 源自函数式编程中的函数名,但是这个应用程序的核心功能是数据缩减。映射 是指把大量输入处理成更小的子问题集(然后把这些子问题分发给并行的工作系统)。缩减 是指把子问题的答案组合成单一输出集。注意,这里没有定义处理 的含义,因为框架允许您自己定义什么是处理。典型的 MapReduce 示例是计算单词在文档集中出现的频率。

根据前面的讨论,我们需要一个输入集并产生一个输出集。第一步是在文件系统中创建一个 input 子目录,工作将放在这个目录中。使用以下命令:

# hadoop-0.20 fs -mkdir input

接下来,在 input 目录中放一些工作。在这里,使用 put 命令把文件从本地文件系统转移到 HDFS 中(见清单 4)。注意,下面的命令格式把源文件转移到 HDFS 子目录 (input) 中。完成之后,在 HDFS 中就有两个文本文件等待处理。

清单4把文件转移到hdfs中
				
# hadoop-0.20 fs -put /usr/src/linux-source-2.6.27/Doc*/memory-barriers.txt  input
# hadoop-0.20 fs -put /usr/src/linux-source-2.6.27/Doc*/rt-mutex-design.txt  input
#

接下来,使用 ls 命令检查文件是否存在(见清单 5)。

清单5 检查hdfs中的文件
# hadoop-0.20 fs -ls input
Found 2 items
-rw-r--r--  1 root supergroup 78031 2010-04-29 17:35 /user/root/input/memory-barriers.txt
-rw-r--r--  1 root supergroup 33567 2010-04-29 17:36 /user/root/input/rt-mutex-design.txt 
#

确认工作已经放在 HDFS 中之后,就可以执行 MapReduce 函数了。这个函数只需要一个命令,但是需要很长的请求,见清单 6。这个命令请求执行一个 JAR。它实际上实现许多功能,但是这个示例只使用 wordcount。jobtracker 守护进程请求 datanode 执行 MapReduce 作业,这会产生相当多的输出(这里的输出比较少是因为只处理两个文件)。它显示 map 和 reduce 函数的进度,然后提供与文件系统的 I/O 和记录处理相关的统计数据。

清单6执行计算单词频率的MapReduce程序
# hadoop-0.20 jar /usr/lib/hadoop-0.20/hadoop-0.20.2+228-examples.jar \
wordcount input output
10/04/29 17:36:49 INFO input.FileInputFormat: Total input paths to process : 2
10/04/29 17:36:49 INFO mapred.JobClient: Running job: job_201004291628_0009
10/04/29 17:36:50 INFO mapred.JobClient:  map 0% reduce 0%
10/04/29 17:37:00 INFO mapred.JobClient:  map 100% reduce 0%
10/04/29 17:37:06 INFO mapred.JobClient:  map 100% reduce 100%
10/04/29 17:37:08 INFO mapred.JobClient: Job complete: job_201004291628_0009
10/04/29 17:37:08 INFO mapred.JobClient: Counters: 17
10/04/29 17:37:08 INFO mapred.JobClient:   Job Counters 
10/04/29 17:37:08 INFO mapred.JobClient:     Launched reduce tasks=1
10/04/29 17:37:08 INFO mapred.JobClient:     Launched map tasks=2
10/04/29 17:37:08 INFO mapred.JobClient:     Data-local map tasks=2
10/04/29 17:37:08 INFO mapred.JobClient:   FileSystemCounters
10/04/29 17:37:08 INFO mapred.JobClient:     FILE_BYTES_READ=47556
10/04/29 17:37:08 INFO mapred.JobClient:     HDFS_BYTES_READ=111598
10/04/29 17:37:08 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=95182
10/04/29 17:37:08 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=30949
10/04/29 17:37:08 INFO mapred.JobClient:   Map-Reduce Framework
10/04/29 17:37:08 INFO mapred.JobClient:     Reduce input groups=2974
10/04/29 17:37:08 INFO mapred.JobClient:     Combine output records=3381
10/04/29 17:37:08 INFO mapred.JobClient:     Map input records=2937
10/04/29 17:37:08 INFO mapred.JobClient:     Reduce shuffle bytes=47562
10/04/29 17:37:08 INFO mapred.JobClient:     Reduce output records=2974
10/04/29 17:37:08 INFO mapred.JobClient:     Spilled Records=6762
10/04/29 17:37:08 INFO mapred.JobClient:     Map output bytes=168718
10/04/29 17:37:08 INFO mapred.JobClient:     Combine input records=17457
10/04/29 17:37:08 INFO mapred.JobClient:     Map output records=17457
10/04/29 17:37:08 INFO mapred.JobClient:     Reduce input records=3381

处理结束之后,检查结果。这个作业的作用是计算单词在输入文件中出现的次数。输出是一个包含元组的文件,元组表示单词和它在输入中出现的次数。找到输出文件之后,可以通过 hadoop-0.20 实用程序使用 cat 命令查看数据(见清单 7)。

清单7 检查MapReduce计算wordcount的输出结果
# hadoop-0.20 fs -ls /user/root/output
Found 2 items
drwxr-xr-x   - root supergroup          0 2010-04-29 17:36 /user/root/output/_logs
-rw-r--r--   1 root supergroup      30949 2010-04-29 17:37 /user/root/output/part-r-00000
#  
# hadoop-0.20 fs -cat output/part-r-00000 | head -13
!= 1
"Atomic 2
"Cache 2
"Control 1
"Examples 1
"Has 7
"Inter-CPU 1
"LOAD 1
"LOCK" 1
"Locking 1
"Locks 1
"MMIO 1
"Pending 5
#

还可以使用 hadoop-0.20 实用程序从 HDFS 中提取文件(见清单 8)。只需使用 get 实用程序(它与前面在 HDFS 中写文件所用的put 相似)。对于 get 操作,指定 HDFS 中要提取的文件(来自 output 子目录)和在本地文件系统中要写的文件 (output.txt)。

清单8 从hdfs中提取输出
# hadoop-0.20 fs -get output/part-r-00000 output.txt
# cat output.txt | head -5
!= 1
"Atomic 2
"Cache 2
"Control 1
"Examples 1
# 

我们再来看一个示例,它使用相同的 JAR,但是目的不同(在这里要试验并行的 grep)。对于这个测试,仍然使用现有的输入文件,但是要删除 output 子目录以便在测试时重新创建它:

# hadoop-0.20 fs -rmr output
Deleted hdfs://localhost/user/root/output

接下来,请求用于执行 grep 的 MapReduce 作业。在这种情况下,并行执行 grep(映射),然后组合 grep 的结果(缩减)。清单 9 给出这个使用模型的输出(为了简短,这里删除了一些输出)。注意,这里的命令请求是一个 grep,它从 input 子目录获取输入,把结果放在 output 子目录中。最后一个参数是要搜索的字符串(在这里是 'kernel')。

清单9执行单词计数的搜索mapreduce作业(grep)
# hadoop-0.20 jar /usr/lib/hadoop/hadoop-0.20.2+228-examples.jar \
grep input output 'kernel'
10/04/30 09:22:29 INFO mapred.FileInputFormat: Total input paths to process : 2
10/04/30 09:22:30 INFO mapred.JobClient: Running job: job_201004291628_0010
10/04/30 09:22:31 INFO mapred.JobClient:  map 0% reduce 0%
10/04/30 09:22:42 INFO mapred.JobClient:  map 66% reduce 0%
10/04/30 09:22:45 INFO mapred.JobClient:  map 100% reduce 0%
10/04/30 09:22:54 INFO mapred.JobClient:  map 100% reduce 100%
10/04/30 09:22:56 INFO mapred.JobClient: Job complete: job_201004291628_0010
10/04/30 09:22:56 INFO mapred.JobClient: Counters: 18
10/04/30 09:22:56 INFO mapred.JobClient:   Job Counters 
10/04/30 09:22:56 INFO mapred.JobClient:     Launched reduce tasks=1
10/04/30 09:22:56 INFO mapred.JobClient:     Launched map tasks=3
10/04/30 09:22:56 INFO mapred.JobClient:     Data-local map tasks=3
10/04/30 09:22:56 INFO mapred.JobClient:   FileSystemCounters
10/04/30 09:22:56 INFO mapred.JobClient:     FILE_BYTES_READ=57
10/04/30 09:22:56 INFO mapred.JobClient:     HDFS_BYTES_READ=113144
10/04/30 09:22:56 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=222
10/04/30 09:22:56 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=109
...
10/04/30 09:23:14 INFO mapred.JobClient:     Map output bytes=15
10/04/30 09:23:14 INFO mapred.JobClient:     Map input bytes=23
10/04/30 09:23:14 INFO mapred.JobClient:     Combine input records=0
10/04/30 09:23:14 INFO mapred.JobClient:     Map output records=1
10/04/30 09:23:14 INFO mapred.JobClient:     Reduce input records=1
#

作业完成之后,检查 output 目录,找到结果文件,然后通过执行文件系统 cat 操作查看其内容(见清单 10)。

清单10检查MapReduce的输出结果

# hadoop-0.20 fs -ls output
Found 2 items
drwxr-xr-x  - root supergroup    0 2010-04-30 09:22 /user/root/output/_logs
-rw-r--r--  1 root supergroup   10 2010-04-30 09:23 /user/root/output/part-00000
# hadoop-0.20 fs -cat output/part-00000
17 kernel
# 

基于web的界面

您已经知道如何检查 HDFS 了,但是如果要寻找 Hadoop 的操作的相关信息,会发现 Web 界面很有用。位于 Hadoop 集群最上层的是 namenode,它管理 HDFS。可以通过 http://localhost:50070 查看文件系统的高层信息(比如可用空间、已用空间和可用的 datanode)以及正在运行的作业。可以通过 http://localhost:50030 深入检查 jobtracker(作业状态)。注意,在这两种情况下都引用 localhost,因为所有守护进程都在同一个主机上运行。


进阶

分布式Hadoop的架构

根据 用 Hadoop 进行分布式数据处理,第 1 部分:入门,所有 Hadoop 守护进程都在同一个主机上运行。尽管不运用 Hadoop 的并行性,这个伪分布式配置提供一种简单的方式来以最少的设置测试 Hadoop 的功能。现在,让我们使用机器集群探讨一下 Hadoop 的并行性。

根据第 1 部分,Hadoop 配置定义了让所有 Hadoop 守护进程在一个节点上运行。因此,让我们首先看一下如何自然分布 Hadoop 来执行并行操作。在一个分布式 Hadoop 设置中,您有一个主节点和一些从节点(见图 1)。


图1 hadoop主从节点的分解
Hadoop 主从节点分解 

如图 1 所示,主节点包括名称节点、从属名称节点和 jobtracker 守护进程(即所谓的主守护进程)。此外,这是您为本演示管理集群所用的节点(使用 Hadoop 实用程序和浏览器)。从节点包括 tasktracker 和数据节点(从属守护进程)。两种设置的不同之处在于,主节点包括提供 Hadoop 集群管理和协调的守护进程,而从节点包括实现 Hadoop 文件系统(HDFS)存储功能和 MapReduce 功能(数据处理功能)的守护进程。

对于该演示,在一个 LAN 上创建一个主节点和两个从节点。设置如图 2 所示。现在,我们来探讨用于多节点分布的 Hadoop 的安装和配置。

图2 hadoop集群设计
Hadoop 集群配置 

为简化部署,要运用虚拟化技术,该技术有几个好处。尽管在该设置中使用虚拟化技术看不出性能优势,但是它可以创建一个 Hadoop 安装,然后为其他节点克隆该安装。为此,您的 Hadoop 集群应显示如下:在一个主机上的虚拟机监控程序上下文中将主从节点作为虚拟机(VM)运行(见图 3)。

图3虚拟环境中的hadoop配置
虚拟环境中的 Hadoop 集群配置

升级hadoop 

在 用 Hadoop 进行分布式数据处理,第 1 部分:入门 中,我们安装了在一个节点上运行的 Hadoop 的一个特殊分布(伪配置)。在本文中,我们要更新分布式配置。如果您没有看过本系列的第 1 部分,那么请阅读第 1 部分,了解如何首先安装 Hadoop 伪配置。

在伪配置中,您没有进行任何配置,因为单个节点已经过预先配置。现在,您需要更新配置。首先,使用 update-alternatives 命令检查当前配置,如清单 1 所示。该命令告诉您,配置在使用 conf.pseudo(最高优先级)。

清单1检查当前hadoop配置

$ update-alternatives --display hadoop-0.20-conf
hadoop-0.20-conf - status is auto.
 link currently points to /etc/hadoop-0.20/conf.pseudo
/etc/hadoop-0.20/conf.empty - priority 10
/etc/hadoop-0.20/conf.pseudo - priority 30
Current `best' version is /etc/hadoop-0.20/conf.pseudo.
$ 

下一步,通过复制现有配置(本例中为 conf.empty,如清单 1 所示)创建一个新配置:

$ sudo cp -r /etc/hadoop-0.20/conf.empty /etc/hadoop-0.20/conf.dist
$ 

最后,激活并检查新配置:

清单2 激活并检查hadoop配置
$ sudo update-alternatives --install /etc/hadoop-0.20/conf hadoop-0.20-conf \
  /etc/hadoop-0.20/conf.dist 40
$ update-alternatives --display hadoop-0.20-conf
hadoop-0.20-conf - status is auto.
 link currently points to /etc/hadoop-0.20/conf.dist
/etc/hadoop-0.20/conf.empty - priority 10
/etc/hadoop-0.20/conf.pseudo - priority 30
/etc/hadoop-0.20/conf.dist - priority 40
Current `best' version is /etc/hadoop-0.20/conf.dist.
$

现在,您有一个名为 conf.dist 的新配置,您要将其用于您的新分布式配置。此时该节点运行于一个虚拟环境中,将该节点克隆到另外两个要充当数据节点的节点中。

配置hadoop以实现分布式操作

下一步是要使所有节点互联互通。这可以 /etc/hadoop-0.20/conf.dist/ 中的两个名为 masters 和 slaves 的文件中实现。本例中的三个节点的 IP 地址是静态分配的,如清单 3 所示(来自 /etc/hosts):

清单3 改设置的hadoop节点
master 192.168.108.133
slave1 192.168.108.134
slave2 192.168.108.135

因此,在主节点上,更新 /etc/hadoop-0.20/conf.dist/masters 来确定主节点,如下所示:

master

然后在 /etc/hadoop-0.20/conf.dist/slaves 中确定从节点, 其中包括以下两行:

slave1
slave2

接下来,从每个节点上,将 Secure Shell (ssh) 连接到所有其他节点,确保 pass-phraseless ssh 在运行。所有这些文件(masters,slaves)都由本系列第 1 部分中使用过的 Hadoop 启动和停止工具使用。

下一步,在 /etc/hadoop-0.20/conf.dist 子目录中继续进行 Hadoop 配置。以下变更需要应用于所有节点(主节点和从节点),如同 Hadoop 文档中所定义的。首先,在 core-site.xml 文件(清单 4)中确定 HDFS 主节点,它定义名称节点的主机和端口(注意主节点的 IP 地址的使用)。core-site.xml 文件定义 Hadoop 的核心属性。

清单 4 在core-site.xml中定义主节点
<configuration>

  <property>
    <name>fs.default.name<name>
    <value>hdfs://master:54310<value>
    <description>The name and URI of the default FS.</description>
  <property>

<configuration>

下一步,确认 MapReduce jobtracker。jobtracker 位于其自己的节点上,但对于本配置,将其放在主节点上,如清单 5 所示。mapred-site.xml 文件包含 MapReduce 属性。

清单5 在mapred-site.xml中确认mapreduce 
<configuration>

  <property>
    <name>mapred.job.tracker<name>
    <value>master:54311<value>
    <description>Map Reduce jobtracker<description>
  <property>

<configuration>

最后,定义默认复制因子(清单 6)。该值定义将创建的副本数,一般小于 3。在本例中,将其定义为 2(数据节点的数量)。该值在包含 HDFS 属性的 hdfs-site.xml 中定义。

清单6 在hdfs-site.xml中定义默认数据副本
<configuration>

  <property>
    <name>dfs.replication<name>
    <value>2<value>
    <description>Default block replication<description>
  <property>

<configuration>

配置项如 清单 4 所示,分布式设置所需的元素见 清单 5 和 清单 6。Hadoop 在这里提供大量配置选项,支持您按需定制整个环境。参考资料 部分含有关于这些选项的更多信息。

完成配置之后,下一步是要格式化名称节点(HDFS 主节点)。对于该操作,使用 hadoop-0.20 实用程序指定名称节点和操作(-format):

清单7 格式化名称节点
user@master:~# sudo su -
root@master:~# hadoop-0.20 namenode -format
10/05/11 18:39:58 INFO namenode.NameNode: STARTUP_MSG: 
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = master/127.0.1.1
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 0.20.2+228
STARTUP_MSG:   build =  -r cfc3233ece0769b11af9add328261295aaf4d1ad; 
************************************************************/
10/05/11 18:39:59 INFO namenode.FSNamesystem: fsOwner=root,root
10/05/11 18:39:59 INFO namenode.FSNamesystem: supergroup=supergroup
10/05/11 18:39:59 INFO namenode.FSNamesystem: isPermissionEnabled=true
10/05/11 18:39:59 INFO common.Storage: Image file of size 94 saved in 0 seconds.
10/05/11 18:39:59 INFO common.Storage: 
  Storage directory /tmp/hadoop-root/dfs/name has been successfully formatted.
10/05/11 18:39:59 INFO namenode.NameNode: SHUTDOWN_MSG: 
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at master/127.0.1.1
************************************************************/
root@master:~# 

格式化名称节点之后,就可以启动 Hadoop 守护进程了。可以对第 1 部分中的伪分布式配置执行同样的操作,但进程为分布式配置完成同样的工作。注意,这里的代码启动名称节点和从属名称节点(正如 jps 命令所指示):

清单8 启动名称节点
root@master:~# /usr/lib/hadoop-0.20/bin/start-dfs.sh
starting namenode, logging to 
  /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-namenode-mtj-desktop.out
192.168.108.135: starting datanode, logging to 
  /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-datanode-mtj-desktop.out
192.168.108.134: starting datanode, logging to 
  /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-datanode-mtj-desktop.out
192.168.108.133: starting secondarynamenode, 
  logging to /usr/lib/hadoop-0.20/logs/hadoop-root-secondarynamenode-mtj-desktop.out
root@master:~# jps
7367 NameNode
7618 Jps
7522 SecondaryNameNode
root@master:~# 

现在,如果使用 jps 节点检测其中一个从节点(数据节点),您会看到每个节点上都有一个数据节点守护进程:

清单9 访问其中一个从节点的数据节点
root@slave1:~# jps
10562 Jps
10451 DataNode
root@slave1:~# 

下一步是要启动 MapReduce 守护进程(jobtracker 和 tasktracker)。如 清单 10 所示执行该操作。注意,脚本启动主节点上的 jobtracker(正如配置所定义的;参见 清单 5)和每个从节点上的 tasktrackers。主节点上的一个 jps 命令显示 jobtracker 正在运行。

清单10 启动MapReduce守护进程
root@master:~# /usr/lib/hadoop-0.20/bin/start-mapred.sh
starting jobtracker, logging to 
  /usr/lib/hadoop-0.20/logs/hadoop-root-jobtracker-mtj-desktop.out
192.168.108.134: starting tasktracker, logging to 
  /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-tasktracker-mtj-desktop.out
192.168.108.135: starting tasktracker, logging to 
  /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-tasktracker-mtj-desktop.out
root@master:~# jps
7367 NameNode
7842 JobTracker
7938 Jps
7522 SecondaryNameNode
root@master:~# 

最后,使用 jps 检查一个从节点。这里您可以看到,一个 tasktracker 守护进程将数据节点守护进程联接到每个从数据节点上:

清单11 检测其中一个从节点上的数据节点
root@slave1:~# jps
7785 DataNode
8114 Jps
7991 TaskTracker
root@slave1:~# 

启动脚本、节点和启动的守护进程之间的关系如图 4 所示。如您所见,start-dfs 脚本启动名称节点和数据节点,而 start-mapred 脚本启动 jobtracker 和 tasktrackers。

图4 每个节点的启动进程和守护进程的关系
每个节点的启动脚本和守护进程的关系 

测试HDFS

既然 Hadoop 已经开始在整个集群中运行了,您可以运行一些测试来确保其正常运作(见清单 12)。首先,通过 hadoop-0.20 实用程序发出一个文件系统命令(fs),并请求一个 df(disk free)操作。与在 Linux® 中一样,该命令仅确定特定设备的已用空间和可用空间。因此,对于新格式化的文件系统,就没有已用空间。下一步,在 HDFS 的根上执行一个 ls 操作,创建一个子目录,列出其内容,并删除它。最后,在 hadoop-0.20 实用程序内,您可以使用 fsck 命令在 HDFS 上执行一个 fsck(文件系统检查)。这一切 — 以及各种其他信息(比如检测到两个数据节点)— 都告诉您文件系统是正常的。

清单12 检查HDFS
root@master:~# hadoop-0.20 fs -df
File system		Size	Used	Avail		Use%
/		16078839808	73728	3490967552	0%
root@master:~# hadoop-0.20 fs -ls /
Found 1 items
drwxr-xr-x   - root supergroup          0 2010-05-12 12:16 /tmp
root@master:~# hadoop-0.20 fs -mkdir test
root@master:~# hadoop-0.20 fs -ls test
root@master:~# hadoop-0.20 fs -rmr test
Deleted hdfs://192.168.108.133:54310/user/root/test
root@master:~# hadoop-0.20 fsck /
.Status: HEALTHY
 Total size:	4 B
 Total dirs:	6
 Total files:	1
 Total blocks (validated):	1 (avg. block size 4 B)
 Minimally replicated blocks:	1 (100.0 %)
 Over-replicated blocks:	0 (0.0 %)
 Under-replicated blocks:	0 (0.0 %)
 Mis-replicated blocks:		0 (0.0 %)
 Default replication factor:	2
 Average block replication:	2.0
 Corrupt blocks:		0
 Missing replicas:		0 (0.0 %)
 Number of data-nodes:		2
 Number of racks:		1

The filesystem under path '/' is HEALTHY
root@master:~#

执行一个MapReduce作业

下一步是执行一个 MapReduce 作业,以验证整个设置运作正常(见清单 13)。该进程的第一步是要引入一些数据。因此,首先创建一个目录来容纳您的输入数据(称为 input),创建方式是使用 hadoop-0.20 实用程序的 mkdir 命令。然后,使用 hadoop-0.20 的put 命令将两个文件放到 HDFS 中。您可以使用 Hadoop 实用程序的 ls 命令检查输入目录的内容。

清单13 生成输入数据
				
root@master:~# hadoop-0.20 fs -mkdir input
root@master:~# hadoop-0.20 fs -put \
  /usr/src/linux-source-2.6.27/Doc*/memory-barriers.txt input
root@master:~# hadoop-0.20 fs -put \
  /usr/src/linux-source-2.6.27/Doc*/rt-mutex-design.txt input
root@master:~# hadoop-0.20 fs -ls input
Found 2 items
-rw-r--r--  2 root supergroup  78031 2010-05-12 14:16 /user/root/input/memory-barriers.txt
-rw-r--r--  2 root supergroup  33567 2010-05-12 14:16 /user/root/input/rt-mutex-design.txt
root@master:~# 

下一步,启动 wordcount MapReduce 作业。与在伪分布式模型中一样,指定输入子目录(包含输入文件)和输出目录(不存在,但会由名称节点创建并用结果数据填充):

清单14 在集群上运行MapReduce wordcount计数
root@master:~# hadoop-0.20 jar \
  /usr/lib/hadoop-0.20/hadoop-0.20.2+228-examples.jar wordcount input output
10/05/12 19:04:37 INFO input.FileInputFormat: Total input paths to process : 2
10/05/12 19:04:38 INFO mapred.JobClient: Running job: job_201005121900_0001
10/05/12 19:04:39 INFO mapred.JobClient:  map 0% reduce 0%
10/05/12 19:04:59 INFO mapred.JobClient:  map 50% reduce 0%
10/05/12 19:05:08 INFO mapred.JobClient:  map 100% reduce 16%
10/05/12 19:05:17 INFO mapred.JobClient:  map 100% reduce 100%
10/05/12 19:05:19 INFO mapred.JobClient: Job complete: job_201005121900_0001
10/05/12 19:05:19 INFO mapred.JobClient: Counters: 17
10/05/12 19:05:19 INFO mapred.JobClient:   Job Counters 
10/05/12 19:05:19 INFO mapred.JobClient:     Launched reduce tasks=1
10/05/12 19:05:19 INFO mapred.JobClient:     Launched map tasks=2
10/05/12 19:05:19 INFO mapred.JobClient:     Data-local map tasks=2
10/05/12 19:05:19 INFO mapred.JobClient:   FileSystemCounters
10/05/12 19:05:19 INFO mapred.JobClient:     FILE_BYTES_READ=47556
10/05/12 19:05:19 INFO mapred.JobClient:     HDFS_BYTES_READ=111598
10/05/12 19:05:19 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=95182
10/05/12 19:05:19 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=30949
10/05/12 19:05:19 INFO mapred.JobClient:   Map-Reduce Framework
10/05/12 19:05:19 INFO mapred.JobClient:     Reduce input groups=2974
10/05/12 19:05:19 INFO mapred.JobClient:     Combine output records=3381
10/05/12 19:05:19 INFO mapred.JobClient:     Map input records=2937
10/05/12 19:05:19 INFO mapred.JobClient:     Reduce shuffle bytes=47562
10/05/12 19:05:19 INFO mapred.JobClient:     Reduce output records=2974
10/05/12 19:05:19 INFO mapred.JobClient:     Spilled Records=6762
10/05/12 19:05:19 INFO mapred.JobClient:     Map output bytes=168718
10/05/12 19:05:19 INFO mapred.JobClient:     Combine input records=17457
10/05/12 19:05:19 INFO mapred.JobClient:     Map output records=17457
10/05/12 19:05:19 INFO mapred.JobClient:     Reduce input records=3381
root@master:~#

最后一步是探索输出数据。由于您运行了 wordcount MapReduce 作业,结果是一个文件(从已处理映射文件缩减而来)。该文件包含一个元组列表,表示输入文件中找到的单词和它们在所有输入文件中出现的次数:

清单15 检测mapreduce的输出
root@master:~# hadoop-0.20 fs -ls output
Found 2 items
drwxr-xr-x   - root supergroup          0 2010-05-12 19:04 /user/root/output/_logs
-rw-r--r--   2 root supergroup      30949 2010-05-12 19:05 /user/root/output/part-r-00000
root@master:~# hadoop-0.20 fs -cat output/part-r-00000 | head -13
!=	1
"Atomic	2
"Cache	2
"Control	1
"Examples	1
"Has	7
"Inter-CPU	1
"LOAD	1
"LOCK"	1
"Locking	1
"Locks	1
"MMIO	1
"Pending	5
root@master:~# 

尽管 hadoop-0.20 实用程序的功能极其丰富,但有时使用一个 GUI 会更方便。在执行文件系统检测时,您可以通过 http://master:50070 链接到名称节点,通过 http://master:50030 连接到 jobtracker 。您可以通过名称节点检测 HDFS,如图 5 所示,在这里您检测输入目录(包含输入数据 — 见上面 清单 13)。

图5 通过名称节点检测HDFS
通过名称节点检测 HDFS 

通过 jobtracker,您可以检测运行中或已完成的作业。在图 6 中,您可以看到对最后一个作业的检测(来自 清单 14)。该图展示了作为 Java 存档(JAR)请求的输出发出的各种数据,以及任务的状态和数量。注意,这里执行了两个映射任务(每个输入文件一个映射)和一个缩减任务(用于缩减两个映射输入)。

图6 检测一个已完成作业的状态
检查一个已完成作业的状态 

最后,您可以通过名称节点检查数据节点的状态。名称节点主页确定活动节点和死节点(作为链接)的数量,且允许您进一步检测它们。图 7 所示的页面显示了活动数据节点以及每个节点的统计数据。

图7 检查活动数据节点的状态
检查活动数据节点的状态 

通过名称节点和 jobtracker Web 界面,可以查看许多其他视图,但出于简洁,只显示该样例集。在名称节点和 jobtracker Web 页面内,您会找到大量链接,从而引导您获取有关 Hadoop 配置和操作的其他信息(包括运行时日志)。

此系列的文章 专注于单节点和多节点集群的 Hadoop 安装及配置。最后这篇文章探索了 Hadoop 编程 — 特别是在 Ruby 语言中 map 和 reduce 应用程序开发。我之所以选择 Ruby,首先是因为,它是一个您应该知道的很棒的面向对象的脚本语言,其次,您将在 参考资料 部分发现很多参考,其中包括解决 Java™ 和 Python 语言的教程。通过这种 MapReduce 编程的探索,将向您介绍流式应用程序编程接口(Application Programming Interface,API)。此 API 提供方法以便在 Java 语言以外的多种语言中开发应用程序。

让我们开始简要介绍一下 map 和 reduce(从功能的角度考虑),然后再进一步钻研 Hadoop 编程模型及其体系结构和用来雕刻、分配、管理工作的元素。

第 3 部分: 应用程序开发

maphe reduce的起源

是什么功能性元素激发了 MapReduce 编程范例的创立?在 1958 年,John McCarthy 发明了名为 Lisp 的语言,其实现了数值和符号计算,但在递归形式下此语言非常不同于现在所使用的大多数语言。(在维基百科全书上记述着 Lisp 那段迷人的历史,同时包括一个有用的教程 — 值得您花费时间来阅读。)Lisp 最先是在 IBM® 704 中实现的,IBM® 704 是第一种大规模生产的计算机,也支持其他旧的语言,如 FORTRAN。

map 函数,源于功能性语言(如 Lisp)但如今在其他语言中也很常见,其中包含了一系列元素的函数的应用程序。这意味着什么? 清单 1 通过 Scheme Shell (SCSH) 提供解释会话,即一个 Lisp 衍生。第一行定义一个名为 square 的函数,该函数可接受参数并发出其平方根。下一行说明 map 函数的使用。如图所示,通过 map,为已应用的函数提供您的函数和一系列元素。结果是一个包含平方元素的新列表。

清单1 SCSH上的map函数演示
> (define square (lambda (x) (* x x)))
> (map square '(1 3 5 7))
'(1 9 25 49)
>

Reduce 也适用于列表但是通常将列表缩减为标量值。清单 2中提供的示例说明用于将列表缩减为标量的其他 SCSH 函数 — 在这种情况下,用 (1 + (2 + (3 + (4 + (5))))) 的格式汇总值的列表。请注意这是典型的功能性编程,取决于迭代上的递归。

清单2 SCSH上的reduce函数演示
> (define (list-sum lis) (if (null? lis) 0 (+ (car lis) (list-sum (cdr lis)))))
> (list-sum '(1 2 3 4 5))
15
> 

有趣的是要注意递归与迭代在命令性语言中同样高效,因为递归在幕后被转化成迭代。

hadoop的编程模型

Google 引用 MapReduce 的概念作为处理或生成大型数据集的编程模型。在规范模型中,map 函数处理键值对,这将得出键值对的中间集。然后 reduce 函数会处理这些中间键值对,并合并相关键的值(请参考图 1)。输入数据使用这样一种方法进行分区,即在并行处理的计算机集群中分区的方法。使用相同的方法,已生成的中间数据将被并行处理,这是处理大量数据的理想方法。

图1 MapReduce的处理的简化流程
MapReduce 处理的简化视图 

对于快速刷新器来说,查看图 1 的体系结构,从 map 和 reduce 角度来进行字数统计(因为您将在本文中开发 map 和 reduce 应用程序)。在提供输入数据时(进入 Hadoop 文件系统 [HDFS]),首先分段,然后分配给 map 工作线程(通过作业跟踪器)。虽然 图 2 中的示例显示了一个被分段的简短语句,但是分段的工作数量通常在 128MB 范围内,其原因是建立工作只需要很少的时间,因为有更多的工作要做,以便最大限度地减少这种开销。map 工作线程(在规范的示例中)将工作分割成包含已标记单词和初始值(在此情况下是 1)的单个矢量。在 map 任务完成时(如通过任务跟踪器在 Hadoop 中所定义的),提供工作给 reduce 工作线程。通过代表所发现的键的数量的值,reduce 工作线程将许多键缩减为一个惟一的集合。

图2 简单的maprecduce流程
简单的 MapReduce 示例 

请注意此过程可在相同的或不同的计算机中出现或者使用不同的数据分区来按顺序或并行完成,且结果仍然是相同的。

虽然规范的视图(用于使用字数统计生成搜索索引)是一种用来查看 Hadoop 方法,但结果是此计算模型被常规地应用到可计算问题上,正如您将要看到的那样。

hadoop的灵活性

从 图 2 中所示的简单示例看,需注意 map 和 reduce 过程这两个主要元素。虽然这里存在一个这些过程如何工作的传统视图,但是它不是 map 和 reduce 体系结构所需要的。这就是 Hadoop 的真实力量 — 其灵活性用来实现在某种程度上活动的 map 和 reduce 过程,这解决了一个特定的应用程序。虽然字数统计示例对于大量的问题是有用且适用的,但是其他的模型仍然在此总体框架内适用。所需的就是使 map 和 reduce 应用程序的开发过程对于 Hadoop 可见。

在其他的应用程序中,Hadoop 已经被用于实现包括神经网络算法的计算机学习应用程序,支持矢量计算机以及 k-means 集群(要获得更多信息,请参考 参考资料 部分)。

数据流

虽然 Hadoop 是一个基于 Java 的框架,但是其有可能在 Java 语言以外的语言中编写 msp 和 reduce 应用程序。Hadoop 内的  实用工具实现了一种数据流胶的类型。通过  实用工具,您可以定义您自己的可执行 map 和 reduce(使用每一个从标准输入 [stdin] 提取的输入和每一个通过标准输出 [stdout] 提供的输出),且  实用工具可适当地读取和写入数据,根据需要调用您的应用程序(请参考清单 3)。

清单3  使用hadoop流工具
				
hadoop jar $HADOOP_HOME/hadoop-流.jar \
	-input inputData
	-output outputData
	-mapper map_exec
	-reducer reduce_exec

清单 3 说明如何在 Hadoop 内使用  实用工具,图 3 图形化地显示了如何定义流。请注意这是一个流使用的简单示例。大量的选项可用于制定如何解析数据、制定如何调用图像、为分区器和合成器指定替换图像以及调整其他配置(要获得更多信息,请参考 参考资料 部分)。

图3 图形流示例
图形流示例 

Ruby示例

通过已经获得的在  实用工具基本理解上的经验,您已经准备编写一个简单的 Ruby map 和 reduce 应用程序并查看如何在 Hadoop 框架中使用过程。虽然此处的示例伴随着规范的 MapReduce 应用程序,但是稍后您将看到其他的应用程序(取决于您将如何用 map 和 reduce 格式实现它们)。

首选是 mapper。此脚本从 stdin 提取文本输入,首先标记它,然后将一系列键值对发送到 stdout。像大多数面向对象的脚本语言一样,这个任务几乎太简单了。如清单 4 中所示的 mapper 脚本(通过一些注释和空白区域可给与其大一点的大小)。此程序使用一个迭代器来从 stdin 中读取一行,同时另一个迭代器将该行分割成单个的标记。使用为 1 的相关值(通过选项卡分隔)将每一个标记(单词)发送到 stdout。

清单4 map ruby脚本
#!/usr/bin/env ruby

# Our input comes from STDIN
STDIN.each_line do |line|

  # Iterate over the line, splitting the words from the line and emitting
  # as the word with a count of 1.
  line.split.each do |word|
    puts "#{word}\t1"
  end

end

下一步,查看 reduce 应用程序。虽然此应用程序稍微有些复杂,但是使用 Ruby hash(关联阵列)可简化 reduce 操作(请参考清单 5)。此脚本可通过来自 stdin (通过  实用工具传递)的输入数据再次工作且将该行分割成一个单词或值。而后该 hash 会检查该单词;如果发现,则将计数添加到元素。否则,您需要在该单词的 hash 中创建新的条目,然后加载计数(应该是来自 mapper 过程的 1)。在所有输入都被处理以后,通过 hash 可简单迭代且将键值对发送到 stdout。

清单5 ruby reduce 脚本
#!/usr/bin/env ruby

# Create an empty word hash
wordhash = {}

# Our input comes from STDIN, operating on each line
STDIN.each_line do |line|

  # Each line will represent a word and count
  word, count = line.strip.split

  # If we have the word in the hash, add the count to it, otherwise
  # create a new one.
  if wordhash.has_key?(word)
    wordhash[word] += count.to_i
  else
    wordhash[word] = count.to_i
  end

end

# Iterate through and emit the word counters
wordhash.each {|record, count| puts "#{record}\t#{count}"}

随着 map 和 reduce 脚本的完成,需从命令行测试它们。记得要使用 chmod +x 将这些文件更改为可执行。通过生成输入文件来启动,如清单 6 所示。

清单6 生成输入文件
# echo "Hadoop is an implementation of the map reduce framework for " \
	"distributed processing of large data sets." > input
#

通过单词输入,现在您可以测试您的 mapper 脚本,如清单 7 所示。回想此脚本简单地将输入标记到键值对,此处每个值都将是1(非惟一输入)。

清单7 测试mapper脚本
# cat input | ruby map.rb
Hadoop	1
is	1
an	1
implementation	1
of	1
the	1
map	1
reduce	1
framework	1
for	1
distributed	1
processing	1
of	1
large	1
data	1
sets.	1
#

到目前为止,一切都很顺利。现在,在原始流格式中将整个应用程序一起调出。在清单 8 中,通过 map 脚本传递您的输入、排序输出(可选步骤)、然后通过 reduce 脚本传递由此产生的中间数据。

清单8 使用linux管道的简单mapreduce

# cat input | ruby map.rb | sort | ruby reduce.rb
large	1
of	2
framework	1
distributed	1
data	1
an	1
the	1
reduce	1
map	1
sets.	1
Hadoop	1
implementation	1
for	1
processing	1
is	1
#

使用hadoop的ruby

在 shell 环境中您的 map 和 reduce 脚本按预期工作,通过 Hadoop 将它们放入测试中。我将会跳过 Hadoop 安装任务(参考本系列的 用 Hadoop 进行分布式数据处理,第 1 部分:入门 或 用 Hadoop 进行分布式数据处理,第 2 部分:进阶 以便建立 Hadoop 并使其运行)。

第一步将要在 HDFS 内为您的输入信息创建输入字典,然后提供一个将测试您脚本的简单文件。清单 9 说明了此步骤(有关这些步骤的更多信息,请参考本系列的 用 Hadoop 进行分布式数据处理,第 1 部分:入门 或 用 Hadoop 进行分布式数据处理,第 2 部分:进阶)。

清单9 为mapreduce过程创建输入文件
# hadoop fs -mkdir input
# hadoop dfs -put /usr/src/linux-source-2.6.27/Documentation/memory-barriers.txt input
# hadoop fs -ls input
Found 1 items
-rw-r--r--  1 root supergroup  78031 2010-06-04 17:36 /user/root/input/memory-barriers.txt
# 

下一步,使用  实用工具,通过自定义脚本来调用 Hadoop,简化输出的输入数据和位置(请参考清单 10)。在此示例中请注意 -file 选项会简单地告诉 Hadoop 来打包您的 Ruby 脚本作为部分作业提交。

请单10 使用ruby mapreduce脚本使用hadoop流

# hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-0.20.2+228-streaming.jar \
  -file /home/mtj/ruby/map.rb -mapper /home/mtj/ruby/map.rb \
  -file /home/mtj/ruby/reduce.rb -reducer /home/mtj/ruby/reduce.rb \
  -input input/* -output output
packageJobJar: [/home/mtj/ruby/map.rb, /home/mtj/ruby/reduce.rb, /var/lib/hadoop-0.20/...
10/06/04 17:42:38 INFO mapred.FileInputFormat: Total input paths to process : 1
10/06/04 17:42:39 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/...
10/06/04 17:42:39 INFO streaming.StreamJob: Running job: job_201006041053_0001
10/06/04 17:42:39 INFO streaming.StreamJob: To kill this job, run:
10/06/04 17:42:39 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job ...
10/06/04 17:42:39 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/...
10/06/04 17:42:40 INFO streaming.StreamJob:  map 0%  reduce 0%
10/06/04 17:43:17 INFO streaming.StreamJob:  map 100%  reduce 0%
10/06/04 17:43:26 INFO streaming.StreamJob:  map 100%  reduce 100%
10/06/04 17:43:29 INFO streaming.StreamJob: Job complete: job_201006041053_0001
10/06/04 17:43:29 INFO streaming.StreamJob: Output: output
# 

最后,通过 hadoop 实用工具使用 cat 文件系统操作来探索输出(请参考清单 11)。

清单11 exploring hadoop output
# hadoop fs -ls /user/root/output
Found 2 items
drwxr-xr-x  - root supergroup      0 2010-06-04 17:42 /user/root/output/_logs
-rw-r--r--  1 root supergroup  23014 2010-06-04 17:43 /user/root/output/part-00000
# hadoop fs -cat /user/root/output/part-00000 | head -12
+--->|	4
immediate	2
Alpha)	1
enable	1
_mandatory_	1
Systems	1
DMA.	2
AMD64	1
{*C,*D},	2
certainly	2
back	2
this	23
# 

在不到 30 行的脚本中,您已经在 Hadoop 框架内实现了 map 和 reduce 元素并演示了它们的执行。虽然是一个简单的示例,但是通过自定义的和专有的算法说明了 Hadoop 背后真实的力量以及为什么 Hadoop 正在成为一种用于处理大型数据集的流行框架。

hadoop其他的应用程序

Hadoop 可用于许多应用程序上,其已超越了为大型数据集简单计算字数的工作。所有这一切的需要就是用矢量格式表达 Hadoop 基础设施可以使用的数据。虽然规范的示例使用矢量表达作为键和值,但是并没有限制您如何来定义值(例如一些值的汇总)。在更加丰富的应用程序集中此灵活性可以为 Hadoop 创造新的机会。

一个一直适合 MapReduce 字数统计模型的有趣的应用程序正在把 Web 服务器访问的频率制表(在开创性 Google 文章中讨论)。对于此应用程序来说,URL 作为键来服务(从 Web 服务器访问日志摄取)。reduce 过程的结果是基于 Web 服务器日志的给定 Web 站点的每次 URL 访问的总数。

在计算机学习用户程序中,Hadoop 已经作为处理大量 GA 个体的规模遗传算法的一种方法(潜在解决方案)。map 过程执行传统的遗传算法,从本地池中搜索最佳单个解决方案。然后 reduce 应用程序成为来自 map 阶段的单个解决方案的集成。这会允许单个节点识别最佳解决方案,然后允许这些解决方案在最适于生存的分布式显示的 reduce 阶段中相互竞争。

另外一个有趣的应用程序被创建用于识别僵尸网络的垃圾邮件。此过程的第一步将会为减少垃圾邮件为目的而对电子邮件按来自给定组织而进行分类(基于一组指纹)。根据过滤的这些数据,对以特定方式(例如参考电子邮件正文中的相同链接)连接的邮件生成一个图表。然后这些相关电子邮件会减少至主机(静态或动态 IP 地址)以识别有问题的僵尸网络。

在应用程序之外通过 map 和 reduce 基元来查看世界,Hadoop 作为在计算机集群之间分配工作的方式非常有用。 Map 和 reduce 并非必须强制某种特定类型的应用程序。相反地,Hadoop 可以被视为一种可以同时将数据和算法分配到主机以获得更快速的并行处理速度的方法。

hadoop应用程序生态环境

虽然 Hadoop 提供了一个灵活的架构,但也可以使用其他应用程序转换与其他应用程序的界面。一个有趣的示例称为 Hive,它是一个具有自己特定查询语言(称为 Hive QL)的数据仓库基础结构。Hive 使得 Hadoop 更加熟悉结构化查询语言 (SQL) 背景,同时还支持传统的 MapReduce 基础结构来进行数据处理。

HBase 是另外一种位于 HDFS 顶部的有趣的应用程序。它是一个类似于 Google BigTable 的高性能数据库系统。代替传统的文件处理,HBase 使数据库将 MapReduce 处理的输入和输出格式列表。

最后,Pig 是 Hadoop 中用于分析大型数据集的平台。Pig 提供可编译 map 和 reduce 应用程序的高级语言。

这是 Hadoop 系列 的最后一篇文章,探索了在适用于 Hadoop 框架的 Ruby 中开发 map 和 reduce 应用程序。希望从这篇文章您可以看到 Hadoop 的真正力量。虽然 Hadoop 将您限制在一个特定的编程模型中,但是这种模型是灵活的且可被应用到大量的应用程序上。

参考文献:

  • MapReduce:大型集群中的简化数据处理 是有关 MapReduce 的开创性文章,其由 Jeff Dean 和 Sanjay Ghemawat 于 2004 年编写。本文仍是一个令人愉快的阅读物。

  • 本文探索了 Hadoop 的  实用工具,其允许在 Java 语言以外的多种语言中开发 map 和 reduce 脚本。Apache 为  提供了一套非常好的资源,包括 Hadoop 流 文档和 流 wiki(为各种命令行选项提供了很好的介绍)。

  • Wikipedia 为 Lisp 和 Scheme 语言提供了很好的介绍以及为 功能性编程概念 提供了一般介绍(和 MapReduce)。

  • 要演示 map 和 reduce 的功能性编程元素,本文使用了 Scheme shell。如果您曾经想尝试 Scheme,则 SCSH 是一个很好的沙盒去尝试这种强大的语言。您也可以在 Tim 的文章 用 Guile 编写脚本(developerWorks,2009 年 1 月)中学习有关 Scheme 和通过 C 编写脚本,或阅读一个很好的 Scheme 介绍。 

  • 在文章 Map-Reduce for Machine Learning on Multicore 中,MapReduce 模型被用于为多核处理器实现各种不同的计算机学习算法。这是一个有趣的阅读用来探索 MapReduce 模型如何应用到各种不同的可计算算法。

  • 配置单元 是一个建立在 Hadoop 顶部的数据舱库基础结构。其在 Hadoop 数据上提供查询语言,此语言支持传统的 Hadoop 编程模型。HBase 是数据库在 Hadoop 的 HDFS 上的表现,在简单文件上执行 MapReduce 以操作数据库表。最后,Pig 是一个包括适用于 Hadoop 编程的高级语言的大型数据库集分析的平台。

  • Ruby 语言是最新的面向对象的脚本语言。其动态关注程序员工作效率。

  • 检查 学术文章中的 Mapreduce 和 Hadoop 算法 的列表。此站点提供有关 Hadoop 如何用于各种不同的应用程序的有趣观点(来自科学、计算机学习、web 服务,等等)。

  • Yahoo! 在 开发人员网络 中提供了很好的 Hadoop 资源集。特别是,Yahoo! Hadoop 教程 介绍了 Hadoop 并提供其使用与配置的详细讨论。

  • 在 developerWorks Linux 专区 寻找为 Linux 开发人员(包括 Linux 新手入门)准备的更多参考资料,查阅我们 最受欢迎的文章和教程。 

  • 在 developerWorks 上查阅所有 Linux 技巧 和 Linux 教程。 

  • 随时关注 developerWorks 技术活动网络广播。 

  • 观看 developerWorks 演示中心,其范围从适用于初学者的产品安装和设置演示到适用于有经验的开发人员的高级功能。



【云栖快讯】阿里云栖开发者沙龙(Java技术专场)火热来袭!快来报名参与吧!  详情请点击

网友评论

atlas2015
文章140篇 | 关注2
关注
用配置管理(Application Configuration Management,简称 ... 查看详情
Alibaba Cloud Toolkit 面向开发者 IDE 平台,在 IDE (比如 E... 查看详情
云数据库 HBase 版(ApsaraDB for HBase)是基于 Hadoop 的一个... 查看详情
为您提供简单高效、处理能力可弹性伸缩的计算服务,帮助您快速构建更稳定、安全的应用,提升运维效... 查看详情
双12

双12