通量电容器?流式SQL中的时态表和连接

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: 简单介绍Flink流式处理数据的特性,以例子讲解Flink建模时态数据。

在1985年的电影《回到未来》中,由发明家Doc Brown设计的神器磁通电容器(flux capacitor)让Marty Mcfly拥有了穿越时空的能力。而数据库一般是具有ACID四大特性,需要考虑时间对数据库的影响。一直以来,弄清楚如何管理和建模时间数据以进行有效的时间点分析是一项长期的研究过程,最早可以追溯到80年代早期,2011年才引入了SQL标准中的时态表(temporal tables)。到目前为止,用户注定要将其作为应用程序逻辑的一部分来实现,这通常会延长开发生命周期以及损失代码的可维护性。此外,虽然没有统一的、普遍接受的时态数据( temporal data)定义,但它所代表的挑战是相同的:如何根据动态变化的历史数据集验证或丰富数据?

出租车费和转换率

例如:如果出租车票价事件与乘坐地点的当地货币挂钩,我们尽可能的希望将票价转换为通用货币以便进一步处理。这是由于汇率在一段时间内的波动会很大,为了产生可靠的结果,每个出租车费用事件都需要与事件发生时的有效汇率相匹配。

使用Flink建模时态数据

在1.7版本中,Flink已将时态表的概念引入其流式SQL和表API中:仅附加表的参数化视图、或者仅允许插入记录、从不更新或删除记录的任何表,这些表被解释为变更日志,并将数据与时间上下文紧密相关,以便只能在特定时间段内将其解释为有效。将流转换为时态表需要以下两步:

  • 定义主键版本控制字段,可用于跟踪随时间发生的更改;
  • 将流公开为时间表函数,将每个时间点映射到静态关系。

回到上面的示例用例中,时态表正是我们对汇率数据进行建模所需要的,例如对时间点查询有用。临时表函数是作为Flink通用表函数类的扩展实现的,并且可以用与表API或SQL解析器一起使用来定义。

import org.apache.flink.table.functions.TemporalTableFunction;
 
(...)
 
// Get the stream and table environments.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
 
// Provide a sample static data set of the rates history table.
List <Tuple2<String, Long>>ratesHistoryData =new ArrayList<>();
 
ratesHistoryData.add(Tuple2.of("USD", 102L)); 
ratesHistoryData.add(Tuple2.of("EUR", 114L)); 
ratesHistoryData.add(Tuple2.of("YEN", 1L)); 
ratesHistoryData.add(Tuple2.of("EUR", 116L)); 
ratesHistoryData.add(Tuple2.of("USD", 105L));
 
// Create and register an example table using the sample data set.
DataStream<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);
 
Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, "r_currency, r_rate, r_proctime.proctime");
 
tEnv.registerTable("RatesHistory", ratesHistory);
 
// Create and register the temporal table function "rates".
// Define "r_proctime" as the versioning field and "r_currency" as the primary key.
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");
 
tEnv.registerFunction("Rates", rates);
 
(...)

实际上,这个Rate函数的作用是什么呢?假设我们想检查给定时间的汇率,比如11:00这一时刻。我们可以简单地通过下面的sql语句得到答案:

SELECT * FROM Rates('11:00');

时间点查询

尽管Flink还不支持使用常量时间属性参数查询时态表函数,但这些函数可用于覆盖更有趣的场景:时态表连接。

使用临时表流式连接

时态表在与流数据结合使用时发挥了其全部潜力。例如,为应用程序提供动力,这些应用程序必须针对一个参考数据集连续地列出白名单,该数据集随着审计或法规遵从时间而发生变化。由于计算成本和资源的消耗,高效连接长期以来一直是查询处理器面临的持久挑战,但在流式数据上的连接带来了一些额外的挑战:

  • 流的无边界特性意味着输入被连续评估,并且中间连接结果可以无限地消耗内存资源。Flink优雅地管理其内存消耗(即使对于连接需要溢出到磁盘的较重情况),并支持时间窗口连接以限制需要保持为状态的数据量;
  • 流数据可能是乱序的或者有延迟,因此不可能预先强制执行排序,并且时间处理需要一些思考以避免不必要的输出和撤销。

在时间数据的特定情况下,时间窗连接是不够的(至少在不进行一些代价调整的情况下是不够的)。这会导致迟早会发生每个参考记录将落在窗口之外并从状态擦除的情况,不再正在考虑将来的联接结果。为了解决这个限制,Flink引入了对时态表连接的支持,以涵盖时变关系。

出租车费与转换率之间的时间表联系

