使用Blink SQL+UDAF实现差值聚合计算

简介: 本案例根据某电网公司的真实业务需求,通过Blink SQL+UDAF实现实时流上的差值聚合计算,通过本案例,让读者熟悉UDAF编写,并理解UDAF中的方法调用关系和顺序。 感谢@军长在实现过程中的指导。笔者水平有限,若有纰漏,请批评指出。 一、客户需求 电网公司每天采集各个用户的电表数据(格式如下表),其中data_date为电表数据上报时间,cons_id为电表id,r1位电表度数,其他字

本案例根据某电网公司的真实业务需求,通过Blink SQL+UDAF实现实时流上的差值聚合计算,通过本案例,让读者熟悉UDAF编写,并理解UDAF中的方法调用关系和顺序。

感谢@军长在实现过程中的指导。笔者水平有限,若有纰漏,请批评指出。

一、客户需求

电网公司每天采集各个用户的电表数据(格式如下表),其中data_date为电表数据上报时间,cons_id为电表id,r1位电表度数,其他字段与计算逻辑无关,可忽略。为了后续演示方便,仅输入cons_id=100000002的数据。

no(string)

data_date(string)

cons_id(string)

org_no(string)

r1(double)

101

20190716

100000002

35401

13.76

101

20190717

100000002

35401

14.12

101

20190718

100000002

35401

16.59

101

20190719

100000002

35401

18.89

表1:输入数据

电网公司希望通过实时计算(Blink)对电表数据处理后,每天得到每个电表最近两天(当天和前一天)的差值数据,结果类似如下表:

cons_id(string)

data_date(string)

subDegreeR1(double)

100000002

20190717

0.36

100000002

20190718

2.47

100000002

20190719

2.3

表2:期望的输出数据

二、需求分析

根据客户的需求,比较容易得到两种解决方案:1、通过over窗口(2 rows over window)开窗进行差值聚合;2、通过hop窗口(sliding=1天,size=2天)进行差值聚合。

over窗口和hop窗口均是Blink支持的标准窗口,使用起来非常简单。本需求的最大难点在于差值聚合,Blink支持SUM、MAX、MIN、AVG等内置的聚合函数,但没有满足业务需求的差值聚合函数,因此需要通过自定义聚合函数(UDAF)来实现。

三、UDAF开发

实时计算自定义函数开发搭建环境请参考UDX概述,在此不再赘述。本案例使用Blink2.2.7版本,下面简要描述关键代码的编写。

完整代码(为了方便上传,使用了txt格式):?SubtractionUdaf.txt

1、在com.alibaba.blink.sql.udx.SubtractionUdaf包中创建一个继承AggregateFunction类的SubtractionUdaf类。

 

public class SubtractionUdaf extends AggregateFunction<Double, SubtractionUdaf.Accum> 

其中Double是UDAF输出的类型,在本案例中为相邻两天的电表差值度数。SubtractionUdaf.Accum是内部自定义的accumulator数据结构。

2、定义accumulator数据结构,用户保存UDAF的状态。

    public static class Accum {
        private long currentTime;//最新度数的上报时间
        private double oldDegree;//前一次度数
        private double newDegree;//当前最新度数
        private long num;   //accumulator中已经计算的record数量,主要用于merge
        private List<Tuple2<Double, Long>> listInput;//缓存所有的输入,主要用于retract
    }

3、实现createAccumulator方法,初始化UDAF的accumulator

    //初始化udaf的accumulator
    public SubtractionUdaf.Accum createAccumulator() {
        SubtractionUdaf.Accum acc = new SubtractionUdaf.Accum();
        acc.currentTime = 0;
        acc.oldDegree = 0.0;
        acc.newDegree = 0.0;
        acc.num = 0;
        acc.listInput = new ArrayList<Tuple2<Double, Long>>();
        return acc;
    }

4、实现getValue方法,用于通过存放状态的accumulator计算UDAF的结果,本案例需求是计算新旧数据两者的差值。

 

    public Double getValue(SubtractionUdaf.Accum accumulator) {
        return accumulator.newDegree - accumulator.oldDegree;
    }

5、实现accumulate方法,用于根据输入数据更新UDAF存放状态的accumulator。考虑到数据可能乱序以及可能的retract,数据数据包括了对应的度数iValue,还包括上报度数的时间(构造的事件时间ts)。

 

    public void accumulate(SubtractionUdaf.Accum accumulator, double iValue, long ts) {
        System.out.println("method : accumulate" );
        accumulator.listInput.add(Tuple2.of(Double.valueOf(iValue),Long.valueOf(ts)));
        Collections.sort(accumulator.listInput,this.comparator);//按照时间排序
        accumulator.num ++;
        if(accumulator.listInput.size() == 1){
            accumulator.newDegree = iValue;
            accumulator.oldDegree = 0.0;
            accumulator.currentTime = ts;
        }else {//处理可能存在的数据乱序问题
            accumulator.newDegree = accumulator.listInput.get(0).f0;
            accumulator.currentTime = accumulator.listInput.get(0).f1;
            accumulator.oldDegree = accumulator.listInput.get(1).f0;
        }
    }

