Hadoop专业解决方案-第3章:MapReduce处理数据

简介:

前言:非常感谢团队的努力,最新的章节终于有了成果,因为自己的懒惰,好久没有最新的进展了,感谢群里兄弟的努力。

群名称是 Hadoop专业解决方案群

 313702010

本章主要内容:

理解MapReduce基本原理

了解MapReduce应用的执行

理解MapReduce应用的设计

截止到目前,我们已经知道Hadoop如何存储数据,但Hadoop不仅仅是一个高可用

的,规模巨大的数据存储引擎,它的另一个主要特点是可以将数据存储与处理相结合。

Hadoop的核心处理模块是MapReduce,也是当前最流行的大数据处理架构之一。

它能够将Hadoop数据存储无缝的融入到数据处理当中,使得它在操作上足够简单,功能上足够强大。MapReduce已经解决很多实际问题(从日志分析,到数据排序,到文本操作,到基于模式的搜索,到图像处理,到机器学习等等)。似乎每周都会出现有关MapReduce新应用的出现。在本章,你将要学习MapReduce的今本原理,包括它的主要部件,MapReduce应用的执行过程与[sht1] 如何设计MapReduce应用。

3.1 了解MapReduce

MapReduce是一个框架,能够利用许多普通计算机对大规模的数据集进行高并发的、分布式的算法处理。MapReduce模型来源于函数[sht2] 式编程语言,如Lisp,中map与reduce组合子的概念。在Lisp中,map的作用是输入一个函数与一系列值。然后利用该函数连续处理每一个值。Reduce使用二进制操作将序列中所有的元素相结合。

注意:组合子是一个利用程序片段构建程序片段的函数。组合子有助于在更高的抽象层次上进行编程,并且能够使你从实现当中区分出策略。在函数式编程中,由于组合子可以直接支持一级公民,使用它们可以自动的构建大多数程序。

MapReduce框架的灵感来源于这些概念。在2004年Google使用它进行分布式计算,处理分布在多个计算机集群上大数据集。从此,该框架被广泛用于许多软件平台,现在是Hadoop生态系统中不可缺少的一部分。

 MapReduce被用于解决大规模计算问题,它经过特殊设计可以运行在普通的硬件设备上。它根据分而治之的原则——输入的数据集被切分成独立的块,同时被mapper模块处理。另外,map执行与数据是典型的本地协作(在第四章在讨论数据本地性时将会了解更多)。MapReduce会对所有map的输出结果排序,它们将作为reduce的一种输入。

用户的任务[sht3] 是实现mapper与reducer,这两个类会继承Hadoop提供的基础类来解决特殊的问题。就像图3-1中所示的,Mapper将key/value(k1,v1)键值对形式的数据作为输入,然后将他们转变为另一种key/value键值对形式。MapReduce框架对所有mapper输出的key/value进行排序,并将key值相同的所有value值合并(k2,{v2,v2,…})。这些kye/value结合后的会被传递到reducer模块,reducer会将他们转化为另一种key/value对(k3,v3)。

 

图3-1  mapper与reducer的功能

一个mapper与reducer一起组成一个Hadoop作业[sht4] 。Mapper是作业[sht5] 的强制性部分,

可以产生0个或者更多个key/value(k2,v2)键值对。Reducer是作业[sht6] 的可选择性部分,可以没有产出或者产出更多的key/value对(k3,v3)[sht7] 。用户的另一个[sht8] 任务是实现驱动器(控制一些执行方面的主要应用)。

MapReduce框架的主要任务(根据用户提供的代码)是统筹所有任务的协调执行。

包括选择合适的机器(节点)运行mapper、[sht9] 启动与监控mapper的执行、为reducer的执行选择合适的节点、对mapper的输出进行排序与拉去并且传送给reducer节点、[sht10] 启动与监控reducer的执行。

   现在我们对MapReduce有了一些了解,下面让我们进一步看看MapReduce作业是如何执行的。

3.1.1 MapReduce执行管道

任何存储在Hadoop中的数据(HDFS与HBase),甚至是存储在Hadoop外的数

据(例如在数据库中),都能够当做MapReduce作业[sht11] 的输入。同样,job的输入能够存储在Hadoop当中(HDFS或者HBase)中或者是[sht12] 外部。Mapreduce框架负责调度和监控任务,再次执行失败的任务。

   图3-2从较高的层次展示MapREduce执行框架。

 

图3-2 高层次Hadoop执行框架

下面介绍MapReduce执行管道线的主要组件。

  Driver:这是主程序,用来初始化MapReduce job。它定义了job的个性化配置,并且标注了所有的组件(包括输入输出格式,mapper与reducer,使用结合器,使用定制的分片器等等)。Driver也可以获得job执行的状态。

  Context:driver,mapper与reducer在不同的阶段被执行,一般情况下是在多台节点上执行。context对象(如图3-2所示)在MapReduce执行的任何阶段都可以被使用。它为交换需要的系统与job内部信息提供一种方便的机制。要注意context协调只发生在MapReduce job 开始后合适的阶段(driver,map或者reduce)。这意味着在一个mapper中设置的值不可以在另一个mapper中使用(即使另一个mapper在第一个mapper完成后开始),但是在任何reducer中都是有效的。

  Input Data:为MapReduce任务准备的最初存储数据。这些数据可以在HDFS,HBase,或者其他的仓库中。一般情况下,input data 是非常大的,几十个G或者更多。

InputFormat:如何对输入数据进行读取和切分

  。InputFormat类确定input data中数据输入哪个任务的InputSplit,并且提供一个生成RecordReader的工厂方法,这个对象主要是读取inputSplit指定的文件。Hadoop提供了一些InputFormat类,在第四章提供了如何自定义InputFormat的实例。InputFormat直接被job的driver调用来决定map任务执行的数目与地点(根据InputSplit)。

  InputSplit:InputSplit确定一个在MapReduce中map任务的作业单元。处理一个数据集的MapReduce程序由几个(也可能是几百个)map任务组成。InputFormat(直接被job driver调用)确定在map阶段中map任务的数目。每个map任务操作一个单独的InputSplit。完成InputSplits的计算后,MapReduce框架会再合适的节点启动期望数目的map任务。

  RecorReader:InputSplit确定map任务的工作机,但没有描述如何获得该数据。RecordReader类是真正从数据源读取数据的类(在map 任务中),并将数据转化为设和map执行的key/value对,并将他们传递给map方法。RecordReader由InputFormat定义。在第四章提供了如何实现自定义RecordReader的实例。

  Mapper:mapper负责在MapReduce程序中第一个阶段用户自定义作业的执行。从实现的角度看,mapper实现负责将输入数据转化成一些列的key/value对(k1,v1),这些键值对将被用于单个map的执行。一般情况下mapp会将输入的键值对转化为另一种输出键值对(k2,v2),这些输出键值对将会作为reduce阶段shuffle与sort阶段的输入。一个新的mapper实例在每个map任务的单独的JVM实体中被实例化, 这些map任务构成所有作业输出的一部分。独立的mapper是不会提供任何与其他mapper通信的机制。这一点保证每个map任务的可靠性仅仅由本地节点的可靠性决定。

  Partition:由所有独立的mapper产生的中间数据(k2,v2)的子集会被分配到一个reducer上执行。这些子集(或者partitions)会作为reduce任务的输入。具备相同键的数值会被一个reduce处理,而不会考虑他们有哪个mapper产生。这样的结果是,所有的map节点必须判断产生的中间数据将有哪个reducer执行。Partitioner类决定特定的key/value对将由哪个reducer执行。默认的Partitioner会为每个key计算一个哈希值,并根据这个值作为分配的依据。第四章提供了如何实现自定义的Partitioner的实例。

  Shuffle:在Hadoop集群中,每个节点可能会执行某个job的几个map任务。一旦至少有一个map函数执行完成,产生的中间输出就会根据key值进行分片,并将由map产生的的分片分发至需要它们的reducer。将map的输出传递到reducer的过程叫做shuffling。

  Sort:每个reduce任务负责处理与部分key值相对应的value。中间key/value数据集,在被传递给reducer前会由Hadoop框架自动排序,组装成(k2,{v2,v2,…})的形式。

  Reducer:reducer负责执行由用户提供的用于完成某个作业第二阶段任务的代码。对于分配到某个reducer中的每个key,reducer的reduce()方法都会被调用一次。这个方法接收一个key值,由迭代器遍历与它绑定在一起的所有value值,并无序的返回与这个key值相关的value值。一般情况下,reducer将输入的key/value转化成输出键值对(k3,v3)。

  OutputFormat:job的输出(job的输出可以由reducer产生,若没有reducer也可由map产生)记录的方式有OutputFormat控制。OutputFormat负责确定输出数据的地址,由RecordWriter负责将数据结果写入。第四章介绍了如何实现自定义的OutputFormat。

  RecordWriter:RecordWriter定义每条output记录如何写入。

