Flink多并行度下WaterMark的设计区别

 问题的提出

  对于WaterMark设计的位置是否会影响窗口的正常开闭?

  下面我模拟了两种情景(source并行度为1,map并行度为2),分别是
1.在source后设置watermark,经过map后开窗
2.在map后设置watermark,然后开窗

ps:       下面的两种代码我都设置了自然增长的watermark,窗口时间都是5秒,只是设置watermark的位置不同

                watermark是testWM对象的ts字段*1000

 代码一:在Source后添加WaterMark

public class WMTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
        // TODO: 2021/12/1 在source后设置watermark 
        SingleOutputStreamOperator<String> sourceWithWM = source.assignTimestampsAndWatermarks(WatermarkStrategy
                .<String>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        String[] split = element.split(",");
                        return Long.parseLong(split[2]) * 1000;
                    }
                }));
        // TODO: 2021/12/1 设置map并行度为2 
        SingleOutputStreamOperator<testWM> mapDS = sourceWithWM.map(r -> {
            String[] split = r.split(",");
            return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2]));
        }).setParallelism(2);
        SingleOutputStreamOperator<String> resultDS = mapDS.keyBy(r -> r.getId())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<testWM, String, Integer, TimeWindow>() {
                    @Override
                    public void process(Integer integer, Context context, Iterable<testWM> elements, Collector<String> out) throws Exception {
                        out.collect("我关窗啦");
                    }
                });
        resultDS.print();
        env.execute();

    }
}
@Data
@AllArgsConstructor
class testWM{
    private int id;
    private int num;
    private long ts;
}

 代码二:在Map后设置WaterMark

public class WMTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
        // TODO: 2021/12/1 设置map并行度为2
        SingleOutputStreamOperator<testWM> mapDS = source.map(r -> {
            String[] split = r.split(",");
            return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2]));
        }).setParallelism(2);
        // TODO: 2021/12/1  在map后添加watermark
        SingleOutputStreamOperator<testWM> mapWithWM = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy
                .<testWM>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<testWM>() {
                    @Override
                    public long extractTimestamp(testWM element, long recordTimestamp) {
                        return element.getTs() * 1000;
                    }
                }));
        SingleOutputStreamOperator<String> resultDS = mapWithWM.keyBy(r -> r.getId())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<testWM, String, Integer, TimeWindow>() {
                    @Override
                    public void process(Integer integer, Context context, Iterable<testWM> elements, Collector<String> out) throws Exception {
                        out.collect("我关窗啦");
                    }
                });
        resultDS.print();
        env.execute();

    }
}
@Data
@AllArgsConstructor
class testWM{
    private int id;
    private int num;
    private long ts;
}

运行结果:

对于第一种,在source后添加watermark的结果如下:

当1,1,1这条数据进入时,开启了[0,5)这个窗口,当1,1,9这条数据进入时,watermark升至9000(忽略watermark的减1).窗口关闭,没有问题

对于第二种,在map后添加watermark的结果如下:

 可以很明显的发现,当第一条1,1,9进入时,[0,5)这个窗口并没有关闭.直到第二条1,1,9进入时,窗口才被关闭,这是为什么?

我针对以上两种情况画了下图来理解.

 图一.图二描绘了source之后设置watermark的场景,一般来说,这是我们生产中需要的

 我在之前的文章中提到过,WaterMark以广播的形式向下游发送,并且如果同时接收上游多条并行度的WaterMark时,以小的为准

这就导致图三(Map后设置WaterMark)中,我需要发送两条足够[0,5)这个窗口关闭的数据,才能真正关闭窗口,因为数据要经过轮询才能到达每个并行度

拓展:

在KafkaSource中,已经做了很好得优化,在生产中我们一般设置并行度与topic分区数相同

如果设置得并行度比topic分区数多,那必然有并行度消费不到数据,就会导致WaterMark一直保持在Long.min_value.

当这种WaterMark向下游广播之后,会导致所有正常并行度的窗口全部无法关闭,因为WaterMark取了各个并行度的最小值

但是当这种状态保持一段时间之后,程序会在计算WaterMark的时候,自动过滤掉迟迟没有数据的并行度传进来的WaterMark,这就是KafkaSource的优化.

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>