其中accumulator为UDAF的状态,iValue和ts为实际的输入数据。

注意需要处理可能存在的输入数据乱序问题。

6、实现retract方法,用于在某些优化场景下(如使用over窗口)对retract的数据进行处理。

 

    public void retract(SubtractionUdaf.Accum accumulator, double iValue, long ts) throws Exception{
        if(accumulator.listInput.contains(Tuple2.of(iValue, ts))){
            if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) == 0){//retract的是最新值
                accumulator.listInput.remove(0);
                accumulator.num--;
                if(accumulator.listInput.isEmpty()){
                    accumulator.currentTime = 0;
                    accumulator.oldDegree = 0.0;
                    accumulator.newDegree = 0.0;
                }else if(accumulator.listInput.size() == 1) {
                    accumulator.currentTime = accumulator.listInput.get(0).f1;
                    accumulator.newDegree = accumulator.listInput.get(0).f0;
                    accumulator.oldDegree = 0.0;
                }else{
                    accumulator.currentTime = accumulator.listInput.get(0).f1;
                    accumulator.newDegree = accumulator.listInput.get(0).f0;
                    accumulator.oldDegree = accumulator.listInput.get(1).f0;
                }
            } else if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) == 1){//retract的是次新值
                accumulator.listInput.remove(1);
                accumulator.num--;
                if(accumulator.listInput.size() == 1){
                    accumulator.oldDegree = 0.0;
                }else {
                    accumulator.oldDegree = accumulator.listInput.get(1).f0;
                }
            }else {//retract的是其他值
                accumulator.listInput.remove(Tuple2.of(iValue, ts));
                accumulator.num--;
            }
        }else {
            throw new Exception("Cannot retract a unexist record : iValue = "+ iValue + "timestamp = "+ ts);
        }
    }

需要考虑retract的是最新的数据还是次新的数据,需要不同的逻辑处理。

7、实现merge方法,用于某些优化场景(如使用hop窗口)。

    public void merge(SubtractionUdaf.Accum accumulator, Iterable<SubtractionUdaf.Accum> its) {
        int i = 0;
        System.out.println("method : merge" );
        System.out.println("accumulator : "+ accumulator.newDegree);
        System.out.println("accumulator : "+ accumulator.currentTime);

        for (SubtractionUdaf.Accum entry : its) {
            if(accumulator.currentTime < entry.currentTime){
                if(entry.num > 1){
                    accumulator.currentTime = entry.currentTime;
                    accumulator.oldDegree = entry.oldDegree;
                    accumulator.newDegree = entry.newDegree;
                    accumulator.num += entry.num;
                    accumulator.listInput.addAll(entry.listInput);
                }else if(entry.num == 1){
                    accumulator.currentTime = entry.currentTime;
                    accumulator.oldDegree = accumulator.newDegree;
                    accumulator.newDegree = entry.newDegree;
                    accumulator.num ++;
                    accumulator.listInput.addAll(entry.listInput);
                }
            }else{
                if(accumulator.num > 1){
                    accumulator.num += entry.num;
                    accumulator.listInput.addAll(entry.listInput);
                }else if(accumulator.num == 1){
                    accumulator.oldDegree = entry.newDegree;
                    accumulator.num += entry.num;
                    accumulator.listInput.addAll(entry.listInput);
                }else if(accumulator.num == 0){
                    accumulator.currentTime = entry.currentTime;
                    accumulator.oldDegree = entry.oldDegree;
                    accumulator.newDegree = entry.newDegree;
                    accumulator.num = entry.num;
                    accumulator.listInput.addAll(entry.listInput);
                }
            }
            Collections.sort(accumulator.listInput,this.comparator);
            System.out.println("merge : "+i);
            System.out.println("newDegree : "+entry.newDegree);
            System.out.println("oldDegree = "+entry.oldDegree);
            System.out.println("currentTime : "+entry.currentTime);
        }
    }

需要考虑merge的是否是比当前新的数据,需要不同的处理逻辑。

8、其他方面,考虑到需要对输入度数按照事件时间排序,在open方法中实例化了自定义的Comparator类,对accumulator数据结构中的inputList按事件时间的降序排序。

    public void open(FunctionContext context) throws Exception {
        //定义record的先后顺序,用于listInput的排序,时间越新的record在list中越前面
        this.comparator = new Comparator<Tuple2<Double, Long>>() {
            public int compare( Tuple2<Double, Long> o1, Tuple2<Double, Long> o2) {
                if (Long.valueOf(o1.f1) < Long.valueOf(o2.f1)) {
                    return 1;
                } else if (Long.valueOf(o1.f1) > Long.valueOf(o2.f1)) {
                    return -1;
                }else {
                    return 0;
                }
            }
        };
    }

请参考使用IntelliJ IDEA开发自定义函数完成UDAF编译、打包,并参考UDX概述完成资源的上传和引用。

 

四、SQL开发及测试结果

(一)over窗口

SQL代码如下,语法检查、上线、启动作业(选择当前启动位点)。并将表1数据上传至datahub。

