Tablestore + Blink实战:交易数据的实时统计

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 交易数据的实时统计是电商网站一个核心功能,可以帮助用户实时统计网站的整体销售情况,快速验证“新销售策略”的效果。我们今天介绍一个基于表格存储(Tablestore)实现交易数据的实时计算,给大家提供一个新使用方式。

背景

交易数据的实时统计是电商网站一个核心功能,可以帮助用户实时统计网站的整体销售情况,快速验证“新销售策略”的效果。我们今天介绍一个基于表格存储(Tablestore)实现交易数据的实时计算,给大家提供一个新使用方式。

Tablestore作为在线的结构化数据库,提供了毫秒级的访问延时和丰富的查询方式,能高效的支撑交易数据的存储和查询,同时Tablestore已经原生支持阿里云的流计算框架Flink/Blink,可以实现数据的实时计算。

架构

image

示例设计

基本场景

注意:示例是模拟一个电商网站的交易数据的存储和实时计算,目的是为了展示Tablestore + Blink的使用流程。

用户通过SDK将网站交易数据实时的存储(PutRow/BatchWrite/TableStoreWriter)到Tablestore的source_order表中,Tablestore通过Tunnel服务,将实时增量的数据流入到Flink/Blink中,每5秒进行一次聚合计算,并将计算的结果重新写回Tablestore的sink_order表中。最后提供给“大屏”实时读取(GetRange)展示。

Source表(源表)- source_order

source表是原始数据表,存储了所有交易记录。

字段 类型 注释
metering(PrimaryKey) string 计量类型,样例中默认是web
orderid(PrimaryKey) string 订单号ID
ts integer 交易时间(Unix时间戳,毫秒精度)
price double 交易金额
buyerid integer 买家ID
sellerid integer 卖家ID
productid integer 商品ID

Sink表(结果表)- sink_order

字段 类型 注释
metering(PrimaryKey) string 计量类型,样例中默认是web
ts(PrimaryKey) integer 交易时间(Unix时间戳,毫秒精度)
price double 交易金额
ordercount integer 交易次数

Flink SQL

DDL参考

注意:当前Blink在支持Tablestore source上还有些限制,只能运行ProcessingTime的方式,未来会支持EventTime模式,按照用户数据参数的时间进行计算。

-- Source 源表创建
CREATE TABLE ots_input (
    metering VARCHAR,
    orderid VARCHAR,
    price DOUBLE,
    byerid BIGINT,
    sellerid BIGINT,
    productid BIGINT,
    primary key(metering,orderid),
    ts AS PROCTIME()
) WITH (
    type = 'ots',
    instanceName = 'ordertest',
    tableName = 'source_order',
    accessId = '******************',
    accessKey = '******************',
    endpoint = 'http://ordertest.cn-zhangjiakou.vpc.tablestore.aliyuncs.com',
    tunnelName = 'blink_agg'
);
-- Sink 结果表创建
CREATE TABLE ots_output (
    metering VARCHAR,
    ts BIGINT,
    price DOUBLE,
    ordercount BIGINT,
    primary key(metering,ts)
) WITH (
    type = 'ots',
    instanceName = 'ordertest',
    tableName = 'sink_order',
    accessId = '******************',
    accessKey = '******************',
    endpoint = 'http://ordertest.cn-zhangjiakou.vpc.tablestore.aliyuncs.com',
    valueColumns = 'price,ordercount'
);

-- 计算
INSERT INTO ots_output
SELECT 
    DISTINCT metering as metering,
    CAST(TUMBLE_START(ots_input.ts, INTERVAL '5' SECOND) AS BIGINT) AS ts,
    SUM(price) as price,
    COUNT(orderid) as ordercount
FROM ots_input
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND),metering;

实战

第一步:准备账户与开服

准备表格存储实例,可以参考《表格存储实例创建》

准备Flink/Blink项目,可以参考《Blink如何购买》

第二步:创建资源

表格存储资源

表格存储控制台入口,创建表格存储实例ordertest (用户自定义即可,下面对于参数位置更换为自定义的实例名),并记录实例的VPC地址

image

同时在控制台创建Source表和Sink表, 并为Source表(source_order)开启一个Tunnel服务blink_agg

image

图三 Source表(source_order)

image

图四 Sink表(sink_order)

image

图五 源表和目标表

image

图六 创建通道

Blink资源

Blink控制台入口,创建一个Blink项目(独享模式要创建集群之后才能创建项目),分别创建一个作业,agg_order,并将上面的Flink SQL粘贴到窗口中,上线服务

