Flink SQL 功能解密系列 —— 流计算“撤回(Retraction)”案例分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 通俗讲retract就是传统数据里面的更新操作,也就是说retract是流式计算场景下对数据更新的处理方式。

什么是retraction(撤回)

通俗讲retract就是传统数据里面的更新操作,也就是说retract是流式计算场景下对数据更新的处理
方式。
首先来看下流场景下的一个词频统计列子。

image.png

没有retract会导致最终结果不正确↑:

image.png
retract发挥的作用

下面再分享两个双十一期间retract保证数据正确性的业务case:

case1: 菜鸟物流订单统计

同一个订单的商品在运输过程中,因为各种原因,物流公司是有可能从A变成B的。为了统计物流公司承担的订单数目,菜鸟团队使用blink计算的retraction机制进行变key汇总操作。

-- TT source_table 数据如下:
order_id      tms_company
0001           中通
0002           中通
0003           圆通

-- SQL代码
create view dwd_table as 
select
    order_id,
    StringLast(tms_company)
from source_table
group by order_id;

create view dws_table as 
select 
    tms_company,
    count(distinct order_id) as order_cnt
from dwd_table 
group by tms_company


此时结果为:
tms_company  order_cnt
中通          2
圆通          1

-----------------------
之后又来了一条新数据 0001的订单 配送公司改成 圆通了。这时,第一层group by的会先向下游发送一条 (0001,中通)的撤回消息,第二层group by节点收到撤回消息后,会将这个节点 中通对应的 value减少1,并更新到结果表中;然后第一层的分桶统计逻辑向下游正常发送(0001,圆通)的正向消息,更新了圆通物流对应的订单数目,达到了最初的汇总目的。

order_id      tms_company
0001           中通
0002           中通
0003           圆通
0001           圆通

写入ADS结果会是(满足需求)
tms_company  order_cnt
中通          1
圆通          2

case2: 天猫双十一购物车加购统计:

双11爆款清单与知名综艺IP“火星情报局”跨界合作,汪涵、撒贝宁、陶晶莹等大咖主持加盟,杭州、长沙两地联播,成功打造为“双11子IP”与“双11购物风向标”,树立电商内容综艺化、娱乐化创新典范,为长线模式探索打下基础。

首次深度联动线下场景,在银泰门店落地爆款清单超级大屏,商场人流截停率28%,用户互动时间占营业时间的40%。

选品模式创新,打造最全维度爆款清单:TOP2000性价比爆款+TOP100小黑盒推荐(新品清单)+TOP200买手天团推荐(人群/场景/地域 清单)

核心业务指标

  • 加购金额
  • 加购件数
  • 加购UV

业务计算逻辑

  • 来自TT的数据要进行去重;
  • 以投放场景和购物车维度进行分组,获取每个分组的最后一条(最新)数据;
  • 以投放场景和小时为维度进行分组,统计 加购金额,加购件数和加购UV 业务指标;

业务BlinkSQL代码

--Blink SQL
--********************************************************************--
--Comment: 天猫双11官方爆款清单统计计算
--********************************************************************--
CREATE TABLE dwd_mkt_membercart_ri(
    cart_id      BIGINT, -- '购物车id',
    sku_id       BIGINT, -- '存放商品的skuId,无sku时,为0',
    item_id      BIGINT, -- '外部id:商品id或者skuid',
    quantity     BIGINT, -- '购买数量',
    user_id      BIGINT, -- '用户id',
    status       BIGINT, -- '状态1:正常-1:删除',
    gmt_create   VARCHAR, -- '属性创建时间',
    gmt_modified VARCHAR, -- '属性修改时间',
    biz_id VARCHAR, -- 投放场景,
    start_time VARCHAR, -- 投放开始时间
    end_time VARCHAR, -- 投放结束时间
    activity_price_time VARCHAR, -- 活动开始时间
    price VARCHAR, -- 商品价格
    dbsync_operation BIGINT -- 时间自动用于排序
) 
WITH 
(
    type='tt'
    -- 其他信息省略
);

--groub by 方式重复,防止TT重发
CREATE VIEW distinct_dwd_mkt_membercart_ri AS 
SELECT
    cart_id,
    sku_id,
    item_id,
    quantity,
    user_id,
    status,
    gmt_create,
    gmt_modified,
    biz_id, 
    start_time, 
    end_time, 
    activity_price_time,
    price, 
    dbsync_operation
FROM
    dwd_mkt_membercart_ri
GROUP BY    
    cart_id,
    sku_id,
    item_id,
    quantity,
    user_id,
    status,
    gmt_create,
    gmt_modified,
    biz_id, 
    start_time, 
    end_time, 
    activity_price_time,
    price, 
    dbsync_operation;