下面将介绍MapReduce执行时两个可选的组件(图3-2中并没有展现)

  Combiner:这是一个可以优化MapReduce job执行的可选执行步骤。如何选择后,combiner运行在mapper执行后,reduce执行前。Combiner的实例会运行在每个map任务中与部分reduce任务中。Combiner接收由mapper实例输出的所有数据作为输入,并且尝试将具有相同key值的value整合,以此来减少key值的存储空间和减少必须存储的(实际上不必须)key值的数目。Combiner的输出会被排序并发送给reducer。

  Distribute cache:另一个常用与MapReduce job中的工具是distribute cache。这个组件可以使得集群中所有节点共享数据。Distribute cache可以是能够被所有任务都能可获得的共享库,包含key/value对的全局查找文件,jar文件(或者是archives)包含可执行代码等等。该工具会将这些文件拷贝至实际执行任务的节点,并使它们可以在本地使用。第四章将提供实例展示如何在MapReduce执行时,使distributed cache与本地代码相结合。

MapReduce最重要的一个特点是它完全隐藏了管理一个大规模分布式集群,与在不

同节点上协同执行任务的复杂性。开发者的编程模型非常简单——只需要实现mapper、reducer的功能,driver,使它们像一个单独的job运行在一起和配置一些必要的参数。所有的用户代码都会摆打包进一个jar文件(事实上,MapReduce框架可以在多个jar文件上进行操作),这个文件会被提交到MapReduce集群用于执行。

3.1.2 MapReduce运行时写作与任务管理

一旦job jar文件提交到集群中,MapReduce框架就已经准备好所有事情。它可以在

只有一台节点的集群,也可以是拥有几百台节点的集群中,掌控分布式代码执行的各个方面。

   MapReduce框架为应用开发提供了一下支持:

  Scheduling:框架可以保证来自多个job的多个任务在一个集群上执行。不同的调度策略提供了不同的调度策略,有“先来先服务”的调度策略,也有保证让来自不同用户的job公平共享集群执行资源的调度策略。调度的另一个方面是推测执行,它是由MapReduce的一个优化。如果JobTracker发现某个任务花费太长时间执行,它就会启动另一个实例执行相同的任务(选择不同的TaskTracker)。推测执行的原理是某台节点非预期的缓慢不会降低任务的执行速度。推测执行默认是开启的,同样也可以通过将参数mapred.map.tasks.speculative.execution 与mapred.reduce.tasks.speculative.execution设置为false来分别禁用map与reduce任务。

  Synchronization:MapReduce的执行需要保证map与reduce阶段的同步。(reduce只有在所有的map的key/value对提交后才能启动)。在这点上,中间[sht13] 的key/value对会根据key值进行分组,这个分组会在所有执行map任务和即将执行reduce任务的节点上进行一次大排序。

  Error and fault hadling:为保证在一个错误和故障是常态的环境中完成job执行,JobTracker会尝试重新执行失败的任务。(在第五章,你讲学习更多关于如何编写可靠的MapReduce应用)

在图3-3中,Hadoop的MapReduce框架使用一种非常简单的协调机制。Job的driver

使用InputFormat完成隔离map的执行(基于数据切分),初始化一个用来与JobTracker通信的job客户端和提交将要执行的job。一旦job被提交,job客户端会从JobTracker端获取job运行状态,并等待它运行完成。JobTracker会为每一个split创建一个map任务,和几个reducer任务。(reduce任务的个数决定与job的配置)。

任务的实际执行由运行在每个节点上的TaskTracker负责,TaskTracker启动map作业,

并周期性向JobTracker发送“心跳”。心跳信息具备双重职能——首先会告诉JobTracker本节点的TaskTracker是活着的,其次是作为一种通信通道。心跳信息中会包含TaskTracker什么时候可以执行一个新的任务。

在这点上,JobTracker使用调度器来给某个节点分配执行任务,并将任务内容通过信息返回值传送给TaskTracker。Hadoop有很多调度器(公平调度是目前使用最多的)。任务一旦分配给TaskTracker,就会[sht14] 消耗它的task slots(每个节点能够运行几个map与reduce任务,TaskTracker会为他们分配相应的map与reduce slot),下一步就是运行任务。

 

图3-3 MapReduce 执行

首先,它会通过将jar文件拷贝到本地文件系统完成job的本地化。同时还会拷贝一

些应用[sht15] 程序必须用到的文件到本地磁盘,并且创建一个task runner实例运行任务。Task runner从distribute cache启动一个新的java虚拟机(JVM)来执行任务。子进程(执行任务的进程)会与他的父进程(TaskTracker)通过来接接口通信。这样,它能够每隔几秒就向父进程发送任务的执行进度,知道执行结束。

当JobTracker收到提示某个job的最后一个任务执行完毕,就会将job的状态标记为“completed”。Job客户端也会通过周期性拉去job状态,发现job的已经执行完成。

注意:默认情况下,Hadoop在[sht16] 单独的JVM中运行每个任务。启动一个新的JVM大

概需要1秒的消耗,在大[sht17] 多数情况下,这是无关紧要的(与某些需要执行几分钟map任务相比)。但在非常小且运行快速的map任务中(他们需要在几秒钟内执行完毕),Hadoop允许通过设置job配置参数mapreduce.job.jvm.numtasks,可以对JVM进行重用。如果该值为1(默认情况下),JVM将不会被重用。如果是-1,一个JVM能够运行的任务数(来自同一个job)是没有限制的。同样也可以通过

Job.getConfiguration().setInt(Job.JVM_NUM_TASKS_TO_RUN, int)来将该值设置为大于1的数。

现在我们已经了解了MapReduce是什么以及它的主要部件,[sht17] 接下来我们将介绍在一个特定应用中,这些组件是如何应用和相互影响的。下面将以wordcount为例,它的功能十分简单,但是能够很好的解释MapReduce的思想。这里面我们将撇开任何其他的解释,只专注于应用[sht18] 与MapReduce管道之间的交互。

3.2第一个Mapreduce应用

列表3-1展示了一个单词计数的mapreduce作业的非常简单的应用

列表3-1:hadoop 单词计数应用

importjava.io.IOException;

importjava.util.Iterator;

importjava.util.StringTokenizer;

importorg.apache.hadoop.conf.Configuration;

importorg.apache.hadoop.conf.Configured;

