Stream 分布式数据流的轻量级异步快照

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 原文来源:Stream 分布式数据流的轻量级异步快照1. 概述分布式有状态流处理支持在云中部署和执行大规模连续计算,主要针对低延迟和高吞吐量。

原文来源Stream 分布式数据流的轻量级异步快照

1. 概述

分布式有状态流处理支持在云中部署和执行大规模连续计算,主要针对低延迟和高吞吐量。这种模式的一个最根本的挑战就是在可能的失败情况下提供处理保证。现有方法依赖于可用于故障恢复的周期性全局状态快照。这些方法有两个主要缺点。首先,他们经常拖延影响数据摄取的整体计算过程。其次,持久化存储所有传输中的记录以及算子状态,这会导致比所需的快照要更大。

因此,提出了一种新的分布式快照的算法,即在 Apache Flink 中的异步屏障快照(Asynchronous Barrier Snapshotting (ABS))。这是一种适用于现代数据流执行引擎的轻量级算法,可最大限度地减少空间需求,让快照发生时对系统的影响降到最低。这种算法不会停止流处理,它只会引入很少的运行时间开销,而且对于整个无环图的拓扑结构,只对有状态的算子进行快照,因此快照的大小只会占用很小的空间。该算法不会对执行产生重大影响,保证线性可伸缩性,并且可以在频繁的快照下正常运行。

这里所说的新型的快照算法,既适用于有向无环图,也适用于有向有环图。本文重点关注在有向无环图中的应用。

2. Apache Flink System

Apache Flink 围绕通用运行时引擎进行架构,可以统一处理批处理和流式作业。Flink 中的作业被编译成任务的有向图。数据元素从外部数据源获取,并以流水线方式通过任务图。基于接收到的输入,任务不断操作其内部状态,并产生新的输出。

2.1 流处理编程模型

可以从外部来源(例如消息队列,套接字流,自定义生成器)或通过调用其他 DataStream 上的操作来创建 DataStreamsDataStreams 支持多种算子,如 mapfilter 和 reduce 等形式的高阶函数,这些函数在每个记录上逐步应用并生成新的 DataStream。每个算子可以通过将并行实例放置在相应流的不同分区上运行来并行化,从而允许分布式执行流转换。

下面的代码示例中显示了如何在 Apache Flink 中实现简单的 Word Count 程序。在此程序中,从文本文件中读取单词,并将每个单词的当前计数打印到标准输出上。这是一个有状态的流处理程序,所以数据源需要知道它们在文件中的当前偏移量,并且需要计数器来将每个单词的当前计数保持在内部状态中。

2.2 分布式数据流执行

当用户执行一个应用程序时,所有的 DataStream 算子都将编译成一个执行图,原理上为一个有向图 G =(T,E),其中顶点 T 表示任务,边 E 表示两个任务之间的 data channels。上图就描绘了一个 Word Count 例子的执行图。如图所示,算子的每个实例都封装在相应的任务上。任务可以进一步细分为没有 input channels 的 Source 以及没有 output channels 的 Sink。此外,M 表示任务在并行执行期间传输的所有记录的集合。每个任务 t ∈ T 封装了一个算子实例的独立运行,其由以下内容组成:

  • 一组 input channels 和 output channelsIt , Ot ⊆ E
  • 算子状态 St
  • 用户自定义函数 ft 。
    在执行过程中,每个任务消耗输入记录,更新算子状态并根据其用户自定义函数生成新的记录。对于流入算子的每一条数据 r ∈ M,通过 UDF,产生一个新的状态值 st’,同时产生一个新的输出的集合 D ⊆ M

3. Asynchronous Barrier Snapshotting

为了提供一致性结果,分布式处理系统需要对失败任务进行恢复。提供这种弹性的一种方法是定期捕获执行图的快照,然后可以用它来从故障中恢复。快照是执行图的全局状态,捕获所有必要信息以从该特定执行状态重新开始计算。

3.1 问题定义

我们定义了一个执行图 G =(T,E) 的全局快照 G * =(T *,E *),其中 T * 和 E * 分别表示所有任务和边的状态集合。更详细地说,T * 由所有算子状态 St * ∈ T * 组成, ∀t ∈ T, E * 是所有 channels 状态 e * ∈ E * 的集合,其中 e * 由在 e 上传输的记录组成。

我们确保每个快照 G * 都保留某些属性,例如最终性 Termination 和可行性 Feasibility,以便在故障恢复后保证结果的正确性。最终性保证,如果所有进程都处于活跃状态,那么快照算法最终会在启动后的有限时间内完成。可行性表达了快照的意义,即在快照过程中关于计算的信息不会丢失。

3.2 非循环数据流的ABS

当一个执行过程被分成多个阶段 (stage),在不保留 channels 状态的情况下执行快照是可行的。stage 将注入的数据流和所有相关的计算划分为一系列可能的执行过程,其中所有先前的输入和生成的输出已经完全处理。在一个 Stage 结束时的算子状态集合反映了整个执行历史,因此它可以用于快照。我们算法背后的核心思想是在保持连续数据摄入的同时使用分段快照创建一致性快照(create identical snapshots with staged snapshotting)。

