基于阿里云官方Flink滚动窗口测试示例完善篇

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 一 官方文档背景 首先列出官方文档对于Flink滚动窗口的介绍以及示例演示:https://help.aliyun.com/document_detail/62511.html?spm=a2c4g.11174283.6.650.73161e494aJMpz对于具体滚动窗口的含义和参数及SQL的使用不再详尽介绍,官方文档介绍的已经相当完善(niubi); Flink SQL支持的窗口聚合主要是两种:Window聚合和Over聚合。

一 官方文档背景

首先列出官方文档对于Flink滚动窗口的介绍以及示例演示:
https://help.aliyun.com/document_detail/62511.html?spm=a2c4g.11174283.6.650.73161e494aJMpz
对于具体滚动窗口的含义和参数及SQL的使用不再详尽介绍,官方文档介绍的已经相当完善(niubi);
Flink SQL支持的窗口聚合主要是两种:Window聚合和Over聚合。本文档主要介绍Window聚合。Window聚合支持两种时间属性定义窗口:Event Time和Processing Time。
本文主要验证和完善的是Flink SQL对于滚动窗口函数的event time的示例的完善;
如下是关于event time的解释

Event Time
Event Time也称为Row Time。EventTime时间属性必须在源表DDL中声明,可以将源表中的某一字段声明成Event Time。目前只支持将TIMESTAMP类型(将来会支持LONG类型)声明成Row Time字段。如果源表中需要声明为Event Time的列不是TIMESTAMP类型,需要借助计算列,基于现有列构造出一个TIMESTAMP类型的列。
由于数据本身的乱序、网络的抖动(网络堵塞导致的数据传输延迟的变化)或者其它原因,导致了数据到达的顺序和被处理的顺序,可能是不一致的(乱序)。因此定义一个Row Time字段,需要明文定义一个Watermark计算方法。

至于为什么完善,就用文章的详细描述过程来解释吧。

二 原始文档测试过程及问题

2.1 测试流程及相关准备

这里测试的整个流程如下;

image

通过datahub的logstash插件将文档中示例的数据放到csv文件当中,通过logstash同步到datahub当中;
详细logstash同步可以参考:
https://help.aliyun.com/document_detail/47451.html?spm=a2c4g.11174283.6.553.771463efHE1Mot
这里将时间换算成Unix时间戳格式类型,并且要符合datahub微妙级别的格式即16位,如下换算以后末尾增加6个0;

image

并且在datahub中使用bigint方式来存储;
还有一种方式是将标准时间格式的数据以string字符串类型方式存储到datahub中然后再连接到flink当中进行计算;
两种方式都需要在flink中进行进一步的处理;
如下是使用bigint方式存储的字段进行flink处理的方式;

image

如下是使用string方式存储的字段进行flink处理的方式;

image

2.2 原始测试流程

在client源端将数据写入csv文件

image

然后通过logstash插件将数据写入datahub中;

image

如下是datahub捕获到的数据情况

image

通过Flink接datahub存储进行作业开发,开发代码如下,和文档中示例代码一致,做了时间类型的转换,否则会报错;

image

启动任务监控,写入数据以后监控发现有6条记录写入,但是输出是1条;

image

查询输出print结果只输出了1条完全不符合文档中描述的输出3条记录的结果;

image

2.3 数据丢失统计

通过如上的测试,我们通过string存储标准时间格式的方式再次进行了验证,结果依然一样,会发生数据丢失的情况;
如下是详细的解释;
需要注意这里的withOffset设置的是2000即2s;

WATERMARK wk FOR ts as withOffset(ts, 2000) --为Rowtime定义Watermark。

image

最终查到目标端输出的结果只有如下几条数据,其他数据均丢失;

image

三 原始文档测试流程疑问处理

基于官方文档滚动窗口函数连接内容源端Flink引用的数据来源是从datahub同步过来的;但是存在一个问题是标准时间格式的数据同步到datahub以后就变成了16位(微妙级别的Unix时间戳)的timestamp字段来存储时间;

image

