《Hadoop与大数据挖掘》一2.4.4 MapReduce组件分析与编程实践

简介:

本节书摘来华章计算机《Hadoop与大数据挖掘》一书中的第2章 ,第2.4.4节,张良均 樊 哲 位文超 刘名军 许国杰 周 龙 焦正升 著 更多章节内容可以访问云栖社区“华章计算机”公众号查看。

2.4.4 MapReduce组件分析与编程实践

MapReduce整个流程包括以下步骤:输入格式(InputFormat)、Mapper、Combiner、Partitioner、Reducer、输出格式(OutputFormat)。这里会针对流程中的Combiner、Part-itioner、输入/输出格式进行分析,同时,也会介绍相关的编程技巧,如自定义键值对。
1. Combiner分析
Combiner是什么呢?从字面意思理解,Combine即合并。其实,Combiner就是对Mapper的输出进行一定的合并,减少网络输出的组件。所以,其去掉与否不影响最终结果,影响的只是性能。
Combiner是Mapper端的汇总,然后才通过网络发向Reducer。如图2-40所示,经过Combiner后,键值对,被合并为,这样发往Reducer的记录就可以减少一条(当然,实际中肯定不是只减少一条记录),从而减少了网络IO。
对于多个输入数据块,每个数据块产生一个InputSplit,每个InputSplit对应一个map任务,每个map任务会对应0个到多个Combiner,最后再汇总到Reducer。在单词计数的例子中,使用Combiner的情形如图2-41所示。


image


需要注意的是,自定义Combiner也是需要集成Reducer的,同样也需要在reduce函数中写入处理逻辑。但是要注意,Combiner的输入键值对格式与输出键值对格式必须保持一致,也正是因为这个要求,很多情况下,采用自定义Combiner的方式在业务或算法处理上行不通。还有,在单词计数程序中,Combiner和Reducer使用的是同一个类代码,这是可能的,但是大多数情况下不能这样做,因为Reducer和Combiner的逻辑在很多情况下是不一样的。
2. Partitioner分析
Partitioner是来做什么的呢?是用来提高性能的吗?非也!Partitioner主要的目的是把键值对分给不同的Reducer。分给不同的Reducer?难道Reducer可以有多个吗?这是当然的,只需要在初始化Job实例的时候进行设置即可,例如设置代码为job.setNum-ReduceTasks(3),这样就可以设置3个Reducer了。
经过前面的分析可以知道,在Reducer的输入端,其键值对组是按照一个键对应一个值列表的。如果同一个键的不同值被发送到了不同的Reducer中,那么(注意,每个Reducer在一个子节点运行,不同Reducer之间不会干扰),经过不同的Reducer处理后,其实我们已经做不到针对一个键,输出一个值了,而是输出了两条记录。我们可以看下Hadoop系统默认的Partitioner实现,默认的Partitioner是HashPartitioner,其源码如代码清单2-30所示。