在我们的方法中,在持续的数据流执行中模拟 stage 是通过向数据流中周期性注入特殊屏障 barrier 标记完成的,这些标记在整个执行图中一直传输到 sink。全局快照是随着每个任务接收表示执行 stage 的 barrier 而逐步构建的。 我们进一步为我们的算法做出以下假设:

  • 网络通道是准确可靠的,遵循 FIFO 先进先出的顺序,并且可以被阻塞以及解除阻塞。当一个 channels 被阻塞时,所有的消息都会被缓存,但是不会被传递,直到它被解除阻塞。
  • 任务可以触发其通道组件上的操作,例如阻塞,解除阻塞和发送消息。所有输出通道都支持广播消息。
  • 在 source 任务中注入的消息(即 stage barrier )被解析为 Nil

算法执行过程如下:

(1) 中央协调器周期性的给所有 source 注入 stage barrier(黑色实线)。当 source 接收到 barrier 时,会为当前的状态生成一个快照,然后将 barrier 广播到它的所有输出中(图(a))。

(2) 当一个非 source 任务接收到其中一个输入的 barrier 时,它会阻塞该输入,直到它接收到来自所有输入的 barrier(第9行 图2(b))。

(3) 当收到来自所有输入的 barrier 时,该任务会生成当前状态的一个快照并将其 barrier 广播到其输出(第12-13行 图2(c))。

(4) 然后,该任务解除输入通道的阻塞来继续后续的计算(第15行,图2(d))。完整的全局快照 G * =(T *,E *) 仅包含所有算子状态 T *,其中 E * = 0

伪代码如下:

如前所述,快照算法应该保证最终性和可行性。最终性由通道和非循环执行图属性保证。channels 的可靠性确保只要任务存活,发送的每个 barrier 最终都会被接收的。此外,由于始终存在来自 source 的一条路径,因此 DAG 拓扑中的每个任务最终都将从其所有 input channels 接收 barrier 并生成快照。

对于可行性,足以证明在全局快照中的算子状态仅反映了直到最后阶段处理的记录的历史过程。这可以通过 channels 的 FIFO 顺序特性以及 barrier 全部接收之前阻塞 input channels 保证 stage 的 post-shot 记录不会在快照生成之前处理。

3.3 循环数据流的ABS

在存在有向循环的执行图中的情况下,上面的 ABS 算法不会终止而会导致死锁,因为一个循环中的任务将无限期地等待接收来自其所有输入的 barrier。此外,在循环中传输的记录不会包含在快照中,因此违反了可行性。因此,为了可行性需要在快照中包含在循环中生成的所有记录,并在恢复时将这些记录重新传输。我们处理循环图的方法继承了基本算法,而不会像上面算法中看到的那样引起任何额外的 channels 阻塞。首先,我们通过静态分析来识别执行图中循环中的 back-edge L。根据控制流图理论,有向图中的 back-edge 是指在深度优先搜索中已经访问过的顶点的边。执行图 G(T,E \ L) 是一个包含拓扑中所有任务的 DAG。从该 DAG 的角度来看,该算法与以前一样运行,但是,我们需要在快照期间对下游 back-edge 接收的记录进行备份。barrier 将循环中的所有记录都推送到下游日志中,以便将它们包含在一致的快照中。

4. 故障恢复

在这提供关于故障恢复操作的简要说明。有几种故障恢复方案可用于一致性快照。最简单的是,整个执行图可以从上一个全局快照重新启动,每个任务 t ,如下所示:

  • 从持久性存储中检索与快照 St 相关联的状态并将其设置为其初始状态
  • 恢复备份的日志以及处理所包含的记录
  • 从其 input channels 开始摄取记录

仅通过重新调度上游任务依赖(其包含到失败任务的 output channels)以及它们各自直到 source 的上游任务,重新恢复调度部分图也是可能的。下图显示了一个恢复示例。为了提供 exactly-once 的语义,应该在所有下游节点中忽略重复记录以避免重新计算。为了达到这个目的,我们用来自source 的序列号标记记录,因此,每个下游节点都可以丢弃序号小于他们已经处理的记录。

5. 实现

我们向 Apache Flink 提供了 ABS 算法的实现,以便为流式运行提供 exactly-once 处理语义。在我们当前的实现中,阻塞通道将所有传入的记录存储在磁盘上,而不是将它们保存在内存中以增加可扩展性。虽然此技术可确保鲁棒性,但会增加 ABS 算法的运行时影响。

为了区分算子状态和数据,我们引入了一个显式的 OperatorState 接口,该接口包含更新状态以及对状态进行检查点的方法。我们为 Apache Flink 支持的有状态运行时算子(例如基于偏移量的源或聚合)提供了 OperatorState 实现。

6. 评估

