在MaxCompute中利用bitmap进行数据处理

简介: 很多数据开发者使用bitmap技术对用户数据进行编码和压缩,然后利用bitmap的与/或/非的极速处理速度,实现类似用户画像标签的人群筛选、运营分析的7日活跃等分析。本文给出了一个使用MaxCompute MapReduce开发一个对不同日期活跃用户ID进行bitmap编码和计算的样例。

很多数据开发者使用bitmap技术对用户数据进行编码和压缩,然后利用bitmap的与/或/非的极速处理速度,实现类似用户画像标签的人群筛选、运营分析的7日活跃等分析。
本文给出了一个使用MaxCompute MapReduce开发一个对不同日期活跃用户ID进行bitmap编码和计算的样例。供感兴趣的用户进一步了解、分析,并应用在自己的场景下。


import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Iterator;

public class bitmapDemo2
{

    public static class BitMapper extends MapperBase {

        Record key;
        Record value;
        @Override
        public void setup(TaskContext context) throws IOException {
            key = context.createMapOutputKeyRecord();
            value = context.createMapOutputValueRecord();
        }

        @Override
        public void map(long recordNum, Record record, TaskContext context)
                throws IOException
        {
            RoaringBitmap mrb=new RoaringBitmap();
            long AID=0;
            {
                {
                    {
                        {
                            AID=record.getBigint("id");
                            mrb.add((int) AID);
                            //获取key
                            key.set(new Object[] {record.getString("active_date")});

                        }
                    }
                }
            }
            ByteBuffer outbb = ByteBuffer.allocate(mrb.serializedSizeInBytes());
            mrb.serialize(new DataOutputStream(new OutputStream(){
                ByteBuffer mBB;
                OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
                public void close() {}
                public void flush() {}
                public void write(int b) {
                    mBB.put((byte) b);}
                public void write(byte[] b) {mBB.put(b);}
                public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
            }.init(outbb)));
            String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
            value.set(new Object[] {serializedstring});
            context.write(key, value);
        }
    }

    public static class BitReducer extends ReducerBase {
        private Record result = null;

        public void setup(TaskContext context) throws IOException {
            result = context.createOutputRecord();
        }

        public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
            long fcount = 0;
            RoaringBitmap rbm=new RoaringBitmap();
            while (values.hasNext())
            {
                Record val = values.next();
                ByteBuffer newbb = ByteBuffer.wrap(Base64.getDecoder().decode((String)val.get(0)));
                ImmutableRoaringBitmap irb = new ImmutableRoaringBitmap(newbb);
                RoaringBitmap p= new RoaringBitmap(irb);
                rbm.or(p);
            }
            ByteBuffer outbb = ByteBuffer.allocate(rbm.serializedSizeInBytes());
            rbm.serialize(new DataOutputStream(new OutputStream(){
                ByteBuffer mBB;
                OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
                public void close() {}
                public void flush() {}
                public void write(int b) {
                    mBB.put((byte) b);}
                public void write(byte[] b) {mBB.put(b);}
                public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
            }.init(outbb)));
            String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
            result.set(0, key.get(0));
            result.set(1, serializedstring);
            context.write(result);
        }
    }
    public static void main( String[] args ) throws OdpsException
    {

        System.out.println("begin.........");
        JobConf job = new JobConf();
        
        job.setMapperClass(BitMapper.class);
        job.setReducerClass(BitReducer.class);

        job.setMapOutputKeySchema(SchemaUtils.fromString("active_date:string"));
        job.setMapOutputValueSchema(SchemaUtils.fromString("id:string"));

        InputUtils.addTable(TableInfo.builder().tableName("bitmap_source").cols(new String[] {"id","active_date"}).build(), job);
//        +------------+-------------+
//        | id         | active_date |
//        +------------+-------------+
//        | 1          | 20190729    |
//        | 2          | 20190729    |
//        | 3          | 20190730    |
//        | 4          | 20190801    |
//        | 5          | 20190801    |
//        +------------+-------------+
        OutputUtils.addTable(TableInfo.builder().tableName("bitmap_target").build(), job);
//        +-------------+------------+
//        | active_date | bit_map    |
//        +-------------+------------+
//        20190729,OjAAAAEAAAAAAAEAEAAAAAEAAgA=3D
//        20190730,OjAAAAEAAAAAAAAAEAAAAAMA
//        20190801,OjAAAAEAAAAAAAEAEAAAAAQABQA=3D

        JobClient.runJob(job);
    }
}

对Java应用打包后,上传到MaxCompute项目中,即可在MaxCompute中调用该MR作业,对输入表的数据按日期作为key进行用户id的编码,同时按照相同日期对bitmap后的用户id取OR操作(根据需要可以取AND,例如存留场景),并将处理后的数据写入目标结构表当中供后续处理使用。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
18天前
|
存储 大数据 数据处理
PHP 与大数据:构建高效数据处理系统
传统的数据处理系统往往难以应对大规模数据的处理需求,而PHP作为一种常用的服务器端脚本语言,在数据处理方面也有其独特的优势。本文将探讨如何利用PHP构建高效的大数据处理系统,结合实际案例分析其应用场景及优势所在。
24 2
|
18天前
|
存储 机器学习/深度学习 算法
大数据时代下的智能洞察:大规模数据处理的创新与应用
在信息爆炸的时代,大规模数据处理成为了科技领域的核心挑战之一。本文将探讨大规模数据处理的定义、创新技术和广泛应用,并阐述数据驱动的决策和洞察对现代社会带来的巨大影响。
92 3
|
11月前
|
数据采集 SQL 分布式计算
数据处理 、大数据、数据抽取 ETL 工具 DataX 、Kettle、Sqoop
数据处理 、大数据、数据抽取 ETL 工具 DataX 、Kettle、Sqoop
1105 0
|
18天前
|
存储 大数据 数据处理
矢量数据库与大数据平台的集成:实现高效数据处理
【4月更文挑战第30天】本文探讨了矢量数据库与大数据平台的集成,以实现高效数据处理。集成通过API、中间件或容器化方式,结合两者优势,提升处理效率,简化流程,并增强数据安全。关键技术支持包括分布式计算、数据压缩编码、索引优化和流处理,以优化性能和实时性。随着技术发展,这种集成将在数据处理领域发挥更大作用。
|
18天前
|
分布式计算 并行计算 大数据
Python多进程在数据处理和大数据分析中的应用
Python多进程在数据处理和大数据分析中的应用
|
9月前
|
SQL JSON 数据处理
大数据Hive JSON数据处理
大数据Hive JSON数据处理
97 0
|
18天前
|
算法 数据可视化 大数据
大数据分析的技术和方法——探究现代数据处理的未来方向
在当今信息化时代,海量数据已经成为企业和组织的重要资源。大数据分析技术的出现为数据处理提供了更高效、更准确的解决方案。本文将深入探讨大数据分析技术和方法,分析其优势和应用场景,以及未来发展方向。
|
18天前
|
分布式计算 大数据 数据处理
大数据开发企业级案例__某通信企业数据处理需求(建议收藏)
大数据开发企业级案例__某通信企业数据处理需求(建议收藏)
39 0
|
8月前
|
存储 分布式计算 大数据
大数据与云计算架构:构建弹性高效的数据处理平台
大数据与云计算架构:构建弹性高效的数据处理平台
|
SQL 数据采集 JSON
MaxCompute中的JSON数据处理
MaxCompute中的JSON数据处理
3026 0

热门文章

最新文章

相关产品

  • 云原生大数据计算服务 MaxCompute