zoukankan      html  css  js  c++  java
  • Oracle和Elasticsearch数据同步

    Python编写Oracle和Elasticsearch数据同步脚本

    标签: elasticsearchoraclecx_Oraclepython数据同步

    一、版本

    Python版本 x64 2.7.12 

    Oracle(x64 12.1.0.2.0)和Elasticsearch(2.2.0)

    python编辑器 PyCharm
     
    下载安装请选择适合自己机器的版本
     
    二、下载模块
    通过官网下载和安装cx_Oracle和pyes模块,分别用于操作Oracle数据库和ES。安装fcntl模块用于解决python脚本单例执行问题。

    如果是远程连接数据库和ES,请一定注意安装的模块或包版本。务必选择相应的版本,不然会遇到问题。

    三、安装过程中会遇到的问题

    cx_Oracle在本地安装过程中出现的一些问题:
    1、安装c++for python的环境
    2、安装Oracle数据库(或者安装API接口中需要的文件而不必下载配置整个oracle环境)
    3、打开数据库工具 oracle SQL developor 按要求创建连接,并新建用户(创建数据库用户名时以c##开头,不然会提示)
    4、oracle连接不上远程的服务器,检查版本是否匹配
     
    fcntl在windows上安装时出现的问题:
    1、用pip install fcntl 报错:indentationerror: unexpected indent(模块版本有问题)
     

    四、源码

    [python] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. # -*- coding: utf-8 -*-  
    2. """ 
    3. 作者:陈龙 
    4. 日期:2016-7-22 
    5. 功能:oracle数据库到ES的数据同步 
    6. """  
    7. import os  
    8. import sys  
    9. import datetime, time  
    10. # import fcntl  
    11. import threading  
    12. import pyes  # 引入pyes模块,ES接口  
    13. import cx_Oracle  # 引入cx_Oracle模块,Oracle接口  
    14.   
    15. os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'  # 中文编码  
    16. reload(sys)  # 默认编码设置为utf-8  
    17. sys.setdefaultencoding('utf-8')  
    18.   
    19. # 创建ES连接 并返回连接参数  
    20. def connect_ES(addr):  
    21.     try:  
    22.         global conn  
    23.         conn = pyes.ES(addr)  # 链接ES '127.0.0.1:9200'  
    24.         print 'ES连接成功'  
    25.         return conn  
    26.     except:  
    27.         print 'ES连接错误'  
    28.         pass  
    29.   
    30. # 创建ES映射mapping 注意各各个字段的类型  
    31. def create_ESmapping():  
    32.     global spiderInfo_mapping, involveVideo_mapping, involveCeefax_mapping,keyWord_mapping,sensitiveWord_mapping  
    33.     spiderInfo_mapping = {'tableName': {'index': 'not_analyzed', 'type': 'string'},  
    34.                           'tableId': {'index': 'not_analyzed', 'type': 'integer'},  
    35.                           'title': {'index': 'analyzed', 'type': 'string'},  
    36.                           'author': {'index': 'not_analyzed', 'type': 'string'},  
    37.                           'content': {'index': 'analyzed', 'type': 'string'},  
    38.                           'publishTime': {'index': 'not_analyzed', 'type': 'string'},  
    39.                           'browseNum': {'index': 'not_analyzed', 'type': 'integer'},  
    40.                           'commentNum': {'index': 'not_analyzed', 'type': 'integer'},  
    41.                           'dataType': {'index': 'not_analyzed', 'type': 'integer'}}  # 除去涉我部分内容的ES映射结构  
    42.     involveVideo_mapping = {'tableName': {'index': 'not_analyzed', 'type': 'string'},  
    43.                             'tableId': {'index': 'not_analyzed', 'type': 'integer'},  
    44.                             'title': {'index': 'analyzed', 'type': 'string'},  
    45.                             'author': {'index': 'not_analyzed', 'type': 'string'},  
    46.                             'summary': {'index': 'analyzed', 'type': 'string'},  
    47.                             'publishTime': {'index': 'not_analyzed', 'type': 'string'},  
    48.                             'url': {'index': 'not_analyzed', 'type': 'string'},  
    49.                             'imgUrl': {'index': 'not_analyzed', 'type': 'string'},  
    50.                             'ranking': {'index': 'not_analyzed', 'type': 'integer'},  
    51.                             'playNum': {'index': 'not_analyzed', 'type': 'integer'},  
    52.                             'dataType': {'index': 'not_analyzed', 'type': 'integer'}}  # 涉我视音频内容的ES映射结构  
    53.     involveCeefax_mapping = {'tableName': {'index': 'not_analyzed', 'type': 'string'},  
    54.                             'tableId': {'index': 'not_analyzed', 'type': 'integer'},  
    55.                             'title': {'index': 'analyzed', 'type': 'string'},  
    56.                             'author': {'index': 'not_analyzed', 'type': 'string'},  
    57.                             'content': {'index': 'analyzed', 'type': 'string'},  
    58.                             'publishTime': {'index': 'not_analyzed', 'type': 'string'},  
    59.                             'keyWords': {'index': 'not_analyzed', 'type': 'string'},  
    60.                             'popularity': {'index': 'not_analyzed', 'type': 'integer'},  
    61.                             'url': {'index': 'not_analyzed', 'type': 'string'},  
    62.                             'dataType': {'index': 'not_analyzed', 'type': 'integer'}}  # 涉我图文资讯内容的ES映射结构  
    63.     keyWord_mapping = {'id':{'index': 'not_analyzed', 'type': 'integer'},  
    64.                       'keywords':{'index': 'not_analyzed', 'type': 'string'}}  
    65.     sensitiveWord_mapping = {'id':{'index': 'not_analyzed', 'type': 'integer'},  
    66.                             'sensitiveType':{'index': 'not_analyzed', 'type': 'string'},  
    67.                             'sensitiveTopic': {'index': 'not_analyzed', 'type': 'string'},  
    68.                             'sensitiveWords': {'index': 'not_analyzed', 'type': 'string'}}  
    69.   
    70. # 创建ES相关索引和索引下的type  
    71. def create_ESindex(ES_index, index_type1,index_type2,index_type3,index_type4,index_type5):  
    72.   
    73.     if conn.indices.exists_index(ES_index):  
    74.         pass  
    75.     else:  
    76.         conn.indices.create_index(ES_index)  # 如果所有Str不存在,则创建Str索引  
    77.         create_ESmapping()  
    78.         conn.indices.put_mapping(index_type1, {'properties': spiderInfo_mapping},[ES_index])  # 在索引pom下创建spiderInfo的_type  "spiderInfo"  
    79.         conn.indices.put_mapping(index_type2, {'properties': involveVideo_mapping},[ES_index])  # 在索引pom下创建involveVideo的_type  "involveVideo"  
    80.         conn.indices.put_mapping(index_type3, {'properties': involveCeefax_mapping},[ES_index])  # 在索引pom下创建involveCeefax的_type  "involveCeefax"  
    81.         conn.indices.put_mapping(index_type4, {'properties': keyWord_mapping}, [ES_index])  
    82.         conn.indices.put_mapping(index_type5, {'properties': sensitiveWord_mapping}, [ES_index])  
    83.     # conn.ensure_index  
    84.   
    85. # 创建数据库连接 并返回连接参数  
    86. def connect_Oracle(name, password, address):  
    87.     try:  
    88.         global conn1  
    89.         # conn1 = cx_Oracle.connect('c##chenlong','1234567890','localhost:1521/ORCL') #链接本地数据库  
    90.         conn1 = cx_Oracle.connect(name, password, address)  # 链接远程数据库 "pom","Bohui@123","172.17.7.118:1521/ORCL"  
    91.         print 'Oracle连接成功'  
    92.         return conn1  
    93.     except:  
    94.         print 'ES数据同步脚本连接不上数据库,请检查connect参数是否正确,或者模块版本是否匹配'  
    95.         pass  
    96.   
    97. def fetch_account(accountcode):  # 取两个‘_’之间的账号名称  
    98.     end = accountcode.find('_')  
    99.     return accountcode[0:end].strip()  
    100. # 根据表的个数创建不同的对象  
    101. # 从记录文档中读取各个表的记录ID,判断各个表的ID是否有变化  
    102. # 分别读取各个表中的相关数据  
    103.   
    104. # 读取各个表的ID与记录的ID(记录在文本或者数据库中)并判断  
    105. """def read_compare_ID(): 
    106.     global tuple_tableName_IdNum 
    107.     global cur 
    108.     tuple_tableName_IdNum = {} 
    109.     tablename = [] 
    110.     cur = conn1.cursor() 
    111.     result1 = cur.execute("select * from tabs")  ##执行数据库操作 读取各个表名 
    112.     row = result1.fetchall() 
    113.     for x in row: 
    114.         tablename.append(x[0])  # 将表名取出并赋值给tablename数组 
    115.         result2 = cur.execute('select {}_ID  from {}'.format(x[0], x[0])) 
    116.         ID_num = result2.fetchall() 
    117.         tuple_tableName_IdNum[x[0]] = ID_num"""  
    118.   
    119. def readOracle_writeES(tableName, ES_index, index_type):  
    120.     global cc  
    121.     cur = conn1.cursor()  
    122.     #result_AlltableNames = cur.execute("select * from tabs")  
    123.     result_latestId = cur.execute("select max({}_Id) from {} ".format(tableName,tableName))  
    124.     num1 = result_latestId.fetchone() #当前表中的最大ID  
    125.     print '当前表中的最大ID{}'.format(num1[0])  
    126.     result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName.upper())) #通过数据库表拿到更新的ID tablename 都转化成大写  
    127.     num2 = result_rememberId.fetchone() #上次记录的更新ID  
    128.     print '上次记录的更新ID{}'.format(num2[0])  
    129.     if tableName.upper() == 'T_SOCIAL':  
    130.         while num2[0] < num1[0]:  
    131.             result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,likeNum,forwardNum,commentNum,accountCode from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))  
    132.             result_tuple1 = result_readOracle.fetchall()  #之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率  
    133.             for i in result_tuple1:  #一条一条写入ES,这个速度太慢,改进 通过bulk接口导入  
    134.                 aa= (i[5]+i[6])  
    135.                 bb=  (i[7]+i[8])  
    136.                 if conn.index(  
    137.                     {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[1]), 'author': unicode(i[2]),  
    138.                     'content': unicode(i[3]), 'publishTime': str(i[4]), 'browseNum': aa,  
    139.                     'commentNum':bb, 'dataType':fetch_account(i[9])}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo  
    140.                     cc += 1  
    141.                     print 'bulk导入后的ID:{}'.format(i[0])  
    142.             rememberId = i[0] #如果写入成功才赋值  
    143.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId,tableName))  
    144.             conn1.commit()  
    145.             result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID  
    146.             num2 = result_rememberId.fetchone()  
    147.         print "{}读{}写成功".format(tableName,index_type)  
    148.     if tableName.upper() == 'T_HOTSEARCH':  
    149.         while num2[0] < num1[0]:  
    150.             result_readOracle = cur.execute("select {}_ID,accountCode,title,publishTime from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))  
    151.             result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率  
    152.             for i in result_tuple1:  #一条一条写入ES,这个速度太慢,改进 通过bulk接口导入  
    153.                 if conn.index(  
    154.                     {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': '','content': '', 'publishTime': str(i[3]), 'browseNum': 0,  
    155.                     'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo  
    156.                     cc += 1  
    157.                     print 'bulk导入后的ID:{}'.format(i[0])  
    158.             rememberId = i[0]  
    159.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
    160.             conn1.commit()  
    161.             result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID  
    162.             num2 = result_rememberId.fetchone()  
    163.         print "{}读{}写成功".format(tableName, index_type)  
    164.     if tableName.upper() == 'T_VIDEO_HOT':  
    165.         while num2[0] < num1[0]:  
    166.             result_readOracle = cur.execute("select {}_ID,accountCode,title,Author,publishTime from {} where {}_ID > {} and rownum<=40 ".format(tableName,tableName,tableName,num2[0]))  
    167.             result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率  
    168.             for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?  
    169.                 if conn.index(  
    170.                     {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': unicode(i[3]),  
    171.                     'content': '', 'publishTime': str(i[4]), 'browseNum': 0,  
    172.                     'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type, bulk=True):  # 将数据写入索引pom的spiderInfo  
    173.                     cc += 1  
    174.                     print 'bulk导入后的ID:{}'.format(i[0])  
    175.             rememberId = i[0]  
    176.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
    177.             conn1.commit()  
    178.             result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID  
    179.             num2 = result_rememberId.fetchone()  
    180.         print "{}读写成功".format(tableName)  
    181.     if tableName.upper() == 'T_PRESS':  
    182.         while num2[0] < num1[0]:  
    183.             result_readOracle = cur.execute(  
    184.                 "select {}_ID,accountCode,title,Author,PublishDate,Content from {} where {}_ID > {} and rownum<=40 ".format(  
    185.                     tableName, tableName, tableName, num2[0]))  
    186.             result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率  
    187.             for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?  
    188.                 if conn.index(  
    189.                     {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': unicode(i[3]),  
    190.                     'content': unicode(i[5]), 'publishTime': str(i[4]), 'browseNum': 0,  
    191.                     'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo  
    192.                     cc += 1  
    193.                     print 'bulk导入后的ID:{}'.format(i[0])  
    194.             rememberId = i[0]  
    195.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
    196.             conn1.commit()  
    197.             result_rememberId = cur.execute(  
    198.                 "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID  
    199.             num2 = result_rememberId.fetchone()  
    200.         print "{}读写成功".format(tableName)  
    201.     if tableName.upper() == 'T_INDUSTRY':  
    202.         while num2[0] < num1[0]:  
    203.             result_readOracle = cur.execute(  
    204.                 "select {}_ID,accountCode,title,Author,PublishTime,Content,BrowseNum from {} where {}_ID > {} and rownum<=40 ".format(  
    205.                     tableName, tableName, tableName, num2[0]))  
    206.             result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率  
    207.   
    208.             for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?  
    209.                 if conn.index(  
    210.                     {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': unicode(i[3]),  
    211.                     'content': unicode(i[5]), 'publishTime': str(i[4]), 'browseNum': i[6],  
    212.                     'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True) : # 将数据写入索引pom的spiderInfo  
    213.                     cc += 1  
    214.                     print 'bulk导入后的ID:{}'.format(i[0])  
    215.             rememberId = i[0]  
    216.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
    217.             conn1.commit()  
    218.             result_rememberId = cur.execute(  
    219.                 "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID  
    220.             num2 = result_rememberId.fetchone()  
    221.         print "{}读写成功".format(tableName)  
    222.     if tableName.upper() == 'T_SOCIAL_SITESEARCH':  
    223.         while num2[0] < num1[0]:  
    224.             result_readOracle = cur.execute('select {}_ID,title,author,content,publishTime,keyWords,browseNum,likeNum,forwardNum,commentNum,url,accountCode from {} where ({}_ID > {})'.format(tableName, tableName, tableName, num2[0]))  
    225.             result_tuple1 = result_readOracle.fetchmany(50)  #因为数据量太大,超过了变量的内存空间,所以一次性取40条  
    226.             for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?  
    227.                 popularity = (i[6] + i[7] + i[8] * 2 + i[9] * 2)  
    228.                 if conn.index(  
    229.                     {'tableName': tableName,'tableId':i[0],'title': unicode(i[1]),'author':unicode(i[2]),  
    230.                     'content':unicode(i[3]),'publishTime':str(i[4]),'keyWords':unicode(i[5]),  
    231.                     'popularity':popularity,'url': i[10],  
    232.                     'dataType':fetch_account(i[11])}, ES_index, index_type, bulk=True):  # 将数据写入索引pom的spiderInfo  
    233.                     cc += 1  
    234.                     print 'bulk导入后的ID:{}'.format(i[0])  
    235.             rememberId = i[0]  
    236.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId,tableName))  
    237.             conn1.commit()  
    238.             result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID  
    239.             num2 = result_rememberId.fetchone()  
    240.         print "{}读写成功".format(tableName)  
    241.     if tableName.upper() == 'T_REALTIME_NEWS':  
    242.         while num2[0] < num1[0]:  
    243.             result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,commentNum,accountCode,url from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))  
    244.             result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率  
    245.             for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?  
    246.                 popularity = (i[5] + i[6] * 2)  
    247.                 if conn.index(  
    248.                     {'tableName': tableName,'tableId':i[0],'title': unicode(i[1]),'author':unicode(i[2]),  
    249.                     'content':unicode(i[3]),'publishTime':str(i[4]),'keyWords':unicode(''),  
    250.                     'popularity':popularity,'url': i[8],'dataType':fetch_account(i[7])}, ES_index, index_type, bulk=True):  # 将数据写入索引pom的spiderInfo  
    251.                     cc += 1  
    252.                     print 'bulk导入后的ID:{}'.format(i[0])  
    253.             rememberId = i[0]  
    254.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
    255.             conn1.commit()  
    256.             result_rememberId = cur.execute(  
    257.                 "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID  
    258.             num2 = result_rememberId.fetchone()  
    259.         print "{}读{}写成功".format(tableName, index_type)  
    260.     if tableName.upper() == 'T_KEY_NEWS':  
    261.         while num2[0] < num1[0]:  
    262.             result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,commentNum,accountCode,url from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))  
    263.             result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率  
    264.             for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?  
    265.                 popularity = (i[5] + i[6] * 2)  
    266.                 if conn.index(  
    267.                     {'tableName': tableName,'tableId':i[0],'title': unicode(i[1]),'author':unicode(i[2]),  
    268.                     'content':unicode(i[3]),'publishTime':str(i[4]),'keyWords':unicode(''),  
    269.                     'popularity':popularity,'url': i[8],'dataType':fetch_account(i[7])}, ES_index, index_type, bulk=True):  # 将数据写入索引pom的spiderInfo  
    270.                     cc += 1  
    271.                     print 'bulk导入后的ID:{}'.format(i[0])  
    272.             rememberId = i[0]  
    273.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
    274.             conn1.commit()  
    275.             result_rememberId = cur.execute(  
    276.                 "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID  
    277.             num2 = result_rememberId.fetchone()  
    278.         print "{}读{}写成功".format(tableName, index_type)  
    279.     if tableName.upper() == 'T_LOCAL_NEWS':  
    280.         while num2[0] < num1[0]:  
    281.             result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,commentNum,accountCode,url from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))  
    282.             result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率  
    283.             for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?  
    284.                 popularity = (i[5] + i[6] * 2)  
    285.                 if conn.index(  
    286.                     {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[1]), 'author': unicode(i[2]),  
    287.                     'content': unicode(i[3]), 'publishTime': str(i[4]), 'keyWords': unicode(''),  
    288.                     'popularity': popularity, 'url': i[8], 'dataType': fetch_account(i[7])}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo  
    289.                     cc += 1  
    290.                     print 'bulk导入后的ID:{}'.format(i[0])  
    291.             rememberId = i[0]  
    292.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
    293.             conn1.commit()  
    294.             result_rememberId = cur.execute(  
    295.                 "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID  
    296.             num2 = result_rememberId.fetchone()  
    297.         print "{}读{}写成功".format(tableName, index_type)  
    298.     if tableName.upper() == 'T_VIDEO_SITESEARCH':  
    299.         while num2[0] < num1[0]:  
    300.             result_readOracle = cur.execute("select {}_ID,accountCode,title,Author,publishTime,url,imgUrl,playNum,keyWords from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))  
    301.             result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率  
    302.             for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?  
    303.                 if conn.index(  
    304.                     {  
    305.                     'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]), 'author': unicode(i[3]),  
    306.                     'summary': unicode('0'), 'publishTime': str(i[4]), 'browseNum': i[7],'url':i[5],'imgUrl':i[6],'ranking':0,  
    307.                     'playNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo  
    308.                     cc += 1  
    309.                     print 'bulk导入后的ID:{}'.format(i[0])  
    310.             rememberId = i[0]  
    311.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
    312.             conn1.commit()  
    313.             result_rememberId = cur.execute(  
    314.                 "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID  
    315.             num2 = result_rememberId.fetchone()  
    316.         print "{}读{}写成功".format(tableName,index_type)  
    317.     if tableName.upper() == 'T_BASE_KEYWORDS':  
    318.         while num2[0] < num1[0]:  
    319.             result_readOracle = cur.execute('select {}_ID,keywords from {} where {}_ID > {} and rownum<=50'.format(tableName, tableName, tableName, num2[0]))  
    320.             result_tuple1 = result_readOracle.fetchall()  #因为数据量太大,超过了变量的内存空间,所以一次性取40条  
    321.             for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?  
    322.                 if conn.index({'id': i[0], 'keywords': i[1]}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo  
    323.                     cc += 1  
    324.                     print 'bulk导入后的ID:{}'.format(i[0])  
    325.             rememberId = i[0]  
    326.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId,tableName))  
    327.             conn1.commit()  
    328.             result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID  
    329.             num2 = result_rememberId.fetchone()  
    330.         print "{}读写成功".format(tableName)  
    331.     if tableName.upper() == 'T_BASE_SENSITIVEWORDS':  
    332.         while num2[0] < num1[0]:  
    333.             result_readOracle = cur.execute('select {}_ID,SensitiveType,SensitiveTopic,SensitiveWords from {} where {}_ID > {} and rownum<=50'.format(tableName, tableName, tableName,num2[0]))  
    334.             result_tuple1 = result_readOracle.fetchall()  # 因为数据量太大,超过了变量的内存空间,所以一次性取40条  
    335.             for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?  
    336.                 if conn.index({'id':i[0],  
    337.                             'sensitiveType':unicode(i[1]),  
    338.                             'sensitiveTopic': unicode(i[2]),  
    339.                             'sensitiveWords':unicode(i[3])}, ES_index, index_type, bulk=True):  # 将数据写入索引pom的spiderInfo  
    340.                     cc +=1  
    341.             print 'bulk导入后的ID:{}'.format(i[0])  
    342.             rememberId = i[0]  
    343.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
    344.             conn1.commit()  
    345.             result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID  
    346.             num2 = result_rememberId.fetchone()  
    347.         print "{}读写成功".format(tableName)  
    348.     else:  
    349.         pass  
    350.   
    351. def ww(a):  
    352.     while True:  
    353.         print a  
    354.         time.sleep(0.5)  #用于多线程的一个实验函数  
    355.   
    356. if __name__ == "__main__":  
    357.     cc = 0  
    358.     connect_ES('172.17.5.66:9200')  
    359.     # conn.indices.delete_index('_all')  # 清除所有索引  
    360.     create_ESindex("pom", "spiderInfo", "involveVideo", "involveCeefax","keyWord","sensitiveWord")  
    361.     connect_Oracle("pom", "Bohui@123", "172.17.7.118:1521/ORCL")  
    362.     # thread.start_new_thread(readOracle_writeES,("T_SOCIAL","pom","spiderInfo"),)#创建一个多线程  
    363.     # thread.start_new_thread(readOracle_writeES,("T_SOCIAL_SITESEARCH", "pom", "spiderInfo"),)#创建一个多线程  
    364.     mm = time.clock()  
    365.     readOracle_writeES("T_SOCIAL", "pom", "spiderInfo") #表名虽然在程序中设置了转化为大写,但是还是全大写比较好  
    366.     readOracle_writeES("T_HOTSEARCH", "pom", "spiderInfo")  
    367.     readOracle_writeES("T_VIDEO_HOT", "pom", "spiderInfo")  
    368.     readOracle_writeES("T_PRESS", "pom", "spiderInfo")  
    369.     readOracle_writeES("T_INDUSTRY", "pom", "spiderInfo")  
    370.     readOracle_writeES("T_VIDEO_SITESEARCH", "pom", "involveVideo")  
    371.     readOracle_writeES("T_REALTIME_NEWS", "pom", "involveCeefax")  
    372.     readOracle_writeES("T_KEY_NEWS", "pom", "involveCeefax")  
    373.     readOracle_writeES("T_LOCAL_NEWS", "pom", "involveCeefax")  
    374.     readOracle_writeES("T_SOCIAL_SITESEARCH", "pom", "involveCeefax")  
    375.     readOracle_writeES("T_BASE_KEYWORDS", "pom", "keyWord")  
    376.     readOracle_writeES("T_BASE_SENSITIVEWORDS", "pom", "sensitiveWord")  
    377.     nn = time.clock()  
    378.     # conn.indices.close_index('pom')  
    379.     conn1.close()  
    380.     print '数据写入耗时:{}  成功写入数据{}条'.format(nn-mm,cc)  
    381.   
    382. #实验多线程  
    383.     """ 
    384.     while a < 100: 
    385.         conn.index( 
    386.             {'tableName': 'T_base_account', 'type': '1', 'tableId': '123', 'title': unicode('陈龙'), 'author': 'ABC', 
    387.             'content': 'ABC', 'publishTime': '12:00:00', 'browseNum': '12', 'commentNum': '12', 'dataType': '1'}, 
    388.             "pom", "spiderInfo", )  # 将数据写入索引pom的spiderInfo 
    389.         a += 1 
    390.     print time.ctime() 
    391.     """  
    392. """ 
    393.     threads = [] 
    394.     t1 = threading.Thread(target=readOracle_writeES,args=("T_SOCIAL","pom","spiderInfo")) 
    395.     threads.append(t1) 
    396.     #t3 = threading.Thread(target=ww,args=(10,)) 
    397.     #threads.append(t3) 
    398.     #t2 = threading.Thread(target=readOracle_writeES,args=("T_SOCIAL_SITESEARCH", "pom", "spiderInfo")) 
    399.     #threads.append(t2) 
    400.     print time.ctime() 
    401.     for t in threads: 
    402.         t.setDaemon(True) 
    403.         t.start() 
    404.     t.join() 
    405. """  

    五、编译过程的问题
     
    1、直接print游标cur.execute ( ) 将不能得到我们想要的结果
     
    result2 = cur.execute('select T_SOCIAL_ID from T_SOCIAL')
    print result2
    返回:<__builtin__.OracleCursor on <cx_Oracle.Connection to pom@172.17.7.118:1521/ORCL>>
     
     
    result2 = cur.execute('select T_SOCIAL_ID  from T_SOCIAL')
    print result2
    num = result2.fetchall()
    print num
    for i in num:
        print i[0]

     
    返回:[(55,), (56,), (57,), (58,), (59,), (60,), (61,), (62,), (63,), (64,), (65,), (66,), (67,), (68,), (69,), (70,)]
         55
    注意:用fetchall()得到的数据为:[(55,), (56,), (57,), (58,), (59,)] 元组而不是数字。
    用 变量[num] 的方式取出具体的数值
     
    2、cx_Oracle中文编码乱码问题
     
    显示中文乱码:������DZ��� �����������
    或者显示未知的编码:('xcexd2xd5xe6xb5xc4xcaxc7xb1xeaxccxe2',)
    需要注意一下几个地方,将数据库中的中文编码转化成utf-8编码,并将中文写入elasticsearch
     
    os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8' #中文编码
     
     
    reload(sys) #默认编码设置为utf-8 一定需要reload(sys)
    sys.setdefaultencoding('utf-8')
     
    'title':unicode('中文')
     
    python传递给js的列表中文乱码怎么解决?  
    json.dumps(dictionary,ensure_ascii=False)
     
     
    3、远程连接不上Oracle数据库的问题
     
    第一:确保connect()中各个参数的值都正确。例如
     
    conn1 = cx_Oracle.connect("username","password","172.17.7.118:1521/ORCL")  #连接远程数据库
    conn1 = cx_Oracle.connect('username','password','localhost:1521/ORCL') #连接本地数据库
    conn2 = pyes.ES('127.0.0.1:9200')  #连接ES
     
    第二:确保安装的版本都符合要求,包括模块的版本。
     
    4、提示TypeError: 'NoneType' object is not callable
     
    确保mapping中的各个字段类型都设置正确
    检查索引和映射是否都书写正确
     
    5、脚本同时读取多个数据库表
    涉及到Python中多线程的问题,给每一个表起一个线程,同时给每一个线程加锁
    编译时碰到问题:AssertionError: group argument must be None for now(检查函数是否书写正确,读写冲突)
    AttributeError: 'builtin_function_or_method' object has no attribute 'setDaemon'
    cx_Oracle.ProgrammingError: LOB variable no longer valid after subsequent fetch(fetchall数据量过大,溢出 设置一次取数据库中 rownum数)
    TypeError: 'NoneType' object has no attribute '__getitem__'  (注意数据库查询对应的大小写)
    No handlers could be found for logger "pyes"  可能是连接超时
    AttributeError: 'tuple' object has no attribute 'append'   tuple不能直接用append
    TypeError: 'tuple' object does not support item assignment  tuple不能赋值
    数据库批量读取
    就多线程问题咨询了大神,大神建议用多进程来实现会比较简单
     
    6、脚本定时触发问题
    Linux crontab定时执行任务,crontab防止脚本周期内未执行完重复执行
     
     
    7、单实例的问题。防止脚本没有执行完再次触发
    刚开始设想在脚本中完成,后来知道这个可以在系统中设定
     
    8、数据同步插件
    网上有大量的关于同步关系型数据库的有关插件 logstash-input-jdbc  不太好安装,不知道如何使用。
    MySQL和ES同步插件的介绍,例如elasticsearch-river-jdbc
    在这儿启用的是bulk接口,批量导入。数据同步的速度大大提高
     
    9、判断数据是否同步成功
    这个是之前一直没有注意的问题,但其实在数据传输的时候是非常重要的。
    目前的判断方法是看ES中的数据量到底有多少,然后对照统计量进行判断分析,,这也是在后期发现有部分数据没有同步过去的方法。
     
    10、统计写入了多少数据
    UnboundLocalError: local variable 'cc' referenced before assignment 
    定义了全局变量cc,但是在局部进行了修改,所以报错 修改同名的全局变量,则认为是一个局部变量
     
    五、源码改进
    因为数据写入的速度太慢(40条数据 800Kb大小 写入花费2S左右),所有在原来的基础上,修改了读取数据库中未写入内容的策略和ES写入的策略。
     
    插入完成的源码
     
    调试问题:
    1、pip install elasticsearch  引入helpers函数模块,使用bulk函数批量导入。
    2、AttributeError: 'ES' object has no attribute 'transport'  因为原来使用的是pyes模块 现在换成了elasticsearch,所以改成对应模块
    conn2 = Elasticsearch("127.0.0.1:9200")
    其他常见错误
        SerializationError:JSON数据序列化出错,通常是因为不支持某个节点值的数据类型
        RequestError:提交数据格式不正确
        ConflictError:索引ID冲突
        TransportError:连接无法建立
     
    最后通过了解其实是找到了数据同步的插件 logstash-input-jdbc 能够实现数据的同步增删改查,按照网上的教程能够很轻松的实现,遇到的问题就是插件同步过去的字段都必须是小写。
     
    ------------
    Python中cx_Oracle的一些函数:
    commit() 提交
    rollback() 回滚
    cursor用来执行命令的方法:
    callproc(self, procname, args):用来执行存储过程,接收的参数为存储过程名和参数列表,返回值为受影响的行数
    execute(self, query, args):执行单条sql语句,接收的参数为sql语句本身和使用的参数列表,返回值为受影响的行数
    executemany(self, query, args):执行单挑sql语句,但是重复执行参数列表里的参数,返回值为受影响的行数
    nextset(self):移动到下一个结果集

    cursor用来接收返回值的方法:
    fetchall(self):接收全部的返回结果行.
    fetchmany(self, size=None):接收size条返回结果行.如果size的值大于返回的结果行的数量,则会返回cursor.arraysize条数据.
    fetchone(self):返回一条结果行.
    scroll(self, value, mode='relative'):移动指针到某一行.如果mode='relative',则表示从当前所在行移动value条,如果 mode='absolute',则表示从结果集的第一行移动value条.
    MySQL中关于中文编码的问题
    conn = MySQLdb.Connect(host='localhost', user='root', passwd='root', db='python') 中加一个属性:
    conn = MySQLdb.Connect(host='localhost', user='root', passwd='root', db='python',charset='utf8') 
    charset是要跟你数据库的编码一样,如果是数据库是gb2312 ,则写charset='gb2312'。
  • 相关阅读:
    loadrunner 录制login(关联 参数化 检查点)
    nmon安装(转)
    理解Load Average做好压力测试
    内存泄漏
    上下文切换
    Jmeter 多台机器产生负载
    Java jvm 原理
    【转】HTML-based script和URL-based script两种脚本录制方式
    cookie模拟
    startUML破解方式
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/6075457.html
Copyright © 2011-2022 走看看