Flink SQL 功能解密系列 —— 流计算“撤回(Retraction)”案例分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 通俗讲retract就是传统数据里面的更新操作,也就是说retract是流式计算场景下对数据更新的处理方式。

什么是retraction(撤回)

通俗讲retract就是传统数据里面的更新操作,也就是说retract是流式计算场景下对数据更新的处理
方式。
首先来看下流场景下的一个词频统计列子。

image.png

没有retract会导致最终结果不正确↑:

image.png
retract发挥的作用

下面再分享两个双十一期间retract保证数据正确性的业务case:

case1: 菜鸟物流订单统计

同一个订单的商品在运输过程中,因为各种原因,物流公司是有可能从A变成B的。为了统计物流公司承担的订单数目,菜鸟团队使用blink计算的retraction机制进行变key汇总操作。

-- TT source_table 数据如下:
order_id      tms_company
0001           中通
0002           中通
0003           圆通

-- SQL代码
create view dwd_table as 
select
    order_id,
    StringLast(tms_company)
from source_table
group by order_id;

create view dws_table as 
select 
    tms_company,
    count(distinct order_id) as order_cnt
from dwd_table 
group by tms_company


此时结果为:
tms_company  order_cnt
中通          2
圆通          1

-----------------------
之后又来了一条新数据 0001的订单 配送公司改成 圆通了。这时,第一层group by的会先向下游发送一条 (0001,中通)的撤回消息,第二层group by节点收到撤回消息后,会将这个节点 中通对应的 value减少1,并更新到结果表中;然后第一层的分桶统计逻辑向下游正常发送(0001,圆通)的正向消息,更新了圆通物流对应的订单数目,达到了最初的汇总目的。

order_id      tms_company
0001           中通
0002           中通
0003           圆通
0001           圆通

写入ADS结果会是(满足需求)
tms_company  order_cnt
中通          1
圆通          2
AI 代码解读

case2: 天猫双十一购物车加购统计:

双11爆款清单与知名综艺IP“火星情报局”跨界合作,汪涵、撒贝宁、陶晶莹等大咖主持加盟,杭州、长沙两地联播,成功打造为“双11子IP”与“双11购物风向标”,树立电商内容综艺化、娱乐化创新典范,为长线模式探索打下基础。

首次深度联动线下场景,在银泰门店落地爆款清单超级大屏,商场人流截停率28%,用户互动时间占营业时间的40%。

选品模式创新,打造最全维度爆款清单:TOP2000性价比爆款+TOP100小黑盒推荐(新品清单)+TOP200买手天团推荐(人群/场景/地域 清单)

核心业务指标

  • 加购金额
  • 加购件数
  • 加购UV

业务计算逻辑

  • 来自TT的数据要进行去重;
  • 以投放场景和购物车维度进行分组,获取每个分组的最后一条(最新)数据;
  • 以投放场景和小时为维度进行分组,统计 加购金额,加购件数和加购UV 业务指标;

业务BlinkSQL代码

--Blink SQL
--********************************************************************--
--Comment: 天猫双11官方爆款清单统计计算
--********************************************************************--
CREATE TABLE dwd_mkt_membercart_ri(
    cart_id      BIGINT, -- '购物车id',
    sku_id       BIGINT, -- '存放商品的skuId,无sku时,为0',
    item_id      BIGINT, -- '外部id:商品id或者skuid',
    quantity     BIGINT, -- '购买数量',
    user_id      BIGINT, -- '用户id',
    status       BIGINT, -- '状态1:正常-1:删除',
    gmt_create   VARCHAR, -- '属性创建时间',
    gmt_modified VARCHAR, -- '属性修改时间',
    biz_id VARCHAR, -- 投放场景,
    start_time VARCHAR, -- 投放开始时间
    end_time VARCHAR, -- 投放结束时间
    activity_price_time VARCHAR, -- 活动开始时间
    price VARCHAR, -- 商品价格
    dbsync_operation BIGINT -- 时间自动用于排序
) 
WITH 
(
    type='tt'
    -- 其他信息省略
);

--groub by 方式重复,防止TT重发
CREATE VIEW distinct_dwd_mkt_membercart_ri AS 
SELECT
    cart_id,
    sku_id,
    item_id,
    quantity,
    user_id,
    status,
    gmt_create,
    gmt_modified,
    biz_id, 
    start_time, 
    end_time, 
    activity_price_time,
    price, 
    dbsync_operation
FROM
    dwd_mkt_membercart_ri
GROUP BY    
    cart_id,
    sku_id,
    item_id,
    quantity,
    user_id,
    status,
    gmt_create,
    gmt_modified,
    biz_id, 
    start_time, 
    end_time, 
    activity_price_time,
    price, 
    dbsync_operation;

