Flink - watermark

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

watermark,只有在有window的情况下才用到,所以在window operator前加上assignTimestampsAndWatermarks即可

不一定需要从source发出

 

1. 首先,source可以发出watermark

我们就看看kafka source的实现

复制代码
    protected AbstractFetcher(
            SourceContext<T> sourceContext,
            List<KafkaTopicPartition> assignedPartitions,
            SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,  //在创建KafkaConsumer的时候assignTimestampsAndWatermarks
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            ProcessingTimeService processingTimeProvider,
            long autoWatermarkInterval,  //env.getConfig().setAutoWatermarkInterval()
            ClassLoader userCodeClassLoader,
            boolean useMetrics) throws Exception
    {    
        //判断watermark的类型
        if (watermarksPeriodic == null) {
            if (watermarksPunctuated == null) {
                // simple case, no watermarks involved
                timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
            } else {
                timestampWatermarkMode = PUNCTUATED_WATERMARKS;
            }
        } else {
            if (watermarksPunctuated == null) {
                timestampWatermarkMode = PERIODIC_WATERMARKS;
            } else {
                throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
            }
        }
        
        // create our partition state according to the timestamp/watermark mode 
        this.allPartitions = initializePartitions(
                assignedPartitions,
                timestampWatermarkMode,
                watermarksPeriodic, watermarksPunctuated,
                userCodeClassLoader);
        
        // if we have periodic watermarks, kick off the interval scheduler
        if (timestampWatermarkMode == PERIODIC_WATERMARKS) { //如果是定期发出WaterMark
            KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts = 
                    (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
            
            PeriodicWatermarkEmitter periodicEmitter= 
                    new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval);
            periodicEmitter.start();
        }
    }
复制代码

 

FlinkKafkaConsumerBase

复制代码
    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
        checkNotNull(assigner);
        
        if (this.punctuatedWatermarkAssigner != null) {
            throw new IllegalStateException("A punctuated watermark emitter has already been set.");
        }
        try {
            ClosureCleaner.clean(assigner, true);
            this.periodicWatermarkAssigner = new SerializedValue<>(assigner);
            return this;
        } catch (Exception e) {
            throw new IllegalArgumentException("The given assigner is not serializable", e);
        }
    }
复制代码

这个接口的核心函数,定义,如何提取Timestamp和生成Watermark的逻辑

public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
    Watermark getCurrentWatermark();
}
public interface TimestampAssigner<T> extends Function {
    long extractTimestamp(T element, long previousElementTimestamp);
}

如果在初始化KafkaConsumer的时候,没有assignTimestampsAndWatermarks,就不会产生watermark

 

可以看到watermark有两种,

PERIODIC_WATERMARKS,定期发送的watermark

PUNCTUATED_WATERMARKS,由element触发的watermark,比如有element的特征或某种类型的element来表示触发watermark,这样便于开发者来控制watermark

 

initializePartitions

复制代码
case PERIODIC_WATERMARKS: {
    @SuppressWarnings("unchecked")
    KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
            (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
                    new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];

    int pos = 0;
    for (KafkaTopicPartition partition : assignedPartitions) {
        KPH kafkaHandle = createKafkaPartitionHandle(partition);

        AssignerWithPeriodicWatermarks<T> assignerInstance =
                watermarksPeriodic.deserializeValue(userCodeClassLoader);
        
        partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
                partition, kafkaHandle, assignerInstance);
    }

    return partitions;
}
复制代码

KafkaTopicPartitionStateWithPeriodicWatermarks

这个类里面最核心的函数,

复制代码
    public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
        return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
    }
    
    public long getCurrentWatermarkTimestamp() {
        Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
        if (wm != null) {
            partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp());
        }
        return partitionWatermark;
    }
复制代码

可以看到是调用你定义的AssignerWithPeriodicWatermarks来实现

 

PeriodicWatermarkEmitter

复制代码
    private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback {

        public void start() {
            timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); //start定时器,定时触发
        }
        
        @Override
        public void onProcessingTime(long timestamp) throws Exception { //触发逻辑

            long minAcrossAll = Long.MAX_VALUE;
            for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) { //对于每个partitions
                
                // we access the current watermark for the periodic assigners under the state
                // lock, to prevent concurrent modification to any internal variables
                final long curr;
                //noinspection SynchronizationOnLocalVariableOrMethodParameter
                synchronized (state) {
                    curr = state.getCurrentWatermarkTimestamp(); //取出当前partition的WaterMark
                }
                
                minAcrossAll = Math.min(minAcrossAll, curr); //求min,以partition中最小的partition作为watermark
            }
            
            // emit next watermark, if there is one
            if (minAcrossAll > lastWatermarkTimestamp) {
                lastWatermarkTimestamp = minAcrossAll;
                emitter.emitWatermark(new Watermark(minAcrossAll)); //emit
            }
            
            // schedule the next watermark
            timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); //重新设置timer
        }
    }
复制代码

 

2. DataStream也可以设置定时发送Watermark

其实实现是加了个chain的TimestampsAndPeriodicWatermarksOperator

