zoukankan      html  css  js  c++  java
  • ORM + 数据库链接池

    db_pool.py

    from DBUtils.PooledDB import PooledDB
    import pymysql
    
    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,
        # 链接池中最多共享的链接数量,0和None表示全部共享。
        # PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
        ping=0,
        # ping MySQL服务端,检查是否服务可用。
        # 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
        host='127.0.0.1',
        port=3306,
        user='root',
        password='123',
        database='db2',
        charset='utf8',
        autocommit='True'
    )
    View Code

    mysql_singleton.py

    import pymysql
    from orm_pool import db_pool
    
    class Mysql(object):
        def __init__(self):
            self.conn = db_pool.POOL.connection()
            self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
    
        def close(self):
            self.cursor.close()
            self.conn.close()
    
        def select(self,sql,args=None):
            self.cursor.execute(sql,args)
            res = self.cursor.fetchall()  # 列表套字典
            return res
    
        def execute(self,sql,args):
            try:
                self.cursor.execute(sql,args)
            except BaseException as e :
                print(e)
    View Code

    orm.py

    from orm_singleton.mysql_singleton import Mysql
    
    
    # 定义字段类
    class Field(object):
        def __init__(self, name, column_type, primary_key, default):
            self.name = name
            self.column_type = column_type
            self.primary_key = primary_key
            self.default = default
    
    
    # 定义具体的字段
    class StringField(Field):
        def __init__(self, name, column_type='varchar(255)', primary_key=False, default=None):
            super().__init__(name, column_type, primary_key, default)
    
    
    class IntegerField(Field):
        def __init__(self, name, column_type='int', primary_key=False, default=None):
            super().__init__(name, column_type, primary_key, default)
    
    
    class ModelMetaClass(type):
        def __new__(cls, class_name, class_bases, class_attrs):
            # 我仅仅只想拦截模型表的类的创建过程
            if class_name == 'Models':
                return type.__new__(cls, class_name, class_bases, class_attrs)
            # 给类放表名,主键字段,所有字段
            table_name = class_attrs.get('table_name', class_name)
            # 定义一个存储主键的变量
            primary_key = None
            # 定义一个字典用来存储用户自定义的表示表的所有字段信息
            mappings = {}
            # for循环当前类的名称空间
            for k, v in class_attrs.items():
                if isinstance(v, Field):
                    mappings[k] = v
                    if v.primary_key:
                        if primary_key:
                            raise TypeError("主键只能有一个")
                        primary_key = v.name
            # 将重复的键值对删除
            for k in mappings.keys():
                class_attrs.pop(k)
            if not primary_key:
                raise TypeError('必须要有一个主键')
            # 将处理好的数据放入class_attrs中
            class_attrs['table_name'] = table_name
            class_attrs['primary_key'] = primary_key
            class_attrs['mappings'] = mappings
            return type.__new__(cls, class_name, class_bases, class_attrs)
    
    
    class Models(dict, metaclass=ModelMetaClass):
        def __init__(self, **kwargs):
            super().__init__(**kwargs)
    
        def __getattr__(self, item):
            return self.get(item, '没有该键值对')
    
        def __setattr__(self, key, value):
            self[key] = value
    
        # 查询全部
        @classmethod
        def select(cls, **kwargs):
            ms = Mysql()
            # select * from userinfo
            if not kwargs:
                sql = 'select * from %s' % cls.table_name
                res = ms.select(sql)
            else:
                # select * from userinfo where id = 1
                k = list(kwargs.keys())[0]
                v = kwargs.get(k)
                sql = 'select * from %s where %s=?' % (cls.table_name, k)
                # select * from userinfo where id = ?
                sql = sql.replace('?', '%s')  # select * from userinfo where id = %s
                res = ms.select(sql,v)
            if res:
                return [ cls(**r) for r in res]  # 将数据库的一条数据映射成类的对象
    
         # 查询单条
        @classmethod
        def select_one(cls, **kwargs):
            # 只查一条
            key = list(kwargs.keys())[0]
            value = kwargs[key]
            # select * from user where id=%s
            sql = 'select * from %s where %s=?' % (cls.table_name, key)
            #
            sql = sql.replace('?', '%s')
            ms = Mysql.singleton()
            re = ms.select(sql, value)
            if re:
                # attrs={'name':'123','password':123}
                # u=User(**attrs)
                # 相当于 User(name='123',password=123)
                u = cls(**re[0])
                return u
            else:
                return
    
        # 新增方法
        def save(self):
            ms = Mysql()
            # insert into userinfo(name,password) values('jason','123')
            # insert into %s(%s) values(?)
            fields = []  # [name,password]
            values = []
            args = []
            for k,v in self.mappings.items():
                if not v.primary_key:  # 将id字段去除   因为新增一条数据 id是自动递增的不需要你传
                    fields.append(v.name)
                    args.append('?')
                    values.append(getattr(self,v.name))
            # insert into userinfo(name,password) values(?,?)
            sql = "insert into %s(%s) values(%s)"%(self.table_name,','.join(fields),','.join(args))
            # insert into userinfo(name,password) values(?,?)
            sql = sql.replace('?','%s')
            ms.execute(sql,values)
    
        # 修改方法:基于已经存在了的数据进行一个修改操作
        def update(self):
            ms = Mysql()
            # update userinfo set name='jason',password='123' where id = 1
            fields = []  # [name,password]
            values = []
            pr = None
            for k,v in self.mappings.items():
                if v.primary_key:
                    pr = getattr(self,v.name,v.default)
                else:
                    fields.append(v.name+'=?')
                    values.append(getattr(self,v.name,v.default))
            sql = 'update %s set %s where %s = %s'%(self.table_name,','.join(fields),self.primary_key,pr)
            # update userinfo set name='?',password='?' where id = 1
            sql = sql.replace('?','%s')
            ms.execute(sql,values)
    
    
    if __name__ == '__main__':
        class Teacher(Models):
            table_name = 'teacher'
            tid = IntegerField(name='tid',primary_key=True)
            tname = StringField(name='tname')
    
            
        # obj = Teacher(tname='jason老师')
        # obj.save()
        # res = Teacher.select()
        # for r in res:
        #     print(r.tname)
        # print(res)
    
    
        res = Teacher.select(tid=1)
        teacher_obj = res[0]
        teacher_obj.tname = 'jason老师'
        teacher_obj.update()
        res1 = Teacher.select()
        print(res1)
    
    
        # class User(Models):
        #     table_name = 'User'
        #     id = IntegerField(name='id', primary_key=True)
        #     name = StringField(name='name')
        #     password = StringField(name='password')
        # print(User.primary_key)
        # print(User.mappings)
        # obj = User(name='jason')
        # print(obj.table_name)
        # print(obj.primary_key)
        # print(obj.mappings)
    View Code
  • 相关阅读:
    第一章 重构
    Android View的事件分发
    java.lang.NoSuchMethodError: android.view.View.setBackground
    handler消息机制
    魅族手机Listview下拉出现hold字样的奇葩问题解决方案
    数据结构--树,二叉树
    数据结构之栈和队列
    设计模式--六大原则
    ListView上下线添加
    Python 入门(七)函数
  • 原文地址:https://www.cnblogs.com/HZLS/p/11073535.html
Copyright © 2011-2022 走看看