importorg.apache.hadoop.fs.Path;

importorg.apache.hadoop.io.IntWritable;

importorg.apache.hadoop.io.LongWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.Job;

importorg.apache.hadoop.mapreduce.Mapper;

importorg.apache.hadoop.mapreduce.Reducer;

importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;

importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

importorg.apache.hadoop.util.Tool;

importorg.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool{

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

private final staticIntWritable one = newIntWritable(1);

privateText word = newText();

@Override

public voidmap(LongWritable key, Text value, Context context)

throwsIOException, InterruptedException {

String line = value.toString();

StringTokenizer tokenizer = newStringTokenizer(line);

while(tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

context.write(word, one);

}

}

}

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{

@Override

public void reduce(Text key, Iterable<IntWritable> val, Context context)

throwsIOException, InterruptedException {

int sum = 0;

Iterator<IntWritable> values = val.iterator();

while(values.hasNext()) {

sum += values.next().get();

}

context.write(key, newIntWritable(sum));

}

}

public int run(String[] args) throws Exception {

Configuration conf = newConfiguration();

Job job = newJob(conf, "Word Count");

job.setJarByClass(WordCount.class);

// Set up the input

job.setInputFormatClass(TextInputFormat.class);

TextInputFormat.addInputPath(job,newPath(args[0]));

// Mapper

job.setMapperClass(Map.class);

// Reducer

job.setReducerClass(Reduce.class);

// Output

job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

TextOutputFormat.setOutputPath(job,newPath(args[1]));

//Execute

booleanres = job.waitForCompletion(true);

if(res)

return0;

else

return-1;

}

public static voidmain(String[] args) throwsException {

intres = ToolRunner.run(newWordCount(), args);

System.exit(res);

}

}

注意:hadoop提供了两个版本的MapReduce API-新的(包含在org.apche.hadoop.mapreduce包里)和旧的(包含在orp.apache.hadoop.mapred包里)。这本书里。我们只用新的API。

这个应用有两个内部类-map和reduce。分别继承了hadoop的Mapper类和Reducer类。

Mapper类有三个主要方法(可以重写):setup(),cleanup()和map()(仅在这里应用的一个),setup()和cleanup()方法在mapper的整个循环中只被调用一次-分别是在mapper执行的开头和结尾。Setup()方法被用来实现mapper的初始化(例如,阅读共享资源,连接HBase表等),然而cleanup()被用来清除mapper资源,可选择的是,如果mapper实现关联矩阵和计数器,就写出信息。

在map函数中实现了Mapper的业务功能(就是,特定程序的逻辑能力)。通常,给定一个键/值对,该方法处理并产生(使用context对象)一个或多个键/值对。一个context对象传递给这个方法允许map方法获取执行环境的附加信息,报告他的执行状况。需要注意的是一个map函数不读取数据。它被唤醒(基于“Hollywood原则”)。每次阅读器读取(和可选地解析)一个新的记录,它就调用(基于“好莱坞原则”)传递给它的数据(通过context对象)的阅读器。如果你很疑惑,看一看下面的列表3-2的mapper基类中额外的方法(没有广泛应用)。

注意:好莱坞原则-不用联系我们,我们联系你。是一个有用的软件开发技术,其中一个对象(或元素)的初始条件和持续的生命周期是被环境所决定的。而不是对象本身。这个原则通常用于必须符合现有框架的约束实现一个类/组件。

列表3-2:mapper基类中run方法

/**

* Expert users can override this method for more complete control over the

* execution of the Mapper.

* @paramcontext

* @throwsIOException

*/

public voidrun(Context context) throwsIOException, InterruptedException {

setup(context);

while(context.nextKeyValue()) {

map(context.getCurrentKey(),context.getCurrentValue(), context);

}

cleanup(context);

}

 

构建和执行Mapreduce项目

使用Eclipse开发Hadoop代码是相当简单的,假设你的Eclipse实例是用Maven配置的,首先为你的应用构建一个Maven项目。因为没有Hadoop Maven的原型,从“简单的”Maven项目开始,手动的添加pom.xml,类似于列表3-3所示。

列表3-3:hadoop2.0中的pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0

http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>com.nokia.lib</groupId>

<artifactId>nokia-cgnr-sparse</artifactId>

<version>0.0.1-SNAPSHOT</version>

<name>cgnr-sparse</name>

<properties>

<hadoop.version>2.0.0-mr1-cdh4.1.0</hadoop.version>

<hadoop.common.version>2.0.0-cdh4.1.0</hadoop.common.version>

<hbase.version>0.92.1-cdh4.1.0</hbase.version>

</properties>

<repositories>

<repository>

<id>CDH Releases and RCs Repositories</id>

<url>https://repository.cloudera.com/content/groups/cdhreleases-rcs</url>

</repository>

</repositories>

<build>

<plugins>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-compiler-plugin</artifactId>

<version>2.3.2</version>

<configuration>

<source>1.6</source>

<target>1.6</target>

</configuration>

</plugin>

</plugins>

</build>

<dependencies>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-core</artifactId>

<version>${hadoop.version}</version>

<scope>provided</scope>

</dependency>

<dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase</artifactId>

<version>${hbase.version}</version>

<scope>provided</scope>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-common</artifactId>

<version>${hadoop.common.version}</version>

<scope>provided</scope>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.10</version>

</dependency>

</dependencies>

</project>

注意:存在很多版本的hadoop,包括不同版本的Cloudera版本(CDH3和CDH4),Hortonworks版本,MapReduce版本,Amazon的EMR等等。他们中一些是兼容的,一些不是兼容的。你需要使用不同的Maven pom文件在特定的运行时间构建一个定向的可执行目标。此外,当前hadoop只支持Java版本6。所以这里需要使用Maven编译器插件,以确保使用正确的版本。

列表3-3中的pom文件是Cloudera CDH4.1版本中的(注意包含在cloudera库中的pom文件)。它包含建立hadoop mapreduce作业所必须的一组最小依赖关系,hadoop-core和hadoop-common。除此之外,如果在一个应用中使用HBase,必须包含一个hbase依赖项。也包含支持基本单元测试的junit。同时注意提供的所有hadoop相关的依赖项是指定的。也就意味着他们不会包含在Maven产生的最终的jar文件中。

一旦创建了Eclipse Maven项目,所有实现MapReduce的代码都会加入到这个项目中。Eclipse负责加载所需的库和编译Java代码等等。

现在你已经知道如何编写一个mapreduce作业,接下来我们将学习如何执行。你可以利用Maven install命令产生一个包含所有需要的代码的jar文件。一旦jar文件被创建,你可以利用FTP上传到集群的边节点。然后利用下面的列表3-4中的命令执行它。

列表3-4:hadoop执行命令

Hadoop jar your.jar mainClass inputpath outputpath

Hadoop提供了一些java服务器页面(JSPs),使你能够可视化执行Mapreduce。Mapreduce的管理JSP可以使你能够查看集群的整体状态和特殊作业执行的细节。图3-4中的Mapreduce的管理页面展示了集群了所有状态,以及当前运行、完成以及失败作业的列表。每一个工作列表(运行、完成和失败)都是可以点击的,可以使你获取关于作业执行的额外信息。

 

图3-4:Mapreduce管理主页

图3-5中的作业详细页面提供了关于执行的(动态)信息。当Jobtracker接受作业时,这个页面就开始存在,一直跟踪在执行过程中的所有变化。你可以在事后对它进行分析工作的执行过程。这个页面包含以下四个主要部分(第四个没有显示在图3-5中):