探测端(Taxi Fare)上仅附加表中的每个记录与构建端(Conversion Rate)上的时间表中的记录版本相连接,该版本与探测端记录时间属性(time)找最接近匹配的主键(currency)值。还记得之前注册的时态表函数(Rates)吗?现在可用于将此连接表达为一个简单的SQL语句,否则需要使用子查询复杂语句。

定期加入与时间表加入

时态表连接既支持处理语义, 也支持事件时间语义,并有效地限制保持在状态中的数据量,同时还允许构建端上任意旧的记录,这与时间窗口连接相反。探测端记录只需要在很短的时间内保持状态,以确保存在无序记录时正确语义。本节开头提到的问题可以通过以下方式克服:

  • 缩小连接的范围:对于给定的taxiFare.time只有时间匹配版本的ratesHistory可见;
  • 从状态中修剪不需要的记录:在当前时间和水印(watermark))延迟之间的记录对于探测端和构建端都是持久的。一旦水印到达并且结果被发出,这些将被丢弃,允许连接操作在时间上向前移动,构建表在状态下“刷新”其版本。

结论

根据上面的内容可以总结到,可以使用Flink在关系和时变参数中能够表达丰富的连续流,而不必涉及语法拼写或者对性能有所影响。换句话说:流式时间旅行无需磁通电容器。将此语法扩展到批处理,以使用适当的(事件)时间语义来丰富历史数据,这也是Flink路线图的一部分!

如果想在使用Flink SQL(通常是Flink SQL)连接流方面获得一些实际操作实践,这里有一个免费培训项目,培训环境基于Docker,只需几分钟即可建立。

作者信息

本文由阿里云开发者社区组织翻译。

文章原标题《Flux capacitor, huh? Temporal Tables and Joins in Streaming SQL》

作者:morsapaes

译者:海棠

文章为简译,更为详细的内容,请查看原文

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
SQL
幂等修改表增加一列的sql语句
这个 SQL 查询用于修改名为 `t1` 的表,通过添加一个名为 `b` 的新列,并指定数据类型。`IF NOT EXISTS` 子句确保只有在表中不存在该列的情况下才会添加。 在你的查询中,`<数据类型>` 应该被替换为你想要的具体数据类型(例如,INT,VARCHAR等)。以下是使用 INT 数据类型的示例: ```sql ALTER TABLE t1 ADD COLUMN IF NOT EXISTS b INT; ``` 请根据你的需求选择合适的数据类型。这个查询的作用是在表 `t1` 中添加一个名为 `b` 的新列,如果该列尚不存在的话。
|
1月前
|
SQL 关系型数据库 MySQL
【MySQL】— —熟练掌握用SQL语句实现数据库和基本表的创建。熟练掌握MySQL的安装、客户端登录方法;熟练掌握MySQL的编码、数据类型等基础知识;掌握实体完整性的定义和维护方法、掌握参照完整性
【MySQL】— —熟练掌握用SQL语句实现数据库和基本表的创建。熟练掌握MySQL的安装、客户端登录方法;熟练掌握MySQL的编码、数据类型等基础知识;掌握实体完整性的定义和维护方法、掌握参照完整性
99 1
|
1月前
|
SQL
现有用户成就统计需求,每个用户有多个成就,某一个成就会被多人拥有,写出数据表设计方案,用一条sql查出每个成就(B.ach_name)下的男生(sex=0)和女生(sex=1)分别有多少?
现有用户成就统计需求,每个用户有多个成就,某一个成就会被多人拥有,写出数据表设计方案,用一条sql查出每个成就(B.ach_name)下的男生(sex=0)和女生(sex=1)分别有多少?
41 0
|
7天前
|
SQL 安全 网络安全
IDEA DataGrip连接sqlserver 提示驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接的解决方法
IDEA DataGrip连接sqlserver 提示驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接的解决方法
17 0
|
1月前
|
SQL 关系型数据库 MySQL
【MySQL】——用SQL语句实现数据库和基本表的创建
【MySQL】——用SQL语句实现数据库和基本表的创建
63 3
【MySQL】——用SQL语句实现数据库和基本表的创建
|
1月前
|
SQL
SQL多表查询的几种连接方式
SQL多表查询时,主要使用以下几种连接方式
|
2月前
|
SQL 消息中间件 分布式数据库
flink sql问题之连接HBase报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
185 0
|
2月前
|
SQL 网络协议 Java
【Java+SQL Server】前后端连接小白教程
【Java+SQL Server】前后端连接小白教程
24 0
|
3月前
|
SQL Oracle 关系型数据库
SQL 的 with as 临时中间表
SQL 的 with as 临时中间表
40 1
|
3月前
|
SQL 数据库
SQL-修改表操作
SQL-修改表操作