代码清单2-30 HashPartitioner源码
public class HashPartitioner<K, V> extends Partitioner<K, V> {
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

在源码中,可以看到HashPartitiner中只有一个方法,就是getPartition(K key,V value, int numReducTasks)。3个参数分别为键、值、Reducer的个数,输出其实就是Reducer的ID。从代码的实现中可以看出,最终输出的Reducer ID只与键(key)的值有关,这样也就保证了同样的键会被发送到同一个Reducer中处理。
同一个键的记录会被发送到同一个Reducer中处理,一个Reducer可以处理不同的键的记录。

3.输入输出格式/键值类型
一般来说,HDFS一个文件对应多个文件块,每个文件块对应一个InputSplit,而一个InputSplit就对应一个Mapper任务,每个Mapper任务在一个节点上运行,其仅处理当前文件块的数据,但是我们编写Mapper的时候只是关心输入键值对,而不是关心输入文件块。那么,文件块怎么被处理成了键值对呢?这就是Hadoop的输入格式要做的工作了。
在InputFormat中定义了如何分割以及如何进行数据读取从而得到键值对的实现方式,它有一个子类FileInputFormat,如果要自定义输入格式,一般都会集成它的子类File-InputFormat,它里面帮我们实现了很多基本的操作,比如记录跨文件块的处理等。
图2-42所示是InputFormat的类继承结构。
然而,比较常用的则是如表2-7所示的几个实现方式。
同理,可以想象,输出格式(OutputFormat)也与输入格式相同,不过是输入格式的逆过程:把键值对写入HDFS中的文件块中。如图2-43所示是OutputFormat的类继承结构。


image



image


同样,比较常用的方式如表2-8所示。

image


image


在Hadoop中,无论是Mapper或Reducer处理的都是键值对记录,那么Hadoop中有哪些键值对类型呢?Hadoop中常用的键值对类型如图2-44所示。


image


从各个类的命名上其实也可以看出其代表什么类型,比如LongWritable,代表的就是Long的实现,而Text就是String的实现。在前面的单词计数中我们使用过IntWritable以及Text。
这里有两点需要注意:
1)值类型都需实现Writale接口;
2)键需要实现WritableComparable接口。
其实从图2-44中也可以看出,Hadoop已有的键值类型都是实现WritableComparable接口的,然而WritableComparable接口又是实现Writable接口的。所以,Hadoop已有的键值类型既可以作为键类型也可以作为值类型。作为键类型的肯定可以作为值类型,但作为值类型的却不能作为键类型。为什么键类型是实现WritableComparable接口呢?其实,如果你联想到了Shuffle/Sort过程的话,应该不难理解,因为MapReduce框架需要在这里对键进行排序。
4.动手实践:指定输入输出格式
这个实验主要是加深理解Hadoop的输入/输出格式,熟悉常用的SequenceFileInput-Format和SequenceFileOutputFormat。
实验步骤:
1)打开Eclipse,打开已经完成的WordCount程序;
2)设置输出格式为SequenceFileOutputFormat,重新打包,并提交到Linux上运行;
3)查看输出的文件;
4)再次修改WordCount程序,设置输入格式为SequenceFileInputFormat、输入路径为3的输出;设置输出格式为TextFileInputFormat;
5)查看输出结果;
6)针对上面的各个步骤以及输出进行分析,解释对应的输出结构。
思考:
1)第4步中查看的文件是否是乱码?如果是乱码,为什么是乱码?针对这样的数据,如何使用HDFS Java API进行读取?如果不是乱码,看到的是什么?
2)使用SequenceFileInputFormat或SequenceFileOutputFormat有什么优势与劣势?
5.自定义键值类型
Hadoop已经定义了很多键值类型,比如Text、IntWritable、LongWritable等,那为什么需要用到自定键值类型呢?答案其实很简单,不够用。在有些情况下,我们需要一些特殊的键值类型来满足我们的业务需求,这种时候就需要自定义键值类型了。前面已经提到,自定义键需要实现WritableComparable接口,自定义值需要实现Writable接口,那么实现了接口后,还需要做哪些操作呢?
自定义值类型可参考代码清单2-31进行分析。
**代码清单2-31 自定义Hadoop 值类型
public class MyWritable implements Writable {
private int counter;
private long timestamp;
@Override
public void write(DataOutput out) throws IOException {

 out.writeInt(counter);
 out.writeLong(timestamp);

}
@Override
public void readFields(DataInput in) throws IOException {

 counter = in.readInt();
 timestamp=in.readLong();

}**
}
在代码清单2-31中,首先实现了Writable接口,接着定义了两个变量。这两个变量其实是与业务相关的(比如,这里定义了一个counter,一个timestamp)。实现了Writable接口后,需要覆写两个方法(write和readFields),这里需要注意写入和读取的顺序是很重要的,比如这里先把counter写入out输出流,再把timestamp写入out输出流。那么,在读取的时候就需要先读取counter,再读取timestamp(如果两个变量都是int型,那么就更加需要注意区分)。
自定义键类型可参考代码清单2-32进行分析。
**代码清单2-32 自定义Hadoop 键类型
public class MyWritableComparable implements WritableComparable {
private int counter;
private long timestamp;
@Override
public void write(DataOutput out) throws IOException {

 out.writeInt(counter);
 out.writeLong(timestamp);

}
@Override
public void readFields(DataInput in) throws IOException {

 counter = in.readInt();
 timestamp= in.readLong();

}
@Override
public int compareTo(MyWritableComparable other) {

 if(this.counter == other.counter){
     return (int)(this.timestamp - other.timestamp);
 }
 return this.counter-other.counter;

}
}**
从代码清单2-32中可以看出,自定义键类型其实就是比自定义值类型多了一个比较方法而已,其他都是一样的。
6.动手实践:自定义键值类型
针对source/hadoop/keyvalue.data数据求解每行数据的个数以及平均值,该数据格式如表2-9所示。

image

