zoukankan      html  css  js  c++  java
  • mysql-redis连接

    # log 数据库连接
    class LogMysql(object):
        conn = None
        cursor = None
    
        def __init__(self):
            self.conn = pymysql.connect(host='', user='',
                                password='',
                                database='log', charset='utf8')
            self.cursor = self.conn.cursor()
    
        # 为了方便使用一般会选择将查询结果加上字段名称以字典组的方式返回查询结果
        def dict_fetchall(self):
            "Return all rows from a cursor as a dict"
            # 获取查询字段
            columns = [col[0] for col in self.cursor.description]
            print(columns)
            return [dict(zip(columns, row)) for row in self.cursor.fetchall()]
    
        # 获取表列表
        def get_table_list(self):
            # 判断表是否存在
            self.cursor.execute("SHOW TABLES")
            res = self.cursor.fetchall()
            table_list = []
            for i in res:
                table_list.append(i[0])
            # print("table_list", table_list)
            return table_list
    
    
    # redis主库
    class Redis(object):
        conn = None
    
        def __init__(self):
            poll = redis.ConnectionPool(host='192.168.5.219', port=6379, db=14, password='root1234@A')
            # 本地测试
            # poll = redis.ConnectionPool(host='192.168.10.10', port=7000, db=14)
            self.conn = redis.Redis(connection_pool=poll)


    class LogMysql(object):
        conn = None
        cursor = None
        table = None
        database = None

        def __init__(self, database, table):
            self.table = table
            self.database = database
            self.conn = pymysql.connect(host='rm-2zezqp8sll2swzwby.mysql.rds.aliyuncs.com', user='datacenter',
                                        password='kbs11zx@',
                                        database=self.database, charset='utf8')

            # 本地测试
            # self.conn = pymysql.connect(host='192.168.10.5', user='root',
            #                             password='root',
            #                             database='unionlog', charset='utf8')
            self.cursor = self.conn.cursor()

        # 为了方便使用一般会选择将查询结果加上字段名称以字典组的方式返回查询结果
        def dict_fetchall(self):
            "Return all rows from a cursor as a dict"
            # 获取查询字段
            columns = [col[0] for col in self.cursor.description]
            # print(columns)
            return [dict(zip(columns, row)) for row in self.cursor.fetchall()]

        # 插入数据库
        def insert_db(self, data):
            # ##################### 表名 #####################
            table = self.table
            keys = ', '.join(data.keys())
            values = ', '.join(['%s'] * len(data))
            sql = 'INSERT INTO {table}({keys}) VALUES ({values})'.format(table=table, keys=keys, values=values)
            try:
                self.cursor.execute(sql, tuple(data.values()))
                print('insert Successful')
                self.conn.commit()
                self.cursor.close()
                self.conn.close()
            except Exception as e:
                print('insert Failed')
                self.conn.rollback()
                self.cursor.close()
                self.conn.close()

        # 更新数据库
        def updata_db(self, data):
            # ##################### 表名 #####################
            table = self.table
            keys = ', '.join(data.keys())
            values = ', '.join(['%s'] * len(data))
            # 实际用的是插入语句,不过加了ON DUPLICATE KEY UPDATE(主键存在,则执行更新操作)
            sql = 'INSERT INTO {table}({keys}) VALUES ({values}) ON DUPLICATE KEY UPDATE'.format(table=table, keys=keys,
                                                                                                 values=values)
            update = ','.join([" {key} = %s".format(key=key) for key in data])
            sql += update
            try:
                self.cursor.execute(sql, tuple(data.values()) * 2)
                print('update Successful')
                self.conn.commit()
                self.cursor.close()
                self.conn.close()
            except Exception as e:
                print('update Failed')
                self.cursor.close()
                self.conn.close()
        

        def update_db_sql(self, sql):
            # sql="""update contact set mobile=100088 where name='kate'"""
            try:
                self.cursor.execute(sql)  #执行sql
                self.conn.commit() #提交到数据库
                # db_cursor.execute("select * from contact")
                # ars=db_cursor.fetchall()
                # for rs in ars:
                #     print(rs)
                print('更新成功')
            except Exception as e:
                print("error to update:",e)
                self.conn.rollback()  #发生错误则回滚
                self.cursor.close()
                self.conn.close()
  • 相关阅读:
    redis--列表
    redis ——字符串
    redis 第一节 redis安装、PHP扩展 、主从
    Python--day7
    Python--day6
    Python爬虫
    JSON基础
    Python--day5
    Python—day3
    Windows10 安装QT问题
  • 原文地址:https://www.cnblogs.com/xiao-xue-di/p/11865040.html
Copyright © 2011-2022 走看看