Flink CDC 读取MySQL的数据

1、前提背景准备

Flink在1.11之后就已经支持从MySQL增量读取Binlog日志的方式。

pom文件如下:

<properties>
    <scala.binary.version>2.11</scala.binary.version>
    <scala.version>2.11.12</scala.version>
    <flink.version>1.12.0</flink.version>
    <fastjson.verson>1.2.72</fastjson.verson>
    <lombok.version>1.18.6</lombok.version>
    <kafka.version>2.3.0</kafka.version>

</properties>

<dependencies>

    <dependency>
        <groupId>com.github.shyiko</groupId>
        <artifactId>mysql-binlog-connector-java</artifactId>
        <version>0.21.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>${kafka.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.11</artifactId>
        <version>${flink.version}</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.verson}</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>1.4.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
        <!--<scope>provided</scope>-->
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <!--<scope>provided</scope>-->
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.google.code.findbugs</groupId>
                <artifactId>jsr305</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.flink</groupId>
                <artifactId>force-shading</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <!--<scope>provided</scope>-->
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

</dependencies>

2、全量读取某个数据库中的所有库中的所有表的Binlog方式代码如下:

public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();

        // 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
        properties.setProperty("debezium.snapshot.locking.mode", "none");

        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));

        CheckpointConfig checkpointConfig = env.getCheckpointConfig();

        //最大同时存在的ck数 和设置的间隔时间有一个就行
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        //超时时间
        checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
        //2.3 指定从CK自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
        //2.4 设置任务关闭的时候保留最后一次CK数据
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        DebeziumSourceFunction<String> MysqlSource = MySQLSource.<String>builder()
                .hostname("100.21.112.11")
                .port(3306)
                .deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
                .username("root")
                .password("xxxx")
                .startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
                .debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
                .build();

        /*
         *  .startupOptions(StartupOptions.latest()) 参数配置
         *  1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
         *  2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
         *  3.latest() 从最新的binlog开始读取
         *  4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
         *  5.timestamp(long startupTimestampMillis) 指定时间戳读取
         * */

        env.addSource(MysqlSource).print();

        env.execute("flink-cdc");

    }


    public static class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {

        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
            Struct value = (Struct) sourceRecord.value();
            Struct after = value.getStruct("after");
            Struct source = value.getStruct("source");

            String db = source.getString("db");//库名
            String table = source.getString("table");//表名

            //获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
            /* READ("r"),
               CREATE("c"),
                UPDATE("u"),
                DELETE("d");*/
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String opstr = operation.toString().toLowerCase();
            //类型修正 会把insert识别成create
            if (opstr.equals("create")) {
                opstr = "insert";
            }

            //获取after结构体里面的表数据,封装成json输出
            JSONObject json1 = new JSONObject();
            JSONObject json2 = new JSONObject();
            //加个判空
            if (after != null) {
                List<Field> data = after.schema().fields(); //获取结构体
                for (Field field : data) {
                    String name = field.name(); //结构体的名字
                    Object value2 = after.get(field);//结构体的字段值
                    //放进json2里面去 json2放到json1里面去
                    json2.put(name, value2);
                }
            }


            //整理成大json串输出
            json1.put("db", db);
            json1.put("table", table);
            json1.put("data", json2);
            json1.put("type", opstr);

            collector.collect(json1.toJSONString());

        }

        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
    }

3、全量读取某个数据库指定DB中的所有表

可以在build之前 ,添加一个