image

image

image
在运维窗口中启动该任务
image

第三步:压入数据 并 实时获取结算结果

1 准备配置文件

程序默认会从'~/tablestoreConf.json'获取配置

vim ~/tablestoreConf.json

# 内容
{
    "endpoint":"http://ordertest.cn-zhangjiakou.ots.aliyuncs.com",
    "accessId":"************",
    "accessKey":"************",
    "instanceName":"ordertest"
}

2 构建源码

mvn install
cd target
tar xzvf stream-compute-1.0-SNAPSHOT-release.tar.gz

3 启动压力器和模拟大屏

可以直接下载工具包:stream-compute-1.0-SNAPSHOT-release.tar.gz

# 窗口1
./bin/mock_order_generator
# 窗口2
./bin/data_show_screen

4 效果

image

源码

源码:https://github.com/aliyun/tablestore-examples/tree/master/demos/StreamCompute

欢迎加入
如果您对表格存储使用有疑问、想探讨,欢迎加入【表格存储公开交流群】,群号:11789671。

image

相关实践学习
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
6月前
|
存储 索引
表格存储根据多元索引查询条件直接更新数据
表格存储是否可以根据多元索引查询条件直接更新数据?
60 3
|
SQL 存储 弹性计算
玩转Tablestore:使用Grafana快速展示时序数据
Grafana 是一款采用 go 语言编写的开源应用,主要用于大规模指标数据的可视化展现,是网络架构和应用分析中最流行的时序数据展示工具,可以通过将采集的数据查询然后可视化的展示,实现报警通知;Grafana拥有丰富的数据源,官方支持以下数据源:Graphite,Elasticsearch,InfluxDB,Prometheus,Cloudwatch,MySQ
1643 0
|
1月前
|
分布式计算 DataWorks API
DataWorks常见问题之按指定条件物理删除OTS中的数据失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
3月前
|
DataWorks NoSQL 关系型数据库
可以使用dataworks从tablestore同步数据到mysql吗?
可以使用dataworks从tablestore同步数据到mysql吗?
29 1
|
10月前
|
NoSQL 开发工具
TableStore表格存储(阿里云OTS)多行数据操作查询,支持倒序,过滤条件和分页
1. 批量读取操作 批量读取操作可以通过多种方式进行,包括: GetRow:根据主键读取一行数据。 BatchGetRow:批量读取多行数据。 GetRange:根据范围读取多行数据。
560 0
|
存储 消息中间件 NoSQL
物联网数据通过规则引擎流转到OTS|学习笔记
快速学习物联网数据通过规则引擎流转到OTS
272 0
物联网数据通过规则引擎流转到OTS|学习笔记
|
存储 负载均衡 开发者
表格存储数据多版本介绍| 学习笔记
快速学习表格存储数据多版本介绍。
224 0
表格存储数据多版本介绍| 学习笔记
|
NoSQL 开发者
《玩转 Tablestore 入门与实战》电子版地址
阿里云基础产品事业部Tablestore团队联合阿里云开发者社区《玩转 Tablestore 入门与实战》重磅来袭!466页技术干货,分享阿里内部基于 Tablestore 的优秀架构设计经验和典型应用场景,带你揭开Tablestore 的神秘面纱!
110 0
《玩转 Tablestore 入门与实战》电子版地址
|
存储 NoSQL 关系型数据库
基于TableStore的海量气象格点数据解决方案实战 王怀远
基于TableStore的海量气象格点数据解决方案实战 王怀远
300 0
基于TableStore的海量气象格点数据解决方案实战 王怀远
|
存储 SQL 运维
基于Tablestore 实现大规模订单系统海量订单/日志数据分类存储的实践
前言:从最早的互联网高速发展、到移动互联网的爆发式增长,再到今天的产业互联网、物联网的快速崛起,各种各样新应用、新系统产生了众多订单类型的需求,比如电商购物订单、银行流水、运营商话费账单、外卖订单、设备信息等,产生的数据种类和数据量越来越多;其中订单系统就是一个非常广泛、通用的系统。而随着数据规模的快速增长、大数据技术的发展、运营水平的不断提高,包括数据消费的能力要求越来越高,这对支撑订单系统的数据库设计、存储系统也提出了更多的要求。在新的需求下,传统的经典架构面临着诸多挑战,需要进一步思考架构优化,以更好支撑业务发展;
671 0
基于Tablestore 实现大规模订单系统海量订单/日志数据分类存储的实践