基于大数据开发套件定时调度带资源文件的MapReduce作业

简介: 1、介绍如何使用DataIDE周期性调度MaxCompute MapReduce作业 2、如何编写带资源文件的MapReduce代码

MaxCompute里的MR作业,很少是只要跑一次就好了的。如果需要周期性调度,目前MaxCompute(原名ODPS)只提供了计算引擎,任务调度可以使用大数据开发套件来实现。这篇帖子从基础开始,介绍了3种周期性调度的方法。同时还介绍了如何使用资源文件。

代码开发

代码以文档里的WordCount 作为例子。
在这个基础上,增加资源文件的读取方法,修改Reduce类。主要的逻辑是读取资源文件,资源文件里的数据格式是字符串1,字符串2。代码逻辑是如果word count里的word如果有在字符串1里出现的话,就替换成字符串2。

    public static class SumReducer extends ReducerBase {
        private Record result = null;
        private Map<String,String> maps = null;

        @Override
        public void setup(TaskContext context) throws IOException {
            result = context.createOutputRecord();
            maps = new HashMap<String,String>();
            StringBuilder importdata = new StringBuilder();
            BufferedInputStream bufferedInput = null;
            try {
                byte[] buffer = new byte[1024];
                int bytesRead = 0;
                //读取资源文件的内容
                bufferedInput = context.readResourceFileAsStream("resource.txt");

                while ((bytesRead = bufferedInput.read(buffer)) != -1) {
                    String chunk = new String(buffer, 0, bytesRead);
                    importdata.append(chunk);
                }
                //解析资源文件的内容,把替换前,替换后的数据放到map里
                String lines[] = importdata.toString().split("\n");
                for (int i = 0; i < lines.length; i++) {
                    String[] ss = lines[i].split(",");
                    maps.put(ss[0].trim(), ss[1].trim());
                    System.out.println(ss[0]+"->"+ss[1]);
                }
            } catch (FileNotFoundException ex) {
                throw new IOException(ex);
            } catch (IOException ex) {
                throw new IOException(ex);
            } finally {
            }
        }

        @Override
        public void reduce(Record key, Iterator<Record> values,
                TaskContext context) throws IOException {
            long count = 0;
            while (values.hasNext()) {
                Record val = values.next();
                count += (Long) val.get(0);
            }
            String value = key.get(0).toString();
            if(maps.containsKey(value)){
                System.out.println(value+"->"+maps.get(value));
                value = maps.get(value);
                
            }
            result.set(0, value);
            result.set(1, count);
            context.write(result);
        }
    }

具体资源文件的用法可以参考文档 ,这里就不再多解释了。

客户端调用

对于测试数据,源文件的内容为

odps,MaxCompute
hello,Hello

我们先用手工调度来跑这个MR,这里跑通了后后面的所有的配置就很容易明白了。
首先需要把代码打出的jar包,和这个resource.txt文件上传到服务器上

>add jar D:\cx_word_count.jar -f;
OK: Resource 'cx_word_count.jar' have been updated.
>add file D:\resource.txt -f;
OK: Resource 'resource.txt' have been updated.

然后通过命令行来调用

jar -resources cx_word_count.jar,resource.txt -classpath D:\cx_word_count.jar com.aliyun.odps.mr.WordCount;

这里的-resources引用的是跑在服务器上的,-classpath是用来找到main方法的。理解这个对后面配置同步任务很有帮助。可以参阅文档

Crontab调用

odpscmd客户端有一个参数,是-e,可以在shell里直接调用jar命令来跑MR,当然也可以使用odpscmd -f来再调用一个脚本文件,但是这样有点麻烦了。这里就直接用-e来做。

你可以先用

/odps/cmd/bin/odpscmd  -e "jar -resources cx_word_count.jar,resource.txt -classpath /odps/cx_word_count.jar com.aliyun.odps.mr.WordCount;"

在Linux服务器上运行任务。注意安装odpscmd配置前需要先配置好java环境。然后后面的Crontab的配置就不展开了。

MR作业

