Flink流处理之迭代任务

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 前面我们分析过Flink对迭代在流图中的特殊处理,使得迭代中的反馈环得以转化为普通的DAG模型。这一篇我们将剖析运行时的流处理迭代任务的执行机制。这里涉及到两个任务类: StreamIterationHead:迭代头任务,它借助于反馈阻塞队列从迭代尾部接收参与下一次迭代的反馈数据。

前面我们分析过Flink对迭代在流图中的特殊处理,使得迭代中的反馈环得以转化为普通的DAG模型。这一篇我们将剖析运行时的流处理迭代任务的执行机制。这里涉及到两个任务类:

  • StreamIterationHead:迭代头任务,它借助于反馈阻塞队列从迭代尾部接收参与下一次迭代的反馈数据。
  • StreamIterationTail:迭代尾任务,它借助于阻塞队列作为反馈信道将下一次需要迭代的数据反馈给迭代头。

对于迭代流处理而言,随着任务(task)最终被并行化执行,它们的子任务(sub task,这些任务在计算节点中的实例)的并行度要求一致,并且迭代头的子任务与迭代尾的子任务必须成对且处于同一个CoLocationGroup中执行,这些都反映了流处理的迭代任务的特殊性,可以认为是一种并发与并行兼具的模式。

StreamIterationHead和StreamIterationTail之间的交互依赖于阻塞队列代理(BlockingQueueBroker)。Flink设计了一种称之为Broker的数据结构,专门用于对迭代进行并发控制,而BlockingQueueBroker是Broker针对BlockingQueue类型的特定实现。因为Broker内部的核心数据结构就是ConcurrentMap<String, BlockingQueue<V>>类型,所以BlockingQueueBroker对应的类型即为:ConcurrentMap<String, BlockingQueue<BlockingQueue>>其图示如下:

BlockingQueueBroker-structure

迭代头任务跟迭代尾任务之间真正交互的数据结构是容量为1的阻塞队列:

final BlockingQueue<StreamRecord<OUT>> dataChannel = new ArrayBlockingQueue<StreamRecord<OUT>>(1);

它会被加入到BlockingQueueBroker内部的ConcurrentMap中去,对应的键是brokerID。brokerID生成规则如下:

public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) {   
    return jid + "-" + iterationID + "-" + subtaskIndex;
}

BlockingQueueBroker是单例的,因为对应的迭代头子任务和迭代尾子任务会生成相同的brokerID,所以两者在同一个JVM中会基于相同的dataChannel进行通信。dataChannel由迭代头创建并递交给BlockingQueueBroker:

BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);

由迭代尾获取:

BlockingQueue<StreamRecord<IN>> dataChannel =      
    (BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);

数据交换是基于dataChannel的,由迭代尾负责生产:

public void collect(StreamRecord<IN> record) {   
    try {      
        if (shouldWait) {         
            dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);      
        } else {         
            dataChannel.put(record);      
        }   
    } catch (InterruptedException e) {      
        throw new RuntimeException(e);   
    }
}

由迭代头负责消费:

while (running) {   
    StreamRecord<OUT> nextRecord = shouldWait ?      
        dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) :      
        dataChannel.take();

    if (nextRecord != null) {      
        for (RecordWriterOutput<OUT> output : outputs) {         
            output.collect(nextRecord);      
        }   
    } else {      
        // done      
        break;   
    }
}

因此,迭代头跟迭代尾的子任务之间通过反馈信道进行迭代数据处理的大致机制如下图:

Streaming-iteration-runtime

相信分析到这里,我们应该明白了为什么迭代头和迭代尾的并行度必须一致,且处于相同的CoLocationGroup中。Flink在执行拓扑中巧妙地化解了反馈环,从而使其适应于DAG计算模型,但却在同一个TaskManager的迭代头和迭代尾对应的子任务中借助于阻塞队列的生产者和消费者模型重塑了反馈“环”。



原文发布时间为:2016-12-12


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
关系型数据库 MySQL API
Flink CDC产品常见问题之mysql整库同步到starrock时任务挂掉如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
监控 关系型数据库 MySQL
Flink CDC产品常见问题之flink-cdc任务抓取全量的mysql数据不生效如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
消息中间件 SQL JSON
Flink问题之source并行度不同导致任务没有数据落地如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
331 0
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
482 5
|
4天前
|
存储 监控 调度
【Flink】怎么提交的实时任务,有多少Job Manager?
【4月更文挑战第18天】【Flink】怎么提交的实时任务,有多少Job Manager?
|
11天前
|
机器学习/深度学习 分布式计算 BI
Flink实时流处理框架原理与应用:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Flink实时流处理框架的原理,包括运行时架构、数据流模型、状态管理和容错机制、资源调度与优化以及与外部系统的集成。此外,还介绍了Flink在实时数据管道、分析、数仓与BI、机器学习等领域的应用实践。同时,文章提供了面试经验与常见问题解析,如Flink与其他系统的对比、实际项目挑战及解决方案,并展望了Flink的未来发展趋势。附带Java DataStream API代码样例,为学习和面试准备提供了实用素材。
33 0
|
30天前
|
Java 关系型数据库 MySQL
Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
【2月更文挑战第33天】Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
49 2
|
1月前
|
关系型数据库 MySQL API
Flink CDC产品常见问题之mysql整库同步到starrock时任务挂掉如何解决
Flink CDC产品常见问题之mysql整库同步到starrock时任务挂掉如何解决
|
1月前
|
SQL 资源调度 Oracle
Flink CDC产品常见问题之sql运行中查看日志任务失败如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
canal SQL 关系型数据库
flink cdc 提交问题之提交任务异常如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。