Flink在大规模状态数据集下的checkpoint调优

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 5万人关注的大数据成神之路,不来了解一下吗?5万人关注的大数据成神之路,真的不来了解一下吗?5万人关注的大数据成神之路,确定真的不来了解一下吗?欢迎您关注《大数据成神之路》今天接到一个同学的反馈问题,大概是:Flink程序运行一段时间就会报这个错误,定位好多天都没有定位到。

5万人关注的大数据成神之路,不来了解一下吗?
5万人关注的大数据成神之路,真的不来了解一下吗?
5万人关注的大数据成神之路,确定真的不来了解一下吗?

欢迎您关注《大数据成神之路》

今天接到一个同学的反馈问题,大概是:

Flink程序运行一段时间就会报这个错误,定位好多天都没有定位到。checkpoint时间是5秒,20秒都不行。

Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://HDFSaaaa/flink/PointWideTable_OffTest_Test2/1eb66edcfccce6124c3b2d6ae402ec39/chk-355/1005127c-cee3-4099-8b61-aef819d72404 in order to obtain the stream state handle
        at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
        at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179)
        at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
        at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
        ... 7 more
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /flink/PointWideTable_OffTest_Test2/1eb66edcfccce6124c3b2d6ae402ec39/chk-355/1005127c-cee3-4099-8b61-aef819d72404 (inode 937800469): File does not exist. [Lease.  Holder: DFSClient_NONMAPREDUCE_1949016825_84, pendingcreates: 10]
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3432)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3520)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3487)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:787)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:537)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
        at org.apache.hadoop.ipc.Client.call(Client.java:1476)
        at org.apache.hadoop.ipc.Client.call(Client.java:1413)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
        at com.sun.proxy.$Proxy17.complete(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:462)
        at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at com.sun.proxy.$Proxy18.complete(Unknown Source)
        at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2506)
        at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2482)
        at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2447)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
        at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
        at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)

同样在网上搜索,也有同样的问题,比如:

任务一开始正常,跑一两天后就会checkpoint超时,收不到Latest Acknowledgement,然后用同样的包重启又可以正常跑几天如此反复,一直找不到原因。
设置项如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //设置失败后一直重启
    env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.milliseconds(1000), Time.minutes(5)));
    env.disableOperatorChaining(); 
    env.enableCheckpointing(1000 * 60 * 15, CheckpointingMode.AT_LEAST_ONCE);
    env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
    //业务比较复杂设置超时时间1个小时。
    env.getCheckpointConfig().setCheckpointTimeout(1000 * 60 * 60);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000  * 10);
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

前言

众所周知,Flink内部为了实现它的高可用性,实现了一套强大的checkpoint机制,还能保证作用的Exactly Once的快速恢复。对此,围绕checkpoint过程本身做了很多的工作。在官方文档中,也为用户解释了checkpoint的部分原理以及checkpoint在实际生产中(尤其是大规模状态集下)的checkpoint调优参数。笔者结合官方文档,给大家做个总结,也算是对Flink checkpoint机理的一个学习。

Checkpoint快慢的性能指标

如果说我们想要对flink的checkpoint操作做调优,那么我们首先得有个衡量指标来展现当前checkpoint是否快慢。在这里,官方提供了以下2个metric指标:

Checkpoint每次开始的时间。观察每次checkpoint开始的时间是为了检测在每次前后checkpoint中间是否存在空闲时间间隔。如果存在间隔时间,说明当前checkpoint都在合理时间内完成。

观察数据buffered的量。这个buffered动作是为了等待其它较慢数据流的stream barriers而设计的。这个偏向于checkpoint原理化的相关内容了。

但大体上,用户根据第一条就能够监测出应用的checkout快慢了。

相邻Checkpoint的间隔时间设置

我们假设一个使用场景,在极大规模状态数据集下,应用每次的checkpoint时长都超过系统设定的最大时间(也就是checkpoint间隔时长),那么会发生什么样的事情。

答案是应用会一直在做checkpoint,因为当应用发现它刚刚做完一次checkpoint后,又已经到了下次checkpoint的时间了,然后又开始新的checkpoint。最后就会造成一个很坏的结果:用户应用本身都没法跑了。

当然了,我们可能会说了,我们设置一下并行checkpoint数,或者说做增量checkpoint,不用每次做全量checkpoint。每次只checkpoint出对前一次checkpoint内的状态数据的增量改动。然后恢复的时候做状态改动的重放。

但是这里,我们可以采用一种更加直接有效的方法,设置连续checkpoint的时间间隔。形象地解释,就是强行在checkpoint间塞入空闲时间,如下图。
image

涉及的相关配置设置如下:

StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)

Checkpoint的资源设置

当我们对越多的状态数据集做checkpoint时,需要消耗越多的资源。因为Flink在checkpoint时是首先在每个task上做数据checkpoint,然后在外部存储中做checkpoint持久化。在这里的一个优化思路是:在总状态数据固定的情况下,当每个task平均所checkpoint的数据越少,那么相应地checkpoint的总时间也会变短。所以我们可以为每个task设置更多的并行度(即分配更多的资源)来加速checkpoint的执行过程。

Checkpoint的task本地性恢复

为了大家未来对checkpoint的优化,我们有必要在runtime级别的checkpoint过程。首先我们要明白一点,flink的checkpoint不是一个完全在master节点的过程,而是分散在每个task上执行,然后在做汇总持久化。这些task做的checkpoint数据在后面应用恢复时包括并行度扩增或减少时还能够重新打散分布。

