Flink - watermark

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

watermark,只有在有window的情况下才用到,所以在window operator前加上assignTimestampsAndWatermarks即可

不一定需要从source发出

 

1. 首先,source可以发出watermark

我们就看看kafka source的实现

复制代码
    protected AbstractFetcher(
            SourceContext<T> sourceContext,
            List<KafkaTopicPartition> assignedPartitions,
            SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,  //在创建KafkaConsumer的时候assignTimestampsAndWatermarks
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            ProcessingTimeService processingTimeProvider,
            long autoWatermarkInterval,  //env.getConfig().setAutoWatermarkInterval()
            ClassLoader userCodeClassLoader,
            boolean useMetrics) throws Exception
    {    
        //判断watermark的类型
        if (watermarksPeriodic == null) {
            if (watermarksPunctuated == null) {
                // simple case, no watermarks involved
                timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
            } else {
                timestampWatermarkMode = PUNCTUATED_WATERMARKS;
            }
        } else {
            if (watermarksPunctuated == null) {
                timestampWatermarkMode = PERIODIC_WATERMARKS;
            } else {
                throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
            }
        }
        
        // create our partition state according to the timestamp/watermark mode 
        this.allPartitions = initializePartitions(
                assignedPartitions,
                timestampWatermarkMode,
                watermarksPeriodic, watermarksPunctuated,
                userCodeClassLoader);
        
        // if we have periodic watermarks, kick off the interval scheduler
        if (timestampWatermarkMode == PERIODIC_WATERMARKS) { //如果是定期发出WaterMark
            KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts = 
                    (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
            
            PeriodicWatermarkEmitter periodicEmitter= 
                    new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval);
            periodicEmitter.start();
        }
    }
复制代码

 

FlinkKafkaConsumerBase

复制代码
    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
        checkNotNull(assigner);
        
        if (this.punctuatedWatermarkAssigner != null) {
            throw new IllegalStateException("A punctuated watermark emitter has already been set.");
        }
        try {
            ClosureCleaner.clean(assigner, true);
            this.periodicWatermarkAssigner = new SerializedValue<>(assigner);
            return this;
        } catch (Exception e) {
            throw new IllegalArgumentException("The given assigner is not serializable", e);
        }
    }
复制代码

这个接口的核心函数,定义,如何提取Timestamp和生成Watermark的逻辑

public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
    Watermark getCurrentWatermark();
}
public interface TimestampAssigner<T> extends Function {
    long extractTimestamp(T element, long previousElementTimestamp);
}

如果在初始化KafkaConsumer的时候,没有assignTimestampsAndWatermarks,就不会产生watermark

 

可以看到watermark有两种,

PERIODIC_WATERMARKS,定期发送的watermark

PUNCTUATED_WATERMARKS,由element触发的watermark,比如有element的特征或某种类型的element来表示触发watermark,这样便于开发者来控制watermark

 

initializePartitions

复制代码
case PERIODIC_WATERMARKS: {
    @SuppressWarnings("unchecked")
    KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
            (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
                    new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];

    int pos = 0;
    for (KafkaTopicPartition partition : assignedPartitions) {
        KPH kafkaHandle = createKafkaPartitionHandle(partition);

        AssignerWithPeriodicWatermarks<T> assignerInstance =
                watermarksPeriodic.deserializeValue(userCodeClassLoader);
        
        partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
                partition, kafkaHandle, assignerInstance);
    }

    return partitions;
}
复制代码

KafkaTopicPartitionStateWithPeriodicWatermarks

这个类里面最核心的函数,

复制代码
    public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
        return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
    }
    
    public long getCurrentWatermarkTimestamp() {
        Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
        if (wm != null) {
            partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp());
        }
        return partitionWatermark;
    }
复制代码

可以看到是调用你定义的AssignerWithPeriodicWatermarks来实现

 

PeriodicWatermarkEmitter

复制代码
    private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback {

        public void start() {
            timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); //start定时器,定时触发
        }
        
        @Override
        public void onProcessingTime(long timestamp) throws Exception { //触发逻辑

            long minAcrossAll = Long.MAX_VALUE;
            for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) { //对于每个partitions
                
                // we access the current watermark for the periodic assigners under the state
                // lock, to prevent concurrent modification to any internal variables
                final long curr;
                //noinspection SynchronizationOnLocalVariableOrMethodParameter
                synchronized (state) {
                    curr = state.getCurrentWatermarkTimestamp(); //取出当前partition的WaterMark
                }
                
                minAcrossAll = Math.min(minAcrossAll, curr); //求min,以partition中最小的partition作为watermark
            }
            
            // emit next watermark, if there is one
            if (minAcrossAll > lastWatermarkTimestamp) {
                lastWatermarkTimestamp = minAcrossAll;
                emitter.emitWatermark(new Watermark(minAcrossAll)); //emit
            }
            
            // schedule the next watermark
            timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); //重新设置timer
        }
    }
复制代码

 

2. DataStream也可以设置定时发送Watermark

其实实现是加了个chain的TimestampsAndPeriodicWatermarksOperator

DataStream

