目录:
一、queue 二、线程 基本使用 线程锁 自定义线程池 生产者消费者模型(队列) 三、进程 基本使用 进程锁 进程数据共享 默认数据不共享 queues array Manager.dict 进程池 PS: IO密集型-多线程 计算密集型 - 多进程 四、协程 原理:利用一个线程,分解一个线程成为多个“微线程”==》程序级别 greenlet gevent pip3 install gevent
一、queue
1.1 queue用法
# 先进先出队列
# put放数据,是否阻塞,阻塞时的超时事件
# get取数据(默认阻塞),是否阻塞,阻塞时的超时事件
# 队列的最大长度:queue.Queue(2) 里面的数字
# qsize()真实个数
# maxsize 最大支持的个数
# join,task_done,阻塞进程,当队列中任务执行完毕之后,不再阻塞
import queue q = queue.Queue(2) # q = queue.Queue()如果没有参数的话,就是可以放无限多的数据。 print(q.empty()) # 返回队列是否为空,空则为True,此处为True q.put(11) q.put(22) print(q.empty()) # 此处为False print(q.qsize()) # 返回队列中现在有多少元素 # q.put(22) # q.put(33,block=False) # 如果队列最大能放2个元素,这时候放了第三个,默认是阻塞的,block=False,如果就会报错:queue.Full # q.put(33,block=True,timeout=2) # 设置为阻塞,如果timeout设置的时间之内,还没有人来取,则就会报错:queue.Full print(q.get()) print(q.get()) print(q.get(timeout=2)) # 队列里的数据已经取完了,如果再取就会阻塞,这里timeout时间2秒,就是等待2秒,队列里还没有数据就报错:queue.Empty
1.2 queue.join
# join:实际上意味着等到队列为空,再执行别的操作,否则就一直阻塞,不是说get取完了,就不阻塞了,而是每次get之后,
# 要执行:task_done 告诉一声已经取过了,等队列为空,join才不阻塞。
下面的程序是阻塞的
q = queue.Queue(5) q.put(123) q.put(456) q.get() # q.task_done() q.get() # q.task_done() # 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号 q.join()
下面的程序是不阻塞的:
q = queue.Queue(5) q.put(123) q.put(456) q.get() q.task_done() q.get() q.task_done() # 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号 q.join()
1.3 其他队列
import queue
# queue.Queue,先进先出队列
# queue.LifoQueue,后进先出队列
# queue.PriorityQueue,优先级队列
# queue.deque,双向对队
# queue.Queue(2) 先进先出队列
# put放数据,是否阻塞,阻塞时的超时事件
# get取数据(默认阻塞),是否阻塞,阻塞时的超时事件
# qsize()真实个数
# maxsize 最大支持的个数
# join,task_done,阻塞进程,当队列中任务执行完毕之后,不再阻塞
import queue # q = queue.Queue(2) 先进先出队列 # q = queue.LifoQueue() 后进先出队列 # q = queue.PriorityQueue() 优先级队列 # q = queue.deque() 双向队列 q = queue.LifoQueue() q.put(123) q.put(456) # 打印;456 print(q.get()) # 优先级最小的拿出来 # 如果优先级一样,则是谁先放,就先取出谁 q = queue.PriorityQueue() q.put((1,'alex1')) q.put((1,'alex2')) q.put((1,'alex3')) q.put((3,'alex3')) # (1, 'alex1') print(q.get()) q = queue.deque() q.append(123) q.append(333) q.appendleft(456) # deque([456, 123, 333]) print(q) # 打印:456 print(q[0]) q.pop() # 从右边删除 # deque([456, 123]) print(q) q.popleft() # 从左边删除
python的队列是在内存里创建的,python的进程退出了,则队列也清空了。
二、生产者消费者模型(队列)
1)生产者消费者模型的作用:
1、解决阻塞
2、就是解耦,修改生产者,不会影响消费者,反之亦然。
2)在生产环境,用生产者消费者模型,就可以解决:
1、处理瞬时并发的请求问题。瞬时的连接数就不会占满。所以服务器就不会挂了。
2、客户端提交一个请求,不用等待处理完毕,可以在页面上做别的事情。
2.1)如果不用队列存数据,服务端通过多线程来处理数据:
用户往队列存数据,服务器从队列里取数据。
没有队列的话,就跟最大连接数有关系,每个服务器就有最大连接数。
客户端要获取服务器放回,服务器要查、修改数据库或修改文件,要2分钟,那客户端就要挂起链接2分钟,2万个连接一半都要挂起,服务器就崩溃了。
如果没有队列,第一个用户发来请求,连上服务器,占用连接,等待2分钟。
第二个人来也要占用2分钟。
web服务器
如果要处理并发,有10万并发,如果:一台机器接收一个连接,需要10万个机器,等待2分钟就处理完了。
2.2)把请求放在队列的好处
用户发来请求,把请求放到队列里,可以让连接马上断开,不会阻塞,就不占用服务器的连接数了。如果看到订单处理了没,就要打开另外一个页面,查看请求是否处理。
服务器查询处理任务的时候,每个才花2分钟,服务器耗时是没有减少的。
但是这样做,客户端就不会持续的占用连接了。那瞬时的连接数就不会占满。所以服务器就不会挂了。
但是后台要处理10万个请求,也需要50台服务器。并不会减少服务器数量。
这样就能处理瞬时并发的请求问题。
服务器只是处理请求,是修改数据库的值,不是告诉客户端。而是客户端再发来请求,查询数据库已经修改的内容。
提交订单之后,把这个订单扔给队列,程序返回“正在处理”,就不等待了,然后断开这个连接,你可以在页面里做别的事情,不用一直等待订单处理完。这样就不影响服务器的最大连接数。在页面帮你发起一个alax请求,url,不停的请求(可能是定时器),我的订单成功没有,我的订单成功没有,如果订单成功了,就自动返回页面:订单成功
如果不用队列的话,一个请求就占用一个服务器,等待的人特别多,等待连接的个数太多了。服务器就挂掉了。
队列就没有最大个数限制,把请求发给队列了,然后http链接就断开了,就不用等待了。
12306买票的时候,下次再来请求的时候,就会告诉你,前面排了几个人。
3)python queue的特点:
python的queue是内存级别的。rabbitmq可以把队列发到别的服务器上处理。
所以python里的queue不能持久化,但是rabbitmq可以持久化。
queue.Queue()这样写,队列就没有最大个数限制。queue.Queue(5)就是说队列里最多能放5个值
4)生产者消费者代码示例:
import time,random import queue,threading q = queue.Queue() def Producer(name): count =0 while True: time.sleep(random.randrange(3)) if q.qsize()<3: # 只要盘子里小于3个包子,厨师就开始做包子 q.put(count) print("Producer %s has produced %s baozi.." %(name,count)) count += 1 def Consumer(name): count =0 while True: time.sleep(random.randrange(4)) if not q.empty(): # 只要盘子里有包子,顾客就要吃。 data = q.get() print(data) print('