zoukankan      html  css  js  c++  java
  • aiomysql+sqlalchemy实现ORM框架【实现仿优酷系统添加用户】

      马上就要开始写仿优酷系统了,在这里提前学习一下ORM。

      一开始去百度搜了一下相关资料,里面的ORM框架要自己写,所以花了半天的时间学习了sqlalchemy:

    SQLAlchemy

      SQLAlchemy 比 Django 自带的 ORM 好在哪里?

       python sqlalchemy 和django orm 对比

      不管是说Django的ORM好,还是说Sqlalchemy好,,我认为框架并不是重点,其核心在于通过学习一门ORM框架,达到能熟练的从数据库取出数据保存数据(也就是数据持久化),这才是重点。

    Aiomysql:

      同样道理,掌握一门异步库也能让自己更加首席线程的操作:

        1.为什么要使用异步访问数据库,有什么优缺点?

          在多用户高并发的情况下,若使用同步查询,则等待时间过长,代价很高;

          使用了异步能提高效率,但在同时增加了处理难度;而且还会有“读脏数据的风险”:

          A和B要买票,同时A和B都发现只剩一张票了,这个时候两人同时点击购买,则会出现“读脏数据”的情况

        2.普通的线程池和Aiomysql有什么区别?

          首先准确的来说,Aiomysql主要使用的是协程,其次才是多线程,多线程建立连接池,每个连接使用协程处理多并发的情况,

          所以,他们的区别应该是在,普通的线程池的每一个线程没有使用协程,而aiomysql线程池里的线程使用了协程

    正式开始主题:

      1.Aiomysql创建连接

    class DBcontroller:
        __engine=None
        __isinstance = False
        def __new__(cls, *args, **kwargs):
            if cls.__isinstance:  # 如果被实例化了
                return cls.__isinstance  # 返回实例化对象
            print('connecting to database...')
            asyncio.get_event_loop().run_until_complete(DBcontroller.connect())
            cls.__isinstance = object.__new__(cls)  # 否则实例化
            return cls.__isinstance  # 返回实例化的对象
    
        @staticmethod
        async def connect():
            try:
                __engine = await create_engine(user='root',
                                                  db='youku',
                                                  host='127.0.0.1',
                                                  password='root',
                                                  minsize=1,
                                                  maxsize=10,
                                                  autocommit=True)
                if __engine:
                    DBcontroller.__engine = __engine
                    DBcontroller.connectStatue =True
                    print('connect to mysql success!')
                else:
                    raise ("connect to mysql error ")
            except:
                print('connect error.', exc_info=True)
    View Code

      2.创建Model

    class User(Base,Model):
        __tablename__ = 'user'
    
        id = Column(INTEGER(11), primary_key=True)
        name = Column(String(255))
        password = Column(String(255))
        is_locked = Column(INTEGER(11), nullable=False, server_default=text("'0'"))
        is_vip = Column(INTEGER(11), nullable=False, server_default=text("'0'"))
        user_type = Column(String(255))
        register_time = Column(String(255))
    

      

     3.创建sqlalchemy表

    from sqlalchemy import Column, Integer, String, MetaData,Table
    metadata = sa.MetaData()
    
    user = Table('user', MetaData(),
                Column('id', Integer, primary_key=True),
                Column('name', String(50)),
                Column('is_locked', Integer,nullable=False,default=0),
                Column('is_vip', Integer,nullable=False,default=0),
                Column('user_type', String(12)),
                Column('register_time', String(12))
                 )
    ()
    View Code

      4.实现查询/更新/插入功能

    class DBcontroller:
        __engine=None
        __isinstance = False
        def __new__(cls, *args, **kwargs):
            if cls.__isinstance:  # 如果被实例化了
                return cls.__isinstance  # 返回实例化对象
            print('connecting to database...')
            asyncio.get_event_loop().run_until_complete(DBcontroller.connect())
            cls.__isinstance = object.__new__(cls)  # 否则实例化
            return cls.__isinstance  # 返回实例化的对象
    
        @staticmethod
        async def connect():
            try:
                __engine = await create_engine(user='root',
                                                  db='youku',
                                                  host='127.0.0.1',
                                                  password='root',
                                                  minsize=1,
                                                  maxsize=10,
                                                  autocommit=True)
                if __engine:
                    DBcontroller.__engine = __engine
                    DBcontroller.connectStatue =True
                    print('connect to mysql success!')
                else:
                    raise ("connect to mysql error ")
            except:
                print('connect error.', exc_info=True)
    
        def selectTable(self,table):
            async def select():
                dbstb={
                    'user':user
                }
                conn = await DBcontroller.__engine.acquire()
    
                try:
                    result = await conn.execute(dbstb[table].select())
                    res = await result.fetchall()
                    for row in res:
                        print(row)
                except Exception as e :
                    print(e)
                finally:
                    # DBcontroller.__engine.release(conn)
                    pass
            asyncio.get_event_loop().run_until_complete(select())
    
        def executeTable(self,table,attr):
            async def execute():
                dbstb={
                    'user':user
                }
                conn = await DBcontroller.__engine.acquire()
    
                try:
                    await conn.execute(dbstb[table].insert().values(attr))
    
                except Exception as e:
                    if e.args[0]== 1062:
                        print('主键已存在')
                    elif e.args[0]==1366:
                        if attr['id']=='':
                            print('未设置主键')
                        elif attr['is_vip']=='':
                            print('未设置vip类型')
                        elif attr['is_locked']=='':
                            print('未设置上锁类型')
                        else:print('未知错误,请联系管理员')
                    else:
                        print(f'connect failed:{e}')
                finally:
                    DBcontroller.__engine.release(conn)
            asyncio.get_event_loop().run_until_complete(execute())
    
        def updateTable(self,table,id,attr):
            async def update():
                dbstb={
                    'user':user
                }
                conn = await DBcontroller.__engine.acquire()
    
                try:
                    await conn.execute(user.update().where(dbstb[table].c.id==id).values(attr))
    
                except Exception as e:
                    if e.args[0]== 1062:
                        print('主键已存在')
                    elif e.args[0]==1366:
                        if attr['id']=='':
                            print('未设置主键')
                        elif attr['is_vip']=='':
                            print('未设置vip类型')
                        elif attr['is_locked']=='':
                            print('未设置上锁类型')
                        else:print('未知错误,请联系管理员')
                        print(e)
                    else:
                        print(f'connect failed:{e}')
                finally:
                    DBcontroller.__engine.release(conn)
            asyncio.get_event_loop().run_until_complete(update())
    

      

    结果

      

  • 相关阅读:
    消除QQ表情小游戏
    图片排序
    自定义字体
    随机图片滚动
    生成500个0-1000的随机数&&数组查找—小练习
    文字搬运
    查找 替换
    BeginInvoke和EndInvoke方法
    MVC HtmlHelper用法大全
    30分钟LINQ教程 【转载】
  • 原文地址:https://www.cnblogs.com/ygy1997/p/11759490.html
Copyright © 2011-2022 走看看