复制代码
   /**
     * Assigns timestamps to the elements in the data stream and periodically creates
     * watermarks to signal event time progress.
     * 
     * <p>This method creates watermarks periodically (for example every second), based
     * on the watermarks indicated by the given watermark generator. Even when no new elements
     * in the stream arrive, the given watermark generator will be periodically checked for
     * new watermarks. The interval in which watermarks are generated is defined in
     * {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
     * 
     * <p>Use this method for the common cases, where some characteristic over all elements
     * should generate the watermarks, or where watermarks are simply trailing behind the
     * wall clock time by a certain amount.
     *
     * <p>For the second case and when the watermarks are required to lag behind the maximum
     * timestamp seen so far in the elements of the stream by a fixed amount of time, and this
     * amount is known in advance, use the
     * {@link BoundedOutOfOrdernessTimestampExtractor}.
     * 
     * <p>For cases where watermarks should be created in an irregular fashion, for example
     * based on certain markers that some element carry, use the
     * {@link AssignerWithPunctuatedWatermarks}.
     * 
     * @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and
     *                                      watermark generator.   
     * @return The stream after the transformation, with assigned timestamps and watermarks.
     * 
     * @see AssignerWithPeriodicWatermarks
     * @see AssignerWithPunctuatedWatermarks
     * @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks) 
     */
    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
            AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
        
        // match parallelism to input, otherwise dop=1 sources could lead to some strange
        // behaviour: the watermark will creep along very slowly because the elements
        // from the source go to each extraction operator round robin.
        final int inputParallelism = getTransformation().getParallelism();
        final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);
        
        TimestampsAndPeriodicWatermarksOperator<T> operator = 
                new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
        
        return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
                .setParallelism(inputParallelism);
    }
复制代码

 

TimestampsAndPeriodicWatermarksOperator

复制代码
  public class TimestampsAndPeriodicWatermarksOperator<T>
        extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
        implements OneInputStreamOperator<T, T>, Triggerable {
    
    private transient long watermarkInterval;
    private transient long currentWatermark;

    public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
        super(assigner); //AbstractUdfStreamOperator(F userFunction)
        this.chainingStrategy = ChainingStrategy.ALWAYS; //一定是chain
    }

    @Override
    public void open() throws Exception {
        super.open();

        currentWatermark = Long.MIN_VALUE;
        watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
        
        if (watermarkInterval > 0) {
            registerTimer(System.currentTimeMillis() + watermarkInterval, this); //注册到定时器
        }
    }

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        final long newTimestamp = userFunction.extractTimestamp(element.getValue(), //由element中基于AssignerWithPeriodicWatermarks提取时间戳
                element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
        
        output.collect(element.replace(element.getValue(), newTimestamp)); //更新element的时间戳,再次发出
    }

    @Override
    public void trigger(long timestamp) throws Exception { //定时器触发trigger
        // register next timer
        Watermark newWatermark = userFunction.getCurrentWatermark(); //取得watermark
        if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
            currentWatermark = newWatermark.getTimestamp();
            // emit watermark
            output.emitWatermark(newWatermark); //发出watermark
        }

        registerTimer(System.currentTimeMillis() + watermarkInterval, this); //重新注册到定时器
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        // if we receive a Long.MAX_VALUE watermark we forward it since it is used
        // to signal the end of input and to not block watermark progress downstream
        if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
            currentWatermark = Long.MAX_VALUE;
            output.emitWatermark(mark); //forward watermark
        }
    }
复制代码

 

可以看到在processElement会调用AssignerWithPeriodicWatermarks.extractTimestamp提取event time

然后更新StreamRecord的时间

 

然后在Window Operator中,

@Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

会在windowAssigner.assignWindows时以element的timestamp作为assign时间

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
13天前
|
数据处理 Apache 流计算
【Flink】Flink 中的Watermark机制
【4月更文挑战第21天】【Flink】Flink 中的Watermark机制
|
3月前
|
存储 SQL Java
Flink报错问题之使用Watermark报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
3月前
|
运维 监控 数据处理
【天衍系列 03】深入理解Flink的Watermark:实时流处理的时间概念与乱序处理
【天衍系列 03】深入理解Flink的Watermark:实时流处理的时间概念与乱序处理
|
9月前
|
分布式计算 数据处理 流计算
【原理】Flink如何巧用WaterMark机制解决乱序问题
【原理】Flink如何巧用WaterMark机制解决乱序问题
|
消息中间件 存储 Kafka
带你理解并使用flink中的WaterMark机制
提问:你了解事件的乱序吗?乱序是怎么产生的呢?在flink流处理中是以什么事件类型判定乱序的呢? 当一条一条的数据从产生到经过消息队列传输,然后Flink接受后处理,这个流程中数据都是按照数据产生的先后顺序在flink中处理的,这时候就是有序的数据流。
1077 0
带你理解并使用flink中的WaterMark机制
|
消息中间件 自然语言处理 程序员
阿粉带你学习Flink中的Watermark
大家好,我是鸭血粉丝(大家会亲切的喊我 「阿粉」),是一位喜欢吃鸭血粉丝的程序员,大家要慢慢熟悉我的存在呦,因为接下来所有的技术,都是我来给大家展示的,昨天刚回想起线上OOM,并且做了性能优化,但是接下来,学习的步伐不能停止,在解决Bug的同时,我依旧要学习,大家来继续和我看一下Flink吧!
阿粉带你学习Flink中的Watermark
|
消息中间件 Kafka 数据处理
Flink的Watermark机制(基于Flink 1.11.0实现)
在使用eventTime的时候如何处理乱序数据?我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络延迟等原因,导致乱序的产生,特别是使用kafka的话,多个分区的数据无法保证有序。所以在进行window计算的时候,我们又不能无限期的等下去,必须要有个机制
|
3月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
594 5
|
2月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1620 2
官宣|Apache Flink 1.19 发布公告
|
2月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
280 3