Flink之watermark 处理延迟数据 详解

watermark介绍

在Flink中,Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制, 本质上是一种时间戳。 用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。
从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,过程中会受到网络延迟、背压等多种因素影响造成数据乱序。在进行窗口处理时,不可能无限期的等待延迟数据到达,当到达特定watermark时,认为在watermark之前的数据已经全部达到(即使后面还有延迟的数据), 可以触发窗口计算,这个机制就是 Watermark(水位线)。

在这里插入图片描述
在这里插入图片描述
如上图:
● w(11): 表示11之前的数据到已经到达,11之前的数据可以进行计算了。
● w(20): 表示20之前的数据到已经到达,20之前的数据可以进行计算了。

watermark的使用

生成时机

watermark可以在接收到DataSource的数据后,立刻生成Watermark。也可以在DataSource后,使用map或者filter操作后再生成watermark。
水位线生产的最佳位置是在尽可能靠近数据源的地方,因为水位线生成时会做出一些有关元素顺序相对时间戳的假设。由于数据源读取过程是并行的,一切引起Flink跨行数据流分区进行重新分发的操作(比如:改变并行度,keyby等)都会导致元素时间戳乱序。但是如果是某些初始化的filter、map等不会引起元素重新分发的操作,所以是可以考虑在生成水位线之前使用。

watermark的计算

watermark = 进入 Flink 窗口的最大的事件时间(maxEventTime) — 指定的延迟时间(t)

生成方式

第一种:With Periodic Watermarks

这个是周期性触发Waterrmark的生成和发送。
周期性分配水位线在程序中会比较常用,是我们会指示系统以固定的时间间隔发出的水位线。
在设置时间为事件时间时,会默认设置这个时间间隔为200ms, 如果需要调整可以自行设置。

设置任务时间类型和

 val env = StreamExecutionEnvironment.getExecutionEnvironment
 //设置时间使用事件时间
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 //设置并行度为1
 env.setParallelism(1)
 //设置自动周期性的产生watermark,默认值为200毫秒
 env.getConfig.setAutoWatermarkInterval(1000)

设置水位线watermark的值

     
    //通过本地socket端口获取数据
    val dataStream = env.socketTextStream("127.0.0.1",10010)
     //对数据的数据进行转换为tuple2的格式
     val tupStream = dataStream.map(line => {
        val arr = line.split(" ")
        (arr(0),arr(1).toLong)
      })
          
   //设置水位线
    val waterDataStream = tupStream.assignTimestampsAndWatermarks(
      //设置时间最低延迟
      WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
        //设置时间戳
        .withTimestampAssigner(new SerializableTimestampAssigner[Tuple2[String,Long]] {
          //当前最大的值
          var currentMaxNum = 0l
          val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          override def extractTimestamp(t: Tuple2[String,Long], recordTimesstamp: Long): Long = {
            val eTime = t._2
            currentMaxNum = Math.max(eTime,currentMaxNum)
            //当前最大的值减去 允许乱序的数据,即为现在的水位线值。
            //注意:这些代码只是为了本地观察方便,正常开发中是不需这样写的。
            val waterMark = currentMaxNum - 2000;
            println("数据:"+t.toString()+"  ,"+sdf.format(eTime)+" ,  当前watermark: "+sdf.format(waterMark))
            eTime
          }
        })
    )

    //对数据进行计算和输出
    waterDataStream.keyBy(_._1).timeWindow(Time.seconds(3)).reduce((e1, e2)=>{
            (e1._1,e1._2+e2._2)
          }).print()

输入和输出:

--------------------输入
s3 1639100010955
s2 1639100009955
s1 1639100008955
s0 1639100007955
s4 1639100011955
s5 1639100012955
s6 1639100013955
s7 1639100016955
          
          
--------------------输出
          
数据:(s3,1639100010955)  ,2021-12-10 09:33:30 ,  当前watermark: 2021-12-10 09:33:28
数据:(s2,1639100009955)  ,2021-12-10 09:33:29 ,  当前watermark: 2021-12-10 09:33:28
数据:(s1,1639100008955)  ,2021-12-10 09:33:28 ,  当前watermark: 2021-12-10 09:33:28
数据:(s0,1639100007955)  ,2021-12-10 09:33:27 ,  当前watermark: 2021-12-10 09:33:28
数据:(s4,1639100011955)  ,2021-12-10 09:33:31 ,  当前watermark: 2021-12-10 09:33:29
数据:(s5,1639100012955)  ,2021-12-10 09:33:32 ,  当前watermark: 2021-12-10 09:33:30
(s2,1639100009955)
(s0,1639100007955)
(s1,1639100008955)
数据:(s6,1639100013955)  ,2021-12-10 09:33:33 ,  当前watermark: 2021-12-10 09:33:31
数据:(s7,1639100016955)  ,2021-12-10 09:33:36 ,  当前watermark: 2021-12-10 09:33:34
(s3,1639100010955)
(s5,1639100012955)
(s4,1639100011955)

说明:

  • 在使用timeWindow的时候,会根据设置的窗口大小 3,将一分钟内的窗口划分为:
    0-2,3-5,6-8,9-11,12-14,15-17,18-20,21-23,24-26,27-29,30-32,33-35…
  • watermark的值是当前输入数据中最大时间戳-去乱序时间。 在watermark前的数据才会被认定是正常的,可供window进行计算的数据。
  • 上面程序中输入s3-s4时,watermark为的秒数是28和29,是在 timewindow划分的时间窗口 27-29 中,所以没有触发计算。直到输入s5,此时watermark秒数是30,在另一个窗口 30-32的窗口中,才会触发 27-29窗口的计算,所以才输出 s2,s0,s1的值。
  • 同理到s7的时候,又是另一个窗口33-35,所以触发上一个窗口的计算。

第二种: With Punctuated Watermarks

定点水位线(标记水位线)不是太常用,主要为输入流中包含一些用于指示系统进度的特殊元组和标记,方便根据输入元素生成水位线的场景使用的。
由于数据流中每一个递增的EventTime都会产生一个Watermark。
在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。

public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // onEvent 中已经实现
    }
}

延迟数据的处理方式

针对延迟太久的数据有3中处理方案:

  1. 丢弃(默认)
  2. allowedLateness: 指定允许数据延迟的时间
  3. sideOutputLateData: 收集迟到的数据
  • 对于迟到太久的数据默认是丢弃的。 不会触发window。因为输入的数据所在的窗口已经执行过了。Flink对这些迟到数据执行的方案就是丢弃。

  • 如果迟到不久,输入的数据所在的窗口还未执行,是不会丢弃的。 这个要看窗口大小最大允许的数据乱序时间

附上 Flink官方文档地址:

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/event-time/generating_watermarks/

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>