zoukankan      html  css  js  c++  java
  • 【转】用python比对数据库表数据的脚本

    最近在做一个数据库异构复制的项目,客户表示需要一个数据比对的工具,我就自己写了一个异构数据库的比对python脚本.这个比对脚本只能比对数量,不能比对具体的记录.使用的sql语句也是最基础的select count(*) 这种,没有开并发所以对大表可能比对时间稍长.

    基本原理是将需要比对的数据写到一张表里,先读取那个表里的数据,取出需要比对的表.然后创建多进程,同时在原端和目标端count.然后将count的结果写到一个excel文件中.

    其中最关键的就是那张表.只要将那张表里的数据搞对了,基本就不会有什么问题.

    目前支持的数据库有oracle,mysql,postgresql,sqlserver.程序分为三个部分

    1.数据库配置文件

    首先需要在python代码的相同目录下写一个名为check.ini的配置文件.下面一个配置文件例子:

    [DATA]
    #配置原端数据库,下面的ORACLE需要与后面的项匹配
    source=ORACLE
    #配置目标端数据库,下面的POSTGRESQL需要与后面的项匹配
    target=POSTGRESQL
    #配置比对表的数据库,需要与下面的配置项匹配
    check_node=ORACLE
    #配置比对表数据库的用户,如果是oracle是用户,如果是mysql,pg,mssql则是数据库名
    check_owner=suq
    #配置比对表的表名,区分大小写
    check_table=check_table
    
    #配置mysql的连接串.注意MYSQL必须大写而且必须是以MYSQL开头,例如想比对多个mysql可以写MYSQL1,MYSQL2等
    #下面的几个配置同样需要以相应例子开头,因为程序就是以项的开头来确认是哪种数据库的
    [MYSQL]
    db_host=192.168.56.25
    db_port=3306
    db_user=root
    db_pwd=root
    db_dbname=major
    
    
    [ORACLE]
    db_host=192.168.56.30
    db_port=1521
    db_user=dsg
    db_pwd=dsg
    db_sid=bre1
    
    
    
    [POSTGRESQL]
    db_host=192.168.56.50
    db_port=5432
    db_user=postgres
    db_pwd=postgres
    db_dbname=msgdb
    
    
    
    [MSSQL]
    db_host=192.168.56.101
    db_port=1433
    db_user=sa
    db_pwd=sa
    db_dbname=master

    2.创建一个比对表.

    例如我上面的例子放在suq用户下的check_table中

    具体的表结构如下:

    [html] view plain copy
     
    1. SQL> desc check_table  
    2.  Name                      Null?    Type  
    3.  ----------------------------------------- -------- ----------------------------  
    4.  SOWNER                         VARCHAR2(30)  
    5.  SNAME                          VARCHAR2(30)  
    6.  TOWNER                         VARCHAR2(30)  
    7.  TNAME                          VARCHAR2(30)  

    分别表示原端的用户名,表名,目标端用户名表名,如果不是用户的那么就是数据库名.

    看一下表内我的测试数据:

    [html] view plain copy
     
    1. SQL> select * from check_table;  
    2.   
    3. SOWNER                 SNAME                  TOWNER                 TNAME  
    4. ------------------------------ ------------------------------ ------------------------------ ------------------------------  
    5. suq                "t1"               suq                t1  
    6. suq                "t2"               suq                t2  
    7. suq                "t3"               suq                t3  
    8. suq                "t4"               suq                t4  

    这里的数据要特别注意,必须写对否则可能运行会报错.需要注意的一般原因是不同的数据库对大小写敏感不同.因此建议在写好这些数据后,手动到数据库查一下,例如

    select count(*) from suq."t1"

    看这样的sql对不对.

    3.就是主程序

    需要注意的是我连接各种数据库分别使用的如下python模块,写excel使用XlsxWriter模块:

    [html] view plain copy
     
    1. C:Users hink>pip list  
    2. cx-Oracle (5.2.1)  
    3. MySQL-python (1.2.4)  
    4. psycopg2 (2.6.2)  
    5. pymssql (2.1.3)  
    6. XlsxWriter (0.8.5)  

    下面是具体的python代码:

    #coding:utf-8
    import cx_Oracle as ora
    import MySQLdb as my
    import psycopg2 as post
    import pymssql as ms
    import ConfigParser as conf
    import multiprocessing as  mul
    import xlsxwriter 
    import time
    
    
    
    def connect(cfg,db):
        if db[0:5] == 'MYSQL':
            db_host=cfg.get(db,'db_host')
            db_port=cfg.get(db,'db_port')
            db_user=cfg.get(db,'db_user')
            db_pwd=cfg.get(db,'db_pwd')
            db_dbname=cfg.get(db,'db_dbname')
            conn = my.connect(host=db_host,port=int(db_port),user=db_user,passwd=db_pwd,db=db_dbname)
            return conn
        elif db[0:6] == 'ORACLE':
            db_host=cfg.get(db,'db_host')
            db_port=cfg.get(db,'db_port')
            db_user=cfg.get(db,'db_user')
            db_pwd=cfg.get(db,'db_pwd')
            db_sid=cfg.get(db,'db_sid')
            conn = ora.connect(db_user,db_pwd,db_host+':'+db_port+'/'+db_sid)
            return conn
        elif db[0:10] == 'POSTGRESQL':
            db_host=cfg.get(db,'db_host')
            db_port=cfg.get(db,'db_port')
            db_user=cfg.get(db,'db_user')
            db_pwd=cfg.get(db,'db_pwd')
            db_dbname=cfg.get(db,'db_dbname')
            conn = post.connect(host=db_host,port=db_port,user=db_user,password=db_pwd,database=db_dbname)
            return conn
        elif db[0:5] == 'MSSQL':
            db_host=cfg.get(db,'db_host')
            db_port=cfg.get(db,'db_port')
            db_user=cfg.get(db,'db_user')
            db_pwd=cfg.get(db,'db_pwd')
            db_dbname=cfg.get(db,'db_dbname')
            conn = ms.connect(host=db_host,port=db_port,user=db_user,password=db_pwd,database=db_dbname)
            return conn
            
    
    
    def check(cfg,db,check_owner,check_table):
        conn=connect(cfg,db)
        cursor=conn.cursor()
        sql='select * from '+check_owner+'.'+check_table
        cursor.execute(sql)
        table_list=[]
        alldata=cursor.fetchall()
        for i in alldata:
            table_list.append(i)
        #print table_list
        return table_list
    
    
    
    def getcount(cfg,db,sql,q):
        conn = connect(cfg,db)
        cursor=conn.cursor()
        try:
            cursor.execute(sql)
            countval = cursor.fetchall()[0][0]
            q.put(countval)
        except Exception,e:
            countval="Error : "+str(e)
            q.put(countval)
    
    
    def isdigit(num):  
        try:  
            int(num)  
            return True  
        except:  
            return False  
    
    
    
    def comp(cfg,source,target,tablelist):
        ###excel start
        xlsxname='check_'+str(time.strftime("%Y%m%d%H%M", time.localtime()))+'.xlsx'
        workbook=xlsxwriter.Workbook(xlsxname)
        top=workbook.add_format({'border':6,'align':'center','bg_color':'cccccc','font_size':13,'bold':True})
        format_data_normal=workbook.add_format({'align':'center','font_size':13})
        format_data_warn=workbook.add_format({'align':'center','font_size':13,'bg_color':'ff0000'})
        format_data_err=workbook.add_format({'align':'center','font_size':13,'bg_color':'ffff00'})
        worksheet = workbook.add_worksheet('sheet1')
        worksheet.set_column('A:A',12)
        worksheet.set_column('B:B',40)
        worksheet.set_column('C:C',12)
        worksheet.set_column('D:D',12)
        worksheet.set_column('E:E',40)
        worksheet.set_column('F:F',12)
        worksheet.set_column('G:G',12)
        title=[u'源端用户',u'源端表名',u'源端数据量',u'目标端用户',u'目标端表名',u'目标端数据量',u'差异条数']
        worksheet.write_row('A1',title,top)
        ###excel stop
        length=len(tablelist)
        for i in range(length):
            check_result=[]
            sowner=tablelist[i][0]
            sname=tablelist[i][1]
            towner=tablelist[i][2]
            tname=tablelist[i][3]
            sql_s='select count(*) from '+sowner+'.'+sname
            sql_t='select count(*) from '+towner+'.'+tname
            #sql_t='select count(*) from '+towner+'.'+'"'+tname+'"'
            q1=mul.Queue()
            q2=mul.Queue()
            p1=mul.Process(target = getcount,args = (cfg,source,sql_s,q1))
            p2=mul.Process(target = getcount,args = (cfg,target,sql_t,q2))
            p1.start()
            p2.start()
            count_s=q1.get()
            count_t=q2.get()
            p1.join
            p2.join
            check_result.append(sowner)
            check_result.append(sname)
            check_result.append(count_s)
            check_result.append(towner)
            check_result.append(tname)
            check_result.append(count_t)
            print '%s %s %s %s %s %s'  %(sowner,sname,count_s,towner,tname,count_t)
            #print check_result
            if isdigit(count_s) and isdigit(count_t):
                check_result.append(count_s-count_t)
                if count_s == count_t:
                    worksheet.write_row('A'+str(2+i),check_result,format_data_normal)
                else:
                    worksheet.write_row('A'+str(2+i),check_result,format_data_warn)
            else:
                check_result.append("Error")
                worksheet.write_row('A'+str(2+i),check_result,format_data_err)
        workbook.close()
    
    
    
    
        
    
    if __name__ == "__main__":
        print "AT time {0}".format(time.ctime())
        print "Begin compare ..."
        cfg=conf.ConfigParser()
        cfg.read('check.ini')
        source=cfg.get('DATA','source')
        target=cfg.get('DATA','target')
        check_node=cfg.get('DATA','check_node')
        check_owner=cfg.get('DATA','check_owner')
        check_table=cfg.get('DATA','check_table')
    
        tablelist=check(cfg,check_node,check_owner,check_table)
        comp(cfg,source,target,tablelist)
        print "AT time {0}".format(time.ctime())
        print "compare complete!"
        raw_input("Press <ENTER>")

    执行这段代码后就会读取check.ini文件,获取需要比对的原端和目标端数据库的信息,以及比对表的信息,首先将比对的表获取写到一个数组中.然后使用for循环对表进行count,再写到excel中.excel名为check_XXXX.xlsx.xxx为时间.如果在执行sql的时候报错,那么excel中以黄色标出,如果比对原端和目标端数据不一致以红色标出.

    下面是我比对oracle和pg中的一个结果:



    --------------

     1 # -*- coding:utf-8 -*-
     2 import os
     3 
     4 yesterdaynamelist=[]
     5 todaynamelist=[]
     6 differentnamelist=[]
     7 areceivername=[]
     8 test=[]
     9 #读取 昨天生成的namelist 文件 并生成todaynamelist
    10 namelist = open('D:\python\Project\AtuoEmail\Date\riqi.txt','r')
    11 linea = namelist.readlines()
    12 # lineb = namelist.readline()
    13 # print (namelist)
    14 # print (linea)
    15 # print (lineb)
    16 # for i in linea:
    17 #     print (i)
    18 #
    19 for i in linea:
    20     line=i.split()
    21     # print(line)
    22     yesterdaynamelist.extend(line)
    23     # print(yestdaynamelist)
    24 
    25 
    26 # 将todaynamelist 列表输出成单列的文本。
    27 yesterdaytxt = open("D:\python\Project\AtuoEmail\Date\yesterdaytxt.txt","w",encoding="utf-8")  #w参数 创建+复写
    28 yesterdaytxt.close()
    29 for i in yesterdaynamelist:
    30      # print (i)
    31      # print(type(i))
    32      yesterdaytxt = open("D:\python\Project\AtuoEmail\Date\yesterdaytxt.txt","a",encoding="utf-8")
    33      yesterdaytxt.write(i)
    34      yesterdaytxt.write("
    ")
    35 yesterdaytxt.close()
    36 # todaytxt.write(todaynamelist)
    37 # todaytxt.close()  #关闭文件
    38 
    39 # Yesterdaytxt
    40 # temp
    41 
    42 # print (line)
    43 # for i in line:
    44 #     print i.strip().split()[0]
    45 #     print i.strip().split()[1]
    46 #     print i.strip().split()[2]
    47 
    48 
    49 #调用 cmd生成当天最新的域控用户名单。
    50 os.system('D:\python\Project\AtuoEmail\TodayADUser.bat')
    51 
    52 namelist = open('D:\python\Project\AtuoEmail\Date\riqi.txt','r')
    53 linea = namelist.readlines()
    54 todaynamelist=[]
    55 for i in linea:
    56     line=i.split()
    57     # print(line)
    58     todaynamelist.extend(line)
    59     # print(todaynamelist)
    60 
    61 # print(todaynamelist)
    62 # print(yesterdaynamelist)
    63 # print(list(set(todaynamelist).difference(set(yesterdaynamelist))))
    64 
    65 #生成差异名单并导出文件
    66 differentnamelist=list(set(todaynamelist).difference(set(yesterdaynamelist))) #“t”“y”对比,输出“T”中新增的元素
    67 # print(differentnamelist)
    68 for i in differentnamelist:
    69 #      print (i)
    70 #      print(type(i))
    71       differentnamelist = open("D:\python\Project\AtuoEmail\Date\differentnamelist.txt","a",encoding="utf-8")
    72       differentnamelist.write(i)
    73       differentnamelist.write("@dafy.com,")
    74 differentnamelist.close()
    View Code
    # -*- coding:utf-8 -*-
    import os
    
    yesterdaynamelist=[]
    todaynamelist=[]
    differentnamelist=[]
    areceivername=[]
    test=[]
    #读取 昨天生成的namelist 文件 并生成todaynamelist
    namelist = open('D:\python\Project\AtuoEmail\Date\riqi.txt','r')
    linea = namelist.readlines()
    # lineb = namelist.readline()
    # print (namelist)
    # print (linea)
    # print (lineb)
    # for i in linea:
    #     print (i)
    #
    for i in linea:
        line=i.split()
        # print(line)
        yesterdaynamelist.extend(line)
        # print(yestdaynamelist)
    
    
    # 将todaynamelist 列表输出成单列的文本。
    yesterdaytxt = open("D:\python\Project\AtuoEmail\Date\yesterdaytxt.txt","w",encoding="utf-8")  #w参数 创建+复写
    yesterdaytxt.close()
    for i in yesterdaynamelist:
         # print (i)
         # print(type(i))
         yesterdaytxt = open("D:\python\Project\AtuoEmail\Date\yesterdaytxt.txt","a",encoding="utf-8")
         yesterdaytxt.write(i)
         yesterdaytxt.write("
    ")
    yesterdaytxt.close()
    # todaytxt.write(todaynamelist)
    # todaytxt.close()  #关闭文件
    
    # Yesterdaytxt
    # temp
    
    # print (line)
    # for i in line:
    #     print i.strip().split()[0]
    #     print i.strip().split()[1]
    #     print i.strip().split()[2]
    
    
    #调用 cmd生成当天最新的域控用户名单。
    os.system('D:\python\Project\AtuoEmail\TodayADUser.bat')
    
    namelist = open('D:\python\Project\AtuoEmail\Date\riqi.txt','r')
    linea = namelist.readlines()
    todaynamelist=[]
    for i in linea:
        line=i.split()
        # print(line)
        todaynamelist.extend(line)
        # print(todaynamelist)
    
    # print(todaynamelist)
    # print(yesterdaynamelist)
    # print(list(set(todaynamelist).difference(set(yesterdaynamelist))))
    
    #生成差异名单并导出文件
    differentnamelist=list(set(todaynamelist).difference(set(yesterdaynamelist))) #“t”“y”对比,输出“T”中新增的元素
    # print(differentnamelist)
    for i in differentnamelist:
    #      print (i)
    #      print(type(i))
          differentnamelist = open("D:\python\Project\AtuoEmail\Date\differentnamelist.txt","a",encoding="utf-8")
          differentnamelist.write(i)
          differentnamelist.write("@dafy.com,")
    differentnamelist.close()
  • 相关阅读:
    ACM HDU 3910 Liang Guo Sha(数学题,读懂题目)
    防止 7Zip 生成的 ZIP 文件在 Mac OS X 下出现乱码
    NYOJ 506
    Scanner
    String 与StringBuilder
    基于JAVA的聊天室开发
    PS加粗字体
    MySQL相关命令
    Matlab中数据处理和多项式插值与曲线拟合
    dos下进入某一文件
  • 原文地址:https://www.cnblogs.com/liangqihui/p/9182410.html
Copyright © 2011-2022 走看看