Flink读写Doris操作介绍

Flink读写Doris操作介绍

​ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。可以将 Doris 表映射为 DataStream 或者 Table。

  • Flink操作Doris修改和删除只支持在 Unique Key 模型上

1. 准备开发环境

  • pom.xml加入依赖
<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.13_2.12</artifactId>
    <version>1.0.3</version>
</dependency>
  • 创建测试库测试表
-- 切测试库
use test_db;

-- 创建测试表flinktest
CREATE TABLE flinktest
(
    siteid INT DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");

-- 插入样例数据
insert into flinktest values
(1,1,'jim',2),
(2,1,'grace',2),
(3,2,'tom',2),
(4,3,'bush',3),
(5,3,'helen',3);

-- 查看表数据情况
select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      1 |        1 | jim      |    2 |
|      5 |        3 | helen    |    3 |
|      4 |        3 | bush     |    3 |
|      3 |        2 | tom      |    2 |
|      2 |        1 | grace    |    2 |
+--------+----------+----------+------+
  • Doris 和 Flink 列类型映射关系
Doris Type Flink Type
NULL_TYPE NULL
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DATE DATE
DATETIME TIMESTAMP
DECIMAL DECIMAL
CHAR STRING
LARGEINT STRING
VARCHAR STRING
DECIMALV2 DECIMAL
TIME DOUBLE
HLL Unsupported datatype

2. Flink-DataStream读Doris

代码示例:

package com.zenitera.bigdata.doris;

import org.apache.doris.flink.cfg.DorisStreamOptions;
import org.apache.doris.flink.datastream.DorisSourceFunction;
import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class Flink_stream_read_doris {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);


        Properties props = new Properties();
        props.setProperty("fenodes", "hdt-dmcp-ops01:8130");
        props.setProperty("username", "root");
        props.setProperty("password", "123456");
        props.setProperty("table.identifier", "test_db.flinktest");

        env
                .addSource(new DorisSourceFunction(new DorisStreamOptions(props), new SimpleListDeserializationSchema()))
                .print();


        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

/*
  代码控制台输出:
[4, 3, bush, 3]
[2, 1, grace, 2]
[1, 1, jim, 2]
[5, 3, helen, 3]
[3, 2, tom, 2]
 */

3. Flink写Doris

Flink 读写 Doris 数据主要有两种方式

  • DataStream
  • SQL

3.1 Flink-DataStream以 JSON 数据 写到Doris

代码示例:

package com.zenitera.bigdata.doris;

import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

/**
 * 使用 Flink 将 JSON 数据 写到Doris数据库
 */
public class Flink_stream_write_doris_json {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        Properties pro = new Properties();
        pro.setProperty("format", "json");
        pro.setProperty("strip_outer_array", "true");
        env
                .fromElements("{"siteid":"10", "citycode": "1001","username": "ww","pv":"100"}")
                .addSink(DorisSink.sink(
                        new DorisExecutionOptions.Builder()
                                .setBatchIntervalMs(2000L)
                                .setEnableDelete(false)
                                .setMaxRetries(3)
                                .setStreamLoadProp(pro)
                                .build(),
                        new DorisOptions.Builder()
                                .setFenodes("hdt-dmcp-ops01:8130")
                                .setUsername("root")
                                .setPassword("123456")
                                .setTableIdentifier("test_db.flinktest")
                                .build())
                );

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

/*
    代码执行前: 5 rows
 select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      1 |        1 | jim      |    2 |
|      5 |        3 | helen    |    3 |
|      4 |        3 | bush     |    3 |
|      3 |        2 | tom      |    2 |
|      2 |        1 | grace    |    2 |
+--------+----------+----------+------+

    代码执行后: 6 rows
 select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      2 |        1 | grace    |    2 |
|      3 |        2 | tom      |    2 |
|      5 |        3 | helen    |    3 |
|      1 |        1 | jim      |    2 |
|     10 |     1001 | ww       |  100 |
|      4 |        3 | bush     |    3 |
+--------+----------+----------+------+
 */

3.2 Flink-DataStream以 RowData 数据 写Doris

代码示例:

package com.zenitera.bigdata.doris;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.*;


public class Flink_stream_write_doris_rowdata {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        LogicalType[] types = {new IntType(), new SmallIntType(), new VarCharType(), new BigIntType()};
        String[] fields = {"siteid", "citycode", "username", "pv"};

        env
                .fromElements("{"siteid":"100", "citycode": "1002","username": "wang","pv":"100"}")

                .map(json -> {
                    JSONObject obj = JSON.parseObject(json);
                    GenericRowData rowData = new GenericRowData(4);
                    rowData.setField(0, obj.getIntValue("siteid"));
                    rowData.setField(1, obj.getShortValue("citycode"));
                    rowData.setField(2, StringData.fromString(obj.getString("username")));
                    rowData.setField(3, obj.getLongValue("pv"));
                    return rowData;

                })


                .addSink(DorisSink.sink(
                        fields,
                        types,
                        new DorisExecutionOptions.Builder()
                                .setBatchIntervalMs(2000L)
                                .setEnableDelete(false)
                                .setMaxRetries(3)
                                .build(),
                        new DorisOptions.Builder()
                                .setFenodes("hdt-dmcp-ops01:8130")
                                .setUsername("root")
                                .setPassword("123456")
                                .setTableIdentifier("test_db.flinktest")
                                .build())
                );

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

/*
    代码执行前: 6 rows
 select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      2 |        1 | grace    |    2 |
|      3 |        2 | tom      |    2 |
|      5 |        3 | helen    |    3 |
|      1 |        1 | jim      |    2 |
|     10 |     1001 | ww       |  100 |
|      4 |        3 | bush     |    3 |
+--------+----------+----------+------+

    代码执行后: 7 rows
 select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      1 |        1 | jim      |    2 |
|      2 |        1 | grace    |    2 |
|      3 |        2 | tom      |    2 |
|      5 |        3 | helen    |    3 |
|     10 |     1001 | ww       |  100 |
|    100 |     1002 | wang     |  100 |
|      4 |        3 | bush     |    3 |
+--------+----------+----------+------+
 */

3.3 Flink-SQL 方式写Doris

Doris测试表:

use test_db;

truncate table flinktest;

insert into flinktest values
(1,1,'aaa',1),
(2,2,'bbb',2),
(3,3,'ccc',3);

select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      2 |        2 | bbb      |    2 |
|      1 |        1 | aaa      |    1 |
|      3 |        3 | ccc      |    3 |
+--------+----------+----------+------+
3 rows in set (0.01 sec)

Flink-SQL代码示例:

package com.zenitera.bigdata.doris;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink_SQL_doris {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        tEnv.executeSql("create table flink_0518(" +
                " siteid int, " +
                " citycode int, " +
                " username string, " +
                " pv bigint " +
                ")with(" +
                "  'connector' = 'doris', " +
                "  'fenodes' = 'hdt-dmcp-ops01:8130', " +
                "  'table.identifier' = 'test_db.flinktest', " +
                "  'username' = 'root', " +
                "  'password' = '123456' " +
                ")");

        tEnv.executeSql("insert into flink_0518(siteid, citycode, username, pv) values(4, 4, 'wangting', 4) ");

    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Flink_0518 {
        private Integer siteid;
        private Integer citycode;
        private String username;
        private Long pv;
    }
}

执行代码,执行完成后查看Doris对应表数据进行验证:

select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      3 |        3 | ccc      |    3 |
|      2 |        2 | bbb      |    2 |
|      1 |        1 | aaa      |    1 |
|      4 |        4 | wangting |    4 |
+--------+----------+----------+------+
4 rows in set (0.01 sec)

3.4 Flink-SQL 方式读Doris

package com.zenitera.bigdata.doris;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink_SQL_doris_read {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        tEnv.executeSql("create table flink_0520(" +
                " siteid int, " +
                " citycode SMALLINT, " +
                " username string, " +
                " pv bigint " +
                ")with(" +
                "  'connector' = 'doris', " +
                "  'fenodes' = 'hdt-dmcp-ops01:8130', " +
                "  'table.identifier' = 'test_db.flinktest', " +
                "  'username' = 'root', " +
                "  'password' = '123456' " +
                ")");

        tEnv.sqlQuery("select * from flink_0520").execute().print();

    }
}

/*
   控制台输出信息:
+----+-------------+----------+---------------+---------+
| op |      siteid | citycode |      username |      pv |
+----+-------------+----------+---------------+---------+
| +I |           1 |        1 |           aaa |       1 |
| +I |           3 |        3 |           ccc |       3 |
| +I |           2 |        2 |           bbb |       2 |
| +I |           4 |        4 |      wangting |       4 |
+----+-------------+----------+---------------+---------+
4 rows in set
*/

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>