如何在MaxCompute上运行HadoopMR作业

简介: MaxCompute(原ODPS)有一套自己的MapReduce编程模型和接口,简单说来,这套接口的输入输出都是MaxCompute中的Table,处理的数据是以Record为组织形式的,它可以很好地描述Table中的数据处理过程,然而与社区的Hadoop相比,编程接口差异较大。Hadoop用户如果

本文用到的

阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/product/odps


MaxCompute(原ODPS)有一套自己的MapReduce编程模型和接口,简单说来,这套接口的输入输出都是MaxCompute中的Table,处理的数据是以Record为组织形式的,它可以很好地描述Table中的数据处理过程,然而与社区的Hadoop相比,编程接口差异较大。Hadoop用户如果要将原来的Hadoop MR作业迁移到MaxCompute的MR执行,需要重写MR的代码,使用MaxCompute的接口进行编译和调试,运行正常后再打成一个Jar包才能放到MaxCompute的平台来运行。这个过程十分繁琐,需要耗费很多的开发和测试人力。如果能够完全不改或者少量地修改原来的Hadoop MR代码就能在MaxCompute平台上跑起来,将是一个比较理想的方式。

现在MaxCompute平台提供了一个HadoopMR到MaxCompute MR的适配工具,已经在一定程度上实现了Hadoop MR作业的二进制级别的兼容,即用户可以在不改代码的情况下通过指定一些配置,就能将原来在Hadoop上运行的MR jar包拿过来直接跑在MaxCompute上。该插件的下载地址在:http://repo.aliyun.com/download/hadoop2openmr-1.0.jar,目前该插件处于测试阶段,暂时还不能支持用户自定义comparator和自定义key类型,下面将以WordCount程序为例,介绍一下这个插件的基本使用方式。

使用该插件在MaxCompute平台跑一个HadoopMR作业的基本步骤如下:

1. 下载HadoopMR的插件

通过http://repo.aliyun.com/download/hadoop2openmr-1.0.jar下载插件,包名为hadoop2openmr-1.0.jar,注意,这个jar里面已经包含hadoop-2.7.2版本的相关依赖,在作业的jar包中请不要携带hadoop的依赖,避免版本冲突。

2. 准备好HadoopMR的程序jar包

编译导出WordCount的jar包:wordcount_test.jar ,wordcount程序的源码如下:


package com.aliyun.odps.mapred.example.hadoop;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
import java.io.IOException;
import java.util.StringTokenizer;
 
public class WordCount {
 
