zoukankan      html  css  js  c++  java
  • Tornado 高并发源码分析之六---异步编程的几种实现方式

    方式一:通过线程池或者进程池
    导入库futures是python3自带的库,如果是python2,需要pip安装future这个库
    备注:进程池和线程池写法相同
     1 from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
     2 from tornado.concurrent import run_on_executor
     3 
     4 def doing(s):
     5     print('xiumian--{}'.format(s))
     6     time.sleep(s)
     7     return s
     8 
     9 class MyMainHandler(RequestHandler):
    10     executor = ProcessPoolExecutor(2)   #新建一个进程池,静态变量,属于类,所以全程只有这个几个进程,不需要关闭,如果放在__init__中,则属于对象,每次请求都会新建pool,当请求增多的时候,会导致今天变得非常多,这个方法不可取
    11 
    12     @gen.coroutine
    13     def get(self, *args, **kwargs):
    14         print('开始{}'.format(self.pool_temp))
    15         a = yield self.executor.submit(doing, 20)   
    16         print('进程 %s'  % self.executor._processes)
    17         self.write(str(a))
    18         print('执行完毕{}'.format(a))
    19 
    20    @run_on_executor       #tornado 另外一种写法,需要在静态变量中有executor的进程池变量
    21     def post(self, *args, **kwargs):
    22 a = yield doing(20)
    方式二:Tornado + Celery + RabbitMQ 实现
    使用Celery任务队列,Celery 只是一个任务队列,需要一个broker媒介,将耗时的任务传递给Celery任务队列执行,执行完毕将结果通过broker媒介返回。官方推荐使用RabbitMQ作为消息传递,redis也可以
     
    一、Celery 介绍:
    1.1、注意:
    1、当使用RabbitMQ时,需要按照pika第三方库,pika0.10.0存在bug,无法获得回调信息,需要按照0.9.14版本即可
    2、tornado-celery 库比较旧,无法适应Celery的最新版,会导致报无法导入task Producter包错误,只需要将celery版本按照在3.0.25就可以了
     
    1.2、关于配置:
    单个参数配置:
     1 app.conf.CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' 
    多个参数配置:
    1 app.conf.update(
    2     CELERY_BROKER_URL = 'amqp://guest@localhost//',
    3     CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
    4 )
    从配置文件中获取:(将配置参数写在文件app.py中)
    1 BROKER_URL='amqp://guest@localhost//'
    2 CELERY_RESULT_BACKEND='redis://localhost:6379/0'
    3 app.config_from_object('celeryconfig')
     
     
    二、案例
    2.1、启动一个Celery 任务队列,也就是消费者:
    1 from celery import Celery
    2 celery = Celery('tasks', broker='amqp://guest:guest@119.29.151.45:5672', backend='amqp')  使用RabbitMQ作为载体, 回调也是使用rabbit作为载体
    3 
    4 @celery.task(name='doing')   #异步任务,需要命一个独一无二的名字
    5 def doing(s, b):
    6     print('开始任务')
    7     logging.warning('开始任务--{}'.format(s))
    8     time.sleep(s)
    9     return s+b
    命令行启动任务队列守护进程,当队列中有任务时,自动执行 (命令行可以放在supervisor中管理)
    --loglevel=info --concurrency=5
    记录等级,默认是concurrency:指定工作进程数量,默认是CPU核心数
     
    2.2、启动任务生产者
     1 import tcelery
     2 tcelery.setup_nonblocking_producer()  #设置为非阻塞生产者,否则无法获取回调信息
     3 
     4 class MyMainHandler(RequestHandler):
     5 
     6     @web.asynchronous
     7     @gen.coroutine
     8     def get(self, *args, **kwargs):
     9         print('begin')
    10         result = yield gen.Task(sleep.apply_async, args=[10])   #使用yield 获取异步返回值,会一直等待但是不阻塞其他请求
    11         print('ok--{}'.format(result.result))     #返回值结果
    12        
    13        # sleep.apply_async((10, ), callback=self.on_success)   
    14        # print('ok -- {}'.format(result.get(timeout=100)))#使用回调的方式获取返回值,发送任务之后,请求结束,所以不能放在处理tornado的请求任务当中,因为请求已经结束了,与客户端已经断开连接,无法再在获取返回值的回调中继续向客户端返回数据
    15       
    16         # result = sleep.delay(10)    #delay方法只是对apply_async方法的封装而已
    17         # data = result.get(timeout=100)  #使用get方法获取返回值,会导致阻塞,相当于同步执行
    18         
    19 
    20     def on_success(self, response):    #回调函数
    21         print ('Ok-- {}'.format(response))
     
     
     
     
     
     
     
     
  • 相关阅读:
    2.分布式锁
    1. junit用法,before,beforeClass,test,after, afterClass的执行顺序
    GC算法
    记一次"截图"功能的前期调研过程!
    程序员转行手册!
    Yarn详细的工作流程
    Yarn的三种调度器(Scheduler)
    Hadoop序列化与Java序列化的区别
    MapReduce执行过程
    从普通登录到单点登录图例
  • 原文地址:https://www.cnblogs.com/hepingqingfeng/p/6655790.html
Copyright © 2011-2022 走看看