而读取到flink以后滚动窗口识别的timestamp时间类型字段的Unix时间戳默认支持识别毫秒级别,即13位的时间戳,直接影响到的数据的计算;
但是在文档中并没有对这部分数据进行实际的处理影响用户测试参考,其实官方文档额外提供了另外的一篇文章--“计算列”:
https://help.aliyun.com/document_detail/110847.html?spm=a2c4g.11186623.2.11.46be1216uEKQSM#concept-nnx-bwy-bhb
详细解释了如何进行针对datahub类似的时间戳格式字段的转换以及flink SQL对应的支持处理方式;这样就解决了时间戳无法识别的问题;

四 基于“计算列”模拟测试

4.1 “计算列”模拟测试

从第三章的讲述基于“计算列”的时间处理通过logstash插件模拟再次进行测试滚动窗口的实现;
开发作业代码如下:

--SQL
--********************************************************************--
--Author: asp_dmp
--CreateTime: 2019-11-07 10:34:36
--Comment: tumble_window
--********************************************************************--

CREATE TABLE tumble_window (
username VARCHAR,
click_url VARCHAR,
`time` bigint,
ts as to_timestamp(`time`/1000),
WATERMARK FOR ts as withOffset(ts, 2000)
) WITH (
type = 'datahub',
endPoint = 'http://dh-cn-beijing-int-vpc.aliyuncs.com',
roleArn='acs:ram::xxxxxxxx:role/aliyunstreamdefaultrole',
project = 'huiyan_zrh',
topic = 'tumble_window_int'
);

CREATE TABLE tumble_output(
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR,
clicks BIGINT
) with (
type='print'
);

INSERT INTO tumble_output
SELECT
TUMBLE_START(ts, INTERVAL '1' MINUTE),
TUMBLE_END(ts, INTERVAL '1' MINUTE),
username,
COUNT(click_url)
FROM tumble_window
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), username;

经过测试源端通过logstash写入12条数据记录;

Jark,http://taobao.com/xxx,1507600800000000       2017-10-10 10:00:00.0  
Jark,http://taobao.com/xxx,1507600810000000       2017-10-10 10:00:10.0
Jark,http://taobao.com/xxx,1507600849000000       2017-10-10 10:00:49.0
Jark,http://taobao.com/xxx,1507600865000000       2017-10-10 10:01:05.0
Jark,http://taobao.com/xxx,1507600918000000       2017-10-10 10:01:58.0
Timo,http://taobao.com/xxx,1507600930000000       2017-10-10 10:02:10.0
Timor,http://taobao.com/xxx,1507600982000000      2017-10-10 10:03:02.0
Timor,http://taobao.com/xxx,1507601015000000      2017-10-10 10:03:35.0
Timor,http://taobao.com/xxx,1507601045000000      2017-10-10 10:04:05.0
Timor,http://taobao.com/xxx,1507601064000000      2017-10-10 10:04:24.0
Tim,http://taobao.com/xxxx,1507601068000000       2017-10-10 10:04:28.0
Tim,http://taobao.com/xxxx,1507601069000000       2017-10-10 10:04:29.0

注:对于如上表格中后边的时间格式仅为了解释使用的时间情况,测试不需要
查看logstash插入日志显示插入12条,从插入情况看并非是顺序一条条进去的;这里留个疑问,后边内容来解释

image

在flink侧查看发现接收到了12条数据记录,但是输出只有一条,

image

从如上的开发作业的实现效果来看,我们想实现的是按照每分钟用户点击网页的次数来分组统计,也就是说希望得到的是6条数据,然而只输出一条;
还是有部分数据丢失了;
然后怀疑主要原因是消息处理延迟吗?Watermark如果设置大一点,降低敏感度,是不是数据不会被丢弃?或者还有其他更好的方案吗?

4.2 Watermark参数的功能

基于4.1章节,在这里就要引申出来开发的flink SQL中引用的Watermark参数了;
使用方式如下:

WATERMARK [watermarkName] FOR AS withOffset(, offset)

对于Watermark用如下示例解释:

WATERMARK FOR rowtime AS withOffset(rowtime, 4000)
Watermark时间为 1501750584000 - 4000 = 1501750580000(2017-08-03 08:56:20.000)。这条数据的Watermark时间含义:时间戳小于1501750580000(2017-08-03 08:56:20.000)的数据已经全部到达。

