zoukankan      html  css  js  c++  java
  • python--操作数据库

    一、连接MySQL

    import pymysql
    import os
    import configparser
    from loggingutils.mylogger import logger as log


    class ConnectMysql:
    def __init__(self, dbinfo_config_name):
    dbinfo_file = os.path.dirname(os.getcwd()) + '\databaseconfig\' + dbinfo_config_name + '.ini'
    self.__config = configparser.ConfigParser()
    self.__config.read(dbinfo_file)
    self.__section = 'DBinfo'
    connect_host = self.__config.get(self.__section, 'connect_host')
    connect_port = self.__config.get(self.__section, 'connect_port')
    connect_username = self.__config.get(self.__section, 'connect_username')
    connect_password = self.__config.get(self.__section, 'connect_password')
    connect_db_name = self.__config.get(self.__section, 'connect_db_name')
    connect_charset = self.__config.get(self.__section, 'connect_charset')
    self._conn = pymysql.connect(host=connect_host, port=connect_port, user=connect_username,
    passwd=connect_password, db=connect_db_name, charset=connect_charset)
    self._cursor = self._conn.cursor()

    # 创建数据库表
    def create_table(self, sql_str):
    self._cursor.execute(sql_str)
    self._close_connect()

    # 删除表
    def drop_table(self, table_name):
    sql_str = 'DROP TABLE IF EXISTS ' + table_name
    self._cursor.execute(sql_str)
    self._close_connect()

    # 查数据
    def select_data(self, sql_str):
    try:
    self._cursor.execute(sql_str)
    index = self._cursor.description
    result = []
    for res in self._cursor.fetchall():
    row = {}
    for i in range(len(index) - 1):
    row[index[i][0]] = res[i]
    result.append(row)
    return result
    except:
    log.error('查库失败')
    finally:
    self._close_connect()

    # 增、删、改数据
    def operate_data(self, sql_str):
    try:
    self._cursor.execute(sql_str)
    self._conn.commit()
    except:
    log.error('操作库失败')
    finally:
    self._close_connect()

    # 关闭链接
    def _close_connect(self):
    self._cursor.close()
    self._conn.close()

    二、跳板机连接MySQL

    import pymysql
    from sshtunnel import SSHTunnelForwarder


    def UserMysql(db):
    server = SSHTunnelForwarder(
    ssh_address_or_host=('',22), # 跳板机地址
    ssh_username='',
    ssh_password='',
    remote_bind_address=('',3306)
    )
    server.start()
    myConfig = pymysql.connect(
    user='',
    passwd='',
    host='127.0.0.1',
    db=db,
    port=server.local_bind_port
    )
    cursor = myConfig.cursor()
    select_sql = 'select * from table where user_name = "1366112584@qq.com"'
    cursor.execute(select_sql)
    # myConfig.commit()
    results = cursor.fetchall()
    print(str(results))
    cursor.close()
    server.stop()

    三、连接Oracle

    import cx_Oracle
    from loggingutils.mylogger import logger as log
    import configparser, os


    class ConnectOracle:
    def __init__(self, dbinfo_config_name):
    dbinfo_file = os.path.dirname(os.getcwd()) + '\databaseconfig\' + dbinfo_config_name + '.ini'
    self.__config = configparser.ConfigParser()
    self.__config.read(dbinfo_file)
    self.__section = 'DBinfo'
    connect_name = self.__config.get(self.__section, 'connect_name')
    connect_password = self.__config.get(self.__section, 'connect_password')
    connect_host = self.__config.get(self.__section, 'connect_host')
    connect_port = self.__config.get(self.__section, 'connect_port')
    connect_db_name = self.__config.get(self.__section, 'connect_db_name')
    connect_url = connect_host + ':' + connect_port + '/' + connect_db_name
    self._db = cx_Oracle.connect(connect_name, connect_password, connect_url)
    self._cursor = self._db.cursor()

    # 创建数据库表
    def create_table(self, sql_str):
    self._cursor.execute(sql_str)
    self._cursor.close()
    self._db.close()

    # 删除表
    def drop_table(self, table_name):
    sql_str = 'DROP TABLE IF EXISTS ' + table_name
    self._cursor.execute(sql_str)
    self._cursor.close()
    self._db.close()

    # 查数据
    def select_data(self, sql_str):
    try:
    self._cursor.execute(sql_str)
    index = self._cursor.description
    result = []
    for res in self._cursor.fetchall():
    row = {}
    for i in range(len(index) - 1):
    row[index[i][0]] = res[i]
    result.append(row)
    return result
    except:
    log.error('查库失败')
    finally:
    self._cursor.close()
    self._db.close()

    # 增、删、改数据
    def operate_data(self, sql_str):
    try:
    self._cursor.execute(sql_str)
    self._db.commit()
    except:
    log.error('操作库失败')
    finally:
    self._cursor.close()
    self._db.close()
  • 相关阅读:
    Elasticsearch入门系列(一)
    清楚Chrome缓存
    解决IIS启动站点报错
    Input type="file"上传文件change事件只触发一次解决方案
    本地计算机上的XXX服务启动后停止,某些服务在未由其它服务或程序使用时将自动停止
    SQL Server Datetime类型为NULL不能用ISNULL(datetime,' ')来判断,会导致1900-01-01
    浏览指南
    谁发明的c++
    c++的用处
    不一样的二叉树遍历(小学生都会)
  • 原文地址:https://www.cnblogs.com/fqfanqi/p/8413466.html
Copyright © 2011-2022 走看看