zoukankan      html  css  js  c++  java
  • 如何在django视图中使用asyncio(协程)和ThreadPoolExecutor(多线程)


    Django视图函数执行,不在主线程中,直接
    loop = asyncio.new_event_loop()  # 更不能loop = asyncio.get_event_loop()
    
    
    会触发

    RuntimeError: There is no current event loop in thread 

    因为asyncio程序中的每个线程都有自己的事件循环,但它只会在主线程中为你自动创建一个事件循环。所以如果你asyncio.get_event_loop在主线程中调用一次,它将自动创建一个循环对象并将其设置为默认值,但是如果你在一个子线程中再次调用它,你会得到这个错误。相反,您需要在线程启动时显式创建/设置事件循环:

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)


    在Django单个视图中使用asyncio实例代码如下(有多个IO任务时)

      

    from django.views import View
    import asyncio
    import time
    from django.http import JsonResponse
    
    class TestAsyncioView(View):
        def get(self, request, *args, **kwargs):
            """
            利用asyncio和async await关键字(python3.5之前使用yield)实现协程
            """
            start_time = time.time()
            loop = asyncio.new_event_loop()  # 或 loop = asyncio.SelectorEventLoop()
            asyncio.set_event_loop(loop)
            self.loop = loop
            try:
                results = loop.run_until_complete(self.gather_tasks())
            finally:
                loop.close()
            end_time = time.time()
            return JsonResponse({'results': results, 'cost_time': (end_time - start_time)})
    
        async def gather_tasks(self):
            """
             也可以用回调函数处理results
            task1 = self.loop.run_in_executor(None, self.io_task1, 2)
            future1 = asyncio.ensure_future(task1)
            future1.add_done_callback(callback)
    
            def callback(self, future):
                print("callback:",future.result())
            """
            tasks = (
                self.make_future(self.io_task1, 2),
                self.make_future(self.io_task2, 2)
            )
            results = await asyncio.gather(*tasks)
            return results
    
        async def make_future(self, func, *args):
            future = self.loop.run_in_executor(None, func, *args)
            response = await future
            return response
    
        """
        # python3.5之前无async await写法
        import types
        @types.coroutine
        # @asyncio.coroutine  # 这个也行
        def make_future(self, func, *args):
            future = self.loop.run_in_executor(None, func, *args)
            response = yield from future
            return response
        """
    
        def io_task1(self, sleep_time):
            time.sleep(sleep_time)
            return 66
    
        def io_task2(self, sleep_time):
            time.sleep(sleep_time)
            return 77
    

      

    在Django单个视图中使用ThreadPoolExecutor实例代码如下(有多个IO任务时)
     
    from django.views import View
    import time
    from concurrent.futures import ThreadPoolExecutor, as_completed
    
    
    class TestThreadView(View):
        def get(self, request, *args, **kargs):
            start_time = time.time()
            future_set = set()
            tasks = (self.io_task1, self.io_task2)
            with ThreadPoolExecutor(len(tasks)) as executor:
                for task in tasks:
                    future = executor.submit(task, 2)
                    future_set.add(future)
            for future in as_completed(future_set):
                error = future.exception()
                if error is not None:
                    raise error
            results = self.get_results(future_set)
            end_time = time.time()
            return JsonResponse({'results': results, 'cost_time': (end_time - start_time)})
    
        def get_results(self, future_set):
            """
            处理io任务执行结果,也可以用future.add_done_callback(self.get_result)
            def get(self, request, *args, **kargs):
                start_time = time.time()
                future_set = set()
                tasks = (self.io_task1, self.io_task2)
                with ThreadPoolExecutor(len(tasks)) as executor:
                    for task in tasks:
                        future = executor.submit(task, 2).add_done_callback(self.get_result)
                        future_set.add(future)
                for future in as_completed(future_set):
                    error = future.exception()
                    print(dir(future))
                    if error is not None:
                        raise error
                self.results = results = []
                end_time = time.time()
                return JsonResponse({'results': results, 'cost_time': (end_time - start_time)})
    
            def get_result(self, future):
                self.results.append(future.result())
            """
            results = []
            for future in future_set:
                results.append(future.result())
            return results
    
        def io_task1(self, sleep_time):
            time.sleep(sleep_time)
            return 10
    
        def io_task2(self, sleep_time):
            time.sleep(sleep_time)
            return 66
     
    附tornado中不依赖异步库实现异步非阻塞
    from tornado.web import RequestHandler
    from concurrent.futures import ThreadPoolExecutor
    class NonBlockingHandler(RequestHandler):
        """
        不依赖tornado的异步库实现异步非阻塞
        使用 gen.coroutine 装饰器编写异步函数,如果库本身不支持异步,那么响应任然是阻塞的。
        在 Tornado 中有个装饰器能使用 ThreadPoolExecutor 来让阻塞过程编程非阻塞,
        其原理是在 Tornado 本身这个线程之外另外启动一个线程来执行阻塞的程序,从而让 Tornado 变得非阻塞
        """
        executor = ThreadPoolExecutor(max_workers=2)
    
        # executor默认需为这个名字,否则@run_on_executor(executor='_thread_pool')自定义名字,经测试max_workers也可以等于1
    
        @coroutine  # 使用@coroutine这个装饰器加yield关键字,或者使用async加await关键字
        def get(self, *args, **kwargs):
            second = yield self.blocking_task(20)
            self.write('noBlocking Request: {}'.format(second))
    
        """
        async def get(self, *args, **kwargs):
            second = await self.blocking_task(5)
            self.write('noBlocking Request: {}'.format(second))
            """
    
        @run_on_executor
        def blocking_task(self, second):
            """
            阻塞任务
            """
            time.sleep(second)
            return second
    
    
    

    参考 https://blog.csdn.net/qq_34367804/article/details/75046718

      https://www.cnblogs.com/zhaof/p/8490045.html

      https://stackoverflow.com/questions/41594266/asyncio-with-django

     
  • 相关阅读:
    hrbust 1788
    poj2299 ( bit )
    LA3027(并查集)
    hdu1166 (bit)
    hdu1598(并查集)
    cdoj1215 (并查集)
    hdu2643 ( 第二类斯特灵数 )
    hdu3625 ( 第一类斯特灵数 )
    Uva10066
    怎么处理sqlserver2017部署在winowsDocker上时区无法修改成功的方式,并且可以多创建新的容器调用简单的方式直接使用!
  • 原文地址:https://www.cnblogs.com/ALXPS/p/10193296.html
Copyright © 2011-2022 走看看