Flink批处理优化器之范围分区重写采用算法

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 采样算法 上一篇我们分析了RangePartitionRewriter的数据处理分支,接下来我们开始分析采样分支,采样分支的核心在于采样算法。因为范围分区输入端每个分区的数据量无从得知,也就是说我们无法得出采样比例。

采样算法

上一篇我们分析了RangePartitionRewriter的数据处理分支,接下来我们开始分析采样分支,采样分支的核心在于采样算法。因为范围分区输入端每个分区的数据量无从得知,也就是说我们无法得出采样比例。此时,如果先对每分区内的所有数据进行遍历,再记录出数据总量会显得很低效,因此Flink选择借助于水塘抽样算法(https://en.wikipedia.org/wiki/Reservoir_sampling)来解决这个问题。

水塘抽样法是一种在线抽样法,可以在不知道样本总量或因样本数量太大而无法载入内存的情况下实现等概率抽样。

在实现时,Flink参考了IBM研究中心对该算法进行改进的一篇论文(Tirthapura, S., & Woodruff, D. P. (2011). Optimal Random Sampling from Distributed Streams Revisited. Lecture Notes in Computer Science),该论文对水塘抽样算法进行了改进以支持对大规模数据流进行随机采样,当输入元素是分布式且跨多个站点,这些站点之间的通信基于一个中央协调器。该算法被封装在ReservoirSamplerWithoutReplacement和ReservoirSamplerWithReplacement这两个类中。它们的继承关系图如下:

这两个采样类的实现基本都遵循如下两步:第一步,在每个分区中为其中的每个元素生成权重,选择权重最大的top K个元素作为每个分区的输出;第二步,从第一步的每个分区收集的K个元素中(此时总共是K * 分区个数的元素数目)选择权重最大的top K个元素。对于这两个采样类而言,第二步两者都是一致的,这部分的逻辑被封装在它们的父类DistributedRandomSampler中。区别在于第一步,ReservoirSamplerWithoutReplacement为每个输入元素生成一个随机数作为其权重,所以其不会重复选择元素,而ReservoirSamplerWithReplacement在第一步会为每个元素生成k次权重,这会导致一个元素可能会在计算top K时被多次选中。

就实现而言,第一步在DistributedRandomSampler中被定义为抽象方法sampleInPartition供子类实现,并要求在单个分区上执行,第二步则由DistributedRandomSampler自行实现,方法名为sampleInCoordinator,在一个全局归约函数中执行。

Flink基于MapPartition实现了一个UDF名为SampleInPartition,将两个采样算法的第一步应用其中,其对应的计划节点的并行度跟改写前的source节点的并行度一致。而对于第二步,Flink基于GroupReduceFunction实现了一个名为SampleInCoordinator的UDF用于归并所有来自各个SampleInPartition的样本输出,它会在全局归约函数中执行协调端的总体采样逻辑。用户必须确保该计划节点的并行度为1,才能使其成为唯一的中央协调器。当采样的样本数据确定之后就可以确定边界了,承担该职责的是范围分区构建器(RangeBoundaryBuilder),它是函数RichMapPartitionFunction的UDF实现,其计划节点的并行度跟SampleInCoordinator所对应的并行度保持一致。采样分支的并行化Dataflow示意图如下:

下面我们来分析一下代码实现,先确定的是样本总量(也就是top K的K的值),计算方式为每个分区的样本数乘以通道连接的下游目标范围分区的并行度(因为其关系到最终范围的划分边界):

final int sampleSize = SAMPLES_PER_PARTITION * targetParallelism;

这里,SAMPLES_PER_PARTITION常量表示每个分区的采样数,默认值为1000条。

在样本总量确定之后,就可以进行采样了,采样的具体实现并没什么特别的,就是按照上面的分析来实现,不再细述。我们主要来看一下如何根据最终的样本数据确定范围分区的每个分区的边界。

第一步对样本进行排序:

Collections.sort(sampledData, new Comparator<T>() {

    @Override
    public int compare(T first, T second) {
        return comparator.compare(first, second);
    }

});

第二步采用平均划分法来计算每个分区的边界,边界被存储于一个二维数组中,因为根据样本提取的临界值将会作为比较器的键存储在Object[]中。

int boundarySize = parallelism - 1;
Object[][] boundaries = new Object[boundarySize][];
if (sampledData.size() > 0) {
    //计算拆分的段
    double avgRange = sampledData.size() / (double) parallelism;
    int numKey = comparator.getFlatComparators().length;
    //每个并行度(分区)一个边界值
    for (int i = 1; i < parallelism; i++) {
        //计算得到靠近段尾的采样记录作为边界界定标准
        T record = sampledData.get((int) (i * avgRange));
        Object[] keys = new Object[numKey];
        comparator.extractKeys(record, keys, 0);
        boundaries[i-1] = keys;
    }
}

计算得到的boundaries会被输出到广播通道:

final NamedChannel broadcastChannel = new NamedChannel("RangeBoundaries", rbPlanNode);
broadcastChannel.setShipStrategy(ShipStrategyType.BROADCAST, DataExchangeMode.PIPELINED);
broadcastChannel.setTarget(ariPlanNode);
List<NamedChannel> broadcastChannels = new ArrayList<>(1);
broadcastChannels.add(broadcastChannel);
ariPlanNode.setBroadcastInputs(broadcastChannels);

广播通道连接着采样分支的尾部和数据处理分支的头部。


原文发布时间为:2017-04-07

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
消息中间件 Kafka 流计算
Flink的分区表订阅功能是通过Kafka的topic分区来实现的
Flink的分区表订阅功能是通过Kafka的topic分区来实现的【1月更文挑战第6天】【1月更文挑战第26篇】
100 1
|
6月前
|
消息中间件 存储 Kafka
Flink---11、状态管理(按键分区状态(值状态、列表状态、Map状态、归约状态、聚合状态)算子状态(列表状态、广播状态))
Flink---11、状态管理(按键分区状态(值状态、列表状态、Map状态、归约状态、聚合状态)算子状态(列表状态、广播状态))
|
6月前
|
流计算
在Flink中,Regular Join(包括Left Join)的结果顺序是由Flink的分区策略和数据的分布方式共同决定的
在Flink中,Regular Join(包括Left Join)的结果顺序是由Flink的分区策略和数据的分布方式共同决定的
24 1
|
1月前
|
存储 SQL 算法
flink cdc 算法问题之low hign点位有重叠如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
3月前
|
关系型数据库 数据处理 流计算
【Flink】Flink 流处理和批处理
【1月更文挑战第26天】【Flink】Flink 流处理和批处理
|
3月前
|
流计算
Flink CDC里关于doris的动态分区问题,对以及建好的动态分区表,可以再次修改历史分区的保留时间嘛?
【1月更文挑战第24天】【1月更文挑战第117篇】Flink CDC里关于doris的动态分区问题,对以及建好的动态分区表,可以再次修改历史分区的保留时间嘛?
31 6
|
3月前
|
机器学习/深度学习 算法 搜索推荐
Flink中的流式机器学习是什么?请解释其作用和常用算法。
Flink中的流式机器学习是什么?请解释其作用和常用算法。
36 0
|
6月前
|
消息中间件 存储 算法
Flink---13、容错机制(检查点(保存、恢复、算法、配置)、状态一致性、端到端精确一次)
Flink---13、容错机制(检查点(保存、恢复、算法、配置)、状态一致性、端到端精确一次)
|
6月前
|
传感器 存储 缓存
Flink---10、处理函数(基本处理函数、按键分区处理函数、窗口处理函数、应用案例TopN、侧输出流)
Flink---10、处理函数(基本处理函数、按键分区处理函数、窗口处理函数、应用案例TopN、侧输出流)
|
6月前
|
分布式计算 负载均衡 算法
Flink---5、聚合算子、用户自定义函数、物理分区算子、分流、合流
Flink---5、聚合算子、用户自定义函数、物理分区算子、分流、合流