    public static class TokenizerMapper
        extends Mapper<Object, Text, Text, IntWritable>{
 
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
 
        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
 
    public static class IntSumReducer
        extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();
 
        public void reduce(Text key, Iterable<IntWritable> values,
            Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
 
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

3. 测试数据准备

  • 创建输入表和输出表
create table if not exists wc_in(line string);
create table if not exists wc_out(key string, cnt bigint);
  • 通过tunnel将数据导入输入表中

待导入文本文件data.txt的数据内容如下:

hello maxcompute
hello mapreduce

例如可以通过如下命令将data.txt的数据导入wc_in中,

tunnel upload  data.txt wc_in;

4. 准备好表与hdfs文件路径的映射关系配置

配置文件命名为:wordcount-table-res.conf

{
  "file:/foo": {
    "resolver": {
      "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver",
      "properties": {
          "text.resolver.columns.combine.enable": "true",
          "text.resolver.seperator": "\t"
      }
    },
    "tableInfos": [
      {
        "tblName": "wc_in",
        "partSpec": {},
        "label": "__default__"
      }
    ],
    "matchMode": "exact"
  },
  "file:/bar": {
    "resolver": {
      "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver",
      "properties": {
          "binary.resolver.input.key.class" : "org.apache.hadoop.io.Text",
          "binary.resolver.input.value.class" : "org.apache.hadoop.io.LongWritable"
      }
    },
    "tableInfos": [
      {
        "tblName": "wc_out",
        "partSpec": {},
        "label": "__default__"
      }
    ],
    "matchMode": "fuzzy"
  }
}

配置项说明:

整个配置是一个json文件,描述hdfs上文件与maxcompute上表之间的映射关系,一般要配置输入和输出两部分,一个HDFS路径对应一个resolver配置,tableInfos配置以及matchMode配置。

  • resolver: 用于配置如何对待文件中的数据,目前有com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver和com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver两个内置的resolver可以选用。除了指定好resolver的名字,还需要为相应的resolver配置一些properties指导它正确的进行数据解析。

    • TextFileResolver: 对于纯文本的数据,输入输出都会当成纯文本对待。当作为输入resolver配置时,需要配置的properties有text.resolver.columns.combine.enable和text.resolver.seperator,当text.resolver.columns.combine.enable配置为true时,会把输入表的所有列按找text.resolver.seperator指定的分隔符组合成一个字符串作为输入。否则,会把输入表的前两列分别作为key,value。
    • BinaryFileResolver: 可以处理二进制的数据,自动将数据转换为maxcompute可以支持的数据类型,如bigint, bool, double等。当作为输出resolver配置时,需要配置的properties有binary.resolver.input.key.class和binary.resolver.input.value.class,分别代表中间结果的key和value类型。
  • tableInfos:用户配置HDFS对应的maxcompute的表,目前只支持配置表的名字tblName,而partSpec和label请保持和示例一致。
  • matchMode:路径的匹配模式,可选项为exact和fuzzy,分别代表精确匹配和模糊匹配,如果设置为fuzzy,则可以通过正则来匹配HDFS的输入路径

5. 使用MaxCompute命令行工具odpscmd提交作业

maxcompute命令行工具的安装和配置方法参考:http://repo.aliyun.com/odpscmd/

在maxcompute的命令行下运行如下命令:

jar -DODPS_HADOOPMR_TABLE_RES_CONF=./wordcount-table-res.conf -classpath hadoop2openmr-1.0.jar,wordcount_test.jar com.aliyun.odps.mapred.example.hadoop.WordCount /foo /bar;    

这里假设我们已经将hadoop2openmr-1.0.jar和wordcount_test.jar以及wordcount-table-res.conf已经放置到odpscmd的当前目录,否则在指定配置和-classpath的路径时需要做相应的修改。

运行过程如下图所示:

1

当作业运行完成后,可以查看结果表wc_out里面的内容,验证作业成功结束,结果符合预期。

2

欢迎加入MaxCompute钉钉群讨论

42559c7dde62e4d333c90e02efdf416257a4be27

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
SQL 分布式计算 监控
MaxCompute提供了一些工具以帮助您监控作业和资源使用情况。
【2月更文挑战第4天】MaxCompute提供了一些工具以帮助您监控作业和资源使用情况。
27 8
|
2月前
|
分布式计算 DataWorks BI
MaxCompute数据问题之运行报错如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
38 1
|
2月前
|
SQL 分布式计算 DataWorks
maxcompute函数问题之注册函数后运行结果错误如何解决
MaxCompute函数包括内置函数和自定义函数(UDF),它们用于在MaxCompute平台上执行数据处理和分析任务;本合集将介绍MaxCompute函数的使用方法、函数编写和优化技巧,以及常见的函数错误和解决途径。
|
2月前
|
SQL 分布式计算 MaxCompute
MaxCompute异常问题之运行语句异常如何解决
MaxCompute异常涉及到在使用阿里云MaxCompute大数据计算服务时遇到的各种错误和问题;本合集将提供针对MaxCompute异常的分析和解决方案,帮助用户处理数据处理、分析任务中的异常情况。
|
2月前
|
SQL 并行计算 大数据
【大数据技术攻关专题】「Apache-Flink零基础入门」手把手+零基础带你玩转大数据流式处理引擎Flink(基础加强+运行原理)
关于Flink服务的搭建与部署,由于其涉及诸多实战操作而理论部分相对较少,小编打算采用一个独立的版本和环境来进行详尽的实战讲解。考虑到文字描述可能无法充分展现操作的细节和流程,我们决定以视频的形式进行分析和介绍。因此,在本文中,我们将暂时不涉及具体的搭建和部署步骤。
496 3
【大数据技术攻关专题】「Apache-Flink零基础入门」手把手+零基础带你玩转大数据流式处理引擎Flink(基础加强+运行原理)
|
4月前
|
SQL 分布式计算 监控
MaxCompute提供了一些工具以帮助您监控作业和资源使用情况
MaxCompute提供了一些工具以帮助您监控作业和资源使用情况
40 4
|
4月前
|
存储 分布式计算 大数据
【大数据技术Hadoop+Spark】Spark RDD设计、运行原理、运行流程、容错机制讲解(图文解释)
【大数据技术Hadoop+Spark】Spark RDD设计、运行原理、运行流程、容错机制讲解(图文解释)
67 0
|
5月前
|
SQL 分布式计算 调度
在MaxCompute中,你可以通过SQL语句来查询和导出实例的运行状态和时间等信息
在MaxCompute中,你可以通过SQL语句来查询和导出实例的运行状态和时间等信息
54 5
|
5月前
|
机器学习/深度学习 大数据
机器学习lgb全国大数据创新应用大赛用户贷款风险预测 完整代码数据 可直接运行
机器学习lgb全国大数据创新应用大赛用户贷款风险预测 完整代码数据 可直接运行
90 0
|
6月前
|
SQL 分布式计算 DataWorks
MaxCompute元数据使用实践--作业统计
通过MaxCompute租户级别Information Schema的“TASKS_HISTORY”视图可以统计查看MaxCompute计算作业的元数据信息,方便您进行作业审计以及各类统计,指导作业性能、成本优化。
1205 0

相关产品

  • 云原生大数据计算服务 MaxCompute