Flink流处理之迭代案例

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 当前Flink将迭代的重心集中在批处理上,之前我们谈及了批量迭代和增量迭代主要是针对批处理(DataSet)API而言的,并且Flink为批处理中的迭代提供了针对性的优化。但是对于流处理(DataStream),Flink同样提供了对迭代的支持,这一节我们主要来分析流处理中的迭代,我们将会看到流处理中的迭代相较于批处理有相似之处,但差异也是十分之明显。

当前Flink将迭代的重心集中在批处理上,之前我们谈及了批量迭代和增量迭代主要是针对批处理(DataSet)API而言的,并且Flink为批处理中的迭代提供了针对性的优化。但是对于流处理(DataStream),Flink同样提供了对迭代的支持,这一节我们主要来分析流处理中的迭代,我们将会看到流处理中的迭代相较于批处理有相似之处,但差异也是十分之明显。

可迭代的流处理程序允许定义“步函数”(step function)并将其内嵌到一个可迭代的流(IterativeStream)中。因为一个流处理程序可能永不终止,因此不同于批处理中的迭代机制,流处理中无法设置迭代的最大次数。取而代之的是,你可以指定等待反馈输入的最大时间间隔(如果超过该时间间隔没有反馈元素到来,那么该迭代将会终止)。通过应用split或filter转换,你可以指定流的哪一部分用于反馈给迭代头,哪一部分分发给下游。这里我们以filter作为示例来展示可迭代的流处理程序的API使用模式。

首先,基于输入流构建IterativeStream,这是一个迭代的起始,通常称之为迭代头:

IterativeStream<Integer> iteration = inputStream.iterate();

接着,我们指定一系列的转换操作用于表述在迭代过程中执行的逻辑(这里简单以map转换作为示例),map API所接受的UDF就是我们上文所说的步函数:

DataStream<Integer> iteratedStream = iteration.map(/* this is executed many times */);

然后,作为迭代我们肯定需要有数据反馈给迭代头进行重复计算,所以我们从迭代过的流中过滤出符合条件的元素组成的部分流,我们称之为反馈流:

DataStream<Integer> feedbackStream = iteratedStream.filter(/* one part of the stream */);

将反馈流反馈给迭代头就意味着一个迭代的完整逻辑的完成,那么它就可以“关闭”这个闭合的“环”了。通过调用IterativeStream的closeWith这一实例方法可以关闭一个迭代(也可表述为定义了迭代尾)。传递给closeWith的数据流将会反馈给迭代头:

iteration.closeWith(feedbackStream);

另外,一个惯用的模式是过滤出需要继续向前分发的部分流,这个过滤转换其实定义的是“终止迭代”的逻辑条件,符合条件的元素将被分发给下游而不用于进行下一次迭代:

DataStream<Integer> output = iteratedStream.filter(/* some other part of the stream */);

跟分析批处理中的迭代一样,我们仍然以解决实际问题的案例作为切入点来看看流处理中的迭代跟批处理中的迭代有何不同。

首先描述一下需要解决的问题:产生一个由一系列二元组(两个字段都是在一个区间内产生的正整数来作为斐波那契数列的两个初始值)构成的数据流,然后对该数据流中的二元组不断地迭代使其产生斐波那契数列,直到某次产生的值大于给定的阈值,则停止迭代并输出迭代次数。

该案例参考自Flink随源码发布的迭代示例,此案例问题规模较小并且能够说明问题。但它示例代码中的一系列变量稍显混乱,为了增强程序的表述性,笔者会对其稍作调整。

这个案例如果拆分到对单个元素(二元组)的角度来看,其执行过程如下图所示:

Streaming-iteration-demo

n表示迭代次数,在最初的map转换中初始化为0;m是判定迭代停止的阈值;

另外,T后面跟的是字段索引,比如T2表示取元组中位置为3的字段。且注意随着迭代T在不断变化。

上面我们已经对问题的核心过程进行了分析,接下来我们会分步解决这个问题的构建迭代的流处理程序。

首先,我们先通过source函数创建初始的流对象inputStream:

DataStream<Tuple2<Integer, Integer>> inputStream = env.addSource(new RandomFibonacciSource());

该source函数会生成二元组序列,二元组的两个字段值是随机生成的作为斐波那契数列的初始值:

