zoukankan      html  css  js  c++  java
  • 还有这种操作?异步迭代器的切片操作!!!

    问题引出

    对于同步中的迭代器我们可以使用itertools的islice模块来实现

    # -*- coding: utf-8 -*-
    # @Time : 2019/1/2 11:52 AM
    # @Author : cxa
    # @File : 切片.py
    # @Software: PyCharm
    from itertools import islice
    
    la = (x for x in range(20))
    print(type(la))
    for item in islice(la, 5, 9):  # 取下标5-9的元素
        print(item)
    

    输出

    <class 'generator'>
    5
    6
    7
    8
    

    那如何对异步生成器进行类似切片的操作呢?

    问题产生

    我在使用mongo的异步模块motor的使用,查询得到300万条数据,然后去进行操作,

    async def get_data():
        data=await get_detail_datas()
        return data
    

    发现返回结果data为AsyncIOMotorCursor类型,查阅资料得知该类型属于async_generator也就是下面要说的异步生成器。

    异步生成器的形式

    针对上面的结果可以通过async for进行遍历获取结果

    async def get_data():
        data=await get_detail_datas()
        async for item in data:
            print(item)
    

    但是问题来了,如何对异步迭代器进行切片操作呢。

    aiostream

    aiostream提供了一组流操作符,可以将它们组合在一起以创建异步操作管道.
    它可以看作是itertools的异步版本,所以可以实现异步迭代器的切片功能

    要求

    python >= 3.6

    安装

    pip3 install aiostream
    

    使用

    import asyncio
    from aiostream import stream
    
    
    async def generate_numbers(n):
        for x in range(n):
            yield x
    
    
    async def consume_some_numbers(n, m):
        zs = stream.take(generate_numbers(n), m)
        t=await stream.list(zs) #返回切片的结果列表
        print(t)
        # async with zs.stream() as streamer: #迭代
        #     # Asynchronous iteration
        #     async for z in streamer:
        #         # Print 1, 9, 25, 49 and 81
        #         print('->', z)
    
    async def get_data():
        await consume_some_numbers(10, 5)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(get_data())
    

    这里使用了aiostream,和python自带的asyncio库,
    首先定义异步函数generate_numbers,作用是生成迭代器,内部含有1-n。
    consume_some_numbers是具体的实现方法,stream.take第一个参数可以传入一个异步迭代器函数,第二个参数是异步迭代器方法的参数。
    然后函数get_data() 去调用了consume_some_numbers。
    后面通过loop创建一个事件循环,然后等结果运行完毕。

    分流

    async def branch(coros, limit=10):
        index = 0
        while True:
            xs = stream.iterate(coros)
            ys = xs[index:index + limit]
            t = await stream.list(ys)
            if not t:
                break
            await asyncio.ensure_future(asyncio.wait(t))
            index += limit 
    

    总结

    这里主要说的是异步迭代器的用法,需要有一定的异步基础才能看懂。

    参考资料
    https://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.take

  • 相关阅读:
    Excel Rendering Limitations
    Output Caching and VaryByParam, VaryByCustom
    ajaxToolkit:AutoCompleteExtender 使用键值对
    Sql Server 2005 存储过程分页
    WEB前端优化
    processModel Element in Machine.config
    如何监测虚拟内存
    MemoryLimit Tuning (ASP.NET v1.1)
    缓存之SqlDependency
    SQL产生随机字符串
  • 原文地址:https://www.cnblogs.com/c-x-a/p/10208179.html
Copyright © 2011-2022 走看看