DataStream

复制代码
   /**
     * Assigns timestamps to the elements in the data stream and periodically creates
     * watermarks to signal event time progress.
     * 
     * <p>This method creates watermarks periodically (for example every second), based
     * on the watermarks indicated by the given watermark generator. Even when no new elements
     * in the stream arrive, the given watermark generator will be periodically checked for
     * new watermarks. The interval in which watermarks are generated is defined in
     * {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
     * 
     * <p>Use this method for the common cases, where some characteristic over all elements
     * should generate the watermarks, or where watermarks are simply trailing behind the
     * wall clock time by a certain amount.
     *
     * <p>For the second case and when the watermarks are required to lag behind the maximum
     * timestamp seen so far in the elements of the stream by a fixed amount of time, and this
     * amount is known in advance, use the
     * {@link BoundedOutOfOrdernessTimestampExtractor}.
     * 
     * <p>For cases where watermarks should be created in an irregular fashion, for example
     * based on certain markers that some element carry, use the
     * {@link AssignerWithPunctuatedWatermarks}.
     * 
     * @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and
     *                                      watermark generator.   
     * @return The stream after the transformation, with assigned timestamps and watermarks.
     * 
     * @see AssignerWithPeriodicWatermarks
     * @see AssignerWithPunctuatedWatermarks
     * @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks) 
     */
    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
            AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
        
        // match parallelism to input, otherwise dop=1 sources could lead to some strange
        // behaviour: the watermark will creep along very slowly because the elements
        // from the source go to each extraction operator round robin.
        final int inputParallelism = getTransformation().getParallelism();
        final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);
        
        TimestampsAndPeriodicWatermarksOperator<T> operator = 
                new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
        
        return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
                .setParallelism(inputParallelism);
    }
复制代码

 

TimestampsAndPeriodicWatermarksOperator

复制代码
  public class TimestampsAndPeriodicWatermarksOperator<T>
        extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
        implements OneInputStreamOperator<T, T>, Triggerable {
    
    private transient long watermarkInterval;
    private transient long currentWatermark;

    public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
        super(assigner); //AbstractUdfStreamOperator(F userFunction)
        this.chainingStrategy = ChainingStrategy.ALWAYS; //一定是chain
    }

    @Override
    public void open() throws Exception {
        super.open();

        currentWatermark = Long.MIN_VALUE;
        watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
        
        if (watermarkInterval > 0) {
            registerTimer(System.currentTimeMillis() + watermarkInterval, this); //注册到定时器
        }
    }

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        final long newTimestamp = userFunction.extractTimestamp(element.getValue(), //由element中基于AssignerWithPeriodicWatermarks提取时间戳
                element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
        
        output.collect(element.replace(element.getValue(), newTimestamp)); //更新element的时间戳,再次发出
    }

    @Override
    public void trigger(long timestamp) throws Exception { //定时器触发trigger
        // register next timer
        Watermark newWatermark = userFunction.getCurrentWatermark(); //取得watermark
        if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
            currentWatermark = newWatermark.getTimestamp();
            // emit watermark
            output.emitWatermark(newWatermark); //发出watermark
        }

        registerTimer(System.currentTimeMillis() + watermarkInterval, this); //重新注册到定时器
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        // if we receive a Long.MAX_VALUE watermark we forward it since it is used
        // to signal the end of input and to not block watermark progress downstream
        if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
            currentWatermark = Long.MAX_VALUE;
            output.emitWatermark(mark); //forward watermark
        }
    }
复制代码

 

可以看到在processElement会调用AssignerWithPeriodicWatermarks.extractTimestamp提取event time

然后更新StreamRecord的时间

 

然后在Window Operator中,

@Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

会在windowAssigner.assignWindows时以element的timestamp作为assign时间

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
Flink Watermark和时间语义
Flink Watermark和时间语义
107 2
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
118 0
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
70 0
时间的守卫者:揭秘Flink中Watermark如何掌控数据流的时空秩序?
【8月更文挑战第26天】Apache Flink是一款功能强大的流处理框架,其Watermark机制为核心,确保了系统即使面对数据乱序或延迟也能准确处理时间相关的特性。Watermark作为一种特殊事件,标记了所有在此之前发生事件的最晚时间点,这对于时间窗口操作至关重要。
101 0
【Flink】Flink 中的Watermark机制
【4月更文挑战第21天】【Flink】Flink 中的Watermark机制
Flink报错问题之使用Watermark报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
【天衍系列 03】深入理解Flink的Watermark:实时流处理的时间概念与乱序处理
【天衍系列 03】深入理解Flink的Watermark:实时流处理的时间概念与乱序处理
213 5
Flink的Interval Join是基于水印(Watermark)和时间窗口(Time Window)实现的
Flink的Interval Join是基于水印(Watermark)和时间窗口(Time Window)实现的
406 2
|
11月前
|
在Flink中,水位线(Watermark)扮演着重要的角色
在Flink中,水位线(Watermark)扮演着重要的角色
101 2

热门文章

最新文章