大数据学习教程SD版第八篇【DataX】

DataX 采用Java编写的工具,使用Python调用功能,修改JSON配置即可使用

数据同步工具 支持:异构数据源、HDFS等 插件式

常用的数据源:MySQL、SQLServer、HDFS、Hive、Hbase、FTP 等

1. DataX 数据源

几乎所有数据源都支持,不支持的也可以自定义Reader和Writer,来支持!常用数据源有:

常用数据源
MySQL、SQLServer、Oracle
Hbase、Phoenix、MongoDB、Hive
FTP、HDFS
ES
Clickhouse

读写插件均在 plugin/reader/ 和 plugin/writer/ 中

reader writer
cassandrareader
clickhousereader
drdsreader
ftpreader
hbase094xreader
hbase11xreader
hbase11xsqlreader
hbase20xsqlreader
hdfsreader
kingbaseesreader
mongodbreader
mysqlreader
odpsreader
opentsdbreader
oraclereader
ossreader
otsreader
otsstreamreader
postgresqlreader
rdbmsreader
sqlserverreader
streamreader
txtfilereader
adbpgwriter
adswriter
cassandrawriter
clickhousewriter
drdswriter
elasticsearchwriter
ftpwriter
hbase094xwriter
hbase11xsqlwriter
hbase11xwriter
hbase20xsqlwriter
hdfswriter
hologresjdbcwriter
kafkawriter
kingbaseeswriter
mongodbwriter
mysqlwriter
ocswriter
odpswriter
oraclewriter
osswriter
otswriter
postgresqlwriter
rdbmswriter
sqlserverwriter
streamwriter
tsdbwriter
txtfilewriter

2. DataX 架构

  1. Reader 读
  2. Writer 写
  3. Framework 通道
  • Job 单个作业管理节点

  • Task 有Job Split 而来,最小单元 包括:reader、channel、writer

  • Schedule 将Task组成TaskGroup ,单个TaskGroup 默认并发为5

  • TaskGroup 启动内部的Task

3. DataX VS Sqoop

DataX:Java 单进程多线程,单机,数据源种类众多

Sqoop:MR,分布式,数据源种类较少

功能 DataX Sqoop
MySQL读写 单机压力大,粒度容易控制 MR 模式重,写出错处理困难
Hive读写 单机压力大 很好
文件格式 支持ORC 默认不支持ORC,可以添加
流控、信息、校验 有流控、有一些、core有校验功能 需要定制,没有
监控 需要定制 需要定制
后续 有改动,持续优化 基本完善,已经停止

4. DataX 安装

  1. 从Github下载tar.gz,并解压安装包方式
  • 使用测试
python datax.py ../job/job.json
  • 报错问题
经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/opt/module/datax/plugin/reader/._odpsreader/plugin.json]不存在. 请检查您的配置文件.
  1. DataX 源码编译安装方式
  • Github下载源码
  • maven编译打包
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
  • 还是报错,并且修复一个又报错一个
  1. 公司内部 二次封装的 DataX 源码安装方式,勉强能用

5. DataX Job配置

  1. job reader 配置
  2. job writer 配置
  3. job setting 配置

配置的Json 模板可以从Github上看,也可以 通过 python datax.py -r xxxreader -w xxxwriter 查看

  • 如何保证数据一致性

先写入一个临时文件

  1. 如果某些Task失败,则认为Job失败,删除临时路径文件

  2. 如果全部成功,修改路径和文件名

6. DataX 实例

查看Github上文档配置即可!

6.1 MySQL -> HDFS

查看配置模板:python bin/datax.py -r mysqlreader -w hdfswriter

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [],
                        "connection": [
                            {
                                "jdbcUrl": [],
                                "table": []
                            }
                        ],
                        "password": "",
                        "username": ""
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [],
                        "compress": "",
                        "defaultFS": "",
                        "fieldDelimiter": "",
                        "fileName": "",
                        "fileType": "",
                        "path": "",
                        "writeMode": ""
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

6.2 HDFS -> MySQL

也可以去Github文档上找到对应的Reader和Writer的Json配置

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "column": [],
                        "defaultFS": "",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ",",
                        "fileType": "orc",
                        "path": ""
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [],
                        "connection": [
                            {
                                "jdbcUrl": "",
                                "table": []
                            }
                        ],
                        "password": "",
                        "username": "",
                        "writeMode": ""
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

7. DataX 分组算法

a 库 : 0 1 2

b 库 : 3 4 5

c 库 :6 7 8

如果4个TaskGroup,则轮询:

Group1: 048

Group2: 15

Group3: 26

Group4: 37

8. DataX 优化

在 core.json 里设置局部参数,在当前 job.json 设置全局参数

  1. 提升单个Channel的速度
  2. 提升Job内Channel并发数:三种
  3. 提高JVM堆内存
python bin/datax.py --kvm="-Xms8G -Xmx8G" xxx.json

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>