Flink Table API 与 Flink SQL 实现Kafka To Kafka 版本1.12

Table API版本

0.前提

1.创建流和表执行环境

2. 连接Source并创建Table

 3.筛选Table对象中的数据

4. 连接Sink并创建临时表

5. 将Table对象写入临时表

测试

杠精打住

SQL 版本

最近有铁汁问我:一闪,你为嘛不用Flink SQL,要用Table API

就是就像我对DStream API和CEP编程一样,虽然CEP编程能给我减少近一半的代码量(实话),但是还是感觉DStream更靠谱一些(主要是CEP出bug就废了)

至于Flink SQL,这是在源码中被提到比有Table API更强大的语法.但是这种代码格式我不喜欢(个人意见)-->因为我不喜欢写字符串

不吹牛了 开干

Table API版本

0.前提

这次的案例就是要将"table-api-source"这个topic上,满足id="bigdata"的数据写入"table-api-sink"这个topic里,在Kafka中的文件格式还是用JSON(喜闻乐见)

对了对了,依赖别忘记了

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>

        <flink.version>1.12.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
</properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>




        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.20</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

    </dependencies>

 我靠,竟然有这么多.其实有一些是用不到的哈哈哈哈哈哈,但是没影响!!!

1.创建流和表执行环境

        这是吃饭的家伙,肯定不能忘啦

//流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Table执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

2. 连接Source并创建Table

        2.1 表的源数据信息

        这次就设置三个字段吧,id是string类型,name是string类型,age是int类型,别问为什么要用DataTypes(人家要求这个格式,我也不乐意)

        ok这就表明我们接下来传入的数据是会解析成这三个格式的字段.

Schema schema = new Schema();
schema.field("id",DataTypes.STRING());
schema.field("name",DataTypes.STRING());
schema.field("age",DataTypes.INT());

         2.2 连接外部文件系统并创建临时表

        此时我们是从"table-api-source"这个topic里面获取数据

        创建的临时表表名为"source_tmp_table"

        也就是说,现在我们消费到的数据就在这张名叫"source_tmp_table"的临时表内

tableEnv.connect(new Kafka()
                        //指定版本(官网上面说就用universal,咱也不敢说,咱也不敢问)
                        .version("universal")
                        //指定topic
                        .topic("table-api-source")
                        //配置文件
                            //设置消费者组
                        .property(ConsumerConfig.GROUP_ID_CONFIG,"test")
                            //设置Kafka连接
                        .property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092")
                        //从最新开始消费(设置offset)
                        .startFromLatest()
                        )
                //将刚刚写好的schema放进去
                .withSchema(schema)
                //指定解析的格式为json
                .withFormat(new Json())
                //创建临时表的表名
                .createTemporaryTable("source_tmp_table");

        2.3 从临时表获取到Table对象(tableEnv是最开始创建的表执行环境)

        这个from()方法,可以说很形象,毕竟是从这个表里获得数据,from相当于直译

Table sourceTable = tableEnv.from("source_tmp_table");

 3.筛选Table对象中的数据

        最开始我们说过要过滤出"id"为"bbigdata"的数据.所以开干!

        你会发现书写的逻辑和写SQL差不多,只不过是按照执行的先后顺序来写的(先where后select)
 突击检查,如果再加个groupby()在里面,是写在哪个位置?(当然是写在where后面,先where再groupby)

        ps:'$'表示字段哦

Table bigdataTable = sourceTable.where($("id").isEqual("bigdata"))
                .select($("id"), $("name"), $("age"));

        ok,我们暂时把这个筛选好的结果表放在一边,等等再用

4. 连接Sink并创建临时表

        此时我们要写的是相当于一个生产者,因为我把要把获取到的数据写入到"table-api-sink"里面

        ok,我们创建了一个名为"sink_tmp_table"的临时表,现在里面是空的