对于Watermark的使用总结如下:

a.Watermark含义是所有时间戳t'< t 的事件已经全部发生。若t(Watermark)已经生效,则后续Event Time小于t的记录将全部丢弃(后续支持用户配置,使Event Time小于t的数据也能继续更新)。
b.针对乱序的的流,Watermark至关重要。即使部分事件延迟到达,也不会过大影响窗口计算的正确性。
c.并行数据流中,当算子(Operator)有多个输入流时,算子的Event Time以最小流Event Time为准。

详细参考:
https://help.aliyun.com/document_detail/110837.html?spm=a2c4g.11186623.6.624.7ab63a98bPsoRU

五 数据“没有被抛弃”

基于Watermark的解释,其实数据不应该丢弃,而offset参数设置也是为了能够进一步保证数据不会被丢弃;
既然上述的所有测试都出现的数据丢弃的情况发生;那么我们就要回到最初的源头来考虑滚动窗口到底是要实现什么样的功能呢?
其实从业务角度来考虑的话并不难以理解,就拿官方提供的示例来讲,用户点击页面,即使并发再高,速度再快,时间总是有先后顺序的,入库也是肯定是有先后的,因此就可以模拟这样一个顺序写入的一个场景然后观察数据是否会被丢弃;
这时候有人可能会疑问还有时区的问题存在,那么就可以考虑参考官方提供时区内容部分;
https://help.aliyun.com/document_detail/96910.html?spm=a2c4g.11186623.6.622.2ffc56cfVYMyD7
当然至于像淘宝,天猫这样遍布全球的大电商,对于这样场景的统计,就不得而知了,我想可能内部会基于时区做转换吧,或者先考虑本地计算汇总然后再做总的汇总?
瞎猜的,佩服阿里云的这些大佬们,这里省略10000个钦佩感慨的字;
那么回过头来,我们就以北京时间为准,进行模拟顺序写入的方式来实现滚动窗口,看是否会出现数据丢失的情况发生;
同样,我们通过logstash一条条的顺序写入到datahub当中;

Jark,http://taobao.com/xxx,1507600800000000   2017-10-10 10:00:00.0  
Jark,http://taobao.com/xxx,1507600810000000   2017-10-10 10:00:10.0
Jark,http://taobao.com/xxx,1507600849000000   2017-10-10 10:00:49.0
Jark,http://taobao.com/xxx,1507600865000000   2017-10-10 10:01:05.0
Jark,http://taobao.com/xxx,1507600918000000   2017-10-10 10:01:58.0
Timo,http://taobao.com/xxx,1507600930000000   2017-10-10 10:02:10.0
Timor,http://taobao.com/xxx,1507600982000000  2017-10-10 10:03:02.0
Timor,http://taobao.com/xxx,1507601015000000  2017-10-10 10:03:35.0
Timor,http://taobao.com/xxx,1507601045000000  2017-10-10 10:04:05.0
Timor,http://taobao.com/xxx,1507601064000000  2017-10-10 10:04:24.0
Tim,http://taobao.com/xxxx,1507601068000000   2017-10-10 10:04:28.0
Tim,http://taobao.com/xxxx,1507601069000000   2017-10-10 10:04:29.0
Tim,http://taobao.com/xxxx,1507601070000000   2017-10-10 10:04:30.0
Tim,http://taobao.com/xxxx,1507601071000000   2017-10-10 10:04:31.0
Tim,http://taobao.com/xxxx,1507601072000000   2017-10-10 10:04:32.0
Tim,http://taobao.com/xxxx,1507601078000000   2017-10-10 10:04:38.0
Tim,http://taobao.com/xxxx,1507601082000000   2017-10-10 10:04:42.0
Tim,http://taobao.com/xxxx,1507601083000000   2017-10-10 10:04:43.0
Tim,http://taobao.com/xxxx,1507601101000000   2017-10-10 10:05:01.0
Tim,http://taobao.com/xxxx,1507601110000000   2017-10-10 10:05:10.0