databaseList,用来指定特定的DB
public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();

        // 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
        properties.setProperty("debezium.snapshot.locking.mode", "none");

        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));

        CheckpointConfig checkpointConfig = env.getCheckpointConfig();

        //最大同时存在的ck数 和设置的间隔时间有一个就行
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        //超时时间
        checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
        //2.3 指定从CK自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
        //2.4 设置任务关闭的时候保留最后一次CK数据
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        DebeziumSourceFunction<String> MysqlSource = MySQLSource.<String>builder()
                .hostname("100.21.112.11")
                .port(3306)
                .deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
                .username("root")
                .password("xxxx")
                .databaseList("horse") // 指定某个特定的库
                .startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
                .debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
                .build();

        /*
         *  .startupOptions(StartupOptions.latest()) 参数配置
         *  1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
         *  2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
         *  3.latest() 从最新的binlog开始读取
         *  4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
         *  5.timestamp(long startupTimestampMillis) 指定时间戳读取
         * */

        env.addSource(MysqlSource).print();

        env.execute("flink-cdc");

    }


    public static class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {

        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
            Struct value = (Struct) sourceRecord.value();
            Struct after = value.getStruct("after");
            Struct source = value.getStruct("source");

            String db = source.getString("db");//库名
            String table = source.getString("table");//表名

            //获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
            /* READ("r"),
               CREATE("c"),
                UPDATE("u"),
                DELETE("d");*/
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String opstr = operation.toString().toLowerCase();
            //类型修正 会把insert识别成create
            if (opstr.equals("create")) {
                opstr = "insert";
            }

            //获取after结构体里面的表数据,封装成json输出
            JSONObject json1 = new JSONObject();
            JSONObject json2 = new JSONObject();
            //加个判空
            if (after != null) {
                List<Field> data = after.schema().fields(); //获取结构体
                for (Field field : data) {
                    String name = field.name(); //结构体的名字
                    Object value2 = after.get(field);//结构体的字段值
                    //放进json2里面去 json2放到json1里面去
                    json2.put(name, value2);
                }
            }


            //整理成大json串输出
            json1.put("db", db);
            json1.put("table", table);
            json1.put("data", json2);
            json1.put("type", opstr);

            collector.collect(json1.toJSONString());

        }

        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
    }

4、全量读取某个数据库指定DB中的指定表

可以在build之前 ,添加一个

tableList,用来指定特定的DB中的特定表
public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();

        // 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
        properties.setProperty("debezium.snapshot.locking.mode", "none");

        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));

        CheckpointConfig checkpointConfig = env.getCheckpointConfig();

        //最大同时存在的ck数 和设置的间隔时间有一个就行
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        //超时时间
        checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
        //2.3 指定从CK自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
        //2.4 设置任务关闭的时候保留最后一次CK数据
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        DebeziumSourceFunction<String> MysqlSource = MySQLSource.<String>builder()
                .hostname("100.21.112.11")
                .port(3306)
                .deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
                .username("root")
                .password("xxxx")
                .databaseList("horse") // 指定某个特定的库
                .tableList("horse.t_dri_info") //指定特定的表
                .startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
                .debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
                .build();

        /*
         *  .startupOptions(StartupOptions.latest()) 参数配置
         *  1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
         *  2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
         *  3.latest() 从最新的binlog开始读取
         *  4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
         *  5.timestamp(long startupTimestampMillis) 指定时间戳读取
         * */

        env.addSource(MysqlSource).print();

        env.execute("flink-cdc");

    }


    public static class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {

        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
            Struct value = (Struct) sourceRecord.value();
            Struct after = value.getStruct("after");
            Struct source = value.getStruct("source");

            String db = source.getString("db");//库名
            String table = source.getString("table");//表名

            //获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
            /* READ("r"),
               CREATE("c"),
                UPDATE("u"),
                DELETE("d");*/
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String opstr = operation.toString().toLowerCase();
            //类型修正 会把insert识别成create
            if (opstr.equals("create")) {
                opstr = "insert";
            }

            //获取after结构体里面的表数据,封装成json输出
            JSONObject json1 = new JSONObject();
            JSONObject json2 = new JSONObject();
            //加个判空
            if (after != null) {
                List<Field> data = after.schema().fields(); //获取结构体
                for (Field field : data) {
                    String name = field.name(); //结构体的名字
                    Object value2 = after.get(field);//结构体的字段值
                    //放进json2里面去 json2放到json1里面去
                    json2.put(name, value2);
                }
            }


            //整理成大json串输出
            json1.put("db", db);
            json1.put("table", table);
            json1.put("data", json2);
            json1.put("type", opstr);

            collector.collect(json1.toJSONString());

        }

        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
    }

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>