zoukankan      html  css  js  c++  java
  • es增量自定义更新的脚本

    安装需要可软件

    sudo apt-get install python-pip
    sudo pip install elasticsearch;
    sudo apt-get install python-dev
    sudo pip install MySQL-python

    导入脚本import.sh

    #!/bin/bash
    set -e
    
    bin=/usr/local/elasticsearch-jdbc-1.5.2.0/bin
    lib=/usr/local/elasticsearch-jdbc-1.5.2.0/lib
    
    echo '{
    "type" : "jdbc",
    "jdbc" : {
    "url" : "jdbc:mysql://192.168.10.29:3306/db_1",
    "user" : "root",
    "password" : "root",
    "sql" : "select * from '${1}' where dtTime>"'${2}'" ",
    "index": "db_1",
    "type": "'${1}'"
    }
    }' | java 
    -cp "${lib}/*" 
    -Dlog4j.configurationFile=${bin}/log4j2.xml 
    org.xbib.tools.Runner 
    org.xbib.tools.JDBCImporter
    
    if [ $? != 0 ];then
      exit -1
    fi

    python调用import.sh实现增量添加:

    #!/usr/bin/env python
    
    from datetime import datetime
    from elasticsearch import Elasticsearch
    import MySQLdb
    import time
    import os
    import subprocess
    
    es=Elasticsearch("192.168.10.29")
    
    def now():
        return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))
    
    def getLastTime(tableName):
        global es
        q={
          "aggs":
          {
             "max":{
                "max":{"field":"dtTime"}
              }
           }
        }
        dt=es.search(index="db_1",doc_type=tableName,body=q)['aggregations']['max']['value']
    
        if dt is None:
            return '2015-01-01 00:00:00'
        return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(dt/1000))
    
    def insert(tableName,dtLastTime):
        global es
        print tableName+" startTime:"+str(dtLastTime)
        print '/usr/local/elasticsearch-jdbc-1.5.2.0/bin/import.sh %s "%s"'%(tableName,str(dtLastTime))
    
        retCode = subprocess.call('/usr/local/elasticsearch-jdbc-1.5.2.0/bin/import.sh %s "%s"'%(tableName,str(dtLastTime)),shell=True)
    
        if retCode!=0:
            print "Import failed"
            return
        print "%s Import finished"%(now())
        es.indices.refresh(index="db_1")
    
    def increment():
    
        conn=MySQLdb.connect(host='192.168.10.29',port=3306,user='root',passwd='root',db ='db_1',)
    
        cur=conn.cursor()
        ret=cur.execute('select vTableName,dtLastTime from importinfo')
        ret=cur.fetchall()
        for line in ret:
            tableName=line[0]
            fileName=line[1].strftime("%Y-%m-%d-%H-%M-%S")
            dtLastTime=getLastTime(tableName)
            insert(tableName,dtLastTime)
        cur.close()
        conn.close()
    
    if __name__=="__main__":
        increment()
        #getLastTime("achi")
  • 相关阅读:
    ORM框架
    js获取浏览器和元素对象的尺寸
    js中的兼容问题
    JS页面上的流氓广告功能
    JS计算1到10的每一个数字的阶乘之和
    JS中 有一个棋盘,有64个方格,在第一个方格里面放1粒芝麻重量是0.00001kg,第二个里面放2粒,第三个里面放4,棋盘上放的所有芝麻的重量
    JS中99乘法表
    JS 中计算 1
    JS中判断一个数是否为质数
    JS水仙花数
  • 原文地址:https://www.cnblogs.com/ggzone/p/5094474.html
Copyright © 2011-2022 走看看