-- 每个投放和购物车数据的最后一条
CREATE VIEW tmp_dwd_mkt_membercart_ri AS 
SELECT 
    biz_id as biz_id,
    LAST_VALUE(user_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as user_id,
    LAST_VALUE(item_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as item_id, 
    LAST_VALUE(sku_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as sku_id,
    LAST_VALUE(start_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as start_time,
    LAST_VALUE(end_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as end_time,
    LAST_VALUE(activity_price_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as activity_price_time,
    LAST_VALUE(price,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as price,
    LAST_VALUE(quantity,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as quantity,
    LAST_VALUE(status,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as status,
    LAST_VALUE(gmt_modified,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as gmt_modified,
    LAST_VALUE(gmt_create,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as gmt_create,
    LAST_VALUE(dbsync_operation,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as dbsync_operation
FROM distinct_dwd_mkt_membercart_ri
WHERE DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMdd')=DATE_FORMAT(gmt_modified , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMdd')
GROUP BY 
cart_id,biz_id;

--存储小时维度的计算结果
CREATE TABLE result_mkt_membercart_ri_eh(
    id VARCHAR, 
    data_time VARCHAR,  
    all_preheating_cart_cnt BIGINT, -- 预热期间的 加购件数
    all_preheating_cart_alipay BIGINT,-- 预热期间的 加购金额
    eh_all_preheating_cart_uv BIGINT,-- 预热期间的 加购UV
    all_cart_cnt BIGINT, -- 投放期间的 加购件数
    all_cart_alipay BIGINT, -- 投放期间的 加购金额
    eh_all_cart_uv BIGINT, -- 投放期间的 加购UV
    primary key(id,data_time)
) WITH (
    type = 'custom',
     -- 其他信息省略
    timeDiv='hour'
) ;
--统计小时维度的 xx xx xx 业务指标
INSERT INTO result_mkt_membercart_ri_eh 
SELECT 
    biz_id,
    DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMddHH') data_time, 
    `sum`(case when gmt_modified<=COALESCE(activity_price_time,end_time) then quantity else 0 end) as all_preheating_cart_cnt,
    `sum`(case when gmt_modified<=COALESCE(activity_price_time,end_time) then quantity*CAST(price AS BIGINT) else 0 end) as all_preheating_cart_alipay,
    `sum`((case when gmt_modified<=COALESCE(activity_price_time,end_time) then user_id end)) eh_all_preheating_cart_uv,
    `sum`(quantity) as all_cart_cnt,
    `sum`(quantity*CAST(price AS BIGINT)) as all_cart_alipay,
    `count`(distinct user_id) eh_all_cart_uv
FROM tmp_dwd_mkt_membercart_ri 
WHERE 
   status>0 
GROUP BY  biz_id ,DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMddHH') ;

AI 代码解读

上面case2天猫业务场景里面的加购金额统计来说,当每个投放场景的购物车的数据发生变化时候,就意味着上面【CREATE VIEW tmp_dwd_mkt_membercart_ri 】中的LAST_VALUE发生变化,最外层的sum统计【INSERT INTO result_mkt_membercart_ri_eh 】就要将前一条的LAST_VALUE【VALUE-1】撤回,用update的新LAST_VALUE【VALUE-2】进行求和统计,这样blink就需要有一种机制将VALUE-1进行撤回,利用【VALUE-2】进行计算,这种机制我们称为retract。

retract 实现原理参考

https://docs.google.com/document/d/18XlGPcfsGbnPSApRipJDLPg5IFNGTQjnz7emkVpZlkw/edit#heading=h.cjkoun4w44l4

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
1
10
1
36030
分享
相关文章
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
330 26
|
4月前
|
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
260 14
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
709 5
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
135 1
SQL中,可以使用 `ORDER BY` 子句来实现排序功能
【10月更文挑战第26天】SQL中,可以使用 `ORDER BY` 子句来实现排序功能
280 6
实时计算Flink场景实践和核心功能体验
本文详细评测了阿里云实时计算Flink版,从产品引导、文档帮助、功能满足度等方面进行了全面分析。产品界面设计友好,文档丰富实用,数据开发和运维体验优秀,具备出色的实时性和动态扩展性。同时,提出了针对业务场景的改进建议,包括功能定制化增强、高级分析功能拓展及可视化功能提升。文章还探讨了产品与阿里云内部产品及第三方工具的联动潜力,展示了其在多云架构和跨平台应用中的广阔前景。
179 9
功能发布-自定义SQL查询
本期主要为大家介绍ClkLog九月上线的新功能-自定义SQL查询。
MySql5.6版本开启慢SQL功能-本次采用永久生效方式
MySql5.6版本开启慢SQL功能-本次采用永久生效方式
80 0

相关产品

  • 实时计算 Flink版