【阿里云流计算】- 电商订单和销量统计案例

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介:

背景

随着新零售的概念慢慢崛起,互联网电商行业竞争越来越激烈!实时数据信息对于电商行业尤为重要,那如何从实时不断的数据流中获取我们想要的信息呢?以下案例是流计算的合作伙伴袋鼠云用阿里云流计算来解决电商订单管理案例。

场景案例

统计商铺的订单总数和总的销量

业务架构图

undefined | center

业务流程:

  1. 用阿里云的DTS(DTS信息同步)把用户的数据同步到大数据总线(DATAHUB)。
  2. 阿里云流计算订阅大数据总线(DATAHUB)的数据进行实时计算。
  3. 将实时数据插入到RDS的云数据库
  4. 再通过阿里云的DATAV或者是其他的大屏做数据展示。

准备工作

RDS->DataHub 数据实时同步,是将 RDS for MySQL 产生的增量数据数据实时同步到 DataHub 中的 topic。
由RDS经过DTS数据同步到大数据总线(DATAHUB)后 DataHub表Schema信息。

可以参考RDS 到 DataHub 数据实时同步

1.订单源表

字段名 数据类型 详情
dts_ordercodeofsys varchar 订单编号
dts_paytime varchar 订单付款时间
dts_deliveredtime varchar 订单发货时间
dts_storecode varchar 店铺编号
dts_warehousecode varchar 仓库code
dts_cancelled bigint 是否取消
dts_delivered bigint 是否发货
dts_receivercity varchar 收货人城市
dts_receiverprovince varchar 收货人省份
dts_record_id varchar 记录ID
dts_operation_flag varchar 操作Flag
dts_instance_id varchar 数据库instanceId
dts_db_name varchar 数据库名
dts_table_name varchar 数据表
dts_utc_timestamp varchar 更新时间
dts_before_flag varchar 变更前标识
dts_after_flag varchar 变更后标识

2.订单详情源表

字段名 数据类型 详情
dts_ordercodeofsys varchar 订单编号
dts_skuname varchar 商品名字
dts_skucode varchar 商品编号
dts_quantity bigint 数量
dts_dividedamount double 发货金额
dts_salechanneldividedamount double 渠道销售金额
dts_initialcost double 成本
dts_record_id varchar 记录ID
dts_operation_flag varchar 操作Flag
dts_instance_id varchar 数据库instanceId
dts_db_name varchar 数据库名字
dts_table_name varchar 表名
dts_utc_timestamp varchar 更新时间
dts_before_flag varchar 变更前标识
dts_after_flag varchar 变更后标识

业务逻辑

--数据的订单源表
create table orders_real(
     dts_ordercodeofsys   varchar, 
     dts_paytime          varchar, 
     dts_deliveredtime    varchar, 
     dts_storecode        varchar, 
     dts_warehousecode    varchar, 
     dts_cancelled        bigint, 
     dts_delivered        bigint, 
     dts_receivercity     varchar, 
     dts_receiverprovince varchar, 
     dts_record_id        varchar, 
     dts_operation_flag   varchar, 
     dts_instance_id      varchar, 
     dts_db_name          varchar, 
     dts_table_name       varchar, 
     dts_utc_timestamp    varchar, 
     dts_before_flag      varchar, 
     dts_after_flag       varchar 
) with (
  type='datahub',
  endPoint='http://dh-cn-XXXXX.com',
  project='项目名',
  topic='表名',
  accessId='自己的ID',
  accessKey='自己的KEY'
); 

create table orderdetail_real(
     dts_ordercodeofsys            varchar, 
     dts_skuname                   varchar,
     dts_skucode                   varchar,
     dts_quantity                  bigint ,
     dts_dividedamount             double,
     dts_salechanneldividedamount  double,
     dts_initialcost               double,
     dts_record_id                 varchar,
     dts_operation_flag            varchar,
     dts_instance_id               varchar,
     dts_db_name                   varchar,
     dts_table_name                varchar,
     dts_utc_timestamp             varchar,
     dts_before_flag               varchar,
     dts_after_flag                varchar
) with (
  type='datahub',
  endPoint='http://dh-cn-XXXX.com',
  project='项目名',
  topic='表名',
  accessId='自己的ID',
  accessKey='自己的KEY'
); 


create table ads_all_count_amount(
    bill_date     varchar,--下单时间
    bill_count    bigint,--总的订单总数
    qty           bigint,--总的销售量
    primary key (bill_date) 
) with (
  type='rds',
  url='jdbc:mysql://rm-XXXX.mysql.rds.aXXXXcs.com:3306/XXXX',
  tableName='数据库表名',
  userName='数据库的账号',
  password='数据库的密码'
);

--订单源表,最新交易时间的商品编号
CREATE VIEW new_paytime AS
SELECT 
dts_ordercodeofsys, 
MAX(dts_paytime) AS dts_paytime
    FROM orders_real
    GROUP BY dts_ordercodeofsys
    