第一部分(页面的顶部)显示了关于作业的所有合并后的信息。包括作业名称,用户,提交到的主机,开始和结束时间等等。

第二部分包含了关于给定作业的mapper和reducer的汇总信息。它告诉这个作业有多少个mapper和reducer。根据他们的状态进行分割-挂起,运行,完成和死亡。

第三部分显示作业的计数器(对计数器的深入讨论,请看第四章),根据名称空间进行了分割。因为在这个示例实现没有使用自定义,对它只使用标准的计数器。

第四部分提供了很好的直方图详述了mapper和reducer的执行。

作业详细页面提供了更多的信息(通过“超链接”),可以更深入的帮你分析作业的执行。这些页面在第五章讨论构建可靠的mapreduce应用程序时进行了详细的讨论。

接下来,我们来看一看mapreduce应用的设计。

 

 

 

图3-5:wordcount作业页面

 

3.3设计MapReduce的实现

如前所述,mapreduce的功能主要来自于它的简单性。除了准备输入数据之外,程序员只需要操作mapper和reducer。现实中的很多问题都可以利用这种方法解决。

在大多数情况下, MapReduce可以作为一个通用的并行执行框架,充分利用数据的本地性。但是这种简单性是有代价的,设计者必须从以特定的方式组合在一起的一小部分组件的角度决定如何表达他的业务问题。

注意:尽管很多著作描述了mapreduce的API的使用,很少描述用实际的方法来设计一个MapReduce应用。

重新制定MapReduce的初始问题,通常有必要回答以下问题:

1、    怎么把一个大问题分解成多个小任务?更具体地说,你怎么分解问题以至于这些小任务可以并行执行?

2、    你会选择哪对key/value作为每个任务的输出/输出?

3、    你如何汇总计算所需要的所有数据?更具体地说, 你怎么安排处理的方式,使所有必要的计算中的数据都同时在内存中?

我们要认识到,很多算法不能很容易地表示为一个单一的MapReduce作业。它往往需要把复杂的算法分解成一系列的作业,把其中一个作业的数据输出成为下一个作业的输入。

本节将会探讨在几个设计不同的实际MapReduce应用问题的例子(从简单到复杂)。所有的例子都会被描述为以下形式:

  • 对问题简单的描述
  • Mapreduce作业的描述,包括:

²  Mapper的描述

²  Reducer的描述

使用Mapreduce作为并行处理的框架

在简单的例子中,源数据被组织为一组独立的记录。结果可以以任何顺序指定。这些类型的问题(”尴尬的并行“),需要以相对独立的方式对每个数据元素进行相同的处理-换句话说,就是不需要合并或者聚合各自的结果。一个简单的例子就是处理几千个pdf文件,提取一些关键的文本,放入到CSV文件中,然后导入到数据库中。

这种情况下的mapreduce实施是非常简单的――唯一需要的就是mapper,单独的处理每个记录然后输出结果。在这个例子中,Mapreduce控制mappers的分布,提供调度和错误处理的所有支持。下面的例子展示了如何设计这种类型的应用程序。

人脸识别的例子

虽然不是经常作为Hadoop-related问题讨论,但是图像处理应用在mapreduce范例中是非常合适的。假设有一个人脸识别算法的应用,需要一个图像,识别一系列想要的特性,并产生一组识别结果。再假设需要在百万图片上做人脸识别。如果所有的图片以序列文件的形式存放在hadoop中,那么你可以用一个简单的map作业就可以实现并行处理。在这个例子中,输入的key/value是ImageID/Image,输出的key/value是ImageID/可特征识别列表。此外,一组可特征识别必须分布到所有的mapper(例如,利用分布式缓存)。

表3-1展示这个例子中mapreduce作业的实施

表3-1:人脸识别作业

Mapper

在这个作业中,mapper首先以可识别特征集进行初始化,对于每一个图像,一个map函数通过它的图像本身,以及可识别的列表来调用的人脸识别算法。识别的结果连同原来imageID一起从map中输出。

Result

这个作业执行的结果是所有包含在原始图片中识别出来的图片。

 

注意:要实现完全独立的mappers/reducers。在mapreduce应用中的每一个mapper/reducer需要创建独自的输出文件。这意味着,人脸识别的作业的执行结果将是一组文件(相同目录下的),每一个包含了各自mapper的输出。如果需要把他们放入到一个单个的文件中。必须在人脸识别作业中添加一个单独的reducer。这个reducer是非常简单的。因为在这个例子中,每一个作为reduce的输入的key只有一个单独的value(这里假设图像的ID是唯一的),reducer只是把输入的key/value直接写入到输出文件。我们要知道在这个例子中尽管一个reducer极其简单,但是这种额外的作业明显的增加了作业的整体运行时间。这是因为额外的reducer分为shuffle和sort(不单单在map作业中出现),当图像的数量非常大时,将花费大量的时间。

接下来看一个比较复杂的例子,map执行的结果必须在一起分组(也就是,以某种方式排序)。很多实际的应用(包括过滤,解析、数据转换,总结等)可以用这种mapreduce作业解决。

第一个Mapreduce应用

列表3-1展示了一个单词计数的mapreduce作业的非常简单的应用

列表3-1:hadoop 单词计数应用

importjava.io.IOException;

importjava.util.Iterator;

importjava.util.StringTokenizer;

importorg.apache.hadoop.conf.Configuration;

importorg.apache.hadoop.conf.Configured;

importorg.apache.hadoop.fs.Path;

importorg.apache.hadoop.io.IntWritable;

importorg.apache.hadoop.io.LongWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.Job;

importorg.apache.hadoop.mapreduce.Mapper;

importorg.apache.hadoop.mapreduce.Reducer;

importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;

importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

importorg.apache.hadoop.util.Tool;

importorg.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool{

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

private final staticIntWritable one = newIntWritable(1);

privateText word = newText();

@Override

public voidmap(LongWritable key, Text value, Context context)

throwsIOException, InterruptedException {

String line = value.toString();

StringTokenizer tokenizer = newStringTokenizer(line);

while(tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

context.write(word, one);

}

}

}

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{

@Override

public void reduce(Text key, Iterable<IntWritable> val, Context context)

throwsIOException, InterruptedException {

int sum = 0;

Iterator<IntWritable> values = val.iterator();

while(values.hasNext()) {

sum += values.next().get();

}

context.write(key, newIntWritable(sum));

}

}

public int run(String[] args) throws Exception {

Configuration conf = newConfiguration();

Job job = newJob(conf, "Word Count");

job.setJarByClass(WordCount.class);

// Set up the input

job.setInputFormatClass(TextInputFormat.class);

TextInputFormat.addInputPath(job,newPath(args[0]));

// Mapper

job.setMapperClass(Map.class);

// Reducer

job.setReducerClass(Reduce.class);

// Output

job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

TextOutputFormat.setOutputPath(job,newPath(args[1]));

//Execute

booleanres = job.waitForCompletion(true);

if(res)

return0;

else

return-1;

}

public static voidmain(String[] args) throwsException {

intres = ToolRunner.run(newWordCount(), args);

System.exit(res);

}

}

注意:hadoop提供了两个版本的MapReduce API-新的(包含在org.apche.hadoop.mapreduce包里)和旧的(包含在orp.apache.hadoop.mapred包里)。这本书里。我们只用新的API。

这个应用有两个内部类-map和reduce。分别继承了hadoop的Mapper类和Reducer类。

 

构建和执行Mapreduce项目

使用Eclipse开发Hadoop代码是相当简单的,假设你的Eclipse实例是用Maven配置的,首先为你的应用构建一个Maven项目。因为没有Hadoop Maven的原型,从“简单的”Maven项目开始,手动的添加pom.xml,类似于列表3-3所示。