-- 每个投放和购物车数据的最后一条
CREATE VIEW tmp_dwd_mkt_membercart_ri AS 
SELECT 
    biz_id as biz_id,
    LAST_VALUE(user_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as user_id,
    LAST_VALUE(item_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as item_id, 
    LAST_VALUE(sku_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as sku_id,
    LAST_VALUE(start_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as start_time,
    LAST_VALUE(end_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as end_time,
    LAST_VALUE(activity_price_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as activity_price_time,
    LAST_VALUE(price,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as price,
    LAST_VALUE(quantity,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as quantity,
    LAST_VALUE(status,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as status,
    LAST_VALUE(gmt_modified,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as gmt_modified,
    LAST_VALUE(gmt_create,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as gmt_create,
    LAST_VALUE(dbsync_operation,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as dbsync_operation
FROM distinct_dwd_mkt_membercart_ri
WHERE DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMdd')=DATE_FORMAT(gmt_modified , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMdd')
GROUP BY 
cart_id,biz_id;

--存储小时维度的计算结果
CREATE TABLE result_mkt_membercart_ri_eh(
    id VARCHAR, 
    data_time VARCHAR,  
    all_preheating_cart_cnt BIGINT, -- 预热期间的 加购件数
    all_preheating_cart_alipay BIGINT,-- 预热期间的 加购金额
    eh_all_preheating_cart_uv BIGINT,-- 预热期间的 加购UV
    all_cart_cnt BIGINT, -- 投放期间的 加购件数
    all_cart_alipay BIGINT, -- 投放期间的 加购金额
    eh_all_cart_uv BIGINT, -- 投放期间的 加购UV
    primary key(id,data_time)
) WITH (
    type = 'custom',
     -- 其他信息省略
    timeDiv='hour'
) ;
--统计小时维度的 xx xx xx 业务指标
INSERT INTO result_mkt_membercart_ri_eh 
SELECT 
    biz_id,
    DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMddHH') data_time, 
    `sum`(case when gmt_modified<=COALESCE(activity_price_time,end_time) then quantity else 0 end) as all_preheating_cart_cnt,
    `sum`(case when gmt_modified<=COALESCE(activity_price_time,end_time) then quantity*CAST(price AS BIGINT) else 0 end) as all_preheating_cart_alipay,
    `sum`((case when gmt_modified<=COALESCE(activity_price_time,end_time) then user_id end)) eh_all_preheating_cart_uv,
    `sum`(quantity) as all_cart_cnt,
    `sum`(quantity*CAST(price AS BIGINT)) as all_cart_alipay,
    `count`(distinct user_id) eh_all_cart_uv
FROM tmp_dwd_mkt_membercart_ri 
WHERE 
   status>0 
GROUP BY  biz_id ,DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMddHH') ;

上面case2天猫业务场景里面的加购金额统计来说,当每个投放场景的购物车的数据发生变化时候,就意味着上面【CREATE VIEW tmp_dwd_mkt_membercart_ri 】中的LAST_VALUE发生变化,最外层的sum统计【INSERT INTO result_mkt_membercart_ri_eh 】就要将前一条的LAST_VALUE【VALUE-1】撤回,用update的新LAST_VALUE【VALUE-2】进行求和统计,这样blink就需要有一种机制将VALUE-1进行撤回,利用【VALUE-2】进行计算,这种机制我们称为retract。

retract 实现原理参考

https://docs.google.com/document/d/18XlGPcfsGbnPSApRipJDLPg5IFNGTQjnz7emkVpZlkw/edit#heading=h.cjkoun4w44l4

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
消息中间件 Kafka 流计算
Flink的分区表订阅功能是通过Kafka的topic分区来实现的
Flink的分区表订阅功能是通过Kafka的topic分区来实现的【1月更文挑战第6天】【1月更文挑战第26篇】
100 1
|
10天前
|
SQL 数据库 索引
SQL索引失效原因分析与解决方案
SQL索引失效原因分析与解决方案
19 0
|
18天前
|
SQL 关系型数据库 MySQL
【MySQL】慢SQL分析流程
【4月更文挑战第1天】【MySQL】慢SQL分析流程
|
1月前
|
SQL 缓存 分布式计算
flink1.18 SqlGateway 的使用和原理分析
# 了解flink1.18 sqlGateway 的安装和使用步骤 # 启动sqlgateway 流程,了解核心的结构 # sql提交流程,了解sql 的流转逻辑 # select 查询的ResultSet的对接流程,了解数据的返回和获取逻辑
|
1月前
|
存储 SQL Java
阿里Flink云服务提供了CDC(Change Data Capture)功能
【2月更文挑战第10天】阿里Flink云服务提供了CDC(Change Data Capture)功能
34 1
|
2月前
|
传感器 SQL Java
Flink撤回机制问题之撤回机制不起作用如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 关系型数据库 MySQL
10个SQL中常用的分析技巧
10个SQL中常用的分析技巧
|
2月前
|
消息中间件 Kafka API
【极数系列】ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter & 详细分析解决
【极数系列】ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter & 详细分析解决
|
2月前
|
SQL 存储 关系型数据库
MySQL(终结篇二)- SQL 语句分析与优化
MySQL(终结篇二)- SQL 语句分析与优化
87 0
|
3月前
|
存储 NoSQL MongoDB
阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference
本文整理自阿里云 Flink 团队归源老师关于阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference 的研究。
46939 2
阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference

相关产品

  • 实时计算 Flink版