产生背景:某天领导说需要将Oracle中的一些表导入到es中,于是就开始了.....
方案说明
- 人生苦短我用py,直接jdbc连接,批量读取,存储es轻轻松松?
- 干大数据的怎么能少的了spark或flink?一个离线批处理不就ok了?
- 既然目标端是es那么还是用它自家产品logstash,轻松配置,无缝全量增量?
方案选择
需求明确的领导,不是一个好领导,而不考虑需求实现的程序员不是一个好的数据搬运工。
尝试方案一
- 百度查看了一下py和Oracle交互的模块用到最多的有两个,一个是
cx_Oracle
,一个是jaydebeapi
二者区别: cx_Oracle对py版本,系统环境版本依赖比较好,必须是门当户对,如果你的es是在Windows下,那就更呵呵了,缺了VC++ 2014不行。而网上的答案千篇一律,装环境都这么困难,直接next,
jaydebeapi
,这个模块的连接方式类似于cx_Oracle
,但是至少好安装,直接下载tar.gz的源码包,python setup.py install
。
- 环境ok了,开始打开pycharm,远程连接服务器上的python环境,一切都很顺利,但是写着写着,发现,没有考虑到数据量很大咋整?得实现批量,数据中有大字段(clob)又咋整?又得判断字段类型(前面已经提到是一些表,而非一张),判断到是CLOB类型后还得读取大字段转为字符串类型...一来二去,我只是想迁移一些表的需求,结果活生生写成了一个迁移工具的影子。于是果断放弃。
尝试方案二
- 经历了方案一,那么同理只要是通过程序迁移避免不了数据量,更避免不了大字段的处理,想想方案二和方案一要经历的过程一样,只是选择的语言和方式不同而已,果断放弃。
尝试方案三
- 基于之前已经安装好input-jdbc插件的logstash,只需要解压,写一下配置文件即可,于是一个 增量 配置文件诞生了。
input {
jdbc {
#驱动包路径
jdbc_driver_library => "/home/risen/ojdbc.jar"
#驱动类
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
#jdbc url
jdbc_connection_string => "jdbc:oracle:thin:@ip:port:orcl"
jdbc_user => "账号"
jdbc_password => "密码"
schedule => "* * * * *"
statement => "select * from 表名"
#增量标识字段名
tracking_column => "主键"
#是否使用字段值作为增量标识
use_column_value => true
#源表字段名导入ES后是否忽略大小写
lowercase_column_names=> false
#分页
jdbc_paging_enabled => "true"
#每页数据量
jdbc_page_size => "5000"
#默认时区
jdbc_default_timezone => "UTC"
}
}
output{
elasticsearch {
hosts =>["ip:port"]
index => "小写表名"
document_type => "小写表名"
document_id => "%{主键}"
}
}
- 经过步骤一的完美配置后,开始启动
bin/logstash -f ../oracle2es.conf -w 3
正常运行不报错,赶紧上es看看,它有没有处理大字段,事实证明,还是"孪生兄弟"强,数据没问题,还是增量同步,而且大字段也都被转换为了字符串存储。针对大数据量还可以通过-w指定线程数,同时还可以指定batch数量。
- 那么问题来了,这配置文件虽然好用,但是那可是200多张表呢?总不能导入一张修改一下配置文件吧,当然不能,接下来就需要 shell 登场了
!> logstash + shell 脚本有没有搞头?当然有
讲过查询,logstash的配置文件可以读取外部设置的环境变量,那这个问题就妥了。
#!/bin/bash +x
#author:AmCoder
#设置logstash_path路径
logstash_path=/home/risen/logstash-5.4.3
#设置logstash_conf_path路径
logstash_conf_path=/home/risen/logstash-5.4.3/oracle2es.conf
if [ ! -n "$1" ] && [ ! -n "$2" ];then
echo "Usage: sh oracle2es.sh 表名 主键"
exit 1
fi
#由于logstash支持引用系统环境变量所以
##设置系统环境变量表名
export table_name=`echo $1 | tr 'a-z' 'A-Z'`
##设置系统环境变量表中的唯一主键
export unique_key=`echo $2 | tr 'a-z' 'A-Z'`
##设置索引名称(表名转小写--es中的索引不能是大写)
export es_index=`echo $table_name | tr 'A-Z' 'a-z'`
#启动logstash并指定配置文件
$logstash_path/bin/logstash -f $logstash_conf_path -w 5
只需要修改logstash的安装目录和配置文件的目录,然后将用到的公共变量(表名和主键)作为外部参数传入即可了。当然之前的logstash的配置文件Oracle2es.conf也是需要修改为以下的(局部地方采用变量的方式):
input {
jdbc {
jdbc_driver_library => "/home/risen/ojdbc.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@ip:port:orcl"
jdbc_user => "用户名"
jdbc_password => "密码"
schedule => "* * * * *"
statement => "select * from ${table_name}"
#增量标识字段名
tracking_column => "${unique_key}"
#是否使用字段值作为增量标识
use_column_value => true
#源表字段名导入ES后是否忽略大小写
lowercase_column_names=> false
#分页
jdbc_paging_enabled => "true"
#每页数据量
jdbc_page_size => "5000"
#默认时区
jdbc_default_timezone => "UTC"
}
}
output{
elasticsearch {
hosts =>["ip:port"]
index => "${es_index}"
document_type => "${es_index}"
document_id => "%{${unique_key}}"
}
}
至此,问题基本已经解决了,剩下的就是你需要拿到200多张表的表名称和主键了。大不了在写个脚本套一层,当然也可以在上面的脚本中添加即可。总结一下,需要知道logstash可全量可增量,还可以引用外部变量,既可以多线程又可以指定分页批次量。(但是,logstash目前还不支持国产服务器)