CREATE FUNCTION OverWindowSubtractionUdaf as 'com.alibaba.blink.sql.udx.SubtractionUdaf';

CREATE TABLE input_dh_e_mp_read_curve (
  `no`                  VARCHAR,
  data_date             VARCHAR,
  cons_id               VARCHAR,
  org_no                VARCHAR,
  r1                    DOUBLE,
  ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss')
  ,WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (
  type = 'datahub',
  endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',
  roleArn='acs:ram::1016954307248737:role/aliyunstreamdefaultrole',
  project = 'jszc_datahub',
  topic = 'input_dh_e_mp_read_curve'
);
CREATE TABLE data_out(
    cons_id varchar
  ,data_date varchar
    ,subDegreeR1 DOUBLE
)with(
    type = 'print'
);

INSERT into data_out  
SELECT
    cons_id
  ,last_value(data_date) OVER (
        PARTITION BY cons_id 
        ORDER BY ts 
        ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date
    ,OverWindowSubtractionUdaf(r1,unix_timestamp(ts)) OVER (
        PARTITION BY cons_id 
        ORDER BY ts 
        ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date
FROM input_dh_e_mp_read_curve

由于使用了print connector,从对应的sink的taskmanager.out日志中可以查看到输出如下(已忽略其他debug日志):

task-1> (+)100000002,20190716,13.76
task-1> (+)100000002,20190717,0.35999999999999943
task-1> (+)100000002,20190718,2.4700000000000006

对比期望输出(表2),20190717和20190718两个窗口的数据均正确,表明业务逻辑正确,但此输出与期望输出有少许差异:

(1)20190716输出为13.76,这是因为第一个over窗口只有一条数据导致的,这种数据可以在业务层过滤掉;

(2)20190719的数据没有输出,这是因为我们设置了watermark,测试环境下20190719之后没有数据进来触发20190719对应的窗口的结束。

(二)hop窗口

SQL代码如下:语法检查、上线、启动作业(选择当前启动位点)。并将表1数据上传至datahub。

 

CREATE FUNCTION HopWindowSubtractionUdaf as 'com.alibaba.blink.sql.udx.SubtractionUdaf';

CREATE TABLE input_dh_e_mp_read_curve (
  `no`                  VARCHAR,
  data_date             VARCHAR,
  cons_id               VARCHAR,
  org_no                VARCHAR,
  r1                    DOUBLE,
  ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss')
  ,WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (
  type = 'datahub',
  endPoint 
相关实践学习
实时数据及离线数据上云方案
本实验通过使用CANAL、DataHub、DataWorks、MaxCompute服务,实现数据上云,解决了数据孤岛问题,同时把数据迁移到云计算平台,对后续数据的计算和应用提供了第一步开山之路。
目录
相关文章
|
3月前
|
分布式计算 资源调度 Hadoop
Flink报错问题之Sql往kafka表写聚合数据报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
4月前
|
SQL 数据采集 分布式计算
Spark SQL中的聚合与窗口函数
Spark SQL中的聚合与窗口函数
|
10月前
|
SQL 数据库管理 索引
SQL基础——聚合与排序(下)
SQL基础——聚合与排序(下)
127 0
|
5月前
|
SQL 关系型数据库 数据库连接
Hasor【环境搭建 03】Dataway接口配置服务使用DataQL聚合查询引擎(SQL执行器实现分页查询举例说明+报错 Query dialect missing 原因分析及解决)
Hasor【环境搭建 03】Dataway接口配置服务使用DataQL聚合查询引擎(SQL执行器实现分页查询举例说明+报错 Query dialect missing 原因分析及解决)
89 0
|
10月前
|
SQL
SQL基础——聚合与排序(上)
SQL基础——聚合与排序(上)
48 0
|
SQL 测试技术 数据库
软件测试最常用的 SQL 命令 | 通过实例掌握基本查询、条件查询、聚合查询
软件测试最常用的 SQL 命令 | 通过实例掌握基本查询、条件查询、聚合查询
92 0
软件测试最常用的 SQL 命令 | 通过实例掌握基本查询、条件查询、聚合查询
|
SQL 存储 关系型数据库
SQL调优指南—SQL调优进阶—聚合优化和执行
本文介绍如何优化器和执行器如何处理聚合(Group-by),以达到减少数据传输量和提高执行效率的效果。
202 0
|
SQL 测试技术 数据库
软件测试最常用的 SQL 命令 | 掌握基本查询、条件查询、聚合查询
软件测试最常用的 SQL 命令 | 掌握基本查询、条件查询、聚合查询
|
SQL 测试技术 数据库
软件测试最常用的 SQL 命令 | 掌握基本查询、条件查询、聚合查询
软件测试最常用的 SQL 命令 | 掌握基本查询、条件查询、聚合查询
|
SQL 测试技术 数据库
软件测试最常用的 SQL 命令 | 通过实例掌握基本查询、条件查询、聚合查询
软件测试最常用的 SQL 命令 | 通过实例掌握基本查询、条件查询、聚合查询