zoukankan      html  css  js  c++  java
  • 数据库MySQL/Postgres/Redis异步操作

    数据库异步操作

    基于 aiomysql 异步操作mysql数据库

        异步操作 MySQL 的话,需要使用一个 aiomysql,直接 pip install aiomysql

    • 入门案例
    # -*- coding: utf-8 -*-
    
    # 导入异步操作的工具类库
    import asyncio
    import aiomysql.sa as aio_sa
    """
        # 标记点
        1. 必须使用 关键字 async 定义 需要异步处理的函数
        2. await 处理的函数中,遇到io操作标识
        3. 开启引擎
        4. 执行异步sql操作
        5. 关闭引擎
        6. 开启 事件循环对象
        7. 添加执行的异步函数
        8. 关闭 事件循环对象
        
    """
    async def main():
        # 通过 aio_sa 创建 引擎
        engine = await aio_sa.create_engine(
            host='127.0.0.1',
            port=3306,
            user='root',
            password='123456',
            db='flaskdb',
            connect_timeout=10  # 连接超时 10秒
        )
    
        # 通过 engine.acquire 获取一个mysql 数据库的连接
        async with engine.acquire() as conn:
            # 异步执行,  返回一个 异步对象
            result = await conn.execute('select * from t1;')
            print(type(result))  # <class 'aiomysql.sa.result.ResultProxy'>
    
            # 获取一条记录
            data = await result.fetchone()
            print(type(data))  # <class 'aiomysql.sa.result.RowProxy'>
    
            print(data.keys())
            print(type(data.keys()))# collections.abc.KeysView
    
            # 转换成列表
            print(list(data.keys()))
    
            # 关闭引擎
            engine.close()
            await engine.wait_closed()
    
    
    # 创建循环loop事件队列
    loop = asyncio.get_event_loop()
    # 添加 要异步执行的函数
    loop.run_until_complete(main())
    # 当完成 loop队列中的所有事件,关闭事件队列
    loop.close()
    
    
    • 入门案例2
    ### 基于 with 开启引擎和开启连接
    # -*- coding: utf-8 -*-
    from pprint import pprint
    import asyncio
    import aiomysql.sa as aio_sa
    
    """
        # 标记点
        1. with 自动关闭 连接和引擎
    """
    
    
    async def main():
        async  with aio_sa.create_engine(host='127.0.0.1',
                                         port=3306,
                                         user='root',
                                         password='123456',
                                         db='flaskdb',
                                         connect_timeout=10  # 连接超时 10秒
                                         ) as engine:
            async with engine.acquire() as conn:
                result = await conn.execute('select * from t1;')
                data = await result.fetchall()
                pprint(list(map(dict, data)))
    
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    
    
    # fetchmany 指定查询的条数
    data = await result.fetchmany(2)
    # fetchall 查询全部
    data = await result.fetchall()
    # fetchone 查询一条
    data = await result.fetchone()
    

    借助SQLAlchemy,通过 aiomysql 查询数据库SQL

    # -*- coding: utf-8 -*-
    import asyncio
    from pprint import pprint
    import aiomysql.sa as aio_sa
    from sqlalchemy.sql.selectable import Select
    from sqlalchemy import text
    
    async  def main():
        async  with aio_sa.create_engine(
                host='127.0.0.1',
                port=3306,
                user='root',
                password='123456',
                db='flaskdb',
                connect_timeout=10  # 连接超时 10秒
        ) as engine:
            async with engine.acquire() as conn:
                ###
                    # 1. whereclause 条件
                    # 2. from_obj  表
                    # 3. text 编辑 sql 条件
                
                # 利用 text / Select 生成 SQL语句
                sql = Select([text("id , name , day")],whereclause=text("id != 1"),from_obj=text("t1"))
    
                # 执行SQL
                result = await  conn.execute(sql)
    
                data = await result.fetchall()
                pprint(list(map(dict,data)))
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    

    借助SQLAlchemy,通过 aiomysql , 执行新增操作

    # -*- coding: utf-8 -*-
    import asyncio
    from pprint import pprint
    import aiomysql.sa as aio_sa
    from sqlalchemy import Table,MetaData,create_engine
    
    async  def main():
        async  with aio_sa.create_engine(
                host='127.0.0.1',
                port=3306,
                user='root',
                password='123456',
                db='flaskdb',
                connect_timeout=10  # 连接超时 10秒
        ) as engine:
            async with engine.acquire() as conn:
    
                # 创建 SQLAlchemy 中的引擎, 用来将表反射出来
                s_enging = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/flaskdb')
    
                # 反射表, MetaData是绑定引擎,获取元数据
                t1_table = Table('t1',MetaData(bind=s_enging),autoload=Table)
    
                inset_sql = t1_table.insert().values(
                    [
                        {
                            "id":1236,
                            "name":'alex',
                            "year":'2021',
                            "month":"06",
                            "day":"16",
                            "datetime":"2021-04-28 14:32:46"
                        },   {
                            "id":1237,
                            "name":'wusir',
                            "year":'2021',
                            "month":"06",
                            "day":"16",
                            "datetime":"2021-04-28 14:32:46"
                        },
                    ]
                )
    
                ### 必须开启事务,否则数据是不会插入到数据库中
                async with conn.begin():
    
                    # 执行SQL
                    result = await  conn.execute(inset_sql)
    
                    # 查看最后一条记录
                    print(result.lastrowid) # 0
                    # 查看影响的行数
                    print(result.rowcount) # 2
    
            # 再次查询, 查看是否插入到数据库
            async  with engine.acquire() as conn:
                data = await  (await conn.execute('select * from t1')).fetchall()
                data = list(map(dict,data))
                pprint(data)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    
    

    借助SQLAlchemy,通过 aiomysql , 执行更新操作

    # -*- coding: utf-8 -*-
    import asyncio
    from pprint import pprint
    import aiomysql.sa as aio_sa
    from sqlalchemy import Table, MetaData, create_engine, text
    
    
    async  def main():
        async  with aio_sa.create_engine(
                host='127.0.0.1',
                port=3306,
                user='root',
                password='123456',
                db='flaskdb',
                connect_timeout=10  # 连接超时 10秒
        ) as engine:
            async with engine.acquire() as conn:
    
                # 创建 SQLAlchemy 中的引擎, 用来将表反射出来
                s_enging = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/flaskdb')
    
                # 反射表, MetaData是绑定引擎,获取元数据
                t1_table = Table('t1',MetaData(bind=s_enging),autoload=Table)
                update_sql = t1_table.update().where(text('name="alex"')).values({"year":'2222'})
                ### 必须开启事务,否则数据是不会更新到数据库中
                async with conn.begin():
    
                    # 执行SQL
                    result = await  conn.execute(update_sql)
    
                    # 查看最后一条记录
                    print(result.lastrowid) # 0
                    # 查看影响的行数
                    print(result.rowcount) # 1
    
            # 再次查询, 查看是否更新到数据库
            async  with engine.acquire() as conn:
                data = await  (await conn.execute('select * from t1 where name="alex" ')).fetchall()
                data = list(map(dict,data))
                pprint(data)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    

    借助SQLAlchemy,通过 aiomysql , 执行删除操作

    # -*- coding: utf-8 -*-
    import asyncio
    from pprint import pprint
    import aiomysql.sa as aio_sa
    from sqlalchemy import Table, MetaData, create_engine, text
    
    
    async  def main():
        async  with aio_sa.create_engine(
                host='127.0.0.1',
                port=3306,
                user='root',
                password='123456',
                db='flaskdb',
                connect_timeout=10  # 连接超时 10秒
        ) as engine:
            async with engine.acquire() as conn:
    
                # 创建 SQLAlchemy 中的引擎, 用来将表反射出来
                s_enging = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/flaskdb')
    
                # 反射表, MetaData是绑定引擎,获取元数据
                t1_table = Table('t1',MetaData(bind=s_enging),autoload=Table)
                
                delete_sql = t1_table.delete() # 全部删除
    
                ### 必须开启事务,否则数据是不会插入到数据库中
                async with conn.begin():
    
                    # 执行SQL
                    result = await  conn.execute(delete_sql)
    
                    # 查看最后一条记录
                    print(result.lastrowid) # 0
                    # 查看影响的行数
                    print(result.rowcount) # 1237
    
            # 再次查询, 查看是否删除了所有数据
            async  with engine.acquire() as conn:
                data = await  (await conn.execute('select * from t1 where name="alex" ')).fetchall()
                data = list(map(dict,data))
                pprint(data) #  []
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    

    基于 asyncpg 对 PostgreSQL 执行异步操作

        异步操作 PostgreSQL,需要使用一个 asyncpg ,直接 pip install asyncpg . asyncpg相比较于psycopg2 效果更好

    异步查询

    # -*- coding: utf-8 -*-
    import asyncio
    from pprint import pprint
    import asyncpg
    
    async  def main():
    
        # 创建 数据库连接驱动
        conn = await asyncpg.connect(
            host='192.168.236.128',
            port=5432,
            user='postgres',
            password='123456',
            database='mydb',
            timeout=10
        )
    
        # - fetchrow  获取满足调价的单条记录
        row1 = await  conn.fetchrow('select * from company;')
        pprint(row1)
        print(type(row1)) # <class 'asyncpg.Record'>
        ##### <class 'asyncpg.Record'>  Record 对象的用法
        # 1. 通过字典的方式获取数据值
        print(row1['id'],row1['name'])  # 8 Paul
    
        # 2. 字典的get方式获取数据
        print(row1.get('id'))
    
        # 3. keys , values ,items
        print(list(row1.keys()))  # tuple_iterator 字段名字
        print(list(row1.values())) # RecordIterator 值
        print(dict(row1.items())) # RecordItemsIterator 字典形式  {字段名:值}
        print(dict(row1))
    
    
        # - fetch 执行 获取满足条件的全部记录
        row2 = await  conn.fetch('select * from company;')
        pprint(row2)
    
        # 关闭连接
        await conn.close()
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    
    

    基于sqlalchemy异步PSQL查询

    # -*- coding: utf-8 -*-
    import asyncio
    from pprint import pprint
    import asyncpg
    from sqlalchemy import text
    from sqlalchemy.sql import Select
    
    
    async  def main():
    
        # 创建 数据库连接驱动
        conn = await asyncpg.connect(
            host='192.168.236.128',
            port=5432,
            user='postgres',
            password='123456',
            database='mydb',
            timeout=10
        )
        # 利用 sqlalchemy 工具类 ,组装sql语句
        sql = Select([text('id,name,age')],whereclause=text('id != 1'),from_obj=text('company'))
        rows = await conn.fetch(str(sql))  #  asyncpg  只支持字符串查询
        pprint(list(map(dict,rows)))
        
        # conn.fetch 也支持 占位符  . $ 符号 作为占位符
        rows2 = await conn.fetch("select * from company where id != $1", 1)
        pprint(list(map(dict,rows2)))
    
        # 关闭连接
        await conn.close()
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    

    异步PSQL添加 INSERT

    # -*- coding: utf-8 -*-
    import asyncio
    from pprint import pprint
    import asyncpg
    """
        # 1. executemany 执行多条sql语句 , 不具备事务功能
        # 2. execute 执行一条SQL语句 , 具有事务功能
    """
    
    async def main():
        # 创建 数据库连接驱动
        conn = await asyncpg.connect(
            host='192.168.236.128',
            port=5432,
            user='postgres',
            password='123456',
            database='mydb',
            timeout=10
        )
    
        ### 执行 insert 语句
        # execute 执行单条 SQL语句
        row = await conn.execute("insert into company(id,name,age,address,salary) values ($1,$2,$3,$4,$5)", 1,'zhangsan', 18, 'bj',
                                 10000)
        pprint(row)  # 'INSERT 0 1'
        pprint(type(row))  # <class 'str'>
    
    
        # executemany 执行多条语句 .第一条参数是一个模板,第二条命令是包含多个元组的列表
        rows = await conn.executemany("insert into company(id,name,age,address,salary) values ($1,$2,$3,$4,$5)",
                                      [(2,'lisi', 18, 'sh',
                                 2222), (3,'wangwu', 18, 'hn',
                                 3333)])
    
        # 关闭连接
        await conn.close()
    
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    

    异步PSQL批量添加之事务

    import asyncio
    from pprint import pprint
    import asyncpg
    """
     1. 开启事务后,执行多个插入语句 . 遇到异常则自动回滚事务
    
    """
    async def main():
        # 创建 数据库连接驱动
        conn = await asyncpg.connect(
            host='192.168.236.128',
            port=5432,
            user='postgres',
            password='123456',
            database='mydb',
            timeout=10
        )
    
        # 开启 一步事务 , 会开启一个事务,使用异步上下文管理
        async with conn.transaction():
            await conn.executemany("insert into company(id,name,age,address,salary) values ($1,$2,$3,$4,$5)",
                                   [(2, 'lisi', 18, 'sh',2222),
                                    (3, 'wangwu', 18, 'hn', 3333)])
        
        # 关闭连接
        await conn.close()
    
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    

    异步PSQL修改

    # -*- coding: utf-8 -*-
    import asyncio
    from pprint import pprint
    import asyncpg
    
    async def main():
        # 创建 数据库连接驱动
        conn = await asyncpg.connect(
            host='192.168.236.128',
            port=5432,
            user='postgres',
            password='123456',
            database='mydb',
            timeout=10
        )
    
        # 修改一条记录
        row = await conn.execute('update company set name = $1 where name = $2 ','zhangsan','geigei')
        print(row)  # UPDATE 0
    
        # 修改多条记录
        rows = await conn.executemany('update company set name = $1 where name = $2',[('zhangsangeigei','zhangsan'),('lisi','geigei2')])
        print(rows) # None
    
    
        #  returning  返回 数据 id
        row = await conn.fetch("update company set name = 'zhangsan' where name ='zhangsan' returning id")
        print(row)
    
        # 关闭连接
        await conn.close()
    
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    
    

    异步PSQL删除

    # -*- coding: utf-8 -*-
    import asyncio
    from pprint import pprint
    import asyncpg
    
    async def main():
        # 创建 数据库连接驱动
        conn = await asyncpg.connect(
            host='192.168.236.128',
            port=5432,
            user='postgres',
            password='123456',
            database='mydb',
            timeout=10
        )
    
        async with conn.transaction():
    
            # 删除一条
            row = await conn.execute("delete from company where name in ($1,$2)","张三","李四")
            print(row)
    
            # 删除多条
            rows = await conn.executemany("delete from company where id = $1 ",[(2,),(3,)])
            print(rows)
    
            # 返回 id
            rows = await  conn.fetch("delete from company where name in ($1,$2) returning id ",'zhangsan','lisi' )
            print(rows)
        # 关闭连接
        await conn.close()
    
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    
    

    连接池

    
    # -*- coding: utf-8 -*-
    
    import asyncio
    import asyncpg
    
    # asyncpg 还提供了连接池,需要的话往池子里面去取即可。
    
    async def main():
        pool = await asyncpg.create_pool(
            host='192.168.236.128',
            port=5432,
            user='postgres',
            password='123456',
            database='mydb',
            timeout=10,
            # 连接池初始化时默认的最小连接数, 默认为 10
            min_size=10,
            # 连接池最大数量 默认是 10,
            max_size=10,
            # 每个链接最大查询数量, 超过了就换新的连接, 默认 5000
            max_queries=5000,
            # 最大不活跃时间, 默认 300.0, 超过这个时间的连接就会被关闭, 传入 0 的话则永不关闭
            max_inactive_connection_lifetime=300
        )
    
        async with pool.acquire() as conn:
            async with conn.transaction():
                row = await conn.fetchrow("select  '100'::int + '200'  ") # <Record ?column?=300>
                print(row)
    
                # 起别名  as
                row = await conn.fetchrow("select '100'::int + '200' as result ")
                print(row)
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    

    异步版PSQL

    
    # -*- coding: utf-8 -*-
    import time
    import asyncio
    import asyncpg
    
    
    async def run_sql(conn, query_list):
        result = []
        for query in query_list:
            result.append(await conn.fetch(query))
        await conn.close()
        return len(result)
    
    
    async def main():
        async with asyncpg.create_pool(
                host='192.168.236.128',
                port=5432,
                user='postgres',
                password='123456',
                database='mydb',
                timeout=10,
                # 连接池初始化时默认的最小连接数, 默认为 10
                min_size=10,
                # 连接池最大数量 默认是 10,
                max_size=10,
                # 每个链接最大查询数量, 超过了就换新的连接, 默认 5000
                max_queries=5000,
                # 最大不活跃时间, 默认 300.0, 超过这个时间的连接就会被关闭, 传入 0 的话则永不关闭
                max_inactive_connection_lifetime=300,
        ) as pool:
            query_list = [('select * from company;') for _ in range(20)]
    
            # 1. 创建 4 个 异步连接
            count = len(query_list) // 5
    
            # 2. 切片, 将任务分成 5 份
            query_list = [query_list[c * 4:(c + 1) * 4] for c in range(count + 1)]
    
            tasks = []
            for q in query_list:
                conn = await pool.acquire()
                tasks.append(run_sql(conn, q))
            # gather通常被用来阶段性的一个操作,做完第一步才能做第二步
            results = await  asyncio.gather(*tasks)
            return results
    
    
    if __name__ == '__main__':
        start = time.perf_counter()
        loop = asyncio.get_event_loop()
        results = loop.run_until_complete(main())
        end = time.perf_counter()
    
        for res in results:
            print(res)
        print(f'耗时: {end - start}')
    
    

    异步操作 Redis

    
    import asyncio
    import aioredis
    
    async def main():
        conn = await aioredis.create_connection('redis://:password@localhost:6379')
        # 指令: ex 超时
        data = await conn.execute("set","name","alex","ex",10)
        
        await conn.close()
        print(conn.closed) # True
        
    asyncio.run(main())
    
  • 相关阅读:
    Linux下判断字符串长度
    Linux下使用xargs得到字符串作为参数进行输出、awk得到字符串作为参数进行输出
    Linux下使用xargs将多行文本转换成一行并用tr实现逗号隔开
    Linux下使用split按行数进行切割
    Maven设置snapshot无法在远程仓库下载的问题解决
    Intellij IDEA自动生成serialVersionUID
    Spring中@Value用法收集
    Http报头Accept与Content-Type的区别(转)
    Linux下Shell的for循环语句
    Shell脚本中的分号使用
  • 原文地址:https://www.cnblogs.com/dengz/p/14889798.html
Copyright © 2011-2022 走看看