表格存储(TableStore)新功能Stream初探

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 阿里云自研PB级nosql数据库TableStore近期发布了新功能Stream,也就是增量通道,可以让用户实时的获取数据库中的增删改操作。很多使用TableStore的用户会定期把数据导入各类计算平台做数据的离线分析,以前的做法是使用DATAX或者使用TableStore的SDK定期拉取数据。

阿里云自研PB级nosql数据库TableStore近期发布了新功能Stream,也就是增量通道,可以让用户实时的获取数据库中的增删改操作。很多使用TableStore的用户会定期把数据导入各类计算平台做数据的离线分析,以前的做法是使用DATAX或者使用TableStore的SDK定期拉取数据。之前我们只能采用全量拉取的办法,定期的全量拉取势必会带来很多不必要的开销,并且也失去了新增数据实时处理的可能。那有了Stream增量通道后,之前的这些痛点都会被迎刃而解。
这个功能究竟怎么使用,又可以用在哪里呢?下面我就带大家初探TableStore的Stream功能。大家也可以先阅读下Stream的原理

产品功能

Stream功能和其他表格存储的很多功能一样,是用户表的一个属性。用户在创建表的时候可以指定是否开启Stream功能。用户也可以通过UpdateTable操作在后续需要使用Stream的时候开启。当用户开启Stream后,用户的修改记录在生命周期内(周期长短由用户开启Stream的时候指定,目前默认最大是一天,如有更长周期需要可以在官网提工单)会被一直保留。
除了表操作以外,Stream的API具体有以下:

  • ListStreams 获取当前表的 Stream 信息,例如 StreamID。具体请参见ListStreams
  • DescribeStream 获取当前表增量数据的分区信息,熟悉表格存储特性的同学会知道表格存储会自动根据用户指定的分片键做分区来实现负载均衡。而我们的增量数据也是通过分区来进行组织,所以消费增量数据之前需要了解当前的分区信息,也就是Stream中的Shard信息。具体参见DescribeStream
  • GetShardIterator 获取具体某个分区的读取iterator。这个iterator可以简单理解为一个偏移量标记我们可以从哪里开始消费增量数据。具体参见GetShardIterator
  • GetStreamRecord 拉取增量数据,每次拉取结束后,会更新iterator用来下次拉取。如果返回数据为空表示当前尚未读取到新数据。如果返回null说明这个分区已经不存在没有后续的增量数据了。GetStreamRecord

如何理解TableStore的增量数据

介绍了Stream API,可能还不是很直观理解TableStore的Stream数据是如何组织的,下面就以用户轨迹为例来介绍如何使用。如何基于表格存储实现用户轨迹数据的存储,可以参考如何高效存储海量GPS数据
在这里,我们假设你使用如下的表结构存储你的海量轨迹数据,

主键顺序 名称 类型 备注
1 partition_key string md5(user_id)前四位 为了负载均衡
2 user_id string/int 用户id 可以是字符串也可以是长整型数字
3 task_id string/int 此次轨迹图的id 可以使字符串也可以是长整型数字
4 timestamp int 时间戳 使用长整型,64位,足够保存毫秒级别的时间戳

假如你有原始数据如下:

2017/5/20 10:10:10的时候小王在杭州虎跑路,开着私家宝马车,速度25km/h,当时风速2m/s,温度20度,已经开了8公里。

在表格存储中存储的是(11列);

part_key user_id task_id timestamp longitude latitude brand speed wind_speed temperature distance
04fc 000001 001 1495246210 120.1516525097 30.2583277934 BMW 25 2 20 8000

当用户的位置不断发生变化,我们会产生一系列类似上面的轨迹数据,例如我们的粒度可以是10秒一个轨迹点。这样在一段时间内,我们可以积累海量的轨迹数据。那对于业务方,往往要做一些运营分析。

分析话题1:统计过去10分钟内是否有一个区域有驾驶热点,会带来交通拥堵。发现潜在拥堵点后,提前做一些车流疏散。