评估的目标是将 ABS 的运行时间开销与 Naiad 中采用的全局同步快照算法进行比较,并测试算法在大数量节点上的可伸缩性。用于评估的执行拓扑结构(如下图)由6个不同的算子组成,其并行度等于集群节点的个数,转换为 6 * 集群大小 个任务顶点。执行包含3个完整的网络 shuffle,以突显 ABS 中通道阻塞的可能影响。source 产生总共10亿条记录,这些记录在 source 实例中均匀分布。拓扑中算子的状态是按键的聚合以及 source 的偏移量。在 Amazon EC2 集群上使用多达40个 m3.medium 实例在运行实验。

我们测量了在不同快照间隔下 ABS 和同步快照两种快照方案运行的运行时间开销。我们实现了在 Apache Flink Naiad 上使用的同步快照算法,以便在相同终端上执行进行比较。该实验在10节点集群上运行。为了评估我们算法的可伸缩性,我们处理固定数量的输入记录(10亿),同时将我们拓扑的并行度从5个增加到40个节点。

在下图中,我们描述了两种算法对基线的运行时影响(无容错)。当快照时间间隔较小时,同步快照的性能影响尤其明显。这是由于系统花费更多时间来获取全局快照而不是处理数据。ABS 对运行时的影响要低得多,因为它可以持续运行而不会阻碍整体执行,同时保持相当稳定的吞吐率。当快照时间间隔变大时,同步算法的影响逐渐变小。

在下图中,我们使用3秒快照间隔的 ABS 拓扑与基准(无容错)进行比较可扩展性。很明显,基准作业和 ABS 都实现了线性可扩展性。

7. 总结

我们的目的是解决在分布式数据流系统上执行定期全局快照的问题。我们引入了 ABS,这是一种新的快照技术,可实现良好的吞吐量。ABS 是第一种考虑非循环执行拓扑的最小可能状态的算法。此外,我们通过仅存储需要在恢复时重新处理的记录来扩展 ABS 以在循环执行图上使用。我们在 Apache Flink 上实现了 ABS,并对比同步快照算法评估了我们算法的性能。在早期阶段,ABS 显示出良好的结果,对整体执行吞吐量影响较小并具有线性可扩展性。

原文: http://pdfs.semanticscholar.org/541e/cf8c5b9db97eb5fcd1ffdf863d948b8954cc.pdf


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
监控 Docker 容器
【Docker】9、Docker-Compose安装轻量级分布式日志服务Graylog
Graylog 是一个开源的日志聚合、分析、审计、展现和预警工具。在功能上来说,和 ELK 类似,但又比 ELK 要简单很多。
1437 1
【Docker】9、Docker-Compose安装轻量级分布式日志服务Graylog
|
9月前
|
消息中间件
分布式事务解决方案之一:MQ异步确保事务
分布式事务解决方案之一:MQ异步确保事务
|
12月前
|
消息中间件 JavaScript 小程序
一个轻量级的分布式日志标记追踪神器,十分钟接入,非常好用!
一个轻量级的分布式日志标记追踪神器,十分钟接入,非常好用!
|
12月前
|
JavaScript 小程序 Java
SpringBoot接入轻量级分布式日志框架GrayLog
SpringBoot接入轻量级分布式日志框架GrayLog
|
NoSQL MongoDB
分布式服务器框架之Servers.Core库中实现 MongoEntityBase 实现阻塞 异步对MongoDB的增删改查
YFMongoDBModelBase类是个模板类,对模板参数进行了约束YFMongoEntityBase,必须要继承YFMongoEntityBase
|
缓存 运维 负载均衡
Redis连环炮:内存淘汰?事务?分布式锁?分步式限流?异步队列?延时队列?高可用?如何部署?哈希槽?数据库和缓存的数据一致性?
Redis连环炮:内存淘汰?事务?分布式锁?分步式限流?异步队列?延时队列?高可用?如何部署?哈希槽?数据库和缓存的数据一致性?
184 0
Redis连环炮:内存淘汰?事务?分布式锁?分步式限流?异步队列?延时队列?高可用?如何部署?哈希槽?数据库和缓存的数据一致性?
|
消息中间件 设计模式 SQL
多机分布式执行异步任务的实现姿势
执行异步任务时,比如需要处理10W个订单,如果是PHP,我们一般会配置一个定时任务,然后该定时任务就会在单机上执行;如果是GO或者JAVA,我们也需要使用相应的策略,保证该任务只在单机上执行,比如分布式锁。可能有同学会问,我直接在多机上执行同一个任务不行么,我只想说,你胆子真大,当多机同时处理一条数据,你会死的很惨的。
185 0
一个高性能、轻量级的分布式内存队列系统--beanstalk
 Beanstalk是一个高性能、轻量级的、分布式的、内存型的消息队列系统。最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟。其实Beanstalkd是典型的类Memcached设计,协议和使用方式都是同样的风格。其基本设计思想很简单:高性能离不开异步,异步离不开队列,而内部都是生产者-消费者模式的。
一个高性能、轻量级的分布式内存队列系统--beanstalk
|
NoSQL API 调度
【Python】轻量级分布式任务调度系统-RQ
一 前言       Redis Queue 一款轻量级的P分布式异步任务队列,基于Redis作为broker,将任务存到redis里面,然后在后台执行指定的Job。就目前而言有三套成熟的工具celery,huey ,rq 。
4809 0

热门文章

最新文章