from multiprocessing import Queue,JoinableQueue # 进程IPC队列
from queue import Queue # 线程队列 先进先出的
from queue import LifoQueue # 后进先出的
put get put_nowait get_nowait full empty qsize
队列Queue
先进先出
自带锁 数据安全
栈 LifoQueue
后进先出
自带锁 数据安全
后进先出队列
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from queue import LifoQueue lq = LifoQueue(5) #超过5个会阻塞 lq.put(123) lq.put(456) lq.put('abc') lq.put('abc') lq.put('abc') lq.put('abc') lq.put('abc') print(lq) print(lq.get()) print(lq.get()) print(lq.get()) print(lq.get())
优先级队列
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from queue import PriorityQueue # 优先级队列 pq = PriorityQueue(5) pq.put((10,'aaa')) pq.put((5,'zzz')) pq.put((5,'bbb')) pq.put((20,'ccc')) print(pq.get()) print(pq.get()) print(pq.get()) print(pq.get()) #前面的值相同的情况下 根据ask排列
======================线程池===============
Threading 没有线程池的
Multiprocessing Pool
concurrent.futures帮助你管理线程池和进程池
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import time from threading import currentThread,get_ident from concurrent.futures import ThreadPoolExecutor #帮你开启线程池中的类 from concurrent.futures import ProcessPoolExecutor #帮你开启进程池中的类 def func(i): time.sleep(1) print("in %s %s" %(i,currentThread())) return i**2 def back(fn): print(fn.result(),currentThread()) t=ThreadPoolExecutor(5) t.map(func,range(20)) #启动多线程任务 """相当于 for i in range(20): t.submit(func,i) """ t = ThreadPoolExecutor(5) for i in range(20): t.submit(fn=func,) t.shutdown() print('main : ',currentThread())
获取任务的结果
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import time from threading import currentThread,get_ident from concurrent.futures import ThreadPoolExecutor #帮你开启线程池中的类 from concurrent.futures import ProcessPoolExecutor #帮你开启进程池中的类 def func(i): time.sleep(1) print("in %s %s" %(i,currentThread())) return i**2 def back(fn): print(fn.result(),currentThread()) # t = ThreadPoolExecutor(20) # ret_l = [] # for i in range(20): # ret = t.submit(func,i) # ret_l.append(ret) # t.shutdown() # for ret in ret_l: # print(ret.result()) # print('main : ',currentThread())
回调函数
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import time from threading import currentThread,get_ident from concurrent.futures import ThreadPoolExecutor #帮你开启线程池中的类 from concurrent.futures import ProcessPoolExecutor #帮你开启进程池中的类 def func(i): time.sleep(1) print("in %s %s" %(i,currentThread())) return i**2 def back(fn): print(fn.result(),currentThread()) t=ThreadPoolExecutor(20) for i in range(100): t.submit(func,i).add_done_callback(back)
回调函数 (进程版)
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import os import time from concurrent.futures import ProcessPoolExecutor #开启进程的类 def func(i): time.sleep(1) print("in %s %s" %(i,os.getpid())) return i**2 def back(fn): print(fn.result(),os.getpid()) if __name__ == '__main__': print("main:",os.getpid()) t = ProcessPoolExecutor(20) for i in range(100): t.submit(func, i).add_done_callback(back)
# threading模块是没有线程池的
# concurrent.futures 进程池 和 线程池
# 高度封装
# 进程池/线程池的统一的使用方式
# 创建线程池/进程池 ProcessPoolExecutor ThreadPoolExecutor
# ret = t.submit(func,arg1,arg2....) 异步提交任务
# ret.result() 获取结果,如果要想实现异步效果,应该是使用列表
# map(func,iterable)
# shutdown close+join 同步控制的
# add_done_callback 回调函数,在回调函数内接收的参数是一个对象,需要通过result来获取返回值
# 回调函数仍然在主进程中执行
# web框架
# 爬虫/自动化开发
# 爬虫 : 访问大量的网页,对网页代码进行处理
# 正则表达式
# 字符串处理
# 前端
# 并发
# 运维 : 一堆机器 一堆程序 你去维护
# 自动化开发/运维开发 : 开发一些程序 让机器的维护/程序的维护自动化起来
# 运维基础
# python的基础开发
# 并发
# 前端
# 4-5个进程
# 每个进程中 开 20个线程
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
def func(): print(1) x = yield 'aaa' print(x) yield 'bbb' g = func() print(next(g)) print(g.send('****'))
在多个函数之间互相切换的功能---协程 yeild 只有程序之间的切换,没有重利用任何IO操作的时间
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
def consumer(): while True: x=yield print(x) def producer(): g=consumer() next(g) for i in range(10): g.send(i) producer()
# 程序执行的上下文切换的工具
# greenlet 程序上下文切换的
# 模块的安装
# 别人写好的代码
# sys.path
# 把别人写好的代码 放到sys.path指示的路径下面
# python2/python3
# pip python2
# pip3 python3
# pip3 install 要安装的模块的名字 在cmd直接打印即可
# pip3 install greenlet
# pip3 install django
# pip3 install greenlet
# pycharm装
# 重新配环境变量 cmd要重启
# 重新装python
# 重新配置环境变量
协程模块 # 单纯的程序切换耗费时间
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from greenlet import greenlet import time def eat(): print('吃') time.sleep(1) g2.switch() # 切换 print('吃完了') time.sleep(1) g2.switch() def play(): print('玩儿') time.sleep(1) g1.switch() print('玩儿美了') time.sleep(1) g1 = greenlet(eat) g2 = greenlet(play) g1.switch() # 切换
gevent pip3 install gevent
greenlet是gevent的底层
gevent是基于greenlet实现的
python代码在控制程序的切换
virtualenv
pipenv
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import gevent import time def eat(): print('吃') time.sleep(2) print('吃完了') def play(): print('玩儿') time.sleep(1) print('玩儿美了') g1 = gevent.spawn(eat) g2 = gevent.spawn(play) gevent.joinall([g1,g2]) """ g1.join() g2.join() """
g1.join()
# g2.join()
# 没执行
# 为什么没执行???是需要开启么?
# 没有开启但是切换了
# gevent帮你做了切换,做切换是有条件的,遇到IO才切换
# gevent不认识除了gevent这个模块内以外的IO操作
# 使用join可以一直阻塞直到协程任务完成
# 帮助gevent来认识其他模块中的阻塞
# from gevent import monkey;monkey.patch_all()写在其他模块导入之前
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from gevent import monkey;monkey.patch_all() import socket import gevent def talk(conn): while True: conn.send(b'hello') print(conn.recv(1024)) sk = socket.socket() sk.bind(('127.0.0.1',9090)) sk.listen() while True: conn,addr = sk.accept() gevent.spawn(talk,conn)
client端
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import socket from threading import Thread def client(): sk = socket.socket() sk.connect(('127.0.0.1',9090)) while True: print(sk.recv(1024)) sk.send(b'bye') for i in range(500): Thread(target=client).start()
# 协程 能够在单核的情况下 极大地提高CPU的利用率
# 不存在数据不安全
# 也不存在线程切换创造的时间开销
# 切换是用户级别的,程序不会因为协程中某一个任务进入阻塞状态而使整条线程阻塞
# 线程的切换
# 时间片到了 降低CPU的效率
# IO会切 提高CPU效率