maxCompute的UDAF demo,实现累加功能

简介: maxCompute的UDAF demo,实现累加功能

需求:将表中数据按照name聚合,并且count进行累加

name count
Jan 1
Jan 2
Feb 3
Feb 1
Mar 1
Mar 5

预期结果:

name count
Jan 3
Feb 7
Mar 13

使用idea的maxCompute studio新增UDAF

_
_
然后自动生成未实现的方法,我的字段name是string,count是bigint
所以@Resolve("string,bigint->bigint")

新增一个class用来存储字段

        private String  name;
        private Long count;
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(name);
            out.writeLong(count);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            name =  in.readUTF();
            count = in.readLong();
        }
    }

这样newBuffer就可以这样写了

    public Writable newBuffer() {
        return new MyBuffer();
    }

还需要一个Map来存储key(name)和value(count),一个long类型的参数存储累加的值

    Long old_count = 0L;//存储累加值
    private LongWritable ret = new LongWritable();//存储输出值

完整代码参考:

public class UDAFTest extends Aggregator {
    private static class MyBuffer implements Writable {
        private String  name;
        private Long count;
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(name);
            out.writeLong(count);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            name =  in.readUTF();
            count = in.readLong();
        }
    }



    @Override
    public Writable newBuffer() {
        return new MyBuffer();
    }
    Map<String,Long> map = new LinkedHashMap<>();
    Long old_count = 0L;
    @Override
    public void iterate(Writable buffer, Writable[] args) throws UDFException {
        String arg = String.valueOf(args[0]);
        Long cnt = Long.parseLong(String.valueOf(args[1]));
        MyBuffer buf = (MyBuffer) buffer;

        if (arg != null) {
            if(map.containsKey(arg)){
                Long newcnt = map.get(arg);
                old_count = cnt+newcnt;
                map.put(arg,old_count);
            }else {
                map.put(arg,old_count+cnt);
            }
        }
        buf.name = arg;
        buf.count = map.get(arg);

    }
    private LongWritable ret = new LongWritable();
    @Override
    public Writable terminate(Writable arg0) throws UDFException {
        MyBuffer buffer = (MyBuffer) arg0;
        ret.set(buffer.count);
        return ret;
    }

    @Override
    public void merge(Writable buffer, Writable partial) throws UDFException {
        MyBuffer buf = (MyBuffer) buffer;
        MyBuffer p = (MyBuffer) partial;
        buf.name = p.name;
        buf.count = p.count;
    }


}

然后通过maxCompute studio发布下
_

发布名为test20191119,这样就可以在Dataworks中调用了。

其中原表数据:
_

_

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
SQL 分布式计算 DataWorks
MaxCompute最佳实践:SQL实现一行变多行&多行变一行
本文对Dataworks里一行变多行&多行变一行进行实践,其中多行变一行是对现有实践的一个引用,方便大家查找
|
2月前
|
分布式计算 DataWorks 大数据
maxcompute函数问题之udaf函数传到线上报错如何解决
MaxCompute函数包括内置函数和自定义函数(UDF),它们用于在MaxCompute平台上执行数据处理和分析任务;本合集将介绍MaxCompute函数的使用方法、函数编写和优化技巧,以及常见的函数错误和解决途径。
|
5月前
|
分布式计算 DataWorks 关系型数据库
MaxCompute支持通过DataWorks数据集成功能将其他数据源数据同步至MaxCompute
MaxCompute支持通过DataWorks数据集成功能将其他数据源数据同步至MaxCompute
35 1
|
5月前
|
jstorm 大数据 分布式数据库
大数据下的实时热点功能实现讨论(实时流的TopN)
我司内部有个基于jstorm的实时流编程框架,文档里有提到实时Topn,但是还没有实现。。。。这是一个挺常见挺重要的功能,但仔细想想实现起来确实有难度。实时流的TopN其实离大家很近,比如下图百度和微博的实时热搜榜,还有各种资讯类的实时热点,他们具体实现方式不清楚,甚至有可能是半小时离线跑出来的。今天不管他们怎么实现的,我们讨论下实时该怎么实现(基于storm)。
117 0
|
8月前
|
分布式计算 运维 大数据
MaxCompute资源管理——使用成本优化功能实现包年包月计算资源降本增效
MaxCompute提供成本优化(计算资源优化推荐)功能,可基于实际作业请求量和资源配置期望,对包年包月一级Quota类型的计算资源生成更优的资源配置方案,帮助进一步提升计算资源利用率,优化计算成本。本文我们一起通过典型场景案例来看看如何通过成本优化(计算资源优化推荐)功能提供降本增效的参考建议。
207 0
|
9月前
|
Cloud Native 大数据
阿里云最新产品手册——阿里云核心产品——云原生大数据计算服务——产品功能
阿里云最新产品手册——阿里云核心产品——云原生大数据计算服务——产品功能自制脑图
77 1
|
10月前
|
数据采集 Java 大数据
大数据数据采集的数据采集(收集/聚合)的Logstash之强大的插件功能
在大数据领域中,Logstash是一款非常流行的数据采集工具。它具有丰富的插件功能,可以完成各种不同数据来源的数据采集任务。本文将介绍Logstash的插件功能,并为大家介绍几款强大的插件。
139 1
|
11月前
|
分布式计算 资源调度 运维
【大数据运维】Hadoop开启Yarn的日志监控功能
【大数据运维】Hadoop开启Yarn的日志监控功能
【大数据运维】Hadoop开启Yarn的日志监控功能
|
分布式计算 MaxCompute
《零基础实现Flume收集网站日志数据到MaxCompute》电子版地址
零基础实现Flume收集网站日志数据到MaxCompute
71 0
《零基础实现Flume收集网站日志数据到MaxCompute》电子版地址
|
SQL 存储 监控
基于 Flink 的实时大数据应用 Demo|学习笔记
快速学习基于 Flink 的实时大数据应用 Demo。
510 0
基于 Flink 的实时大数据应用 Demo|学习笔记