列表3-3:hadoop2.0中的pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0

http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>com.nokia.lib</groupId>

<artifactId>nokia-cgnr-sparse</artifactId>

<version>0.0.1-SNAPSHOT</version>

<name>cgnr-sparse</name>

<properties>

<hadoop.version>2.0.0-mr1-cdh4.1.0</hadoop.version>

<hadoop.common.version>2.0.0-cdh4.1.0</hadoop.common.version>

<hbase.version>0.92.1-cdh4.1.0</hbase.version>

</properties>

<repositories>

<repository>

<id>CDH Releases and RCs Repositories</id>

<url>https://repository.cloudera.com/content/groups/cdhreleases-rcs</url>

</repository>

</repositories>

<build>

<plugins>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-compiler-plugin</artifactId>

<version>2.3.2</version>

<configuration>

<source>1.6</source>

<target>1.6</target>

</configuration>

</plugin>

</plugins>

</build>

<dependencies>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-core</artifactId>

<version>${hadoop.version}</version>

<scope>provided</scope>

</dependency>

<dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase</artifactId>

<version>${hbase.version}</version>

<scope>provided</scope>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-common</artifactId>

<version>${hadoop.common.version}</version>

<scope>provided</scope>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.10</version>

</dependency>

</dependencies>

</project>

注意:存在很多版本的hadoop,包括不同版本的Cloudera版本(CDH3和CDH4),Hortonworks版本,MapReduce版本,Amazon的EMR等等。他们中一些是兼容的,一些不是兼容的。你需要使用不同的Maven pom文件在特定的运行时间构建一个定向的可执行目标。此外,当前hadoop只支持Java版本6。所以这里需要使用Maven编译器插件,以确保使用正确的版本。

列表3-3中的pom文件是Cloudera CDH4.1版本中的(注意包含在cloudera库中的pom文件)。它包含建立hadoop mapreduce作业所必须的一组最小依赖关系,hadoop-core和hadoop-common。除此之外,如果在一个应用中使用HBase,必须包含一个hbase依赖项。也包含支持基本单元测试的junit。同时注意提供的所有hadoop相关的依赖项是指定的。也就意味着他们不会包含在Maven产生的最终的jar文件中。

一旦创建了Eclipse Maven项目,所有实现MapReduce的代码都会加入到这个项目中。Eclipse负责加载所需的库和编译Java代码等等。

现在你已经知道如何编写一个mapreduce作业,接下来我们将学习如何执行。你可以利用Maven install命令产生一个包含所有需要的代码的jar文件。一旦jar文件被创建,你可以利用FTP上传到集群的边节点。然后利用下面的列表3-4中的命令执行它。

列表3-4:hadoop执行命令

Hadoop jar your.jar mainClass inputpath outputpath

Hadoop提供了一些java服务器页面(JSPs),使你能够可视化执行Mapreduce。Mapreduce的管理JSP可以使你能够查看集群的整体状态和特殊作业执行的细节。图3-4中的Mapreduce的管理页面展示了集群了所有状态,以及当前运行、完成以及失败作业的列表。每一个工作列表(运行、完成和失败)都是可以点击的,可以使你获取关于作业执行的额外信息。

 

图3-4:Mapreduce管理主页

图3-5中的作业详细页面提供了关于执行的(动态)信息。当Jobtracker接受作业时,这个页面就开始存在,一直跟踪在执行过程中的所有变化。你可以在事后对它进行分析工作的执行过程。这个页面包含以下四个主要部分(第四个没有显示在图3-5中):

第一部分(页面的顶部)显示了关于作业的所有合并后的信息。包括作业名称,用户,提交到的主机,开始和结束时间等等。

第二部分包含了关于给定作业的mapper和reducer的汇总信息。它告诉这个作业有多少个mapper和reducer。根据他们的状态进行分割-挂起,运行,完成和死亡。

第三部分显示作业的计数器(对计数器的深入讨论,请看第四章),根据名称空间进行了分割。因为在这个示例实现没有使用自定义,对它只使用标准的计数器。

第四部分提供了很好的直方图详述了mapper和reducer的执行。

作业详细页面提供了更多的信息(通过“超链接”),可以更深入的帮你分析作业的执行。这些页面在第五章讨论构建可靠的mapreduce应用程序时进行了详细的讨论。

接下来,我们来看一看mapreduce应用的设计。

 

 

 

图3-5:wordcount作业页面

 

设计MapReduce的实现

如前所述,mapreduce的功能主要来自于它的简单性。除了准备输入数据之外,程序员只需要操作mapper和reducer。现实中的很多问题都可以利用这种方法解决。

在大多数情况下, MapReduce可以作为一个通用的并行执行框架,充分利用数据的本地性。但是这种简单性是有代价的,设计者必须从以特定的方式组合在一起的一小部分组件的角度决定如何表达他的业务问题。

注意:尽管很多著作描述了mapreduce的API的使用,很少描述用实际的方法来设计一个MapReduce应用。

重新制定MapReduce的初始问题,通常有必要回答以下问题:

4、    怎么把一个大问题分解成多个小任务?更具体地说,你怎么分解问题以至于这些小任务可以并行执行?

5、    你会选择哪对key/value作为每个任务的输出/输出?

6、    你如何汇总计算所需要的所有数据?更具体地说, 你怎么安排处理的方式,使所有必要的计算中的数据都同时在内存中?

我们要认识到,很多算法不能很容易地表示为一个单一的MapReduce作业。它往往需要把复杂的算法分解成一系列的作业,把其中一个作业的数据输出成为下一个作业的输入。

本节将会探讨在几个设计不同的实际MapReduce应用问题的例子(从简单到复杂)。所有的例子都会被描述为以下形式:

  • 对问题简单的描述
  • Mapreduce作业的描述,包括:

²  Mapper的描述

²  Reducer的描述

使用Mapreduce作为并行处理的框架

在简单的例子中,源数据被组织为一组独立的记录。结果可以以任何顺序指定。这些类型的问题(”尴尬的并行“),需要以相对独立的方式对每个数据元素进行相同的处理-换句话说,就是不需要合并或者聚合各自的结果。一个简单的例子就是处理几千个pdf文件,提取一些关键的文本,放入到CSV文件中,然后导入到数据库中。

这种情况下的mapreduce实施是非常简单的――唯一需要的就是mapper,单独的处理每个记录然后输出结果。在这个例子中,Mapreduce控制mappers的分布,提供调度和错误处理的所有支持。下面的例子展示了如何设计这种类型的应用程序。

人脸识别的例子

虽然不是经常作为Hadoop-related问题讨论,但是图像处理应用在mapreduce范例中是非常合适的。假设有一个人脸识别算法的应用,需要一个图像,识别一系列想要的特性,并产生一组识别结果。再假设需要在百万图片上做人脸识别。如果所有的图片以序列文件的形式存放在hadoop中,那么你可以用一个简单的map作业就可以实现并行处理。在这个例子中,输入的key/value是ImageID/Image,输出的key/value是ImageID/可特征识别列表。此外,一组可特征识别必须分布到所有的mapper(例如,利用分布式缓存)。

表3-1展示这个例子中mapreduce作业的实施

表3-1:人脸识别作业

Mapper

在这个作业中,mapper首先以可识别特征集进行初始化,对于每一个图像,一个map函数通过它的图像本身,以及可识别的列表来调用的人脸识别算法。识别的结果连同原来imageID一起从map中输出。

Result

这个作业执行的结果是所有包含在原始图片中识别出来的图片。

 

