开发者社区> 问答> 正文

当水印小于窗口结束时,触发基于事件时间的Flink窗口操作

我在Flink测试事件时间和水印。以下是我的代码。

object WatermarkTest {

 def main(args: Array[String]): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

     val properties = new Properties()
     properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
     properties.setProperty("group.id", "enven-test")

     env.getConfig.setAutoWatermarkInterval(1L)
     val input = env.addSource(new FlinkKafkaConsumer011[String]("event-time-topic", new SimpleStringSchema(), properties))

     val inputMap = input.map(f=> {
       val arr = f.split(",")
       val code = arr(0)
       val time = arr(1).toLong
       MyEvent(code, time)
    })

    val watermark = inputMap.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator())
    val window = watermark
       .keyBy(_.code)
       .window(TumblingEventTimeWindows.of(Time.seconds(5)))
       .apply(new WindowFunctionTest)

    window.print()

   env.execute()
}

class WindowFunctionTest extends WindowFunction[MyEvent,(String, Int,String,String,String,String),String,TimeWindow]{

   override def apply(key: String, window: TimeWindow, input: Iterable[MyEvent], out: Collector[(String, Int,String,String,String,String)]): Unit = {
   val list = input.toList.sortBy(_.time)
   val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

out.collect(key,input.size,format.format(list.head.time),format.format(list.last.time),format.format(window.getStart),format.format(window.getEnd))

}

}
}
以下是事件时间和水印生成器:

class BoundedOutOfOrdernessGenerator extends

  AssignerWithPeriodicWatermarks[MyEvent] {
  val maxOutOfOrderness = 10000L

  var currentMaxTimestamp: Long = _

  val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

  var watermark: Watermark = null
  var timestamp: Long = _

  override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
       timestamp = element.time
       currentMaxTimestamp = if (timestamp > currentMaxTimestamp) timestamp else currentMaxTimestamp
       println("timestamp:" + element.code +","+ element.time + "|" +format.format(element.time) +", currentMaxTimestamp: "+  currentMaxTimestamp + "|"+ format.format(currentMaxTimestamp) + ", watermark: "+ format.format(watermark.getTimestamp))
       timestamp;
  }

  override def getCurrentWatermark(): Watermark = {
       watermark = new Watermark((currentMaxTimestamp - maxOutOfOrderness)/1000*1000);
      watermark
  }

}
在我看来,第一次计算应该在水印之后:2016-04-27 19:34:25.000。测试结果显示水印水印后触发计算:2016-04-27 19:34:24.000。任何人都能解释一下吗?

展开
收起
社区小助手 2018-12-11 16:33:29 3157 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    我建议你在getCurrentWatermark和extractTimestamp中打印水印。这应该澄清正在发生的事情。

    问题是正在调用extractTimestamp来从时间戳为19:34:35的事件中提取时间戳 - 这个事件将导致当前水印前进到19:34:25,从而触发窗口 - 和你正在打印当前的水印。在执行extractTimestamp中的println时,尚未提前水印以反映此新事件。但是在extractTimestamp返回后不久,将调用getCurrentWatermark,这会将当前水印提升到19:34:25,这将依次触发窗口。

    2019-07-17 23:19:53
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载