private static class RandomFibonacciSource        
    implements SourceFunction<Tuple2<Integer, Integer>> {    

    private Random random = new Random();    
    private volatile boolean isRunning = true;    
    private int counter = 0;   

    public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {        
        while (isRunning && counter < MAX_RANDOM_VALUE) {            
            int first = random.nextInt(MAX_RANDOM_VALUE / 2 - 1) + 1;            
            int second = random.nextInt(MAX_RANDOM_VALUE / 2 -1) + 1;  

            if (first > second) continue;            

            ctx.collect(new Tuple2<Integer, Integer>(first, second));            
            counter++;            
            Thread.sleep(50);        
        }    
    }    

    public void cancel() {        
        isRunning = false;    
    }
}

为了对新计算的斐波那契数列中的值以及累加的迭代次数进行存储,我们需要将二元组数据流转换为五元组数据流,并据此创建迭代对象:

IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> iterativeStream =        
    inputStream.map(new TupleTransformMapFunction()).iterate(5000);

注意上面代码段中iterate API的参数5000,不是指迭代5000次,而是等待反馈输入的最大时间间隔为5秒。流被认为是无界的,所以无法像批处理迭代那样指定最大迭代次数。但它允许指定一个最大等待间隔,如果在给定的时间间隔里没有元素到来,那么将会终止迭代。

元组转换的map函数实现:

private static class TupleTransformMapFunction extends RichMapFunction<Tuple2<Integer,        
    Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {    
    public Tuple5<Integer, Integer, Integer, Integer, Integer> map(            
        Tuple2<Integer, Integer> inputTuples) throws Exception {        
        return new Tuple5<Integer, Integer, Integer, Integer, Integer>(                
            inputTuples.f0,                
            inputTuples.f1,                
            inputTuples.f0,                
            inputTuples.f1,                
            0);    
    }
}

上面五元组中,其中索引为0,1这两个位置的元素,始终都是最初生成的两个元素不会变化,而后三个字段都会随着迭代而变化。

在迭代流iterativeStream创建完成之后,我们将基于它执行斐波那契数列的步函数并产生斐波那契数列流fibonacciStream:

DataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> fibonacciStream =        
    iterativeStream.map(new FibonacciCalcStepFunction());

这里的fibonacciStream只是一个代称,其中的数据并不是真正的斐波那契数列,其实就是上面那个五元组。

其中用于计算斐波那契数列的步函数实现如下:

private static class FibonacciCalcStepFunction extends        
    RichMapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,        
    Tuple5<Integer, Integer, Integer, Integer, Integer>> {    
    public Tuple5<Integer, Integer, Integer, Integer, Integer> map(            
        Tuple5<Integer, Integer, Integer, Integer, Integer> inputTuple) throws Exception {        
        return new Tuple5<Integer, Integer, Integer, Integer, Integer>(                
            inputTuple.f0,                
            inputTuple.f1,                
            inputTuple.f3,                
            inputTuple.f2 + inputTuple.f3,                
            ++inputTuple.f4);    
    }
}

正如上文所述,后三个字段会产生变化,在计算之前,数列最后一个元素会被保留,也就是f3对应的元素,然后通过f2元素加上f3元素会产生最新值并更新f3元素,而f4则会累加。

随着迭代次数增加,不是整个数列都会被保留,只有最初的两个元素和最新的两个元素会被保留,这里也没必要保留整个数列,因为我们不需要完整的数列,我们只需要对最新的两个元素进行判断即可。

上文我们对每个元素计算斐波那契数列的新值并产生了fibonacciStream,但是我们需要对最新的两个值进行判断,看它们是否超过了指定的阈值。超过了阈值的元组将会被输出,而没有超过的则会再次参与迭代。因此这将产生两个不同的分支,我们也为此构建了分支流:

SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> branchedStream =        
    fibonacciStream.split(new FibonacciOverflowSelector());

而对是否超过阈值的元组进行判断并分离的实现如下:

private static class FibonacciOverflowSelector implements OutputSelector<
    Tuple5<Integer, Integer, Integer, Integer, Integer>> {    
    public Iterable<String> select(            
        Tuple5<Integer, Integer, Integer, Integer, Integer> inputTuple) {        
        if (inputTuple.f2 < OVERFLOW_THRESHOLD && inputTuple.f3 < OVERFLOW_THRESHOLD) {            
            return Collections.singleton(ITERATE_FLAG);        
        }        

        return Collections.singleton(OUTPUT_FLAG);    
    }
}

