zoukankan      html  css  js  c++  java
  • python 连接oracle -- sqlalchemy及cx_Oracle的使用详解

      python连接oracle -- sqlalchemy 

    import cx_Oracle as orcl
    import pandas as pd
    from sqlalchemy import create_engine
    
    # 数据库连接
    db = create_engine('oracle://qmcb:qmcb@localhost:1521/tqmcbdb')
    
    #查询
    sql_select = ''' ...'''
    df = pd.read_sql_query(sql_select, db)
    
    #执行
    db.execute('truncate table {}'.format(ttb))
    
    #保存
    df.to_sql()  #太慢
    
    #插入
    conn = db.raw_connection()
    cursor = conn.cursor()
    col = ', '.join(df.columns.tolist())
    s = ', '.join([':'+str(i) for i in range(1, df.shape[1]+1)])
    sql = 'insert into {}({}) values({})'.format(ttb, col, s)
    cursor.executemany(sql, df.values.tolist())
    conn.commit()
    cursor.close()
    

      python连接oracle -- cx_Oracle 

    cx_oracle 使用

    oracle 数据库表字段的缺失值统计 -- 基于python

    cx_Oracle模块的安装和使用  看下连接oracle的各种方式

    import cx_Oracle as orcl
    
    #定义oracle连接
    orcl.connect('load/123456@localhost/ora11g')
    orcl.connect('username','password','host:port/orcl')
    tnsname = orcl.makedsn('host', 'port', 'sid')
    db = orcl.connect('username', 'password', tnsname)
    
    #数据库参数输出
    db.autocommit = False  # 关闭自动提交
    orcl.clientversion()
    db.version
    db.ping()
    db.username
    db.password
    db.dsn   #dsn=data source name
    db.tnsentry
    cursor.rowcount
    
    #定义游标
    #orcl.Cursor(db)
    cursor = db.cursor()
    
    #查询数据 + 执行代码(如建表,清空表)
    sql = "SELECT ENAME, trunc(SAL,0) FROM EMP WHERE deptno = :deptno and sal > :value"
    cursor.execute(sql, deptno = 30, value = 1000)
    dict1 = {'deptno':30, 'sal':3000}
    cursor.execute(sql, dict1)
    cursor.execute(sql)
    
    #
    cursor.fetchone()
    cursor.fetchall()
    cursor.fetchmany(10) #查10条数据
    
    #插入数据
    #mysql自带load data infile 'D:/../...txt' into table user(var1, ., .)
    cursor.execute('insert into demo(v) values(:1)',['nice'])
    data = [['a',1], ['b',2], ...]
    cursor.executemany(sql, data)
    
    #查询列名
    columns = [i[0] for i in cursor.description]
    
    #出错回滚  -- 一般如插入数据时出错回滚, try:...except:db.rollback()
    db.rollback()
    
    #关闭数据库
    cursor.close()
    db.close()
    

      定义ConnectOracle对象

    import cx_Oracle as orcl
    class ConnectOracle:
        #scott xx 192.168.32.200 1521 oracle
        #sqlplus, system as sysdba, select instance_name from  V$instance;
        #oracle://qmcbrt:qmcbrt@localhost:1521/tqmcbdb
        #cx_Oracle.connection('hr', 'hrpwd', 'localhost:1521/XE')
        ##cx_Oracle.connection('qmcb:qmcb@localhost:1521/tqmcbdb')
        def __init__(self, username, passwd, host, port='1521', sid='oracle'):
            # dsn = data source name
            self.login = {}
            self.db = None
            self.cursor = None
            self.login['username'] = username
            self.login['passwd'] = passwd
            self.login['dsn'] = orcl.makedsn(host, port, sid)
            print(orcl.clientversion())
    
        def connect_oracle(self):
            try:
                #orcl.connect('load/123456@localhost/ora11g')
                #orcl.connect('username','password','host/orcl')
                self.db = orcl.connect(self.login['username'], self.login['passwd'], self.login['dsn'])  # 登录内搜数据库
                self.db.autocommit = False  # 关闭自动提交
                #orcl.Cursor(self.db)
                self.cursor = self.db.cursor()  # 设置cursor光标
                #print(self.db.dsn)
                #print(self.db.password)
                #print(self.db.ping())
                #print(self.db.tnsentry)
                #print(self.db.username);
                #print(self.db.version)
                #print(self.cursor.rowcount)
                return True
            except:
                print('can not connect oracle')
                return False
    
        def close_oracle(self):
            self.cursor.close()
            self.db.close()
    
        def select_oracle(self, sql, size=0, params=None):
            if self.connect_oracle():
                if params:
                    # 执行多条sql命令
                    # results = cur.executemany("select field from table where field = %s", ('value1', 'value2', 'value3'))
                    #sql = "SELECT ENAME, trunc(SAL,0) FROM EMP 
                    #           WHERE deptno = :deptno and sal > :value"
                    #cursor.execute(sql, deptno = 30, value = 1000);
                    #params = {'deptno':30, 'sal':3000}
                    #cur.execute(sql, params)
                    self.cursor.executemany(sql, params)
                else:
                    self.cursor.execute(sql)
                if size:
                    content = self.cursor.fetchmany(size)
                else:
                    content = self.cursor.fetchall()
                
                # 字段名
                #ss = sql.lower()
                #find默认查找从左向右第一个位置
                #string = sql[ss.find('select'), ss.find('from')]
                columns = [i[0] for i in self.cursor.description]
                self.close_oracle()
                #if len(content)==1:
                #    return pd.Series(content[0], index=columns)
                return pd.DataFrame(content, columns=columns)
            return False
    
        def insert_oracle(self, sql, data=None):
            try:
                self.connect_oracle()
                if data:
                    # 批量插入, [(),..]、((),..)
                    # mysql自带load data infile 'D:/../...txt' into table user(var1, ., .)
                    #self.cursor.execute('insert into demo(v) values(:1)',['nice'])
                    self.cursor.executemany(sql, data)
                else:
                    self.cursor.execute(sql, data)
            except:
                print("insert异常")
                self.db.rollback()  # 回滚
            finally:
                self.db.commit()  # 事务提交
                self.close_oracle()
    

      定义读取oracle函数

    def read_db(orcl_engien, ftb, snum, enum):
        db = create_engine(orcl_engien)  #不需要close()
        query_sql = '''
            select * 
            from {} 
            where 
            row_num > {} and
            row_num <= {} and
            is_normal='1' and 
            is_below_16='0' and 
            is_xs='0' and 
            is_cj='0' and 
            is_bzsw='0' and 
            is_dc='0' and 
            is_px='0' 
            '''.format(ftb, snum, enum)
        return pd.read_sql_query(query_sql, db)
    

      定义connectOracle

    class ConnectOracle:
        def __init__(self, username, passwd, host, port='1521', sid='oracle'):
            # dsn = data source name
            self.login = {}
            self.db = None
            self.cursor = None
            self.login['username'] = username
            self.login['passwd'] = passwd
            self.login['dsn'] = orcl.makedsn(host, port, sid)
            print(orcl.clientversion())
     
        def connect(self):
            try:
                #orcl.connect('load/123456@localhost/ora11g')
                self.db = orcl.connect(self.login['username'], self.login['passwd'], self.login['dsn'])  # 登录内搜数据库
                self.db.autocommit = False  # 关闭自动提交
                self.cursor = self.db.cursor()  # 设置cursor光标
                return True
            except Exception as e:
                print(e)
                return False
     
        def close(self):
            self.cursor.close()
            self.db.close()
        
        def execute(self, sql, params=None):
            try:
                if params:
                    self.cursor.execute(sql, params)
                else:
                    self.cursor.execute(sql)
                return True
            except Exception as e:
                print(e)
                self.db.rollback()
                return False
            
        def select(self, sql, size=0, params=None):
            if self.connect():
                if self.execute(sql):
                    if size:
                        data_list = self.cursor.fetchmany(size)
                    else:
                        data_list = self.cursor.fetchall()
                    columns = [i[0].lower() for i in self.cursor.description]
                    #if len(content)==1:
                    #    return pd.Series(content[0], index=columns)
                    df = pd.DataFrame(data_list, columns=columns)
                    return df
                else:
                    print('代码执行错误')
                self.close()
            else:
                print('数据库连接错误')
                return None
     
        def insert(self, sql, data=None):
            if self.connect():
                try:
                    if data:
                        if isinstance(data[0], (list,tuple,)):
                            self.cursor.executemany(sql, data)
                        else:
                            self.cursor.execute(sql, data)
                    else:
                        self.cursor.execute(sql)
                except Exception as e:
                    print(e)
                    print("insert异常")
                    self.db.rollback()  #回滚
                finally:
                    self.db.commit()  #事务提交
                    self.close()
            else:
                print('数据库连接错误')
    

      定义插入oracle函数

    import retry
    
    #backoff=1, jitter=0, , logger=retry.logging
    @retry.retry(exceptions=Exception, tries=5, delay=5, backoff=1, max_delay=10)
    def insert_db(data, orcl_engien, ttb):
        # dtyp = {col:types.VARCHAR(df[col].str.len().max())
        #    for col in df.columns[df.dtypes == 'object'].tolist()}
        # 太慢!!
        #dtyp = {'aac147':types.VARCHAR(18),'score':types.NUMBER(6,2)}
        #return data.to_sql(ttb, con=db, dtype=dtyp, if_exists='append', index=False)
        #output = io.StringIO()
        #data.to_csv(output, sep='	', index=False, header=False)
        #output.getvalue()
        #output.seek(0)
        db = create_engine(orcl_engien)  #不需要close()
        conn = db.raw_connection()
        #db = cx_Oracle.connect(orcl_engien[9:])
        cursor = conn.cursor()
        #cursor.copy_from(output, ttb, null='')
        col = ', '.join(data.columns.tolist())
        s = ', '.join([':'+str(i) for i in range(1, data.shape[1]+1)])
        sql = 'insert into {}({}) values({})'.format(ttb, col, s)
        #TypeError: expecting float, 
        cursor.executemany(sql, data.values.tolist())
        conn.commit()
        cursor.close()
        #try:
        #except Exception as e:
        #    print(e)
        #finally:
    

      定义重试装饰器

    #定义一个重试修饰器,默认重试一次
    def retry(num_retries=1, delay=2, random_sec_max=3):
        #用来接收函数
    	import time
    	import numpy as np
        def wrapper(func):
            #用来接收函数的参数
            def wrapper(*args,**kwargs):
                #为了方便看抛出什么错误定义一个错误变量
                #last_exception =None
                #循环执行包装的函数
                for _ in range(num_retries):
                    try:
                        #如果没有错误就返回包装的函数,这样跳出循环
                        return func(*args, **kwargs)
                    except Exception as err:
                        #捕捉到错误不要return,不然就不会循环了
                        #last_exception = e 
    					#np.random.randint(0,random_sec_max+1,size=10)
    					time.sleep(delay + np.random.random()*random_sec_max)
    					print(err)
    			else:
    				#如果要看抛出错误就可以抛出
    				# raise last_exception
    				raise 'ERROR: 超过retry指定执行次数!!'
    				print('未执行参数为:', *args, **kwargs)
            return wrapper
        return wrapper
    

      

  • 相关阅读:
    android websocket推送
    proguardgui.bat来混淆已有的jar包
    android raw与assets区别
    Eclipse开发Android报错Jar mismatch! Fix your dependencies
    gc overhead limit exceeded
    如何签名apk,并让baidu地图正常显示
    Eclipse--Team--SVN--URL修改
    监听EditText
    android 注销
    从Android手机中取出已安装的app包,导出apk
  • 原文地址:https://www.cnblogs.com/iupoint/p/10932069.html
Copyright © 2011-2022 走看看