必要的理论知识
#一 操作系统的作用:
1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口
2:管理、调度进程,并且将多个进程对硬件的竞争变得有序
#二 多道技术:
1.产生背景:针对单核,实现并发
ps:
现在的主机一般是多核,那么每个核都会利用多道技术
有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个
cpu中的任意一个,具体由操作系统调度算法决定。
2.空间上的复用:如内存中同时有多道程序
3.时间上的复用:复用一个cpu的时间片
切换:(遇到IO、占用CPU时间过长、更高优先级的应用程序)
保存状态:在即将切换到另一应用程序时,需要保存当前程序的状态
cpu在多个任务之间来回切换的好处与不足
1、一个任务占用cpu时间长,会被操作系统强行剥夺走cpu的执行权限:只是为了保证并发的效果,反而会降低效率
2、一个任务遇到io操作,会被操作系统强行剥夺走cpu的执行权限:为了实现并发的效果,这种情况是并发,可以提高效率
大前提:一个CPU同一时刻只能执行一个任务
串行:一行任务完全执行完毕,才会执行下一行
并发:多个任务看起来是同时运行的,单核下就能实现并发(并发=切换+保存状态)
并行:多个任务是真正意义的同时运行,只有在多核才能实现并行
阻塞:程序在执行过程中发生了IO操作(对比CPU执行速度,非常慢)CPU直接切走
进程的三种状态(运行、就绪、阻塞)
提交任务的两种方式
#同步调用:提交完一个任务后,会在原地等待,等待任务完完整整的运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的
#异步调用:提交完一个任务后,不在原地等待,而是直接执行下一行代码,会导致任务是并发执行的
异步调用就是你 喊 你朋友吃饭 ,你朋友说知道了 ,待会忙完去找你 ,你就去做别的了。
同步调用就是你 喊 你朋友吃饭 ,你朋友在忙 ,你就一直在那等,等你朋友忙完了 ,你们一起去。
阻塞调用:在调用结果返回之前,当前线程会被挂起(如遇到IO,CPU直接切走)
阻塞与同步的区别 #1. 同步调用:apply一个累计1亿次的任务,该调用会一直等待,直到任务返回结果为止,但并未阻塞住(即便是被抢走cpu的执行权限,那也是处于就绪态); #2. 阻塞调用:当socket工作在阻塞模式的时候,如果没有数据的情况下调用recv函数,则当前线程就会被挂起,直到有数据为止。
进程其他了解知识
#进程的层次结构 无论UNIX还是windows,进程只有一个父进程,不同的是: 1. 在UNIX中所有的进程,都是以init进程为根,组成树形结构。父子进程共同组成一个进程组,这样,当从键盘发出一个信号时,该信号被送给当前与键盘相关的进程组中的所有成员。 2. 在windows中,没有进程层次的概念,所有的进程都是地位相同的,唯一类似于进程层次的暗示,是在创建进程时,父进程得到一个特别的令牌(称为句柄),该句柄可以用来控制子进程,但是父进程有权把该句柄传给其他子进程,这样就没有层次了。 #进程的状态 tail -f access.log |grep '404' 执行程序tail,开启一个子进程,执行程序grep,开启另外一个子进程,两个进程之间基于管道'|'通讯,将tail的结果作为grep的输入。 进程grep在等待输入(即I/O)时的状态称为阻塞,此时grep命令都无法运行 其实在两种情况下会导致一个进程在逻辑上不能运行, 1. 进程挂起是自身原因,遇到I/O阻塞,便要让出CPU让其他进程去执行,这样保证CPU一直在工作 2. 与进程无关,是操作系统层面,可能会因为一个进程占用时间过多,或者优先级等原因,而调用其他的进程去使用CPU。 因而一个进程由三种状态 运行,阻塞,就绪
二、进程
python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。
multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。
multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。
1、参数介绍
#参数介绍 Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动) group参数未使用,值始终为None target表示调用对象,即子进程要执行的任务 args表示调用对象的位置参数元组,args=(1,2,'egon',) kwargs表示调用对象的字典,kwargs={'name':'egon','age':18} name为子进程的名称
2、方法介绍
p.start():启动进程,并调用该子进程中的p.run()
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True
p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
3、属性介绍
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
p.name:进程的名称
p.pid:进程的pid
p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
4、开启进程的两种方式
from multiprocessing import Process import time def run(): print("start") time.sleep(2) print("stop") if __name__ == '__main__': s = Process(target=run) s.start() ------>-------------->#向操作系统发送开启子进程的信号 #time.sleep(3) print("主进程") s.strat()执行完毕后,开启子进程需要一定的时间,所以会先打印主进程 当把time.sleep(3)取消注释后,那么s.start()下一行需要等,会先执行子进程
from multiprocessing import Process import time class MyProcess(Process): def __init__(self,name,age): super().__init__() self.name = name self.age =age def run(self): print("name is %s" %self.name) time.sleep(2) print("age is %s" %self.age) if __name__ == '__main__': s = MyProcess("pdun",1) s.start() print("主进程")
#注意:
在Windows中,开启子进程的代码,必须放在if __name__=="__main__中
#p1 = Process(target=func,args("egon",) #注意注意再注意,args中只有一个值时,一定要加逗号
5、join:让主进程在原地等待,等待子进程运行完毕,不会影响子进程的执行
from multiprocessing import Process import time def func(name,n): print("%s start..." %name) time.sleep(n) print("%s over!!!" %name) if __name__ == '__main__': p1 = Process(target=func,args=("子进程1",2)) p2 = Process(target=func,args=("子进程2",3)) start = time.time() p1.start() p2.start() p1.join() print("主进程",(time.time()-start))
6、进程之间内存地址相互隔离
from multiprocessing import Process a = 10 def func(): global a a = 0 if __name__ == '__main__': p = Process(target=func) p.start() p.join() print(a)
6、进程对象其他相关属性和方法
1、获取id
from multiprocessing import Process,current_process import time def func(): print("%s start..." %current_process().pid) time.sleep(3) print("%s over" %current_process().pid) if __name__ == '__main__': p = Process(target=func) p.start() print("主进程",current_process().pid)
from multiprocessing import Process import time,os def func(): print("%s start..." %os.getpid()) time.sleep(3) print("%s over" %os.getpid()) if __name__ == '__main__': p = Process(target=func) p.start() print("主进程",os.getpid())
from multiprocessing import Process import time,os def func(): print("%s start... 它的父进程是%s" %(os.getpid(),os.getppid())) time.sleep(3) print("%s over... 它的父进程是%s" % (os.getpid(), os.getppid())) if __name__ == '__main__': p = Process(target=func) p.start() print("主进程id是%s,主进程的父进程是%s" %(os.getpid(),os.getppid())) 在这里主进程的父进程是pycharm
2、进程的名字,通过(进程对象.name)获得,默认为Process-1,可更改
from multiprocessing import Process def func(): pass if __name__ == '__main__': p = Process(target=func,name="xxx") print(p.name) ------------------------------------------------------- name=
3、终止子进程terminate()与判断是否终止is_alive()
from multiprocessing import Process import time def func(): pass if __name__ == '__main__': p = Process(target=func) p.start() print(p.is_alive()) p.terminate() time.sleep(0.1) print(p.is_alive())
4、p.exitcode 获取进程的退出码 只有进程完全结束后才能获取到
7、僵尸进程,孤儿进程
Linux中存在僵尸进程与孤儿进程
孤儿进程
父进程已经结束了而子进程还在运行中,子进程称之为孤儿进程(无害)
孤儿进程会统一交给init这个根进程来管理
僵尸进程
当子进程已经结束,并不会立即从操作系统中完全删除,会留下一些进程信息,共父进程查看(有害)
Python会自动清理僵尸进程
8、守护进程
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常
怎么使用
进程对象.daemon=True 一次给你要在.start()前设置
from multiprocessing import Process def task(): print("子进程会执行结束吗") if __name__ == '__main__': p = Process(target=task) p.daemon = True p.start() print("over") >>over
进程的安全问题
进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争可能会造成数据的错乱,如何控制,就是加锁处理,加锁把并发转为串行,虽然join也能是进程串行,但join会使被执行顺序固定死,锁仅仅把部分代码串行,其他代码仍然是并发
part1:多个进程共享同一打印终端
from multiprocessing import Process import time,random def bar1(name,age,sex): print("my name is %s" %name) # time.sleep(random.randint(1,3)) print("age is %s" %age) print("sex is %s" %sex) def bar2(name,age,sex): print("my name is %s" %name) # time.sleep(random.randint(1, 3)) print("age is %s" %age) print("sex is %s" %sex) if __name__ == '__main__': p1 = Process(target=bar1,args=("egon",20,"man")) p2 = Process(target=bar2,args=("Alex",19,"man")) p1.start() p2.start() ---------------------------------------------------------------------- 此时打印没有错乱,但是当把注释掉的代码解开时,打印会出现错乱
from multiprocessing import Process,Lock import time,random def bar1(name,age,sex,lock): lock.acquire() print("my name is %s" %name) time.sleep(random.randint(1,3)) print("age is %s" %age) print("sex is %s" %sex) lock.release() def bar2(name,age,sex,lock): lock.acquire() print("my name is %s" %name) time.sleep(random.randint(1, 3)) print("age is %s" %age) print("sex is %s" %sex) lock.release() if __name__ == '__main__': lock = Lock() p1 = Process(target=bar1,args=("egon",20,"man",lock)) p2 = Process(target=bar2,args=("Alex",19,"man",lock)) p1.start() p2.start()
part2:多个进程共享同一文件
这里把文件当数据库,模拟抢票 # (文件中{"count":1})
from multiprocessing import Process import json,time,random def func(name): search(name) by(name) def search(name): with open("data.txt", "rt", encoding="utf-8") as f: dic = json.load(f)["count"] print("%s查看了票数 剩余票数:%s" % (name,dic)) def by(name): with open("data.txt", "rt", encoding="utf-8") as f: tic_dic = json.load(f) time.sleep(random.randint(1,3)), if tic_dic["count"] > 0: tic_dic["count"] -=1 with open("data.txt","wt",encoding="utf-8")as f: json.dump(tic_dic,f) print("%s购买成功" % name) if __name__ == '__main__': p1 = Process(target=func,args=("egon",)) p2 = Process(target=func,args=("Alex",)) p3 = Process(target=func,args=("jerry",)) p1.start() p2.start() p3.start() ------------------------------------------------------------------- egon查看了票数 剩余票数:1 Alex查看了票数 剩余票数:1 jerry查看了票数 剩余票数:1 Alex购买成功 jerry购买成功 egon购买成功
from multiprocessing import Process,Lock import json,time,random def func(name,lock): search(name) lock.acquire() by(name) lock.release() def search(name): with open("data.txt", "rt", encoding="utf-8") as f: dic = json.load(f)["count"] print("%s查看了票数 剩余票数:%s" % (name,dic)) def by(name): with open("data.txt", "rt", encoding="utf-8") as f: tic_dic = json.load(f) time.sleep(random.randint(1,3)), if tic_dic["count"] > 0: tic_dic["count"] -=1 with open("data.txt","wt",encoding="utf-8")as f: json.dump(tic_dic,f) print("%s购买成功" % name) if __name__ == '__main__': lock = Lock() p1 = Process(target=func,args=("egon",lock)) p2 = Process(target=func,args=("Alex",lock)) p3 = Process(target=func,args=("jerry",lock)) p1.start() p2.start() p3.start()
#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。 虽然可以用文件共享数据实现进程间通信,但问题是: 1.效率低(共享数据基于文件,而文件是硬盘上的数据) 2.需要自己加锁处理 #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。 1 队列和管道都是将数据存放于内存中 2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
队列(推荐使用)
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
#IPC 进程间通讯:指至少两个进程或线程间传送数据或信号的一些技术或方法 进程与进程之间内存是物理隔离的 无法直接通讯 例如:qq要调用浏览器 来显示某个网页 网页地址就必须想办法告诉浏览器 #四种方式: 1.使用一个共享文件 在硬盘创建一个文件 不同进程之间共享这个文件 优点:交换的数据量几乎没有限制 缺点:速度慢 2.系统开辟一块共享内存 以供进程间交换数据 优点:速度快 缺点:数据量不能太大 3.管道 优点:封装了文件的打开 关闭等操作 缺点:速度慢 并且是单向的 编程复杂度较高 4.socket 不仅可以用于与远程计算机中的进程通讯 还可以用于与本地进程通讯 基于内存 的速度快
队列 也是一个容器
特点:先进先出,支持进程间共享,自动处理进程间安全问题(加锁)
主要方法:
1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。 2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常. 3 q.get_nowait():同q.get(False) q.put_nowait():同q.put(False) 4 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。 5 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。 6 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
''' multiprocessing模块支持进程间通信的两种主要形式:管道和队列 都是基于消息传递实现的,但是队列接口 ''' from multiprocessing import Process,Queue import time q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) print(q.full()) #满了 print(q.get()) print(q.get()) print(q.get()) print(q.empty()) #空了
生产者消费者模型
什么是生产者消费者模型
生产者:代指生产数据的任务
消费者:代指处理数据的任务
实现方式:生产者--------》(对列)《------------消费者
为何用
降低耦合度,平衡生产能力和消费能力,从而提升效率
怎么用
import time,random from multiprocessing import JoinableQueue,Process def producer(name,food,q): for i in range(10): res = "%s%s" %(food,i) time.sleep(random.randint(1,3)) q.put(res) print("厨师%s生产了:%s" %(name,food)) def consumer(name,q): while True: #这里用while循环是因为消费者不知道生产多少次 res = q.get() time.sleep((random.randint(1,3))) print("%s吃了%s" %(name,res)) q.task_done() #无论你生产多少次,我处理一次记录一次 if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=producer,args=("egon","泔水",q)) p2 = Process(target=producer, args=("ALEX", "泔水", q)) p3 = Process(target=producer, args=("jerry", "泔水", q)) c1 = Process(target=consumer,args=("liuxx",q)) c2 = Process(target=consumer, args=("changxx", q)) c1.daemon=True #添加守护进程,让消费者子进程伴随主进程结束 c2.daemon=True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() q.join() print("zhu")
JoinableQueue()
可join的队列
JoinableQueue() q = JoinableQueue() q.put(1) q.put(2) q.get() q.task_done #告诉容器已经处理完了一个数据 q.task_done #有几个就要调用几次 q.join #也是一个阻塞函数,一直等到队列中的数据被处理完毕
使用场景;在生产者消费者模型中,当消费者不知道生产者生产多少数据时,就可是使用这个队列
线程
1、什么是线程
在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程
线程顾名思义,就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程
车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一个流水线
流水线的工作需要电源,电源就相当于cpu
所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位,是负责执行代码的
2、为什么用线程
线程VS进程
1、同一进程下的多个线程共享该进程的资源
2、创建线程的开销远远小于进程
3、线程的代码可不写在if __name__=='__main__'下
4、线程之间没有子父关系
3、如何用线程
开启线程的两种方式与开启进程相似
from threading import Thread import time def foo(name): print("%s开启"%name) time.sleep(1) print("%s结束"%name) if __name__ == '__main__': t = Thread(target=foo,args=("进程1",)) t.start() print("主进程")
from threading import Thread import time class MyThread(Thread): def __init__(self,name): super().__init__() self.name = name def run(self): print("%s开启"% self.name) time.sleep(1) print("%s结束"% self.name) if __name__ == '__main__': t = MyThread("线程") t.start() print("主线程")
4、在一个进程下开启多个线程/进程的区别
from threading import Thread import os,time def func(): print(os.getpid()) s = time.time() for i in range(10): t = Thread(target=func) t.start() print(time.time()-s) 0.001993894577026367 ------------------------------------------------------------------- if __name__ == '__main__': s = time.time() li = [] for i in range(10): t = Process(target=func) t.start() li.append(t) for t in li: t.join() print(time.time()-s) 1.2716023921966553
from threading import Thread n = 10 def func(): global n n = 0 print(n) t = Thread(target=func) t.start() print(n+1)
练习
from threading import Thread from socket import * def server(ip,port,back_long=5): s = socket() s.bind((ip,port)) s.listen(back_long) while True: conn, addr = s.accept() t2=Thread(target=comunicate,args=(conn,)) #该线程是完成通讯 t2.start() def comunicate(conn): while True: try: res = conn.recv(1024) if len(res) == 0:break conn.send(res.upper()) except ConnectionResetError: break if __name__ == '__main__': t = Thread(target=server, args=('127.0.0.1', 8888)) #该线程是完成链接 t.start()
from socket import * c = socket() c.connect(("127.0.0.1",8888)) while True: msg = input(">>:") if len(msg) == 0:break c.send(msg.encode()) data = c.recv(1024) print(data)
练习二
三个任务,一个接收用户输入,一个将用户输入的内容格式化成大写,一个将格式化后的结果存入文件
from threading import Thread msg_l = [] format_l = [] def get(): while True: res = input(">>:").strip() if not res:continue msg_l.append(res) def format_msg(): while True: if msg_l: res = msg_l.pop() format_l.append(res.upper()) def save(): while True: if format_l: with open("a.txt","at",encoding="utf-8")as f: res = format_l.pop() f.write('%s ' %res) # get() # format_msg() # save() # 此时如果直接调用三个函数,那么进程会在第一个循环中一直循环下去 if __name__ == '__main__': t1 = Thread(target=get) t2 = Thread(target=format_msg) t3 = Thread(target=save) t1.start() t2.start() t3.start()
5、线程相关的其他方法
Thread实例对象的方法 # isAlive(): 返回线程是否活动的。 # getName(): 返回线程名。 # setName(): 设置线程名。 threading模块提供的一些方法: # threading.currentThread(): 返回当前的线程变量。 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
6、守护线程
需要强调的是
#1.对主进程来说,运行完毕指的是主进程代码运行完毕 #2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
详细解释
#1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束, #2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
from threading import Thread import time def func(): print(123) time.sleep(2) print("over") def bar(): print(888) time.sleep(5) print("stop") t1 = Thread(target=func) t2 = Thread(target=bar) t1.daemon = True t1.start() t2.start() print("主进程") ---------------------------- 666 888 主进程 over stop
7、互斥锁
from threading import Thread,Lock import time s = Lock() a = 100 def func(): global a s.acquire() b = a #这里的目的是让100个进程sleep前都拿到a time.sleep(0.1) a = b-1 #a -= 1 s.release() if __name__ == '__main__': start = time.time() li = [] for i in range(100): t = Thread(target=func) li.append(t) t.start() for t in li: t.join() print(a) print(time.time()-start) 如果把b=a赋值去掉,直接用a-=1,此时不需要锁,计算时间0.00XXX秒 而现在使用了10秒,虽然降低了效率,但保证了数据的安全
8、信号量
from threading import Thread import os,time def func(): print(os.getpid()) li = [] for i in range(10): t = Thread(target=func) t.start() li.append(t) for t in li: t.join()
9、Event事件
线程之间数据是共享的,但是,当一个线程的执行需要另一个线程的执行结果时,可以使用事件来简化代码量,
比如,一个线程是服务器启动,另一个线程是连接服务器,此时,需要知道服务器已经启动,才能连接
from threading import Thread,Event import random,time boot = Event() def func(): print("正在启动") time.sleep(random.randint(3,5)) print("启动成功") boot.set() def task(): boot.wait() print("链接成功") if __name__ == '__main__': t1 = Thread(target=func) t1.start() t2 = Thread(target=task) t2.start()
10、线程queue
import queue # queue.Queue() #x先进先出 # queue.LifoQueue() #后进先出 q=queue.PriorityQueue()#优先级(数字越小,优先级越高,字母越小优先级越高)
q=queue.PriorityQueue(3)#优先级(数字越小,优先级越高) q.put((1,"asas")) q.put((2,"sdsad")) #注意是内部是个元组,如果鄙视元组,将只取第一个值 q.put((-3,"sdfsd")) print(q.get()) print(q.get()) print(q.get())
12、进程池,线程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,random,time def func(name):
print(i)
return i**2if __name__ == '__main__': p = ProcessPoolExecutor() #不给定范围的话,默认CPU核数, print(os.cpu_count())获得CPU核数 for i in range(20): obj = p.submit(func,"进程pid:") #submit提交任务
res = obj.result() #得到任务的执行结果,这里是函数的返回值
print(res)
print("主")
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,random,time def func(name,n): print("%s %s is running" %(name,os.getpid())) time.sleep(random.randint(1,2)) return n**2 if __name__ == '__main__': p = ProcessPoolExecutor() #不给定范围的话,默认CPU核数,print(os.cpu_count()) #获得CPU核数 for i in range(10): obj = p.submit(func,"进程pid:",i) #func函数需要参数的话,跟在func后边 res = obj.result() print(res) print("主")
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,random,time def func(name,n): print("%s %s is running" %(name,os.getpid())) time.sleep(random.randint(1,2)) return n**2 if __name__ == '__main__': p = ProcessPoolExecutor() #不给定范围的话,默认CPU核数,print(os.cpu_count()) #获得CPU核数 li = [] for i in range(10): obj = p.submit(func,"进程pid:",i) #func函数需要参数的话,跟在func后边 li.append(obj) p.shutdown(wait=True) #关闭进程池入口,并等进程池内的所有任务执行完毕, # 类似 JoinableQueue的join,等队列中的值全部取完 for obj in li: res = obj.result() print(res) print("主")
同步调用:提交完一个任务后,就在原地等待,等待任务完完整整的运行完毕,拿到结果,再执行下一行代码,导致任务串行
异步调用:提交完一个任务后,不等待,直接执行下一行代码,任务是并发执行的
13、进程、线程池异步回调:回调函数add_done_callback
给异步任务绑定一个函数,当任务完成是自动调用该函数
# 使用方法:
给进程池或线程池提交一个异步任务(submit),会返回一个表示结果的对象,
给该对象绑定方法add_done_callback,用于绑定回调函数
注意:
回调函数有且只有一个参数,就是对象本身,通过对象,result来获取结果
回调函数是由子线程来执行,哪个子线程有空就来处理
from concurrent.futures import ThreadPoolExecutor import requests import threading def get_data(url): print("%s正在处理%s" %(threading.current_thread().name,url)) resp = requests.get(url) return resp.text def parser_data(f): res = f.result() print("解析长度为%s 地址为%s" %(len(res[0]),res[1])) urls = ["https://www.baidu.com/","https://csdn.net"] pool = ThreadPoolExecutor() for url in urls: f = pool.submit(get_data,url) #提交任务 f.add_done_callback(parser_data) #绑定一个回调函数
from multiprocessing import Process def func(callback): print("测试。。。") callback() def callback(): print("这是回调。。。") if __name__ == '__main__': p = Process(target=func,args=(callback,)) p.start()