通过DataX同步数据至Elasticsearch
使用总结
- Long值导入时精度丢失,字段类型使用text
- splitPk使用ID流水号时,导入无进度0%
- 因Id取最小值递加至最大值,范围间隔大空查询较多
- 将数据源的查询时间 拆细
- ES日期字段创建需指定格式 yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis
- 日期数据导入时,text写入为日期格式,long写入为时间戳(数据有精度错误)
- 注意时区问题 写入时指定时区 或 对UTC时间戳进行转换
- 指定:“2019-03-12T12:12:12.123+0800”
- 转换:东八区时间戳 = 3600000*8 + UTC时间戳
执行命令
python /datax/bin/datax.py --jvm=-Xmx4g job.json
配置样例
job.json
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
...
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://xxx:9999",
"accessId": "xxxx",
"accessKey": "xxxx",
"index": "test-1",
"type": "default",
"cleanup": true,
"settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
"discovery": false,
"batchSize": 1000,
"splitter": ",",
"column": [
{"name": "pk", "type": "id"},
{ "name": "col_ip","type": "ip" },
{ "name": "col_double","type": "double" },
{ "name": "col_long","type": "long" },
{ "name": "col_integer","type": "integer" },
{ "name": "col_keyword", "type": "keyword" },
{ "name": "col_text", "type": "text", "analyzer": "ik_max_word"},
{ "name": "col_geo_point", "type": "geo_point" },
{ "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
{ "name": "col_nested1", "type": "nested" },
{ "name": "col_nested2", "type": "nested" },
{ "name": "col_object1", "type": "object" },
{ "name": "col_object2", "type": "object" },
{ "name": "col_integer_array", "type":"integer", "array":true},
{ "name": "col_geo_shape", "type":"geo_shape", "tree": "quadtree", "precision": "10m"}
]
}
}
}
]
}
}
参数说明
-
endpoint
-
描述:ElasticSearch的连接地址
-
必选:是
-
默认值:无
-
accessId
-
描述:http auth中的user
-
必选:否
-
默认值:空
-
accessKey
-
描述:http auth中的password
-
必选:否
-
默认值:空
-
index
-
描述:elasticsearch中的index名
-
必选:是
-
默认值:无
-
type
-
描述:elasticsearch中index的type名
-
必选:否
-
默认值:index名
-
cleanup
-
描述:是否删除原表
-
必选:否
-
默认值:false
-
batchSize
-
描述:每次批量数据的条数
-
必选:否
-
默认值:1000
-
trySize
-
描述:失败后重试的次数
-
必选:否
-
默认值:30
-
timeout
-
描述:客户端超时时间
-
必选:否
-
默认值:600000
-
discovery
-
描述:启用节点发现将(轮询)并定期更新客户机中的服务器列表。
-
必选:否
-
默认值:false
-
compression
-
描述:http请求,开启压缩
-
必选:否
-
默认值:true
-
multiThread
-
描述:http请求,是否有多线程
-
必选:否
-
默认值:true
-
ignoreWriteError
-
描述:忽略写入错误,不重试,继续写入
-
必选:否
-
默认值:false
-
ignoreParseError
-
描述:忽略解析数据格式错误,继续写入
-
必选:否
-
默认值:true
-
alias
-
描述:数据导入完成后写入别名
-
必选:否
-
默认值:无
-
aliasMode
-
描述:数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个)
-
必选:否
-
默认值:append
-
settings
-
描述:创建index时候的settings, 与elasticsearch官方相同
-
必选:否
-
默认值:无
-
splitter
-
描述:如果插入数据是array,就使用指定分隔符
-
必选:否
-
默认值:-,-
-
column
-
描述:elasticsearch所支持的字段类型,样例中包含了全部
-
必选:是
-
dynamic
-
描述: 不使用datax的mappings,使用es自己的自动mappings
-
必选: 否
-
默认值: false
参考
https://github.com/alibaba/DataX
https://github.com/alibaba/DataX/blob/master/introduction.md
https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md
DataX
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
Features
DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
DataX详细介绍
请参考:DataX-Introduction
Quick Start
Download DataX下载地址
请点击:Quick Start
Support Data Channels
DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:DataX数据源参考指南
类型 | 数据源 | Reader(读) | Writer(写) | 文档 |
---|---|---|---|---|
RDBMS 关系型数据库 | MySQL | √ | √ | 读 、写 |
Oracle | √ | √ | 读 、写 | |
SQLServer | √ | √ | 读 、写 | |
PostgreSQL | √ | √ | 读 、写 | |
DRDS | √ | √ | 读 、写 | |
通用RDBMS(支持所有关系型数据库) | √ | √ | 读 、写 | |
阿里云数仓数据存储 | ODPS | √ | √ | 读 、写 |
ADS | √ | 写 | ||
OSS | √ | √ | 读 、写 | |
OCS | √ | √ | 读 、写 | |
NoSQL数据存储 | OTS | √ | √ | 读 、写 |
Hbase0.94 | √ | √ | 读 、写 | |
Hbase1.1 | √ | √ | 读 、写 | |
Phoenix4.x | √ | √ | 读 、写 | |
Phoenix5.x | √ | √ | 读 、写 | |
MongoDB | √ | √ | 读 、写 | |
Hive | √ | √ | 读 、写 | |
Cassandra | √ | √ | 读 、写 | |
无结构化数据存储 | TxtFile | √ | √ | 读 、写 |
FTP | √ | √ | 读 、写 | |
HDFS | √ | √ | 读 、写 | |
Elasticsearch | √ | 写 | ||
时间序列数据库 | OpenTSDB | √ | 读 | |
TSDB | √ | √ | 读 、写 |