zoukankan      html  css  js  c++  java
  • es增量自定义更新的脚本

    安装需要可软件

    sudo apt-get install python-pip
    sudo pip install elasticsearch;
    sudo apt-get install python-dev
    sudo pip install MySQL-python

    导入脚本import.sh

    #!/bin/bash
    set -e
    
    bin=/usr/local/elasticsearch-jdbc-1.5.2.0/bin
    lib=/usr/local/elasticsearch-jdbc-1.5.2.0/lib
    
    echo '{
    "type" : "jdbc",
    "jdbc" : {
    "url" : "jdbc:mysql://192.168.10.29:3306/db_1",
    "user" : "root",
    "password" : "root",
    "sql" : "select * from '${1}' where dtTime>"'${2}'" ",
    "index": "db_1",
    "type": "'${1}'"
    }
    }' | java 
    -cp "${lib}/*" 
    -Dlog4j.configurationFile=${bin}/log4j2.xml 
    org.xbib.tools.Runner 
    org.xbib.tools.JDBCImporter
    
    if [ $? != 0 ];then
      exit -1
    fi

    python调用import.sh实现增量添加:

    #!/usr/bin/env python
    
    from datetime import datetime
    from elasticsearch import Elasticsearch
    import MySQLdb
    import time
    import os
    import subprocess
    
    es=Elasticsearch("192.168.10.29")
    
    def now():
        return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))
    
    def getLastTime(tableName):
        global es
        q={
          "aggs":
          {
             "max":{
                "max":{"field":"dtTime"}
              }
           }
        }
        dt=es.search(index="db_1",doc_type=tableName,body=q)['aggregations']['max']['value']
    
        if dt is None:
            return '2015-01-01 00:00:00'
        return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(dt/1000))
    
    def insert(tableName,dtLastTime):
        global es
        print tableName+" startTime:"+str(dtLastTime)
        print '/usr/local/elasticsearch-jdbc-1.5.2.0/bin/import.sh %s "%s"'%(tableName,str(dtLastTime))
    
        retCode = subprocess.call('/usr/local/elasticsearch-jdbc-1.5.2.0/bin/import.sh %s "%s"'%(tableName,str(dtLastTime)),shell=True)
    
        if retCode!=0:
            print "Import failed"
            return
        print "%s Import finished"%(now())
        es.indices.refresh(index="db_1")
    
    def increment():
    
        conn=MySQLdb.connect(host='192.168.10.29',port=3306,user='root',passwd='root',db ='db_1',)
    
        cur=conn.cursor()
        ret=cur.execute('select vTableName,dtLastTime from importinfo')
        ret=cur.fetchall()
        for line in ret:
            tableName=line[0]
            fileName=line[1].strftime("%Y-%m-%d-%H-%M-%S")
            dtLastTime=getLastTime(tableName)
            insert(tableName,dtLastTime)
        cur.close()
        conn.close()
    
    if __name__=="__main__":
        increment()
        #getLastTime("achi")
  • 相关阅读:
    ORACLE CASE WHEN 及 SELECT CASE WHEN的使用方法
    myeclipse6.0下载及注冊码
    tcpdump抓包分析具体解释
    经常使用的android弹出对话框
    我对Laravel ThinkPHP Yii symfony2 CI cakephp 的看法
    MFC原创:三层架构01(人事管理系统)DAL
    struts2操作数据库
    EJB究竟是什么,真的那么神奇吗??
    Android简单发送邮件(可带附件)
    java实现SPFA算法
  • 原文地址:https://www.cnblogs.com/ggzone/p/5094474.html
Copyright © 2011-2022 走看看