zoukankan      html  css  js  c++  java
  • aiomysql异步操作mysql

    # coding:utf-8
    import traceback
    import logging
    import aiomysql
    import asyncio
    '''
    mysql 异步版本
    '''
    
    logobj = logging.getLogger('mysql')
    
    
    class Pmysql:
        def __init__(self):
            self.coon = None
            self.pool = None
    
        async def initpool(self):
            try:
                logobj.debug("will connect mysql~")
                __pool = await aiomysql.create_pool(
                    minsize=5,
                    maxsize=10,
                    host='127.0.0.1',
                    port=3306,
                    user='root',
                    password='root',
                    db='youku',
                    autocommit=True)
                return __pool
            except:
                logobj.error('connect error.', exc_info=True)
    
        async def getCurosr(self):
            conn = await self.pool.acquire()
            cur = await conn.cursor()
            return conn, cur
    
        async def query(self, query, param=None):
            conn, cur = await self.getCurosr()
            try:
                await cur.execute(query, param)
                return await cur.fetchall()
            except:
                logobj.error(traceback.format_exc())
            finally:
                if cur:
                    await cur.close()
                # 释放掉conn,将连接放回到连接池中
                await self.pool.release(conn)
    
        async def execute(self, query, param=None):
            conn, cur = await self.getCurosr()
            try:
                await cur.execute(query, param)
                if cur.rowcount == 0:
                    return False
                else:
                    return True
            except:
                logobj.error(traceback.format_exc())
            finally:
                if cur:
                    await cur.close()
                # 释放掉conn,将连接放回到连接池中
                await self.pool.release(conn)
    
    
    async def test():
        mysqlobj = await getAmysqlobj()
        # UPDATE `youku`.`person` SET `psName` = '张三丰' WHERE (`id` = '3');
        exeRtn = await mysqlobj.execute("update person set psName='马云' where id = 3")
        if exeRtn:
            print('操作成功')
        else:
            print('操作失败')
    
    
    async def getAmysqlobj():
        mysqlobj = Pmysql()
        pool = await mysqlobj.initpool()
        mysqlobj.pool = pool
        return mysqlobj
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(test())
    

      

  • 相关阅读:
    MVC3 string equlas int 方法
    AjAx ComponentArt. NavBar 的用法
    GridView重写排序、分页 (原作)
    如何用 Calendar 控件来做日程管理
    无刷新仿google波形扭曲彩色Asp.net验证码
    Asp.net 2.0图形报表制作chart(原作)
    WinForm.Net 界面皮肤使用资源(C#原作)
    java Date类用法(转)
    画类图
    LCA tarjan hdu 2586代码详细步骤(转)
  • 原文地址:https://www.cnblogs.com/ygy1997/p/11753335.html
Copyright © 2011-2022 走看看