- 消息队列 Message Queue
MQ是在消息传输过程中保存消息的容器。
MQ最经典的用法就是在producer和customer之间通过消息管道来传递消息,producer往管道中写入消息,customer从管道中读取消息。
操作系统提供了很多机制来实现进程间的通信,multiprocessing模块就提供了Queue()和Pipe()两种方法来实现。
- multiprocessing 模块中的方法:Queue()
1 from multiprocessing import Queue 2 3 q = Queue(3) # 消息队列最大个数为3 4 5 q.put(1) # put()向队列中放数据 6 q.put("A") 7 q.put(["abc", "345"]) 8 # q.put("test") # 队列满了就无法再放入数据了 9 # q.put_nowait() # 不会等待队列有空闲位置再放入数据,如果数据放入不成功就直接崩溃(不建议使用put_nowait) 10 11 print(q.full()) # 判断队列此时是否已满 12 print(q.get()) 13 print(q.get()) 14 15 print(q.full()) 16 print(q.get()) 17 print(q.empty()) # 判断此时队列是否为空 18 # print(q.get()) # 队列空了就无法再读取数据了 19 # q.get_nowait() # 队列为空,取值的时候不等待,但是取不到值那么直接崩溃了
True 1 A False ['abc', '345'] True
- multiprocessing模块中的方法:Pipe()
multiprocessing.Pipe([duplex])
1)返回2个连接对象(conn1, conn2),代表管道的两端,默认是双向通信.如果duplex=False,conn1只能用来接收消息,conn2只能用来发送消息.不同于os.open之处在于os.pipe()返回2个文件描述符(r, w),表示可读的和可写的。
2)send()和recv()方法分别是发送和接收消息。close()表示关闭关闭管道
1 from multiprocessing import Process, Pipe 2 3 4 def fun(pipe, x): 5 pipe.send("Hello {0}".format(x)) 6 7 reciver, sender = Pipe() 8 p = Process(target=fun, args=(sender, 'Karl',)) # 传递参数为sender,其实传递的是reciver也是可以的 9 p.start() 10 print(reciver.recv()) 11 p.join() 12 print(reciver.recv()) #在等待接收
Hello Karl
- Queue模块
python提供了Queue模块来专门实现消息队列。
Queue对象实现一个FIFO队列,Queue()只有一个构造参数maxsize,用来指定队列长度,maxsize小于1就表示队列长度无限。
Queue对象主要有以下成员函数:
qsize(): 放回MQ的当前空间
empty(): 判断MQ是否为空
full(): 判断MQ是否满
put(item, block=True, timeout=None): 往MQ中存放消息。block可以控制是否阻塞,timeout指定阻塞等待时间。
put_nowait(item): 相当于put(item, False)
get(block=True, timeout=None)
以下两个函数用来判断消息对应的任务是否完成。
task_done(): 接收消息的线程通过调用此函数来说明消息对应的任务已完成。
join(): 实际上意味着等到队列为空,再执行别的操作。
- Celery异步分布式
Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具。
Celery 专注于实时任务处理,支持任务调度。
说白了,它是一个分布式队列的管理工具,我们可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列。
Celery 本身不是任务队列,它是管理分布式任务队列的工具,或者换一种说法,它封装好了操作常见任务队列的各种操作,我们用它可以快速进行任务队列的使用与管理。
- 使用Celery时的几个常用概念
1) Brokers
brokers 中文意思为中间人,在这里就是指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/拿取产品的地方(队列)
常见的 brokers 有 rabbitmq、redis、Zookeeper 等。
2) Result Stores / Backend
顾名思义就是结果储存的地方,队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果,就是 Result Stores 了
常见的 backend 有 redis、Memcached 甚至常用的数据都可以。
3) Workers
就是 Celery 中的工作者,类似与生产/消费模型中的消费者,其从队列中取出任务并执行。
4) Tasks
就是我们想在队列中进行的任务咯,一般由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。
5) Celery 其内建任务状态有如下几种:
参数 | 说明 |
---|---|
PENDING | 任务等待中 |
STARTED | 任务已开始 |
SUCCESS | 任务执行成功 |
FAILURE | 任务执行失败 |
RETRY | 任务将被重试 |
REVOKED | 任务取消 |
- 习题
这里我们用 redis 当做 celery 的 broker 和 backend。
首先,写一个task:
1 #tasks.py 2 from celery import Celery 3 4 app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0') #配置好celery的backend和broker 5 6 @app.task #普通函数装饰为 celery task 7 def add(x, y): 8 return x + y
OK,到这里,broker 我们有了,backend 我们有了,task 我们也有了,现在就该运行 worker 进行工作了,在 tasks.py 所在目录下运行:
1 celery -A tasks worker --loglevel=info
意思就是运行 tasks 这个任务集合的 worker 进行工作(当然此时broker中还没有任务,worker此时相当于待命状态)
最后一步,就是触发任务啦,最简单方式就是再写一个脚本然后调用那个被装饰成 task 的函数:
1 #trigger.py 2 from tasks import add 3 result = add.delay(4, 4) #不要直接 add(4, 4),这里需要用 celery 提供的接口 delay 进行调用 4 while not result.ready(): 5 time.sleep(1) 6 print("task done: {0}".format(result.get()))
运行此脚本
delay 返回的是一个 AsyncResult 对象,里面存的就是一个异步的结果,当任务完成时result.ready()
为 true,然后用 result.get()
取结果即可。
到此,一个简单的 celery 应用就完成啦。