zoukankan      html  css  js  c++  java
  • python访问mysql

    # -*- coding: UTF-8 -*-
    import time
    
    import phoenixdb
    import psycopg2
    import pymysql
    from urllib.parse import urlparse
    
    """
    将数据库的连接信息组装成一个connection对象
    """
    
    
    class Connection(object):
        def __init__(self, conn_type, host, port, schema, username, password):
            self.host = host
            self.port = port
            self.conn_type = conn_type
            self.schema = schema
            self.username = username
            self.password = password
    
        # 从jdbc_url构造connection
        @classmethod
        def create(cls, url: str, username=None, password=None):
            result = urlparse(url[5:]) if url.startswith('jdbc:') else urlparse(url)
            conn_dict = {'host': result.hostname,
                         'port': result.port,
                         'conn_type': result.scheme,
                         'schema': result.path[1:],
                         'username': username,
                         'password': password}
            return cls(**conn_dict)
    
        def __str__(self):
            return self.__dict__
    
        def __repr__(self):
            return str(self.__str__())
    
        @property
        def jdbc_url(self):
            return "jdbc:{conn_type}://{host}:{port}/{schema}?useCompression=true&socketTimeout=1200000".format(
                **self.__dict__)
    
    
    # 连接数据库和数仓
    class DbConnCursor(object):
    
        def __init__(self, conn_type, host, port, schema, username, password):
            if conn_type.upper() == 'MYSQL':
                self.conn = pymysql.connect(host=host,port=port,user=username,password=password,database=schema,connect_timeout=31536000)
                        
            elif conn_type.upper() == 'POSTGRESQL':
                self.conn = psycopg2.connect(host=host, port=port, user=username, password=password, database=schema)
            elif conn_type.upper() == 'HBASE':
                self.conn = phoenixdb.connect("http://{host}:{port}".format(host=host, port=port), autocommit=True)
            
            self.cur = self.conn.cursor()
    
        # 通过传入一个conn对象构造db_cursor
        @classmethod
        def create(cls, conn: Connection):
            return cls(**conn.__dict__)
    
        def __enter__(self):
            return self.cur
    
        def __exit__(self, exc_type, exc_value, exc_trace):
            self.conn.commit()
            self.cur.close()
            self.conn.close()
  • 相关阅读:
    nginx主配置文件详解
    微信网页第三方登录原理
    QQ第三方登陆流程详解
    php垃圾回收机制
    mysql索引
    MySQL性能优化的最佳20+条经验
    MYSQL explain详解
    mysql分区功能详细介绍,以及实例
    MySQL分表、分区
    Redis主从读写分离配置
  • 原文地址:https://www.cnblogs.com/wangbin2188/p/12926106.html
Copyright © 2011-2022 走看看