在筛选方法select中,我们对不同的分支以不同的常量标识符进行标识:ITERATE_FLAG(还要继续迭代)和OUTPUT_FLAG(直接输出)。

产生了分支流之后,我们就可以从中检出不同的流分支做迭代或者输出处理。对需要再次迭代的,就通过迭代流的closeWith方法反馈给迭代头:

iterativeStream.closeWith(branchedStream.select(ITERATE_FLAG));

而对于不需要的迭代就直接让其流向下游处理,这里我们只是简单得将流“重构”了一下然后直接输出:

DataStream<Tuple3<Integer, Integer, Integer>> outputStream = branchedStream        
    .select(OUTPUT_FLAG).map(new BuildOutputTupleMapFunction());
outputStream.print();

所谓的重构就是将之前的五元组重新缩减为三元组,实现如下:

private static class BuildOutputTupleMapFunction extends RichMapFunction<        
    Tuple5<Integer, Integer, Integer, Integer, Integer>,        
    Tuple3<Integer, Integer, Integer>> {    
    public Tuple3<Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer,            
        Integer> inputTuple) throws Exception {        
        return new Tuple3<Integer, Integer, Integer>(                
            inputTuple.f0,                
            inputTuple.f1,                
            inputTuple.f4);    
    }
}

最终我们将会得到类似如下的输出:

(7,14,5)

(18,37,3)

(3,46,3)

(23,32,3)

(31,43,2)

(13,45,2)

(37,42,2)

……

前两个整数是斐波那契数列的两个初始值,第三个整数表示其需要经历多少次迭代其斐波那契数列最新的两个值才会超过阈值。

最终完整的主干程序代码如下:

public static void main(String[] args) throws Exception {    
    StreamExecutionEnvironment env = StreamExecutionEnvironment            
        .getExecutionEnvironment().setBufferTimeout(1);

    DataStream<Tuple2<Integer, Integer>> inputStream = env.addSource(new RandomFibonacciSource());    

    IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> iterativeStream =            
        inputStream.map(new TupleTransformMapFunction()).iterate(5000);    

    DataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> fibonacciStream =            
        iterativeStream.map(new FibonacciCalcStepFunction());    

    SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> branchedStream =            
        fibonacciStream.split(new FibonacciOverflowSelector());    

    iterativeStream.closeWith(branchedStream.select(ITERATE_FLAG));    

    DataStream<Tuple3<Integer, Integer, Integer>> outputStream = branchedStream            
        .select(OUTPUT_FLAG).map(new BuildOutputTupleMapFunction());    

    outputStream.print();    

env.execute("Streaming Iteration Example");



原文发布时间为:2016-11-29 本文作者:vinoYang 本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
476 5
|
5月前
|
Java Linux API
flink入门-流处理
flink入门-流处理
108 0
|
6月前
|
分布式计算 资源调度 监控
没有监控的流处理作业与茫茫大海中的裸泳无异 - 附 flink 与 spark 作业监控脚本实现
没有监控的流处理作业与茫茫大海中的裸泳无异 - 附 flink 与 spark 作业监控脚本实现
|
5月前
|
分布式计算 Java API
Flink教程(04)- Flink入门案例
Flink教程(04)- Flink入门案例
57 0
|
2月前
|
运维 监控 数据处理
【天衍系列 03】深入理解Flink的Watermark:实时流处理的时间概念与乱序处理
【天衍系列 03】深入理解Flink的Watermark:实时流处理的时间概念与乱序处理
|
3月前
|
存储 数据挖掘 Apache
【Flink】Flink 有状态的流处理
【1月更文挑战第26天】【Flink】Flink 有状态的流处理
|
3月前
|
关系型数据库 数据处理 流计算
【Flink】Flink 流处理和批处理
【1月更文挑战第26天】【Flink】Flink 流处理和批处理
|
3月前
|
机器学习/深度学习 算法 物联网
实时计算Flink版:引领流处理的新时代
实时计算Flink版:引领流处理的新时代
|
4月前
|
数据可视化 JavaScript 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
43 0
|
4月前
|
SQL 消息中间件 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
71 0