分析话题2:又或者我们希望在晚饭时间点,统计一下来某个商圈吃饭的客户都是从哪些地方开车过来的,日后可以在辐射区域内做一些精准推广。

这类问题的共同点是需要在这张轨迹表中获取一个时间段内新写入的数据,针对我们的表结构设计,如果没有增量通道的时候,我们能做的就是拿到所有的用户id和taskid进行时间段内的getrange读,这样如果同时的轨迹用户较多,会带来大量的getrange并发访问,而且我们还需要一张额外的表记录用户和轨迹id的关系。如果我们修改表结构,把时间作为第一主键,又会带来严重的数据写入尾部热点,数据分布不均匀等问题。

那么我们的架构就由以前的
gps_004_jpeg 加上大量的range读变为了下图的基于增量获取:
gps_007_jpeg

Stream 的数据返回格式

当你使用我们的Stream APi读取增量数据的时候上面的数据会以下面的形式返回,我们以Go Sdk为例返回如下格式的Stream record。

record 0: {"Type":"PutRow", "PrimaryKey":{[{"Name": "pk1", "Value": "04fc"} {"Name": "pk2", "Value": "000001"} {"Name": "pk3", "Value": "001"} {"Name": "pk4", "Value": "%!s(int64=1495246210)"}]}, "Info":{"Epoch":0, "Timestamp": 1503555067832234, "RowIndex": 1}, "Columns":[{"Name":"longitude", "Type":"Put", "Timestamp":1503555067833, "Value":1e+02} {"Name":"latitude", "Type":"Put", "Timestamp":1503555067833, "Value":30.2583277934} {"Name":"brand", "Type":"Put", "Timestamp":1503555067833, "Value":BMW} {"Name":"speed", "Type":"Put", "Timestamp":1503555067833, "Value":25} {"Name":"wind_speed", "Type":"Put", "Timestamp":1503555067833, "Value":2} {"Name":"temperature", "Type":"Put", "Timestamp":1503555067833, "Value":20} {"Name":"distance", "Type":"Put", "Timestamp":1503555067833, "Value":8000}]}
record 1: {"Type":"PutRow", "PrimaryKey":{[{"Name": "pk1", "Value": "04fc"} {"Name": "pk2", "Value": "000001"} {"Name": "pk3", "Value": "001"} {"Name": "pk4", "Value": "%!s(int64=1495246310)"}]}, "Info":{"Epoch":0, "Timestamp": 1503555068082609, "RowIndex": 1}, "Columns":[{"Name":"longitude", "Type":"Put", "Timestamp":1503555068083, "Value":1e+02} {"Name":"latitude", "Type":"Put", "Timestamp":1503555068083, "Value":30.2583277934} {"Name":"brand", "Type":"Put", "Timestamp":1503555068083, "Value":BMW} {"Name":"speed", "Type":"Put", "Timestamp":1503555068083, "Value":25} {"Name":"wind_speed", "Type":"Put", "Timestamp":1503555068083, "Value":2} {"Name":"temperature", "Type":"Put", "Timestamp":1503555068083, "Value":20} {"Name":"distance", "Type":"Put", "Timestamp":1503555068083, "Value":8001}]}

我们可以发现,表格的一次操作对应Stream的一条记录,记录中会涵盖这次操作的类型,操作的主键以及修改列的内容。有了这些数据我们可以方便做以下事情:

  1. 将数据做清洗写入另一张TableStore表
  2. 将数据写入流计算平台,做实时计算分析
  3. 将数据写入MaxCompute做进行分析

下面罗列下我们目前有如下几种方式可以读区表格存储的增量数据:

  1. SDK直接访问,目前我们的Java SDK和Go SDK已经支持Stream的Api,具体的使用可以参考Java Stream 示例Go Stream 示例
  2. DATAX 离线读取stream数据到odps,具体使用参考[DATAX 访问TableStore增量]()
  3. 基于Stream Client,用户自己开发实时数据通道将数据导出至不同的数据源。使用可以参考Stream Client使用
  4. Stream对接FC,通过FC触发数据处理逻辑。 [即将发布,敬请期待]()

