zoukankan      html  css  js  c++  java
  • es增量自定义更新的脚本

    安装需要可软件

    sudo apt-get install python-pip
    sudo pip install elasticsearch;
    sudo apt-get install python-dev
    sudo pip install MySQL-python

    导入脚本import.sh

    #!/bin/bash
    set -e
    
    bin=/usr/local/elasticsearch-jdbc-1.5.2.0/bin
    lib=/usr/local/elasticsearch-jdbc-1.5.2.0/lib
    
    echo '{
    "type" : "jdbc",
    "jdbc" : {
    "url" : "jdbc:mysql://192.168.10.29:3306/db_1",
    "user" : "root",
    "password" : "root",
    "sql" : "select * from '${1}' where dtTime>"'${2}'" ",
    "index": "db_1",
    "type": "'${1}'"
    }
    }' | java 
    -cp "${lib}/*" 
    -Dlog4j.configurationFile=${bin}/log4j2.xml 
    org.xbib.tools.Runner 
    org.xbib.tools.JDBCImporter
    
    if [ $? != 0 ];then
      exit -1
    fi

    python调用import.sh实现增量添加:

    #!/usr/bin/env python
    
    from datetime import datetime
    from elasticsearch import Elasticsearch
    import MySQLdb
    import time
    import os
    import subprocess
    
    es=Elasticsearch("192.168.10.29")
    
    def now():
        return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))
    
    def getLastTime(tableName):
        global es
        q={
          "aggs":
          {
             "max":{
                "max":{"field":"dtTime"}
              }
           }
        }
        dt=es.search(index="db_1",doc_type=tableName,body=q)['aggregations']['max']['value']
    
        if dt is None:
            return '2015-01-01 00:00:00'
        return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(dt/1000))
    
    def insert(tableName,dtLastTime):
        global es
        print tableName+" startTime:"+str(dtLastTime)
        print '/usr/local/elasticsearch-jdbc-1.5.2.0/bin/import.sh %s "%s"'%(tableName,str(dtLastTime))
    
        retCode = subprocess.call('/usr/local/elasticsearch-jdbc-1.5.2.0/bin/import.sh %s "%s"'%(tableName,str(dtLastTime)),shell=True)
    
        if retCode!=0:
            print "Import failed"
            return
        print "%s Import finished"%(now())
        es.indices.refresh(index="db_1")
    
    def increment():
    
        conn=MySQLdb.connect(host='192.168.10.29',port=3306,user='root',passwd='root',db ='db_1',)
    
        cur=conn.cursor()
        ret=cur.execute('select vTableName,dtLastTime from importinfo')
        ret=cur.fetchall()
        for line in ret:
            tableName=line[0]
            fileName=line[1].strftime("%Y-%m-%d-%H-%M-%S")
            dtLastTime=getLastTime(tableName)
            insert(tableName,dtLastTime)
        cur.close()
        conn.close()
    
    if __name__=="__main__":
        increment()
        #getLastTime("achi")
  • 相关阅读:
    解决了Excel的一个貌似很奇怪的问题~~~
    关闭子页面,刷新父页面
    动态控制DataGrid中的TextBox的状态及输入值!!
    C#对Oracle BLOB字段的写入读取方法
    谈恋爱,好累...
    可移植,可扩展高效Proactor模式
    When are asynchronous file writes not asynchronous...
    [转]How to support 10,000 or more concurrent TCP connections
    [转]Creating a forwarding dll
    [转]非金钱激励员工的108种手段
  • 原文地址:https://www.cnblogs.com/ggzone/p/10121197.html
Copyright © 2011-2022 走看看