注:对于如上表格中后边的时间格式仅为了解释使用的时间情况,测试不需要
输入的csv文件内容如下:

image

数据通过logstash插入datahub日志打印如下:
这里显示的就是一条条的进行了插入

image

然后我们再次从flink当中查看数据统计的输出情况,数据没有丢失,并且最新的数据的输出,需要下游数据再进来以后进行计算统计;

image

至此,我们这里也就应该解释清楚了吧;

数据之所以丢,是因为“若t(Watermark)已经生效,则后续Event Time小于t的记录将全部丢弃。”这句话,原因从测试的整个现象来推测,是并行写入导致的(解释上述4.1章节疑问,并行写入加上我们的数据是一下子输入到datahub中的),可能我们认为应该是顺序写入的,所以出现了数据的丢失;而如果按照滚动窗口来说,就拿文档中用户点击页面来举例,点击的情况是随着时间一下一下点击出来的,所以基本是顺序写入,然后根据相应的时间间隔粒度来统计,数据就不丢失了。
对于offset则是要控制在并行写入(无序)的情况下,对于数据延迟问题的解决,结论就是在offset设置过小时,出现数据丢失的概率较大;offset设置过大,则又会出现数据处理不及时的情况,有兴趣的同学可以通过基于连续时间以及offset设置来验证,篇幅有限,各位有兴趣可搞一波事情;

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
存储 弹性计算 运维
阿里云服务器ECS经济型e实例详细介绍_性能测试和租用价格
阿里云服务器ECS经济型e实例详细介绍_性能测试和租用价格,阿里云服务器ECS推出经济型e系列,经济型e实例是阿里云面向个人开发者、学生、小微企业,在中小型网站建设、开发测试、轻量级应用等场景推出的全新入门级云服务器,CPU采用Intel Xeon Platinum架构处理器,支持1:1、1:2、1:4多种处理器内存配比,e系列性价比优选
|
21天前
|
SQL 存储 API
阿里云实时计算Flink的产品化思考与实践【下】
本文整理自阿里云高级产品专家黄鹏程和阿里云技术专家陈婧敏在 FFA 2023 平台建设专场中的分享。
110327 10
阿里云实时计算Flink的产品化思考与实践【下】
|
1月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
139 3
|
1月前
|
监控 关系型数据库 MySQL
Flink CDC产品常见问题之使用3.0测试mysql到starrocks启动报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
存储 API 流计算
要测试和区分Flink的每个key状态和每个并行度的状态
【2月更文挑战第23天】要测试和区分Flink的每个key状态和每个并行度的状态
14 1
|
1月前
|
分布式计算 关系型数据库 OLAP
阿里云AnalyticDB基于Flink CDC+Hudi实现多表全增量入湖实践
阿里云AnalyticDB基于Flink CDC+Hudi实现多表全增量入湖实践
69 0
|
1月前
|
弹性计算 缓存 测试技术
阿里云2核4g服务器(费用价格/性能测试/支持人数)
阿里云2核4g服务器能支持多少人访问?2核4G服务器并发数性能测试,阿小云账号下的2核4G服务器支持20人同时在线访问,然而应用不同、类型不同、程序效率不同实际并发数也不同,2核4G服务器的在线访问人数取决于多个变量因素
|
1月前
|
弹性计算 缓存 测试技术
2核4g服务器能支持多少人访问?阿里云2核4G服务器并发数测试
2核4g服务器能支持多少人访问?阿里云2核4G服务器并发数测试,2核4G服务器并发数性能测试,阿小云账号下的2核4G服务器支持20人同时在线访问,然而应用不同、类型不同、程序效率不同实际并发数也不同,2核4G服务器的在线访问人数取决于多个变量因素
|
2月前
|
SQL Oracle 算法
Flink CDC 数据源问题之不支持窗口聚合如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
34 0
|
2月前
|
并行计算 计算机视觉
YOLOv8太卷啦 | YOLOv8官方仓库正式支持RT-DETR训练、测试以及推理
YOLOv8太卷啦 | YOLOv8官方仓库正式支持RT-DETR训练、测试以及推理
92 0