下面我们就用外卖订单系统为例再说明下Stream如何可以方便我们简化,高效的实现我们的应用。

外卖订单系统

场景描述

现在外卖行业非常火热,几家大厂都在角逐这个领域。而外卖也确实给我们的日常生活带来的很多便利,那如何基于表格存储打造一款高效的外卖应用呢,下面我们来详细介绍下。

系统特点

很多外卖会在不同时间有明显的波峰波谷,例如食品外卖,三餐点和夜宵时间点会有明显的波峰。那么表格存储这类海量高性能弹性计费的数据库产品就非常适合。除此之外,外卖系统还要基于一个区域内所有用户的下单情况做一个做优化的配送,实现效率最优,那么这样的系统我们如何设计表结构呢。

表结构设计

表1 订单表

主键顺序 名称 类型 备注
1 partition_key string md5(user_id)前四位 为了负载均衡
2 user_id string/int 用户id 可以是字符串也可以是长整型数字
3 timestamp int 时间戳 使用长整型,64位,足够保存毫秒级别的时间戳
4 order_id string/int 订单Id 可以使字符串也可以是长整型数字

表2 配送表

主键顺序 名称 类型 备注
1 partition_key string md5(user_id)前四位 为了负载均衡
2 user_id string/int 配送员id 可以是字符串也可以是长整型数字
3 delivery_id int 配送序列 使用长整型,基于表格存储主键自增列

数据存储示例

设计好表结构后,我们看下具体如何存储:
订单表原始数据是:

2017/5/20 10:12:20小王下了订单,订单号10005,购买了两串烤肉和一杯咖啡,总共支付来51元,收获地址是西湖区XXX路XX号。

配送表原始数据是:

2017/5/20 10:12:20配送员小李收到配送订单信息,订单号10005,购买了两串烤肉和一杯咖啡,收获地址是西湖区XXX路XX号。
part_key user_id timestamp order_id merchant_id commodity price address payment_type status
01f3 000001 1495246210 10005 黑暗料理 2烤肉,1咖啡 51 西湖区XXX路XX号 alipay 等待配送
part_key user_id delivery_id order_id merchant_id commodity price address payment_type status
01f3 000001 1495249230 10005 黑暗料理 2烤肉,1咖啡 51 西湖区XXX路XX号 alipay 配送中

主键

订单表
  1. part_key:第一个主键,分区建,主要是为了负载均衡,保证数据可以均匀分布在所有机器上,提高并发度和性能。如果业务主键user_id可以保证均匀分布,那么可以不需要这个主键。
  2. user_id:第二个主键,用户ID,可以是字符串也可以是数字,唯一标识一个用户。
  3. timestamp:第三个主键,时间戳,表示某一个时刻,单位可以是秒或者毫秒,用来表示用户订单的时间戳。在这里放置时间,是因为系统往往需要查询某个用户一段时间内的所有订单信息。
  4. order_id:第四个主键,订单号。
  5. 至此,上述四个主键可以唯一确定某一个用户在某一个时间点下的一个订单。
配送表
  1. part_key:第一个主键,分区建,主要是为了负载均衡,保证数据可以均匀分布在所有机器上,提高并发度和性能。如果业务主键user_id可以保证均匀分布,那么可以不需要这个主键。
  2. user_id:第二个主键,配送员ID,可以是字符串也可以是数字,唯一标识一个用户。
  3. delivery_id:第三个主键,配送号,注意不是用户的订单号,这一列使用自增列,配送员的客户端可以根据这个id拉去更新的配送信息。
  4. 至此,上述三个主键可以获取一个配送订单的详细信息。

属性列

订单表
  1. merchant_id :商家id
  2. commodity:商品内容。
  3. price:订单价格。
  4. address: 配送地址
  5. payment_type:用户支付类型。
  6. status:订单的状态。
配送表
  1. order_id :订单id
  2. commodity:商品内容。
  3. price:订单价格。
  4. address: 配送地址
  5. payment_type:用户支付类型。
  6. status:订单的状态。