注意:要实现完全独立的mappers/reducers。在mapreduce应用中的每一个mapper/reducer需要创建独自的输出文件。这意味着,人脸识别的作业的执行结果将是一组文件(相同目录下的),每一个包含了各自mapper的输出。如果需要把他们放入到一个单个的文件中。必须在人脸识别作业中添加一个单独的reducer。这个reducer是非常简单的。因为在这个例子中,每一个作为reduce的输入的key只有一个单独的value(这里假设图像的ID是唯一的),reducer只是把输入的key/value直接写入到输出文件。我们要知道在这个例子中尽管一个reducer极其简单,但是这种额外的作业明显的增加了作业的整体运行时间。这是因为额外的reducer分为shuffle和sort(不单单在map作业中出现),当图像的数量非常大时,将花费大量的时间。

接下来看一个比较复杂的例子,map执行的结果必须在一起分组(也就是,以某种方式排序)。很多实际的应用(包括过滤,解析、数据转换,总结等)可以用这种mapreduce作业解决。

利用mapreduce做简单的数据处理

这种情况的一个例子就是构建倒排索引。这种类型的问题需要所有的mapreduce步骤进行执行,需要shuffle和sort把所有的结果集合在一起。下面的例子展示了如何设计这种类型的应用。

倒排索引的例子

在计算机科学中,倒排索引是一个数据框架,用来存放了从内容(例如单词或者数字)到它在一个文档或一组文档里的位置的映射,如表3-6所示。倒排索引的目的是实现快速的全文搜索,在文档增加的时候增加处理成本为代价,倒排索引式的数据结构是典型搜索引擎的关键部分,优化了查找某些单词出现的文档的速度。

文档

ID

Title

Content

1

Popular

Football is Popular in US

2

Common Sport

Soccer is commonly played in Europe

3

National Sport

Cricket is played all over India

...

...

...

 

 

倒排索引

Term

value

Document

Document

Document

Title

popular

1

 

 

Title

sport

1

2

3

Title

common

2

 

 

Title

national

3

 

 

Content

football

1

 

 

Content

is

1

2

3

Content

popular

1

 

 

...

表3-6:倒排索引

要创建倒排索引,可以把每个文档(或者文档里行)给mapper。mapper可以解析出文档里的多个单词,然后输出[单词,词频]键值对。reducer可以只是一个识别,输出列表或者可以执行每个单词的一些统计汇总的功能。

注释在第九章你将学会更多关于如何利用Hbase来存储倒排的索引。

表3-2里展示了这个例子中mapreduce作业的实现。

表3-2 倒排索引的计算

处理阶段

描述

Mapper

作业中,mapper的任务是构建一个包含一个单词索引的独特的记录和描述在文档里单词出现的信息。它读取每个输入的文档,解析,然后为文档里的每一个独特的单词创建一个索引描述符。该描述符包含文档的ID,文档里索引出现的次数,和任何附件的信息(比如从文档的开头索引位置的偏移量) ,每一个所以描述符被写出。

Shuffle和sort

Mapreduce的shuffle和sort过程会把所有的记录都按照索引值排序,确保reducer接受到所有相同key值的索引。

Reducer

这项工作中,reducer的作用是构建一个倒排索引结构。根据系统的要求,可能有一个或多个reducer。Reducer得到所有给定索引的描述符,并生成一个索引记录,并写入到指定的索引存储。

Result

该作业执行的结果是一组原始文档的倒排索引。

更多复杂的mapreduce应用需要将来自多个获取的数据(就是说连接数据)进行处理。

构建迭代式mapreduce应用

在迭代式的mapreduce应用中,在循环中通常一个或者多个mapreduce作业会被执行。也就是说,这样的应用程序要么使用一个内部实现迭代逻辑的驱动程序来实现,并在迭代循环中调用需要的mapreduce作业,要么使用是在一次循环中运行mapreduce作业的一个外部脚本和检查转换的标准。(另一个选择方式是使用工作流引擎,第六章至第八章细述了hadoop下的工作流引擎Apache Oozie)。使用一个驱动程序执行迭代逻辑提供了一个更灵活的解决方案,使你能够利用内部变量和Java的全部功能去实现迭代和转换的检查。

         迭代算法的典型例子是解决一个线性方程组。接下来你将看到如何利用mapreduce设计这样一个算法。

解决线性方程组示例

许多实际的问题都可以表示为解决线性方程类似的问题。至少可以归纳到这种方法里。

1、  可以利用线性方程解决优化的问题

2、  近似问题(例如,多项式曲线)

注解:这个例子是在Gene Kalmens(一个在Nokia的同事)帮助下完成的。

要有效的解决线性方程问题,问题的大小是非常重要的---大致有成百上千或者更多变量是个挑战。这种情况下,另一种选择是使用T字节内容的超级计算机,或者使用允许零散计算的算法,而不需要把完整的矩阵放到内存中。遵循这些要求的这类算法是可以提供相近解决方案的迭代方法。它的性能和迭代的次数有关,在要求的精度内找到一个答案是可行的。

这些类型的算法中,当系数矩阵正确的时候,共轭梯度(CG)方法提供了最好的性能。下面是线性方程组的基本方程:

Ax=b

使用共轭梯度,可以实现最速下降法,适用于如下定义的Rn中的二次曲面。

f(x) = 1512'> xTAx −xTb, x 15鈭?/m:t>'> Rn

在端点方向上每一步都改善了解向量。对于前面步骤中的所有向量,每一步得到的向量都共轭与A。

CG算法有以下步骤:

1、  选择初始向量x0,为了简单起见,可以设置为0.

2、  计算初始的残差向量r0(例如,r0=b=Ax0).

3、  选择初始的查找方向p0-r0.

4、  循环如下:

a)         计算系数:ak=(rkTrk)/ (pkTApk).

b)         找到下一个x的近似值:xk+1=xk+akpk

c)         计算新的残差向量:rk+1= rk+1+akApk

d)         如果abs(rk+1) 的在公差内,则结束循环。

e)         计算出标量(scalar),推断下一个搜索方向:bk=(rk+1Trk+1)/(rkTrk)。

f)          推断下一个搜索方向:pk+1=r k+1+b kp k

5、循环结束

结果就是x k+1

在这个算法实现中唯一“昂贵”的操作是残差向量的计算(步骤2和4c)。因为此步骤需要矩阵向量乘。用Mapreduce可以很容的实现这个操作。

确保你有两个Hbase表-----一个存放矩阵A,另一个存放所有的向量。如果矩阵A是稀疏矩阵,一个合理的Hbase数据模型如下:

1、  每个表的行代表一个矩阵的行。

2、  给定矩阵的行中的所有元素都存储在一个单独的列族,列名称对应于给定矩阵元素的列。

虽然对于实现向量乘法,矩阵的列不需要明确的列元素。但是如有有必要设置或更新单个元素的时候,这个表的布局可能会很方便。

一个表示向量的合理的HBase表如下所示:

1、  每个表的行代表单个的向量。

2、  给定向量的所有元素都存储在单独的列族,列名称对应用一个向量索引,列值对应于用于索引的矢量值。

虽然从技术角度上讲,存储不同行值的作为向量索引的向量可以使用不同的表。提出的这个模型可以使读写向量非常的快(单行读写),同时减少了打开Hbase连接的数量。

Hbase表合理的设计,使Mapreduce矩阵向量的实现变得相当简单。一个mapper就能完成任务。矩阵向量乘的mapreduce作业如表3-7所示。

表3-7:矩阵向量乘法作业

处理阶段

描述

Mapper

作业中,mapper第一次初始化为向量的值,对于矩阵的每一行,计算出源向量和矩阵的行的乘向量。结果值(不为0)存储在结果向量的索引r中。

