zoukankan      html  css  js  c++  java
  • python3链接oracle

    配置信息

    #dbconfig.py
    #ORACLE地址
    ORACLE_HOST = ip #测试地址
    ORACLE_SID = "orcl"  #测试  实例
    ORACLE_USER = ""  #测试用户
    ORACLE_PASSWORD = "" #测试密码
    #oracle端口
    ORACLE_PORT = 1521
    NLS_LANG = 'SIMPLIFIED CHINESE_CHINA.UTF8' #编码
    # NLS_LANG = 'SIMPLIFIED CHINESE_CHINA.ZHS16GBK'#编码2,好像都可以,自由切换
    ORACLE_PATH = 'C:/workFiles/instantclient_18_3' #windows配置信息路径
    ORACLE_ECODING = 'utf8'

    现在的是非线程安全的

    #OracleHelper.py
    import cx_Oracle as oracle
    from tutorial.poHelper import dbconfig
    #import os
    # 增加环境变量,配合读取配置信息访问oracle
    #os.environ['NLS_LANG'] = dbconfig.NLS_LANG
    #os.environ['path'] = dbconfig.ORACLE_PATH  #这个路径看况填写
    
    class OracleHelper():
        def __init__(self):
            self.dsn_tns = oracle.makedsn(dbconfig.ORACLE_HOST, dbconfig.ORACLE_PORT,
                                 dbconfig.ORACLE_SID)
        def getConnect(self):
            # 建立连接
            self.connect = oracle.connect(user=dbconfig.ORACLE_USER, password=dbconfig.ORACLE_PASSWORD, dsn=self.dsn_tns,threaded=True,events = True)
            return self.connect
    
    _oracleManager = OracleHelper()
    
    #获取连接
    def getConn():
        """ 获取数据库连接 """
        return _oracleManager.getConnect()
    
    #查询所有
    def fetchall(sql,param=[]):
        return _execute_query_by_sql_param(sql,param=param)
    
    #查询一条记录
    def fetchone(sql,param=[]):
        sql = get_complete_sql(sql, param)
        sql = "select * from ( %s ) where rownum = 1" % sql
        result = _execute_query_by_sql_param(sql=sql)
        return result
    
    #增加
    def insert(sql,param=[]):
        sql = get_complete_sql(sql, param)
        result = _execute_commit(sql)
        return result
    
    #修改
    def update(sql,param=[]):
        sql = get_complete_sql(sql, param)
        result = _execute_commit(sql)
        return result
    
    #插入两条记录
    def insertTwo(sql1, sql2, param1, param2):
        result1 = insert(sql1,param1)
        result2 = insert(sql2,param2)
        result = result1 + result2
        return result
    
    #删除
    def delete(sql, param):
        sql = get_complete_sql(sql, param)
        result = _execute_commit(sql)
        return result
    
    #执行多个sql修改
    def executemany(sql,params):
        count = 0
        for item in params:
            result = update(sql=sql,param=item)
            count += result
        return count
    
    
    #执行带参查询
    def _execute_query_by_sql_param(sql,param=[]):
        sql = get_complete_sql(sql, param)
        result = _execute_query_oracle_sql(sql=sql)
        return result
    
    def _execute_commit(sql):
        result = 0
        try:
            # 获取链接
            connect = getConn()
            # 获取游标
            cursor = connect.cursor()
            # 执行修改
            cursor.execute(sql)
            #返回结果
            result = cursor.rowcount
            # 提交数据
            connect.commit()
        except Exception as e:
            print(e)
        finally:
            close_oracle(cursor, connect)
        return result
    
    #执行不带参查询
    def _execute_query_oracle_sql(sql):
        result = []
        try:
            #获取链接
            connect = getConn()
            #获取游标
            cursor = connect.cursor()
            #执行查询
            cursor.execute(sql)
            #获取数据 ,可以有多种方式 fetchall(),fetchmang(N)(N 为正整数),fetchone()
            result = cursor.fetchall()
            # count = cursor.rowcount
            # print("Total:", count)
        except Exception as e:
            print(e)
        finally:
            close_oracle(cursor,connect)
        return result
    
    #关闭链接
    def close_oracle(cursor,connect):
        try:
            if cursor != None:
                cursor.close()
            if connect != None:
                connect.close()
        except Exception as  e:
            print(e)
    
    #改变参数方法
    def chang_list_param_to_tuple(param=[]):
        param_list = []
        for item in param:
            param_list.append("'%s'" % item)
        return tuple(param_list)
    
    #获得拼接好的sql
    def get_complete_sql(sql,param=[]):
        param = chang_list_param_to_tuple(param=param)
        sql = sql % param
        print(sql)
        return sql
    
    
    if __name__ == '__main__':
        # sql = "select * from vc_user where instr(real_name,%s)>0"
        # param = [""]
        sql = "update po_monitor_lexicon set BINARY_SYSTEM = :1 ,IS_CRAWLED='N' where monitor_key = '金蝶办公'"
        param = [['0'],['2'],['3']]
        # result = _execute_many(sql,param)
        # result = fetchall(sql,param)
        # print(result)
        # result = fetchone(sql,param)
        # print(result)

     

    配合框架使用,线程安全的代码,大家放心使用

    from DBUtils.PooledDB import PooledDB
    
    from tutorial.poHelper import dbconfig
    import cx_Oracle as oracle
    #import os
    # 增加环境变量,配合读取配置信息访问oracle
    #os.environ['NLS_LANG'] = dbconfig.NLS_LANG
    #os.environ['path'] = dbconfig.ORACLE_PATH  #这个路径看况填写
    
    class OracleHelper:
        def __init__(self):
            # ora_dsn = host + ":" + port + "/" + dsn
            connKwargs = { 'user': dbconfig.ORACLE_USER,
                            'password': dbconfig.ORACLE_PASSWORD,
                            'dsn': dbconfig.ORACLE_HOST + ":" + str(dbconfig.ORACLE_PORT) + "/" + dbconfig.ORACLE_SID,
                            'nencoding': dbconfig.ORACLE_ECODING,
                            'threaded':True
                           }
            self._pool = PooledDB(oracle, mincached=3, maxcached=20, maxshared=20, maxusage=10000,blocking=True, **connKwargs)
        def getConn(self):
            return self._pool.connection()
    
    
    _oracleManager = OracleHelper()
    
    
    def getConn():
        """ 获取数据库连接 """
        return _oracleManager.getConn()
    
    
    def insert(sql, params):
        return __execute(sql, params)
    
    
    def insertTwo(sql1, sql2, params1, params2):
        return __save(sql1, sql2, params1, params2)
    
    
    def update( sql, params):
        return __execute(sql, params)
    
    
    def delete(sql, params):
        return __execute(sql, params)
    
    
    def __execute(sql, param=[]):
        """ 执行sql语句 """
        try:
            conn = getConn()
            cursor = conn.cursor()
            rowcount = cursor.execute(sql, param)
            cursor.close()
            conn.commit()
            conn.close()
            return rowcount
        except Exception as e:
            print (e)
        finally:
            cursor.close()
            conn.close()
    
    
    def __save(sql1, sql2,param1=[],params2=[]):
        """ 同时执行两条sql语句 """
        try:
            conn = getConn()
            cursor = conn.cursor()
            rowcount = cursor.execute(sql1, param1)
            cursor.execute(sql2, params2)
            conn.commit()
            cursor.close()
            conn.close()
            return rowcount
        except Exception as e:
            print(e)
        finally:
            cursor.close()
            conn.close()
    
    
    def fetchone(sql,params=[]):
        """ 获取一条信息 """
        try:
            conn = getConn()
            cursor = conn.cursor()
            rowcount = cursor.execute(sql,params)
            res = cursor.fetchone()
            cursor.close()
            conn.close()
            return res
        except Exception as e:
            print(e)
            cursor.close()
            conn.close()
    
    
    
    def fetchall(sql,params):
        """ 获取所有信息 """
        try:
            conn = getConn()
            cursor = conn.cursor()
            rowcount = cursor.execute(sql,params)
            res = cursor.fetchall()
            cursor.close()
            conn.close()
            return res
        except Exception as e:
            print(e)
            cursor.close()
            conn.close()
    
    
    def executemany(sql,params):
        try:
            conn = getConn()
            cursor = conn.cursor()
            rowcount = cursor.executemany(sql, params)
            cursor.close()
            conn.commit()
            conn.close()
            return rowcount
        except Exception as e:
            print (e)
        finally:
            cursor.close()
            conn.close()
    
    
    if __name__ == '__main__':
        sql = "select * from vc_user where  instr(real_name, :1 ) > 0"
        param = ['']
        # sql = "update po_monitor_lexicon set BINARY_SYSTEM = :1 ,IS_CRAWLED='N' where monitor_key = '金蝶办公'"
        # param = [['0'],['2'],['3']]
        # result = _execute_many(sql,param)
        result = fetchone(sql,param)
        for item in  result:
            print(item)
        print(len(result))
        # result = fetchone(sql,param)
        # print(result)
  • 相关阅读:
    树的递归
    《计算机存储与外设》 3二级存储器
    《计算机存储与外设》 2主存储器
    《码农翻身:用故事给技术加点料》阅读有感
    分析"傍富婆发财"
    《第22条军规》Catch-22
    《编译原理》4
    《编译原理》3
    《血酬定律》
    linux下netcore程序部署
  • 原文地址:https://www.cnblogs.com/procedureMonkey/p/9894640.html
Copyright © 2011-2022 走看看