由于表格存储的分区键可以在数据访问增加时进行分裂,当我们有百万用户同时在高峰期下单时我们可以分裂出较多的分区轻松应对每秒数十万甚至数百万的新增订单。有了这样的一个订单存储系统后,如何衔接我们的派单系统呢,这时候我们就可以使用增量功能,把近期的订单信息导入排单系统进行线路优化计算。前面我们也提到了外卖订单的伸缩特性,所以我们推荐使用函数计算进行订单的派送计算,我们表格存储Stream对接函数计算的功能也即将上线,届时我们是需要一些配置就可以打通表格存储和函数计算这两款全托管完全弹性计费的存储,计算产品。让我们的外卖订单飞的再快一点吧。
Stream_FC

谢谢使用表格存储,欢迎扫码加群讨论

相关实践学习
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
10月前
|
NoSQL 开发工具
TableStore表格存储(阿里云OTS)多行数据操作查询,支持倒序,过滤条件和分页
1. 批量读取操作 批量读取操作可以通过多种方式进行,包括: GetRow:根据主键读取一行数据。 BatchGetRow:批量读取多行数据。 GetRange:根据范围读取多行数据。
577 0
|
存储 SQL NoSQL
表格存储 Tablestore 十年发展总结
这篇文章接下来会先整体介绍下表格存储 Tablestore,之后会分享下在技术层面产品这几年的功能演进、技术架构演进以及稳定性优化相关的工作,以及在业务层面我们定义的核心应用场景和一些典型案例。
66543 7
表格存储 Tablestore 十年发展总结
《效率提升:表格存储实时数据流:Stream的技术揭秘和应用场景》电子版地址
效率提升:表格存储实时数据流:Stream的技术揭秘和应用场景
78 0
《效率提升:表格存储实时数据流:Stream的技术揭秘和应用场景》电子版地址
|
存储 NoSQL
|
存储 SQL NoSQL
表格存储 Tablestore SQL 商业版介绍
表格存储(Tablestore)是阿里云自研的多模型结构化数据存储,提供海量结构化数据存储以及快速的查询和分析服务。表格存储的分布式存储和强大的索引引擎能够支持 PB 级存储、千万 TPS 以及毫秒级延迟的服务能力。使用表格存储你可以方便的存储和查询你的海量数据。 表格存储在 21 年 9 月正式公测了 SQL 功能,使得你在享受表格存储全托管,灵活的存储能力之外,可以让你的业务迁移更加平顺。经
1135 0
表格存储 Tablestore SQL 商业版介绍
|
存储 运维 NoSQL
表格存储 Tablestore 简介
近十年来互联网技术得到了飞速的发展,越来越多的行业逐渐加入到了互联网的阵营中来,同时也产生了更丰富、更复杂的业务场景和需求,这对于数据应用系统的性能无疑是巨大的挑战。传统关系型数据库有什么瓶颈,如何通过分布式数据库表格存储 Tablestore 进行优化?
832 0
|
存储 SQL 缓存
使用 Blink 访问表格存储 Tablestore
本文介绍如何使用实时计算 Blink 服务访问表格存储服务(Tablestore),并进行开发。背景Blink 产品介绍阿里云实时计算Flink版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于Apache Flink构建的企业级、高性能实时大数据处理系统,由Apache Flink创始团队官方
464 0
使用 Blink 访问表格存储 Tablestore
|
6月前
|
存储 索引
表格存储根据多元索引查询条件直接更新数据
表格存储是否可以根据多元索引查询条件直接更新数据?
62 3
|
SQL 存储 弹性计算
玩转Tablestore:使用Grafana快速展示时序数据
Grafana 是一款采用 go 语言编写的开源应用,主要用于大规模指标数据的可视化展现,是网络架构和应用分析中最流行的时序数据展示工具,可以通过将采集的数据查询然后可视化的展示,实现报警通知;Grafana拥有丰富的数据源,官方支持以下数据源:Graphite,Elasticsearch,InfluxDB,Prometheus,Cloudwatch,MySQ
1644 0
|
1月前
|
分布式计算 DataWorks API
DataWorks常见问题之按指定条件物理删除OTS中的数据失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。