1)编写Driver程序,main函数接收两个参数和,设置输入格式为KeyValueInputFormat;
2)编写Mapper程序,map函数针对每个value值,使用‘t’进行分隔;接着,对分隔后的数据进行求和以及个数统计(注意将字符串转换为数值),输出平均值和个数,Mapper输出键值对类型为;
3)编写自定义value类型MyValue,定义两个字段,一个是average,一个是num,用于存储平均值和个数;重写toString方法;
4)编写Reducer程序,直接输出即可;
5)对编写的程序进行打包averagejob.jar;
6)上传source/hadoop/keyvalue.data到HDFS,上传averagejob.jar到Linux;
7)使用命令hadoop jar averagejob.jar进行调用;
8)查看输出结果。
思考:
1)Reducer类是否必需?如果不需要,则如何修改?如果去掉reducer,输出结果会有什么不一样?
2)如果想让程序可以直接在Eclipse中运行,应该如何修改程序?

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
18天前
|
存储 消息中间件 监控
【Flume】Flume在大数据分析领域的应用
【4月更文挑战第4天】【Flume】Flume在大数据分析领域的应用
|
1月前
|
Cloud Native 数据处理 云计算
探索云原生技术在大数据分析中的应用
随着云计算技术的不断发展,云原生架构作为一种全新的软件开发和部署模式,正逐渐引起企业的广泛关注。本文将探讨云原生技术在大数据分析领域的应用,介绍其优势与挑战,并探讨如何利用云原生技术提升大数据分析的效率和可靠性。
|
1月前
|
SQL 分布式计算 关系型数据库
阿里云E-MapReduce Trino专属集群外连引擎及权限控制踩坑实践
本文以云厂商售后技术支持的角度,从客户的需求出发,对于阿里云EMR-Trino集群的选型,外连多引擎的场景、Ldap以及Kerberos鉴权等问题进行了简要的实践和记录,模拟客户已有的业务场景,满足客户需求的同时对过程中的问题点进行解决、记录和分析,包括但不限于Mysql、ODPS、Hive connector的配置,Hive、Delta及Hudi等不同表格式读取的兼容,aws s3、阿里云 oss协议访问异常的解决等。
|
1月前
|
存储 消息中间件 大数据
Go语言在大数据处理中的实际应用与案例分析
【2月更文挑战第22天】本文深入探讨了Go语言在大数据处理中的实际应用,通过案例分析展示了Go语言在处理大数据时的优势和实践效果。文章首先介绍了大数据处理的挑战与需求,然后详细分析了Go语言在大数据处理中的适用性和核心技术,最后通过具体案例展示了Go语言在大数据处理中的实际应用。
|
1月前
|
数据采集 运维 数据挖掘
API电商接口大数据分析与数据挖掘 (商品详情店铺)
API接口、数据分析以及数据挖掘在商品详情和店铺相关的应用中,各自扮演着重要的角色。以下是关于它们各自的功能以及如何在商品详情和店铺分析中协同工作的简要说明。
|
3月前
|
关系型数据库 MySQL Serverless
高顿教育:大数据抽数分析业务引入polardb mysql serverless
高顿教育通过使用polardb serverless形态进行数据汇总,然后统一进行数据同步到数仓,业务有明显高低峰期,灵活的弹性伸缩能力,大大降低了客户使用成本。
|
3月前
|
机器学习/深度学习 数据采集 算法
大数据分析技术与方法探究
在当今信息化时代,数据量的增长速度远快于人类的处理能力。因此,如何高效地利用大数据,成为了企业和机构关注的焦点。本文将从大数据分析的技术和方法两个方面进行探究,为各行业提供更好的数据应用方向。
|
3月前
|
机器学习/深度学习 人工智能 自然语言处理
大数据分析的技术和方法:从深度学习到机器学习
大数据时代的到来,让数据分析成为了企业和组织中不可或缺的一环。如何高效地处理庞大的数据集并且从中发现潜在的价值是每个数据分析师都需要掌握的技能。本文将介绍大数据分析的技术和方法,包括深度学习、机器学习、数据挖掘等方面的应用,以及如何通过这些技术和方法来解决实际问题。
52 2
|
3月前
|
机器学习/深度学习 人工智能 运维
大数据分析:探索信息世界的钥匙
在当今信息爆炸的时代,大数据分析成为挖掘宝藏般的技术和方法。本文将介绍大数据分析的基本概念、技术与方法,并探讨其在商业、科学和社会领域中的广泛应用。从数据收集和预处理到模型构建和结果解读,大数据分析为我们揭示了信息世界的钥匙,为决策者提供了有力的支持。
|
2月前
|
API
GEE案例分析——利用sentinel-3数据计算空气污染指数(Air Pollution Index,简称API)
GEE案例分析——利用sentinel-3数据计算空气污染指数(Air Pollution Index,简称API)
105 0

热门文章

最新文章