--订单详情表,有效的订单的订单编码、商品名称、商品编号、数量的信息
CREATE VIEW new_orderdetail AS
SELECT 
dts_ordercodeofsys, 
dts_skuname, 
dts_skucode,
CASE WHEN dts_operation_flag = 'U'
        AND dts_before_flag = 'Y'
        AND dts_after_flag = 'N' THEN -1 * dts_quantity 
    WHEN dts_operation_flag = 'U'
        AND dts_before_flag = 'N'
        AND dts_after_flag = 'Y' THEN dts_quantity 
    WHEN dts_operation_flag = 'D' THEN -1 * dts_quantity 
    ELSE dts_quantity 
    END AS dts_quantity
        FROM 
        orderdetail_real
        
--订单总单数,总销售量
INSERT INTO ads_all_count_amount
SELECT 
    from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd') AS bill_date, 
    COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count, 
    SUM(b.dts_quantity) AS qty
from 
    (new_paytime) a 
join 
    (new_orderdetail) b
ON a.dts_ordercodeofsys = b.dts_ordercodeofsys
GROUP BY 
        from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd')

难点解析

为了方便大家理解结构化代码和代码维护,我们推荐使用View(VIEW的语义)把业务逻辑差分成三个模块。

模块一

首先根据订单编号做分组,因为同一个编号订单会有多次业务操作(例如下单、付款、发货),会在Binlog日志中形成多条同一订单编号的订单流水记录。使用MAX(dts_paytime)获取同一编号的最后一次操作数据库最终付款交易时间。

CREATE VIEW new_paytime AS
SELECT 
    dts_ordercodeofsys, 
    MAX(dts_paytime) AS dts_paytime
FROM orders_real
GROUP BY dts_ordercodeofsys

模块二

数据库日志会获取所有的数据记录的变更,而每个订单是有状态的。如列表所示:

字段名 数据类型 详情
dts_record_id varchar 记录ID
dts_operation_flag varchar 操作Flag
dts_instance_id varchar 数据库instanceId
dts_db_name varchar 数据库名字
dts_table_name varchar 表名
dts_utc_timestamp varchar 更新时间
dts_before_flag varchar 变更前标识
dts_after_flag varchar 变更后标识

dts_record_id: 这条增量日志的唯一标识,唯一递增。如果变更类型为 update,那么增量更新会被拆分成 2 条,一条 Insert,一条 Delete。这两条记录具有相同的 record_id。

dts_instance_id: 这条增量日志所对应的数据库的 server id。

dts_db_name: 这条增量更新日志更新的表所在的数据库库名。

dts_table_name:这条增量更新日志更新的表。

dts_operation_flag: 标示这条增量日志的操作类型。取值包括:

I : insert 操作
D : delete 操作
U : update 操作

dts_utc_timestamp: 这条增量日志的操作时间戳,为这个更新操作记录 binlog 的时间戳。这个时间戳为 UTC 时间。

dts_before_flag: 表示这条增量日志后面带的各个 column 值是否更新前的值。取值包括:Y 和 N。当后面的 column 为更新前的值时,dts_before_flag=Y, 当后面的 column 值为更新后的值时,dts_before_flag=N.

dts_after_flag:表示这条增量日志后面带的各个 column 值是否更新后的值。取值包括:Y 和 N。 当后面的 column 为更新前的值时,dts_after_flag=N,当后面的 column 值为更新后的值时,dts_after_flag=Y.

对于不同的操作类型,增量日志中的 dts_before_flag 和 dts_after_flag 定义如下:

  1. 操作类型为:insert

当操作类型为 insert 时,后面的所有 column 值为新插入的记录值,即为更新后的值。所以 before_flag=N, after_flag=Y
undefined | center

  1. 操作类型为:update

当操作类型为 update 时,会将 update 操作拆为 2 条增量日志。这两条增量日志的 dts_record_id ,dts_operation_flag 及 dts_utc_timestamp 相同。
第一条日志记录了更新前的值,所以 dts_before_flag=Y, dts_after_flag=N
第二条日志记录了更新后的值,所以 dts_before_flag=N, dts_after_flag=Y
undefined | center

  1. 操作类型为:delete

当操作类型为 delete 时,后面的所有 column 值为被删除的记录值,即为更新前的值。所以 dts_before_flag=Y, dts_after_flag=N
undefined | center

CREATE VIEW new_orderdetail AS
SELECT 
dts_ordercodeofsys, 
dts_skuname, 
dts_skucode,
CASE WHEN dts_operation_flag = 'U'
        AND dts_before_flag = 'Y'
        AND dts_after_flag = 'N' THEN -1 * dts_quantity 
    WHEN dts_operation_flag = 'U'
        AND dts_before_flag = 'N'
        AND dts_after_flag = 'Y' THEN dts_quantity 
    WHEN dts_operation_flag = 'D' THEN -1 * dts_quantity 
    ELSE dts_quantity 
    END AS dts_quantity
        FROM 
        orderdetail_real

怎么判断是有效交易订单呢?