在这个实现中,mapreduce的dirver执行了先前描述的算法,每次相乘,都需要调用矩阵向量乘的mapreduce作业。

虽然这里描述的算法实现非常简单和直接。为了能使CG(共轭梯度)能够使用,必须满足下列条件:

1、  矩阵A必须是正定矩阵。它提供了带有一个极值点的凸表面。那意味着这种方法会收敛于任何选择的初始向量x0

2、  矩阵A必须是对称的,这样可以确保在每一步的过程中都存在与A正交的向量。

如果矩阵A不是对称的,通过下面的代替初始方程可以变成对称的。

ATAx=ATb

ATA是对称和正定的。因此上面描述的算法可以使用。使用初始算法,在计算新的系数矩阵ATA时,会造成很大的性能损失。另外,这种方法的收敛也会受到影响,因为k(ATA)=k(A)2

幸运的是,在前面你可以选择不计算ATA。而是修改前面算法中步骤2,4a,4c,如下所示:

步骤2-计算ATAx0,用两个矩阵向量乘:AT(Ax0)。

步骤4a-计算分母pkTATApk,注意它和(Apk)2是相等的,此步计算的实质上是矩阵向量积和本身内积的结果。

步骤4c-和步骤2类似,计算两个矩阵向量乘:ATApk

所以,整体算法实现的第一步必须检查矩阵是否是对称的。如果是,原先的算法就可以使用,如果不是,就要使用改进的算法。

除了第一个作业之外,整个算法的实现需要更多的mapreduce作业-矩阵转置。矩阵转置的mapreduce作业如表3-8所示:

表3-8:矩阵转置作业

处理阶段

描述

Mapper

作业中,对于矩阵的每一行(r),每个元素(r,j)都将作为矩阵元素(j,r)写入到结果矩阵里面。

注意,在这个例子中,算法的转换准则也是算法本身的重要组成部分。在接下来的例子中,使用了Hadoop特有的技术进行转换标准的计算。

Stranding divalent link的例子(滞留二价连接)

一个相当普遍的mapper问题就是滞留二价链接问题

注释 这个例子是在一个Nokia的同事Dmitry Mikhelson的帮助下准备的。

如果两个连接的链路是通过一个Bivalent节点连接,则被称为二价的。二价节点是一个只有两个连接的节点,例如,在图3-9中,节点N6,N7,N8和N9是二价节点,链路L5,L6,L7,L8和L9也是二价的。二价的退化情况是链路L4。

 

图3-9:二价链路的例子

如图3-9所示,计算二价链路延伸的算法看起来非常简单-在两个非二价节点之间的任何延伸的链路是称为二价链路的延伸。

要实现该算法,假定:

1、  一个节点描述为拥有键值Ni 15-NNi'> 的对象N,比如,节点N1可以被描述为 15NN1'> ,N2为 15NN2'> 。

2、  链路被描述为一个拥有键值Li~ 15LLi'> 的对象L,比如,链路L1可以被描述LL1,L2为LL2等等。一个链接对象包含引用它开始和结束的节点( 15NNi'> , 15NNj'> )。

3、  也引进一个对象类型的链路(s)或节点(LN),可以有任何键值,可以包含节点或者一个或多个链路。

4、  在定义一个类型-一个链接的链。这种数据类型包含在串中的一个有链接的链路列表。

当有了这些,这个二价链路strand的算法看起来如下:

1、  构建一个局部的二价串集合

2、  循环以下步骤:

a)         合并局部串(strands)

b)         如果没有可以结合的局部串,跳出循环。

3、结束循环

实际的实现过程包含两个mapreduce作业。一个是准备初始的strands(串),另一个(在循环中执行)是合并局部的strands(串)。在这个例子中。串的合并是作为mapreduce作业执行的一部分完成的。因此,这些作业(不是driver)知道在执行过程中合并了多少个局部strand。幸运的是,Hadoop在驱动和mapreduce执行中间提供了一个简单的机制-计数器。

注释:Hadoop提供轻量级的对象(计数器)去收集和作业相关的权值/统计信息。这些东西在mapreduce作业中的任何地方是可以设置和访问的。

这个例子的第一个mapreduce作业的实现如表3-9所示。

 

表3-9 无二价节点作业的清除

处理阶段

描述

Mapper

作业中mapper的任务是创建不属于源记录( 15NNi鍜?/m:t>>LLi'> )的 15LNNi'> 记录。它读取每一个输如的记录,然后查看对象类型。如果是有键值Ni的节点,一个新类型的 15LNNi'> 会被输出。如果是一个链接,所有的邻接节点(Ni和Nj)会从链接中提取出来,两个记录( 15LNNi鍜?/m:t>LNNj'> )会写出来。

Shuffle和Sort

Mapreduce的shuffle和sort会把基于节点key值的所有记录进行排序,确保每个节点相邻的所有链路只被一个单独的reducer处理,对于指定的key值,每个reducer将会同时得到所有的LN记录。

Reducer

作业中,reducer的任务是删除无二价性的节点,然后创建部分的链路的链接。对于给定的节点key值,reducer读取所有的LN记录然后把他们存储到内存中。如果这个节点的链路的数量是2,那么这就是个2价节点,一个新的链(合并这两个节点)会写入到输出文件中(例如,看L5,L6和L7,L8链路对)。如果链路的数量不是2(他可能是一个终结点或是多个链接的节点),那么它就是非二价节点。对于这种类型的节点,一个包含唯一一个链路的特殊的串被创建,链接到这个非二价节点(例如,L4或L5),这种节点的链的数量同链接到这个节点的链路是相等的。

结果

这个作业的执行结果包括部分二价节点的记录,一些这样的节点可以重复(例如,L4,可以出现两次-从过程N4到N5)。

 

 

 

这个例子的第二个mapreduce作业的实现如表3-10所示

表3-10 :合并部分Strands作业

处理阶段

描述

Mapper

作业中,mapper的任务是把拥有相同链路的strand放到一块。对于他读取的每一个strand,你产生一些key/value键值对。Key的值是在strand中链路的key。Strands的value值是strands本身。

Shuffle和sort

Mapreduce的shuffle和sort会把把所有来自终链路键值的记录进行排序,确保相同链路ID的所有strand记录同时来之相同的reducer。

Reducer

作业中,reducer的任务是把拥有相同链路ID的strand进行合并。对于每一个链路ID,所有的strands被载入到内存中,然后按照strand的类型进行处理:

如果两个strands包含相同的链路,产生的strand(链)是完整的,可以直接写入到最终结果目录。

否则生成的链(strand(链中)包含所有独特的链)被创建,然后写入到输出文件中做进一步处理。这种情况下,要被处理的计数器增加。

结果

该作业的执行结果是在一个单独的目录中包含了所有完整的链。

这里给出的例子就是开始潜在的使用mapreduce来解决现实世界的问题。接下来,仔细看看那些情况下是使用mapreduce是合理的,哪些是不合适的。

用还是不用Mapreduce?

如上讨论的那样,mapreduce是在大量数据的情况下解决简单问题的技术,而且必须以并行的方式处理(最好是多台机器)。这个概念的整体思路是在现实的时间框架上对大规模数据集进行计算。

另外,mapreduce可以用来并行密集型计算,并不是和数据量相关,而是和整体的计算时间(一般是“高度并行”情况下的计算)有关。

为了能使Mapreduce可以应用,下面必须符合:

1、  要运行的计算必须可以组合,它指的是必须能对数据集下的小数据集进行计算。然后对部分结果合并。