配置DataIDE的MR作业的界面,很容易就让人想到MR任务的main方法。其实就是DataIDE会根据配置自己生成main方法,然后去调用MaxCompute上的任务。具体的配置可以参考这个截图:
screenshot
可以在右边看到可以配置任务的调度周期和上下游依赖,从而实现每天的定时调度,而且还能是保证上游的数据导入、预处理完成后才开始做MR操作,非常好用。

Shell任务

上述的MR任务简单方便,但是DataIDE出于安全考虑,不让用户自己写main方法。如果需要用到诸如传参数之类的功能,可以自己写Shell任务,但是调度让DataIDE来做。这样就集上面两个方法之长了。

Shell任务需要先参考文档 先配置调度的ECS信息,这里不再展开。完成后写一个Shell脚本,内容为

##@resource_reference{"cx_word_count.jar,resource.txt"}
/opt/taobao/tbdpapp/odpswrapper/odpsconsole/bin/odpscmd   -u  testid  -p  testkey  --project=testproject --endpoint=http://service.odps.aliyun.com/api  -e "jar -resources cx_word_count.jar,resource.txt  -classpath /odps/cx_word_count.jar  com.aliyun.odps.mr.WordCount"

要把里面的Access id/key,Project 替换成你自己的,然后开始测试代码。需要特别注意的是, Shell任务是在机器上的admin账号下运行的 ,如果发现各种奇怪的错误,比如明明存在的文件找不到一类的错误,可以先su - admin,调试下Shell命令,或者访问下对应的文件,看看是否是环境变量,文件目录权限的问题。另外也可以把错误日志重定向到某个文件里,比如/tmp文件夹下的某个临时日志文件里,方便事后调试。大家可以在admin账号下把shell调试通过后再放到数加上去调用。

另外Shell任务可以调整调度的机器,可以参考
screenshot

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
SQL 分布式计算 监控
MaxCompute提供了一些工具以帮助您监控作业和资源使用情况。
【2月更文挑战第4天】MaxCompute提供了一些工具以帮助您监控作业和资源使用情况。
23 8
|
6月前
|
分布式计算 资源调度 大数据
黑马程序员-大数据入门到实战-MapReduce & YARN入门
黑马程序员-大数据入门到实战-MapReduce & YARN入门
73 0
|
4月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
59 0
|
4月前
|
分布式计算 资源调度 物联网
助力工业物联网,工业大数据之服务域:定时调度使用【三十四】
助力工业物联网,工业大数据之服务域:定时调度使用【三十四】
34 1
|
4月前
|
SQL 分布式计算 监控
MaxCompute提供了一些工具以帮助您监控作业和资源使用情况
MaxCompute提供了一些工具以帮助您监控作业和资源使用情况
40 4
|
4月前
|
存储 分布式计算 搜索推荐
【大数据技术Hadoop+Spark】MapReduce之单词计数和倒排索引实战(附源码和数据集 超详细)
【大数据技术Hadoop+Spark】MapReduce之单词计数和倒排索引实战(附源码和数据集 超详细)
46 0
|
4月前
|
分布式计算 Hadoop 大数据
【云计算与大数据计算】Hadoop MapReduce实战之统计每个单词出现次数、单词平均长度、Grep(附源码 )
【云计算与大数据计算】Hadoop MapReduce实战之统计每个单词出现次数、单词平均长度、Grep(附源码 )
145 0
|
4月前
|
存储 分布式计算 Hadoop
【云计算与大数据技术】Hadoop MapReduce的讲解(图文解释,超详细必看)
【云计算与大数据技术】Hadoop MapReduce的讲解(图文解释,超详细必看)
80 0
|
4月前
|
存储 分布式计算 大数据
【云计算与大数据技术】大数据系统总体架构概述(Hadoop+MapReduce )
【云计算与大数据技术】大数据系统总体架构概述(Hadoop+MapReduce )
95 0
|
5月前
|
机器学习/深度学习 分布式计算 大数据
大数据 - MapReduce:从原理到实战的全面指南
大数据 - MapReduce:从原理到实战的全面指南
262 0

相关产品

  • 云原生大数据计算服务 MaxCompute