#进程与线程 #开进程的两种方法:一种 # import time ,random # from multiprocessing import Process # def piao(name): # print('%s is piaoing' % name) # time.sleep(random.randrange(1,5)) # print('%s piaoing end' % name) # # p1=Process(target=piao,args=('agen',)) #args传值为元组,最后必须加逗号 # p2=Process(target=piao,args=('alex',)) # p3=Process(target=piao,args=('yuanhao',)) # if __name__ == '__main__': # p1.start() # p2.start() # p3.start() # print('主进程') #首先会打印这句“主进程”,这是因为整个s1相当于是主进程,挨个往下进行 #而p1,p2,p3等进程的执行速度明显没有主进程的执行速度快,所以是先打印类,如果换一个配置好的机器,有可能打印是在某个进程打印之后才执行,主要就是各进程间抢占资源,谁快,谁先执行 #开进程的方法二: # import time,random # from multiprocessing import Process # class Piao(Process): #类继承与Process # def __init__(self,name): # super().__init__() # self.name=name # def run(self): #如果采用继承这种方式,则必须重写run方法,则调用该对象的start()方法就是调用run()方法 # print('%s is piaoing' % self.name) # time.sleep(random.randrange(1,5)) # print('%s piao end' % self.name) # # p1=Piao('alex') # p2=Piao('agen') # p3=Piao('yuanhao') # if __name__ == '__main__': # p1.start() #自动调用重写的run方法 # p2.start() # p3.start() # print('哈哈哈,结束类') #使用多进程的方式连接服务端和客户端 # from socket import * # from multiprocessing import Process # server=socket(AF_INET,SOCK_STREAM) # server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # server.bind(('127.0.0.1',8080)) # server.listen(5) # # def talk(conn,client_addr): # while True: # try: # msg=conn.recv(1024) # if not msg: break # conn.send(msg.upper()) # except Exception: # break # # if __name__ == '__main__': # while True: # conn, client_addr = server.accept() # p=Process(target=talk,args=(conn,client_addr,)) # p.start() # # #客户端 # from socket import * # client=socket(AF_INET,SOCK_STREAM) # client.connect(('127.0.0.1',8080)) # # while True: # msg=input(">>: ").strip() # if not msg:continue # # client.send(msg.encode('utf-8')) # msg=client.recv(1024) # print(msg.decode('utf-8')) #进程对象的其它方法:terminate()关闭进程 is_alive 查看该进程是否存活 # from multiprocessing import Process # import time,random # class Piao(Process): # def __init__(self,name): # super().__init__() # self.name=name # def run(self): # print('%s is piaoing' %self.name) # time.sleep(random.randrange(1,5)) # print('%s is piao end' % self.name) # p1=Piao('alex') # if __name__ == '__main__': # print(p1.is_alive()) #False # p1.start() # print(p1.is_alive()) #True # p1.terminate() # print(p1.is_alive()) #True 因为进程调用terminate()方法,不会立即关闭,进程的关闭也是需要时间的,因此为True # time.sleep(1) # print(p1.is_alive()) #在等待1s后,再次查看就是False类 # #p.join()方法是父进程等待p进程的结束,是父进程阻塞在原地,而p仍然在后台运行 #p.daemon=True 必须在p.start()之前设置。设置进程为守护进程,禁止p创建子进程,并且父进程死,p也跟着一起死。 # from multiprocessing import Process # import time,random # class Piao(Process): # def __init__(self,name): # super().__init__() # self.name=name # def run(self): # print('%s is piaoing' % self.name) # time.sleep(random.randrange(1,3)) # print('%s is piao end' % self.name) # # p1=Piao('alex') # p1.daemon=True # p2=Piao('agen') # p2.daemon=True # p3=Piao('yuanhao') # p3.daemon=True # p_list=[p1,p2,p3] # if __name__ == '__main__': # for p in p_list: # p.start() # for p in p_list: # p.join() # print('程序执行结束') #子进程调用join,等子进程全部执行完成才执行该打印语句 #进程只要start就会在开始运行类,所以p1/p2/p3start时,系统中已经有四个并发的进程类 #join是让主进程等,而p1-p3仍然是并发执行的,p1.join执行的时候,其余p2/p3仍然在运行,等p1.join #结束,p2/p3也已结束类,这样p2.join/p3.join直接通过检测,无需等待 #所以三个join话费的总时间仍然是耗费时间最长的那个进程运行的时间 #进程对象的其它属性:name,pid from multiprocessing import Process import time,random class Piao(Process): def __init__(self,name): self.name=name super().__init__() #Process的__init__方法也有self.name=Piao-1,如果放在self.name=name 后面会覆盖 def run(self): print('%s is piaoing' % self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' % self.name ) p=Piao('egon') if __name__ == '__main__': p.start() print('开始') print(p.name) #Piao-1 若上面初始化时super在下,则会覆盖name的赋值,打印出默认的进程的名字Piao-1 print(p.pid) #7652
进程锁lock
#进程锁,模拟实现抢票功能 from multiprocessing import Process,Lock import json,time,random,os def work(filename,name,lock): #买票 ,加入锁对象的传入参数 # lock.acquire() #锁使用时一种就是先acquire()后release()释放掉,等同于with lock: with lock: with open(filename,'r',encoding='utf-8',) as f: dic=json.loads(f.read()) if dic['count'] >0: dic['count'] -=1 time.sleep(random.randint(1,3)) #模拟网络延迟 with open(filename,'w',encoding='utf-8') as f: f.write(json.dumps(dic)) print(' 33[43m%s 抢票成功 33[0m' % name) else: print(' 33[45m%s 抢票失败 33[0m' % name) #lock.release() if __name__ == '__main__': lock=Lock() #在这里创建锁对象 p_l=[] for i in range(100): p=Process(target=work,args=('db','alex',lock,)) #将锁对象作为参数传入 p_l.append(p) p.start() for p in p_l: p.join print('主进程结束')
进程间通信:队列
队列常用方法
#进程间通信 #interprocess communication 这就是IPC通信:基于消息的通信机制:两种:管道和队列 #队列是管道枷锁实现的,队列是先进先出,堆栈是先进后出 # from multiprocessing import Queue # q=Queue(3) #指定队列的大小 # q.put({'name':'alex'}) # q.put(3) # q.put('哈哈哈') #往队列中添加任意的对象 #q.put(4) #如果超过指定的队列长度还要往里面填值,则不会报错,会在此处阻塞等待将值取走后才接着往下执行 #q.put('d',block=False) #默认block是True的,即等待,False后不等待直接报错queue.Full 队列已满 #q.put_nowait('f') #等同于将block=False #q.put('f',timeout=3) #设置3秒的等待时间,如果在该时间内还是满的,的报错 #响应的q.get()也有相应的方法 # print(q.get()) # print(q.get()) # print(q.get()) # print(q.empty()) #判断是否为空队列 # print(q.get(block=False)) # print(q.get_nowait()) # print(q.get(timeout=2)) #print(q.qsize())
生产者消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
基于队列实现生产者消费者模型
#################################################################### #列表生成式 # res1=[i for i in range(10)] # res2=[i for i in range(10) if i > 5] #注意,这里只能是if,不能有else语句,会产生歧义,可以加and # print(res1,res2) #[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] [6, 7, 8, 9] #队列模拟生产者和消费者 # from multiprocessing import Process,Queue # import time,random # def consumer(q,name): #消费者 # while True: # time.sleep(random.randint(1,3)) # res=q.get() # if res is None:break # print(' 33[41m消费者%s拿到类%s 33[0m' %(name,res)) # # def producer(seq,q,name): #生产者 # for item in seq: # time.sleep(random.randint(1,3)) # q.put(item) # print(' 33生产者%s生产类%s[42m 33[0m' % (name,item)) # q.put(None) # # if __name__ == '__main__': # q=Queue() # c=Process(target=consumer,args=(q,'agen')) # c.start() # # seq=['包子%s' %i for i in range(10)] # #producer(seq,q,'alex') # p=Process(target=producer,args=(seq,q,'厨师',)) # p.start() # print('主进程') from multiprocessing import Process,JoinableQueue #JoinableQueue知道消费者是否消费完成 import time,random def consumer(q,name): #消费者 while True: time.sleep(random.randint(1,3)) res=q.get() q.task_done() #消费者全部消费后向生产者喊话说结束类,然后生产者的q.join()处不再阻塞,继续往下执行。 print(' 33[41m消费者%s拿到类%s 33[0m' %(name,res)) def producer(seq,q,name): #生产者 for item in seq: time.sleep(random.randint(1,3)) q.put(item) print(' 33[42m生产者%s生产类%s 33[0m' % (name,item)) q.join() print('------>>>>>') if __name__ == '__main__': q=JoinableQueue() c=Process(target=consumer,args=(q,'agen')) c.daemon=True #设置为守护进程,主进程结束则消费者结束,在这里,主进程在等待生产者结束,生产者中加入了q.join(),q.task_done, #一旦消费者全部接收完毕,则生产者不在等待,也运行结束,不再阻塞,打印------》》》则整个进程结束 c.start() seq=['包子%s' %i for i in range(10)] #producer(seq,q,'alex') p=Process(target=producer,args=(seq,q,'厨师',)) p.start() # c.join() p.join() print('主进程')
进程池:
开多进程的目的是为了并发,如果有多核,通常有几个核就开几个进程,进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行),但很明显需要并发执行的任务要远大于核数,这时我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数...
当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
而且对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。
进程池Pool。 # from multiprocessing import Process,Pool # from socket import * # import os # server=socket(AF_INET,SOCK_STREAM) # server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # server.bind(('127.0.0.1',8080)) # server.listen(5) # # def talk(conn,addr): # print(os.getpid()) #打印进程的pid # while True: # try: # msg=conn.recv(1024) # if not msg:break # conn.send(msg.upper()) # except Exception: # break # # if __name__ == '__main__': # pool=Pool() # res_l=[] # while True: # conn,addr=server.accept() # print(addr) # pool.apply(talk,args=(conn,addr,)) #同步执行 # res=pool.apply_async(talk,args=(conn,addr,)) #异步执行 # res_l.append(res) # print(res_l)
Pool([numprocess [,initializer [, initargs]]]):创建进程池
1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组
p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async() p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成5 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。 obj.ready():如果调用完成,返回True obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常 obj.wait([timeout]):等待结果变为可用。 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
#进程的回调函数:把一个任务的结果再交给另一个任务执行 # import os # from multiprocessing import Pool # def work(n): # return n**2 # # if __name__ == '__main__': # pool=Pool() #默认是开启的进程数是cpu颗粒数 # res_l=[] # for i in range(10): # res=pool.apply_async(work,args=(i,)) # res_l.append(res) # for res in res_l: # print(res.get()) #这里需要试用get()方法才能取到值
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
from multiprocessing import Pool import time,random,os def get_page(url): print('(进程 %s) 正在下载页面 %s' %(os.getpid(),url)) time.sleep(random.randint(1,3)) return url #用url充当下载后的结果 def parse_page(page_content): print('<进程 %s> 正在解析页面: %s' %(os.getpid(),page_content)) time.sleep(1) return '{%s 回调函数处理结果:%s}' %(os.getpid(),page_content) if __name__ == '__main__': urls=[ 'http://maoyan.com/board/1', 'http://maoyan.com/board/2', 'http://maoyan.com/board/3', 'http://maoyan.com/board/4', 'http://maoyan.com/board/5', 'http://maoyan.com/board/7', ] p=Pool() res_l=[] #异步的方式提交任务,然后把任务的结果交给callback处理 #注意:会专门开启一个进程来处理callback指定的任务(单独的一个进程,而且只有一个) for url in urls: res=p.apply_async(get_page,args=(url,),callback=parse_page) res_l.append(res) #异步提交完任务后,主进程先关闭p(必须先关闭),然后再用p.join()等待所有任务结束(包括callback) p.close() p.join() print('{主进程 %s}' %os.getpid()) #收集结果,发现收集的是get_page的结果 #所以需要注意了: #1. 当我们想要在将get_page的结果传给parse_page处理,那么就不需要i.get(),通过指定callback,就可以将i.get()的结果传给callback执行的任务 #2. 当我们想要在主进程中处理get_page的结果,那就需要使用i.get()获取后,再进一步处理 for i in res_l: #本例中,下面这两步是多余的 callback_res=i.get() print(callback_res) ''' 打印结果: (进程 52346) 正在下载页面 http://maoyan.com/board/1 (进程 52347) 正在下载页面 http://maoyan.com/board/2 (进程 52348) 正在下载页面 http://maoyan.com/board/3 (进程 52349) 正在下载页面 http://maoyan.com/board/4 (进程 52348) 正在下载页面 http://maoyan.com/board/5 <进程 52345> 正在解析页面: http://maoyan.com/board/3 (进程 52346) 正在下载页面 http://maoyan.com/board/7 <进程 52345> 正在解析页面: http://maoyan.com/board/1 <进程 52345> 正在解析页面: http://maoyan.com/board/2 <进程 52345> 正在解析页面: http://maoyan.com/board/4 <进程 52345> 正在解析页面: http://maoyan.com/board/5 <进程 52345> 正在解析页面: http://maoyan.com/board/7 {主进程 52345} http://maoyan.com/board/1 http://maoyan.com/board/2 http://maoyan.com/board/3 http://maoyan.com/board/4 http://maoyan.com/board/5 http://maoyan.com/board/7 '''
from multiprocessing import Pool import time,random import requests import re def get_page(url,pattern): response=requests.get(url) if response.status_code == 200: return (response.text,pattern) def parse_page(info): page_content,pattern=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0], 'title':item[1], 'actor':item[2].strip()[3:], 'time':item[3][5:], 'score':item[4]+item[5] } print(dic) if __name__ == '__main__': pattern1=re.compile(r'<dd>.*?board-index.*?>(d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get() # res=requests.get('http://maoyan.com/board/7') # print(re.findall(pattern,res.text)) 爬虫案例