主要用到了一个 JDBC importer for Elasticsearch的库。
想要增量同步,有一些先决条件。首先数据库中要维护一个update_time的时间戳,这个字段表示了该记录的最后更新时间。然后用上面的那个库,定时执行一个任务,这个任务中执行的sql就是根据时间戳判断该记录是否应该被更新。
这里先写一个最简单的例子来展示一下。
从上方插件官网中下载适合的dist包,然后解压。进入bin目录,可以看到一堆sh脚本。在bin目录下创建一个test.sh:
bin=/home/csonezp/Dev/elasticsearch-jdbc-2.3.1.0/bin lib=/home/csonezp/Dev/elasticsearch-jdbc-2.3.1.0/lib echo '{ "type" : "jdbc", "statefile" : "statefile.json", "jdbc": { "url" : "jdbc:mysql://myaddr", "user" : "myuser", "password" : "mypwd", "type" : "mytype", "index": "myindex", "schedule" : "0 * * * * ?", "metrics" : { "enabled" : true }, "sql" : [ { "statement" : "select * from gd_actor_info where update_time > ?", "parameter" : [ "$metrics.lastexecutionstart" ] } ] } }' | java -cp "${lib}/*" -Dlog4j.configurationFile=${bin}/log4j2.xml org.xbib.tools.Runner org.xbib.tools.JDBCImporter
schedule现在设置成每分钟都执行一次,是为了方便观察行为。statefile这一句是一定要加的。$metrics.lastexecutionstart就是这个脚本的关键所在了,这个指的是上一次脚本执行的时间,可以通过比较这个时间和数据库里的字段来判断是否要更新。