zoukankan      html  css  js  c++  java
  • Python 使用MySQLdb模块通过ssh隧道连接mysql

    # -*- coding: utf-8 -*-
    import os, sys
    
    import MySQLdb
    from sshtunnel import SSHTunnelForwarder
    
    from util.read_ini import ReadIni
    
    db_name = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
    sys.path.append(db_name)
    
    
    # 创建数据工具类
    class SSHMySQL:
        def __init__(self):
            file_name = os.path.join(db_name, 'config\base.ini')
            self.read_ini = ReadIni(file_name=file_name, node='server')
            # 连接ssh
            self.server = self.get_server()
            # 通过ssh隧道连接mysql
            self.conn = self.get_conn()
            # 获取连接mysql的游标对象
            self.cur = self.conn.cursor()
    
        def __enter__(self):
            return self
    
        # 使用SSHTunnelForwarder方法连接ssh
        def get_server(self):
            ssh_host = self.read_ini.get_value('ssh_host')
            ssh_port = 22
            server = SSHTunnelForwarder(
                (ssh_host, ssh_port),
                ssh_password=self.read_ini.get_value('password'),
                ssh_username=self.read_ini.get_value('username'),
                remote_bind_address=('127.0.0.1', 3306),
            )
            return server
    
        # 使用ssh隧道连接mysql
        def get_conn(self):
            # 启动ssh
            self.server.start()
            # 连接数据库 host必须为127.0.0.1
            conn = MySQLdb.connect(host='127.0.0.1',
                                   port=self.server.local_bind_port,
                                   user='root',
                                   passwd='',
                                   db=self.read_ini.get_value('database'),
                                   charset='utf8')
            return conn
    
        # 退出方法
        def __exit__(self, exc_type, exc_val, exc_tb):
            # 关闭数据库游标对象
            self.cur.close()
            # 关闭数据库连接
            self.conn.close()
            # 关闭ssh
            self.server.stop()
        
        # 获取单个数据结果
        def get_one(self, query, param=None):
            try:
                # 通过数据库游标执行SQL query为SQL语句块
                self.cur.execute(query, param)
                # 接受结果集
                result = self.cur.fetchone()
                # 处理结果集
                if result is not None:
                    response = dict(zip([k[0] for k in self.cur.description], result))
                else:
                    response = result
                # 返回结果集
                return response
            except Exception as e:
                # 捕获异常后回滚
                self.conn.rollback()
                raise e
    
        def get_more(self, query, param=None):
            try:
                self.cur.execute(query, param)
                result = self.cur.fetchall()
                if result is not None:
                    response = [dict(zip([k[0] for k in self.cur.description], row)) for row in result]
                else:
                    response = result
                return response
            except Exception as e:
                self.conn.rollback()
                raise e
    
        def update(self, query, param=None):
            return self.__edit(query, param)
    
        def delete(self, query, param=None):
            return self.__edit(query, param)
    
        def insert_one(self, query, param=None):
            return self.__edit(query, param)
    
        def __edit(self, query, param):
            count = 0
            try:
                self.cur.execute(query, param)
                self.conn.commit()
            except Exception as e:
                self.conn.rollback()
                raise e
            return count
    
        def insert_many(self, query, param=None):
            try:
                self.cur.executemany(query, param)
                self.conn.commit()
            except Exception as e:
                self.conn.rollback()
                raise e
    
    
    if __name__ == '__main__':
        with SSHMySQL() as db:
            # sql = "SELECT * FROM backup_manager WHERE `name` LIKE %s;"
            # args = ('backup%',)
            # res = db.get_more(sql, args)
            # print(res)
    
            query = 'SELECT UUID FROM virtualmachine WHERE label LIKE %s;'
            args = ('lxq_nginx' + '%',)
            res = db.get_more(query, args)
            print(res)
    不积跬步,无以至千里;不积小流,无以成江海。
  • 相关阅读:
    正则表达式基础知识
    成功的基本法则
    Java实现简单的格式化信函生成器
    C实现哈希表
    C实现求解给定文本中以指定字符开头和结尾的子串数量的三种算法
    Java实现求解二项式系数及代码重构
    Java 异常处理学习总结
    C实现大整数幂求模问题的两种算法
    linux 学习前言
    提高编程能力的10种方法
  • 原文地址:https://www.cnblogs.com/xuezhimin-esage-2020/p/14473461.html
Copyright © 2011-2022 走看看