首先是要满足dts_operation_flag=U 或者 dts_operation_flag=I,
然后dts_before_flag代表的是变更前订单状态,dts_after_flag是变更后订单状态;
所以有效交易订单为:

        dts_operation_flag = 'U'
        AND dts_before_flag = 'N'
        AND dts_after_flag = 'Y' THEN dts_quantity 
  • 为什么THEN -1 * dts_quantity呢?

订单的取消或者是交易没有成功在总的销量里也会记录;为了保证总销量的正确性,所以把没有成交的订单数量设为负数在计算总的销量会减去这个数量。

模块三

为什么订单源表和订单详情要做JOIN操作?

new_paytime查出的是最新交易的时间的所有的订单编号;new_orderdetail查询的是所有的有效的订单的订单编码、商品名称、商品编号、数量的信息;两张表JOIN是为整合成一张大表,方便用户来统计订单总数和总的销量。


SELECT 
    from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd') AS bill_date, 
    COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count, 
    SUM(b.dts_quantity) AS qty
from 
    (new_paytime) a 
join 
    (new_orderdetail) b
ON 
    a.dts_ordercodeofsys = b.dts_ordercodeofsys
GROUP BY 
        from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd');
相关实践学习
实时数据及离线数据上云方案
本实验通过使用CANAL、DataHub、DataWorks、MaxCompute服务,实现数据上云,解决了数据孤岛问题,同时把数据迁移到云计算平台,对后续数据的计算和应用提供了第一步开山之路。
目录
相关文章
|
4月前
|
数据库
电子好书发您分享《《阿里云数据库案例集客户案例集》电子书》
电子好书发您分享《《阿里云数据库案例集客户案例集》电子书》
202 2
|
24天前
|
存储 DataWorks Kubernetes
阿里云cam授权案例
以下是内容摘要: 1. 阿里云OSS自定义策略允许全权访问`dc-odsopr`及其所有内容。 2. Dataworks授权文档详情见[此处](https://help.aliyun.com/document_detail/74302.html)。 3. 日志权限策略涵盖多种日志操作,如获取和管理项目、作业、日志存储等,限定于`cn-shenzhen`区域的`k8s`项目。 4. AliKafka权限包括实例、主题的管理,消息操作及部署、ACL、用户管理等。 5. OSS策略允许上传对象至`sz-creative-system-test-public`存储空间。
24 5
|
28天前
使用阿里云智能翻译接口案例—
使用阿里云智能翻译接口案例—
10 0
|
28天前
|
JSON 数据格式
使用阿里云火车票查询接口案例—
使用阿里云火车票查询接口案例—
10 0
|
28天前
使用阿里云身份证扫描识别接口案例—
使用阿里云身份证扫描识别接口案例—
31 0
|
2月前
|
数据采集 供应链 监控
5 天学会阿里云 RPA:电商行业应用
在当今数字化的商业环境中,电商行业迅速发展。为了在激烈的市场竞争中脱颖而出,电商企业不断寻求提高运营效率和提供优质客户体验的方法。阿里云 RPA(机器人流程自动化)的出现为电商行业带来了全新的解决方案。
|
3月前
|
人工智能 JavaScript Java
阿里云参编业内首个代码大模型标准,通义灵码获 2023 AI4SE “银弹” 案例
阿里云参编业内首个代码大模型标准,通义灵码获 2023 AI4SE “银弹” 案例
|
3月前
|
存储 人工智能 供应链
AI电商新思路!天猫X阿里云Create@AI创客松比赛结果出炉
1月13日-15日,阿里云和天猫联合举办了一场Create@AI创客松,以「AI电商」为赛题,面向有志于利用AI助力电商经营的创新团队及公司征集优秀方案,深化AI技术在电商场景中的应用。
|
23天前
|
Ubuntu JavaScript 关系型数据库
在阿里云Ubuntu 20.04服务器中搭建一个 Ghost 博客
在阿里云Ubuntu 20.04服务器上部署Ghost博客的步骤包括创建新用户、安装Nginx、MySQL和Node.js 18.x。首先,通过`adduser`命令创建非root用户,然后安装Nginx和MySQL。接着,设置Node.js环境,下载Nodesource GPG密钥并安装Node.js 18.x。之后,使用`npm`安装Ghost-CLI,创建Ghost安装目录并进行安装。配置过程中需提供博客URL、数据库连接信息等。最后,测试访问前台首页和后台管理页面。确保DNS设置正确,并根据提示完成Ghost博客的配置。
在阿里云Ubuntu 20.04服务器中搭建一个 Ghost 博客
|
27天前
|
存储 分布式计算 网络协议
阿里云服务器内存型r7、r8a、r8y实例区别参考
在阿里云目前的活动中,属于内存型实例规格的云服务器有内存型r7、内存型r8a、内存型r8y这几个实例规格,相比于活动内的经济型e、通用算力型u1实例来说,这些实例规格等性能更强,与计算型和通用型相比,它的内存更大,因此这些内存型实例规格主要适用于数据库、中间件和数据分析与挖掘,Hadoop、Spark集群等场景,本文为大家介绍内存型r7、r8a、r8y实例区别及最新活动价格,以供参考。
阿里云服务器内存型r7、r8a、r8y实例区别参考