tableEnv.connect(new Kafka()
                        .topic("table-api-sink")
                        .version("universal")
                        .property(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092")
                        )
                //因为数据格式并没有变化,所以还是可以使用之前定义好的源数据schema
                .withSchema(schema)
                //指定为Json解析
                .withFormat(new Json())
                //创建临时表
                .createTemporaryTable("sink_tmp_table");

5. 将Table对象写入临时表

        需要将之前拿到的bigdataTable这个Table对象的数据写到临时表里面去,也就是把过滤后的数据写进Kafka

        调用这个executeInsert()方法,就可以完成啦!

bigdataTable.executeInsert("sink_tmp_table");

照道理说现在应该就可以完美运行了,那我们去试试

试之前先把kafka的topic创建一下,然后再把对应的生产者和消费者起一下

1.创建Topic

kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic table-api-source
kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic table-api-sink

 2.启动对应的生产者和消费者

kafka-console-producer.sh --broker-list hadoop102:9092 --topic table-api-source
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic table-api-sink

搞定!!!现在启动IDEA程序,以下是一个完整版的

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

import static org.apache.flink.table.api.Expressions.$;

public class Flink99_TableApi_Kafka2Kafka {
    public static void main(String[] args) throws Exception {
        //流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //Table执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //
        Schema schema = new Schema();
        schema.field("id",DataTypes.STRING());
        schema.field("name",DataTypes.STRING());
        schema.field("age",DataTypes.INT());
        //
        tableEnv.connect(new Kafka()
                        //指定版本
                        .version("universal")
                        //指定topic
                        .topic("table-api-source")
                        //配置文件
                        .property(ConsumerConfig.GROUP_ID_CONFIG,"test")
                        .property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092")
                        //从最新开始消费(设置offset)
                        .startFromLatest()
                        )
                //将刚刚写好的schema放进去
                .withSchema(schema)
                //指定解析的格式为json
                .withFormat(new Json())
                //创建临时表的表名
                .createTemporaryTable("source_tmp_table");
        //
        Table sourceTable = tableEnv.from("source_tmp_table");
        //
        Table bigdataTable = sourceTable.where($("id").isEqual("bigdata"))
                .select($("id"), $("name"), $("age"));
        //
        tableEnv.connect(new Kafka()
                        .topic("table-api-sink")
                        .version("universal")
                        .property(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092")
                        )
                //因为数据格式并没有变化,所以还是可以使用之前定义好的源数据schema
                .withSchema(schema)
                //指定为Json解析
                .withFormat(new Json())
                //创建临时表
                .createTemporaryTable("sink_tmp_table");
        //
        bigdataTable.executeInsert("sink_tmp_table");
    }
}

测试

 现在"table-api-source"这个topic的生产者传入一条数据如上

然后看看"table-api-sink"这个topic的消费者的界面,发现如下

好像是没有问题的!!!!! 

我们再去传入个name不是bigdata的数据进去试试

 发现消费者那里并没有打印.至此.大功告成!!!!!

杠精打住

有小朋友又会问了,你这个消费者没打印数据,不一定是你过滤掉了,说不定是你压根没写进那个"table-api-source"这个topic里

行行行,那我放个大招给你看看

 kafka-eagle!启动!

可以看到,里面的的确确有两条消息,但是我们只消费到了一条,所以是完全么有问题的.老实人大获全胜

SQL 版本

        emmmmm???怎么一眼望去全是字符串???

        这就是鄙人不喜欢写Flink SQL的原因,这完全不像是咱码农的行为嘛!

        小声bb:这点代码量怎么够我摸一天的??

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink100_SQL_Kafka2Kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1. 注册SourceTable: source_sensor
        tableEnv.executeSql("create table source_sensor (id string, ts bigint, vc int) with("
                                + "'connector' = 'kafka',"
                                + "'topic' = 'topic_source_sensor',"
                                + "'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092,hadoop104:9092',"
                                + "'properties.group.id' = 'atguigu',"
                                + "'scan.startup.mode' = 'latest-offset',"
                                + "'format' = 'csv'"
                                + ")");

        // 2. 注册SinkTable: sink_sensor
        tableEnv.executeSql("create table sink_sensor(id string, ts bigint, vc int) with("
                                + "'connector' = 'kafka',"
                                + "'topic' = 'topic_sink_sensor',"
                                + "'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092,hadoop104:9092',"
                                + "'format' = 'csv'"
                                + ")");

        // 3. 从SourceTable 查询数据, 并写入到 SinkTable
        tableEnv.executeSql("insert into sink_sensor select * from source_sensor where id='sensor_1'");
    }
}

  其实我觉得这个Kafka To Kafka应该是实时数仓的主力,所以还是很有必要好好lou一lou

休息了休息了!!写了一个上午,老实人要睡觉了!!

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>