Flink 的侧输出 和一个流拆分成多个流

1. Flink 侧输出流

官网 :https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/side_output/

1.1 理解

 侧输出流就是将问题数据或者 不符合条件的数据进行输出到数据库中或者打印出来, 就形成一个正确的流和一个不符合条件的流。

1.2 实例

   案例:  判断 输入的字符串是否等于 "big"  如果等于big  就输入到  主输出流  否则将抛出异常
package com.wudl.flink.stream;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import static org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.Media.print;

/**
 * @author :wudl
 * @date :Created in 2021-11-28 17:56
 * @description: Flink 的测输出流
 * @modified By:
 * @version: 1.0
 */

public class Flink_SideOutput {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> dsStream = env.socketTextStream("192.168.1.130", 9999);
        OutputTag<String> outputTag = new OutputTag<String>("number") {
        };
        SingleOutputStreamOperator<String> process = dsStream.process(new ProcessFunction<String, String>() {
            @Override
            public void processElement(String s, Context ctx, Collector<String> collector) throws Exception {

                // 判断 输入的字符串是否等于 "big"  如果等于big  就输入到  主输出流  否则将抛出异常
                   try {
                       if (s.equals("big")) {
                           collector.collect(s);
                       }else
                       {
                           throw new Exception();
                       }
                   }catch (Exception e)
                   {
                     //  e.printStackTrace();
                       ctx.output(outputTag,s);
                   }
            }
        });

        // 打印主流
        process.print("打印主流 ----->");
        //  打印错误的的流或者是 不符合条件的流
        process.getSideOutput(outputTag).print("侧输出流----->");
        env.execute();

    }

}

1.3结果

在这里插入图片描述

3. Flink 使用侧输出流把一个流拆成多个流

Flink 12 以后官方建议我们使用多侧输入流来输出

3.1实例:

 根据输入的数据 进行条件判断然后输入到不同的侧输入流中。
package com.wudl.flink.stream;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @author :wudl
 * @date :Created in 2021-11-28 18:41
 * @description: 使用侧输出流把一个流拆成多个流   flink 12 以后建议 多个流的输出用侧输入流
 * @modified By:
 * @version: 1.0
 */

public class Flink_SideOutputTwo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> socketTextStream = env.socketTextStream("192.168.1.130", 9999);
        OutputTag<Integer> outputTag6 = new OutputTag<Integer>("number6-10") {
        };

        OutputTag<Integer> outputTag10 = new OutputTag<Integer>("number10") {
        };

        SingleOutputStreamOperator<Integer> process = socketTextStream.process(new ProcessFunction<String, Integer>() {
            @Override
            public void processElement(String s, Context cxt, Collector<Integer> out) throws Exception {
                int number = Integer.parseInt(s);
                if (number < 5) {
                    out.collect(number);
                } else if (number > 5 && number < 10) {
                    cxt.output(outputTag6, number);
                } else {
                    cxt.output(outputTag10, number);
                }
            }
        });
        process.print(" 主流-------->");
        process.getSideOutput(outputTag6).print("侧输入流--  大于5 小于10 的数据---");

        process.getSideOutput(outputTag10).print("侧输入流--  大于10 的数据---");

        env.execute();


    }

}

3.2 结果

在这里插入图片描述
********************************************************** 应用的实例***********************************************************************

4. flink 侧输入流的实例应用

案例: 通过开窗将迟到的数据输出到侧输入流中 , 
package com.wudl.flink.stream;

import com.wudl.flink.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

/**
 * @author :wudl
 * @date :Created in 2021-11-28 19:01
 * @description:Flink 侧输入流应用实例
 * @modified By:
 * @version: 1.0
 */

public class Flink_SideOutput_Examples {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> stringDataStreamSource = env.socketTextStream("192.168.1.130", 9999);
        SingleOutputStreamOperator<WaterSensor> map = stringDataStreamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] split = s.split(",");
                return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.parseInt(split[2]));
            }
        });

        // 创建产生水印策略
        WatermarkStrategy<WaterSensor> wms = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        return element.getTs() * 1000;
                    }
                });


        SingleOutputStreamOperator<String> process = map.assignTimestampsAndWatermarks(wms).keyBy(WaterSensor::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(3))
                .sideOutputLateData(new OutputTag<WaterSensor>("side_1") {
                })
                .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                    @Override
                    public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                        String msg = "当前key: " + key + " 窗口: [" + context.window().getStart() / 1000 + "," + context.window().getEnd() / 1000 + ") 一共有 "
                                + elements.spliterator().estimateSize() + "条数据" +
                                "watermark: " + context.currentWatermark();

                        out.collect(context.window().toString());
                        out.collect(msg);
                    }
                });


        process.print("主输出流---->");
        process.getSideOutput(new OutputTag<WaterSensor>("side_1"){}).print("侧输出流-------->");

        env.execute();

    }
}

在这里插入图片描述

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>