为了快速的状态恢复,每个task会同时写checkpoint数据到本地磁盘和远程分布式存储,也就是说,这是一份双拷贝。只要task本地的checkpoint数据没有被破坏,系统在应用恢复时会首先加载本地的checkpoint数据,这样就大大减少了远程拉取状态数据的过程。此过程如下图所示:
image

外部State的存储选择

上小节的方法其实还并没有从本质上解决大规模状态集下checkpoint慢的问题,只是说它降低了这个慢的风险和造成的影响。在这里我们反复强调的是一个大规模状态,我们理理思路,因为规模之大,所以我们才会慢。那如果我们能找到一种更快的存储状态的介质(或者策略),那么这个过程也是能够变快的。

所以在这里,我们可以选择更加高效的外部存储介质来做State的存储(比如RocksDB),而不是仅限于存储于有限的内存空间里,或完全落地到磁盘上。这是我们在State Backend上做的一个选择。

可以使用RocksDB来作为增量checkpoint的存储,并在其中不是持续增大,可以进行定期合并清楚历史状态。

image

该例子中,子任务的操作是一个keyed-state,一个checkpoint文件保存周期是可配置的,本例中是2,配置方式state.checkpoints.num-retained,上面展示了每次checkpoint时RocksDB示例中存储的状态以及文件引用关系等。

  • 对于checkpoint CP1,本地RocksDB目录包含两个磁盘文件(sstable),它基于checkpoint的name来创建目录。当完成checkpoint,将在共享注册表(shared state registry)中创建两个实体并将其count置为1.在共享注册表中存储的Key是由操作、子任务以及原始存储名称组成,同时注册表维护了一个Key到实际文件存储路径的Map。
  • 对于checkpoint CP2,RocksDB已经创建了两个新的sstable文件,老的两个文件也存在。在CP2阶段,新的两个生成新文件,老的两个引用原来的存储。当checkpoint结束,所有引用文件的count加1。
  • 对于checkpoint CP3,RocksDB的compaction将sstable-(1),sstable-(2)以及sstable-(3)合并为sstable-(1,2,3),同时删除了原始文件。合并后的文件包含原始文件的所有信息,并删除了重复的实体。除了该合并文件,sstable-(4)还存在,同时有一个sstable-(5)创建出来。Flink将新的sstable-(1,2,3)和sstable-(5)存储到底层,sstable-(4)引用CP2中的,并对相应引用次数count加1.老的CP1的checkpoint现在可以被删除,由于其retained已达到2,作为删除的一部分,Flink将所有CP1中的引用文件count减1.
  • 对于checkpoint CP4,RocksDB合并sstable-(4)、sstable-(5)以及新的sstable-(6)成sstable-(4,5,6)。Flink将该新的sstable存储,并引用sstable-(1,2,3),并将sstable-(1,2,3)的count加1,删除CP2中retained到2的。由于sstable-(1), sstable-(2), 和sstable-(3)降到了0,Flink将其从底层删除。
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
JSON Java API
Flink CDC 2.0 支持全量故障恢复,可以从 checkpoint 点恢复。
【2月更文挑战第17天】Flink CDC 2.0 支持全量故障恢复,可以从 checkpoint 点恢复。
39 3
|
2月前
|
消息中间件 Java Kafka
Flink背压问题之checkpoint超时如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
存储 SQL canal
Flink CDC数据同步问题之同步数据到checkpoint失败如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。
|
2月前
|
Oracle 关系型数据库 MySQL
Flink CDC之数据源调优如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
27 1
|
2月前
|
算法 大数据 数据处理
【天衍系列 01】深入理解Flink的 FileSource 组件:实现大规模数据文件处理
【天衍系列 01】深入理解Flink的 FileSource 组件:实现大规模数据文件处理
|
3月前
|
资源调度 监控 API
在Flink中,通过YARN模式进行峰谷动态并行度扩容缩容可以使用 Flink 自带的动态调优功能
在Flink中,通过YARN模式进行峰谷动态并行度扩容缩容可以使用 Flink 自带的动态调优功能【1月更文挑战第6天】【1月更文挑战第26篇】
145 1
|
4月前
|
运维 算法 Apache
FFA 2023 「生产实践」专场:Flink 大规模技术优化与生产实践
完整议程已公开,期待 12 月 8-9 日与你 Flink Forward Asia 2023 相会!
395 1
 FFA 2023 「生产实践」专场:Flink 大规模技术优化与生产实践
|
8月前
|
SQL Serverless 程序员
准备数据集用于flink学习
准备一百多万的交易数据,作为flink学习过程中的数据集
准备数据集用于flink学习
|
9月前
|
Web App开发 消息中间件 固态存储
Flink Unaligned Checkpoint 在 Shopee 的优化和实践
Tech Lead of Shopee Flink Runtime Team 范瑞,在 Flink Forward Asia 2022 核心技术的分享。
528 0
Flink Unaligned Checkpoint 在 Shopee 的优化和实践
|
10月前
|
SQL 人工智能 分布式计算
如何使用 Flink SQL 探索 GitHub 数据集|Flink-Learning 实战营
想要了解如何使用 Flink 在 GitHub 中发现最热门的项目吗?本实验使用阿里云实时计算 Flink 版内置的 GitHub 公开事件数据集,通过 Flink SQL 实时探索分析 Github 公开数据集中隐藏的彩蛋!
381 1
如何使用 Flink SQL 探索 GitHub 数据集|Flink-Learning 实战营

热门文章

最新文章