2、  数据集的大小要足够大(或者计算时间要足够长),当基础设施  为独立的计算和合并结果不会对整体性能造成影响。

3、  计算主要取决于于正在处理的数据集。用Hbase可以额外添加小的数据集。分布式缓存或者一些其他的技术。

然而,当数据集必须能随机的被访问去执行操作(例如,如果一个给定的数据集记录必须加上额外的记录来执行操作),在这种情境中,mapreduce是不适用的。然后在这种情况下,可以运行额外的mapreduce作业来为计算“准备”数据。

另外一些不适用mapreduce的问题是递归问题(例如,斐波那契问题)。在这种情况下,mapreduce不适用是因为当前value值的计算需要前一个的知识。这就意味着你不能把它们分解成为可以单独运行的子计算(sub computation)。

如果一个数据足够的小,小到可以放到一个机器的内存里,作为一个独立的应用程序可能会处理的更快。在这种情况下,使用mapreduce,会使执行变得不必要的复杂,通常会更慢。

注意,(keep it in mind),虽然一大类的算法不能直接应用在mapreduce的实施上。但是对于同样的基本问题,往往存在可以通过利用mapreduce解决的替代解决方案。这种情况下,使用mapreduce通常是有利的,因为mapreduce是在有丰富的hadoop生态系统中执行的(支持更容易的改进的实施),并与其它应用程序的集成。

最后 你应该记住Mapreduce本质上是一个批处理实现。决不能用于在线计算(比如在线用户请求的实时计算)。

常见的Mapreduce设计陷阱

当你设计mapreduce应用的时候,下面列举的是需要注意和避免的。

Ø  当map任务中对数据分片的时候。要确保没有创建过多(通常情况下,mapper的数量应该在数百,而不是数千)或者过少的分片。正确数量的mapper对应用程序有以下优势:

1、  拥有过多的mapper会造成调度和基础设施的开销,在极端情况下,甚至会杀死一个Jobtracker。另外,过多的mapper通常会提高整体资源的利用率(因为创建过多的JVM)和执行时间(因为执行slot的数量是有限的)。

2、  Mapper太少会导致集群不能充分利用,给一些节点(实现运行mapper的节点)造成过度负载。此外,在有大型map任务情况下,重试和推测执行的情况会变得非常昂贵的代价且会花费更长的时间。

3、  大量小型的mapper会造成大量的寻求,shuffle map输出给reducer的结果时。当把map的输出结果传递给reducer时,它也会造成过多的连接。

Ø  为应用程序配置Reducer的数量是另一个重要因素,reducer太多(通常是成千)或太少都会使效率降低。

1、  除了调度和基础设施的开销外,大量的reducer会创建太多的输出文件(记住,每个reducer创建自己的输出文件),对namenode有负面的影响。当有其他作业利用该mapreduce作业的结果时,它会变得更为复杂。

2、  太少的reducer和太少的mapper一样,造成同样的负面影响-不能充分利用集群和非常昂贵(代价)的回调。(retry)

Ø  合理利用作业计数器

1、  计数器在跟踪少量的,重要的,全局的信息是适用的(在Chapter 5了解更多关于使用计数器的详情)。他们绝对不是只是整合非常细粒度统计的应用程序。

2、  计数器的代价非常高,因为Jobtracker在应用程序的整个持续时间内,必须维持每个map/reduce任务的每一个计数器。

Ø  对应用程序的输出,选择一个合适的压缩机制来改善写性能(压缩速度vs压缩效率)。

Ø  为mapreduce作业的输出选择一个合适的文件格式。利用序列化文件通常是最好的选择,因为它们可以被压缩和分片(在Chapter 2了解更多关于压缩文件的信息)。

Ø  当单个输入/输出文件很大的时候,考虑使用更大的输出块大小(多个千兆字节大小)。

 

1、  尽量避免在map和reduce方法中添加新的类的实例。这些方法在执行过程中会循环执行多次。也就是说类的创建和处理将增加执行的时间,为垃圾收集器增加额外的工作。比较好的方法是在相应的set()方法中创建大量的中间类,然后重写map和reduce方法。

2、  不要用分布式缓存来移动大数量的工件或者非常大的工件(每个百兆字节)。分布式缓存的设计是用来分布小部分中等大小的工件,几兆到几十兆大小。

3、  处理少量的数据时,不要创建成百上千个小作业式的工作流。

4、  不直接从reducer或者mapper直接写入用户自定义的文件。Hadoop中当前实现文件写的功能是单线程的(从第二章获取更多细节),这意味着当多个mapper/reducer试图写文件时,这个执行将被序列化。

5、  不要创建这样的mapreduce功能,扫描一个Hbase表来创建一个新的Hbase表(或者写入同样的表中)  。TableInputFormat是为基于具有时间敏感性的表扫描的Hbase和Mapreduce的实现。  另一方面,Hbase写功能会因为Hbase表的分割而产生一定的写延迟。 结果是Region服务器会挂掉,然后你会失去一些数据。最好的解决方案是把作业分割成两个作业。一个扫描表并想HDFS中写入中间结果。另一个从HDFS读取数据并写入到HBase中。  

总结

本章节讨论了Mapreduce框架,了解了Mapreduce的整体框架和Mapreduce管道的执行方式。也学习了如何设计Mapreduce应用和Mapreduce遇到的问题的类型。最后学习了怎么编写和执行一个简单的mapreduce应用以及在执行过程中都发生了什么。

现在你知道了如何设计一个mapreduce应用、编写和执行一个简单的应用。第四章考察了mapreduce管道中不同组件的定制方法。使您能够更好的利用Mapreduce环境。

目录
相关文章
|
3月前
|
存储 分布式计算 Hadoop
Hadoop:驭服数据洪流的利器
在当今信息大爆炸的时代,海量数据成为企业决策的重要依据。本文将介绍大规模数据处理框架Hadoop的概念与实践,探讨其在解决大数据应用中的重要性和优势。从分布式计算、高可靠性、扩展性等方面深入剖析Hadoop的工作原理,并结合实例说明如何利用Hadoop来处理海量数据,为读者提供了解和运用Hadoop的基础知识。
|
4月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
38 1
|
3月前
|
分布式计算 关系型数据库 Hadoop
使用Sqoop将数据从Hadoop导出到关系型数据库
使用Sqoop将数据从Hadoop导出到关系型数据库
|
3月前
|
分布式计算
如何在MapReduce中处理非结构化数据?
如何在MapReduce中处理非结构化数据?
26 0
|
3月前
|
存储 分布式计算 监控
Hadoop的JobTracker和TaskTracker在MapReduce中的作用是什么?
Hadoop的JobTracker和TaskTracker在MapReduce中的作用是什么?
49 0
|
4月前
|
存储 分布式计算 分布式数据库
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
27 0
|
4月前
|
存储 Linux
[hadoop3.x]HDFS之银行海量转账数据分层案例(八)
[hadoop3.x]HDFS之银行海量转账数据分层案例(八)
109 1
|
4月前
|
存储 SQL 分布式计算
Hadoop(HDFS+MapReduce+Hive+数仓基础概念)学习笔记(自用)
Hadoop(HDFS+MapReduce+Hive+数仓基础概念)学习笔记(自用)
254 0
|
4月前
|
分布式计算 Hadoop 大数据
大数据成长之路-- hadoop集群的部署(4)退役旧数据节点
大数据成长之路-- hadoop集群的部署(4)退役旧数据节点
52 0
|
4月前
|
存储 分布式计算 资源调度
干翻Hadoop系列文章【03】:MapReduce概念详解
干翻Hadoop系列文章【03】:MapReduce概念详解