大数据学习教程SD版第八篇【DataX】
DataX 采用Java编写的工具,使用Python调用功能,修改JSON配置即可使用
数据同步工具 支持:异构数据源、HDFS等 插件式
常用的数据源:MySQL、SQLServer、HDFS、Hive、Hbase、FTP 等
1. DataX 数据源
几乎所有数据源都支持,不支持的也可以自定义Reader和Writer,来支持!常用数据源有:
常用数据源 | 读 | 写 |
---|---|---|
MySQL、SQLServer、Oracle | √ | √ |
Hbase、Phoenix、MongoDB、Hive | √ | √ |
FTP、HDFS | √ | √ |
ES | √ | |
Clickhouse |
读写插件均在 plugin/reader/ 和 plugin/writer/ 中
reader | writer |
---|---|
cassandrareader clickhousereader drdsreader ftpreader hbase094xreader hbase11xreader hbase11xsqlreader hbase20xsqlreader hdfsreader kingbaseesreader mongodbreader mysqlreader odpsreader opentsdbreader oraclereader ossreader otsreader otsstreamreader postgresqlreader rdbmsreader sqlserverreader streamreader txtfilereader |
adbpgwriter adswriter cassandrawriter clickhousewriter drdswriter elasticsearchwriter ftpwriter hbase094xwriter hbase11xsqlwriter hbase11xwriter hbase20xsqlwriter hdfswriter hologresjdbcwriter kafkawriter kingbaseeswriter mongodbwriter mysqlwriter ocswriter odpswriter oraclewriter osswriter otswriter postgresqlwriter rdbmswriter sqlserverwriter streamwriter tsdbwriter txtfilewriter |
2. DataX 架构
- Reader 读
- Writer 写
- Framework 通道
-
Job 单个作业管理节点
-
Task 有Job Split 而来,最小单元 包括:reader、channel、writer
-
Schedule 将Task组成TaskGroup ,单个TaskGroup 默认并发为5
-
TaskGroup 启动内部的Task
3. DataX VS Sqoop
DataX:Java 单进程多线程,单机,数据源种类众多
Sqoop:MR,分布式,数据源种类较少
功能 | DataX | Sqoop |
---|---|---|
MySQL读写 | 单机压力大,粒度容易控制 | MR 模式重,写出错处理困难 |
Hive读写 | 单机压力大 | 很好 |
文件格式 | 支持ORC | 默认不支持ORC,可以添加 |
流控、信息、校验 | 有流控、有一些、core有校验功能 | 需要定制,没有 |
监控 | 需要定制 | 需要定制 |
后续 | 有改动,持续优化 | 基本完善,已经停止 |
4. DataX 安装
- 从Github下载tar.gz,并解压安装包方式
- 使用测试
python datax.py ../job/job.json
- 报错问题
经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/opt/module/datax/plugin/reader/._odpsreader/plugin.json]不存在. 请检查您的配置文件.
- DataX 源码编译安装方式
- Github下载源码
- maven编译打包
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
- 还是报错,并且修复一个又报错一个
- 公司内部 二次封装的 DataX 源码安装方式,勉强能用
5. DataX Job配置
- job reader 配置
- job writer 配置
- job setting 配置
配置的Json 模板可以从Github上看,也可以 通过 python datax.py -r xxxreader -w xxxwriter 查看
- 如何保证数据一致性
先写入一个临时文件
如果某些Task失败,则认为Job失败,删除临时路径文件
如果全部成功,修改路径和文件名
6. DataX 实例
查看Github上文档配置即可!
6.1 MySQL -> HDFS
查看配置模板:python bin/datax.py -r mysqlreader -w hdfswriter
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": [],
"table": []
}
],
"password": "",
"username": ""
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [],
"compress": "",
"defaultFS": "",
"fieldDelimiter": "",
"fileName": "",
"fileType": "",
"path": "",
"writeMode": ""
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
6.2 HDFS -> MySQL
也可以去Github文档上找到对应的Reader和Writer的Json配置
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"column": [],
"defaultFS": "",
"encoding": "UTF-8",
"fieldDelimiter": ",",
"fileType": "orc",
"path": ""
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": "",
"table": []
}
],
"password": "",
"username": "",
"writeMode": ""
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
7. DataX 分组算法
a 库 : 0 1 2
b 库 : 3 4 5
c 库 :6 7 8
如果4个TaskGroup,则轮询:
Group1: 048
Group2: 15
Group3: 26
Group4: 37
8. DataX 优化
在 core.json 里设置局部参数,在当前 job.json 设置全局参数
- 提升单个Channel的速度
- 提升Job内Channel并发数:三种
- 提高JVM堆内存
python bin/datax.py --kvm="-Xms8G -Xmx8G" xxx.json