开启进程的方式一:
from multiprocessing import Process import time def work(name): print('task <%s> is running' % name) time.sleep(2) print('task <%s> is done' % name) if __name__ == '__main__': p1=Process(target=work,args=('tom',)) # 或 kwargs={'name':'tom'} p2=Process(target=work,args=('jerry',)) #生成一个进程对象 p1.start() #开启进程对象,告诉操作系统要开一个p1进程 p2.start() #告诉操作系统要开一个p2进程 print('主') #主 #task <tom> is running #task <jerry> is running #task <tom> is done #task <jerry> is done # #乱的原因是多个进程并发运行,在共享同一个终端,谁先抢到这个终端,就先打印
在windows中Process()必须放到# if __name__ == '__main__': 下
开启进程的方式二:
from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name=name def run(self): # 必须 print('task <%s> is running' % self.name) time.sleep(2) print('task <%s> is done' % self.name) if __name__ == '__main__': p=MyProcess('tom') p.start() print('主') #主 #task <tom> is running
并发的套接字通信
服务端
from multiprocessing import Process from socket import * s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(('127.0.0.1',8080)) s.listen(5) def talk(conn,addr): while True: try: data=conn.recv(1024) if not data: break conn,send(data.upper()) except Exception: break conn.close() if __name__ == '__main__': while True: conn,addr=s.accept() p=Process(target=talk,args=(conn,addr)) p.start() s.close()
客户端
from socket import * c=socket(AF_INET,SOCK_STREAM) c.bind(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue c.send(msg.encode('utf-8')) data=c.recv(1024) print(data.decode('utf-8')) c.close()
问题:
如果客户端非常多,并发量非常大,来一个客户端,服务端要开一个进程
要限制一下不能无限制开进程。可以用进程池限制
join方法
主进程等着子进程都执行完,才执行主进程自己
from multiprocessing import Process import time def work(name): print('task <%s> is running' % name) time.sleep(2) print('task <%s> is done' % name) if __name__ == '__main__': p1=Process(target=work,args=('tom',)) p2=Process(target=work,args=('jerry',)) p3=Process(target=work,args=('peter',)) p_l=[p1,p2,p3] for p in p_l: p.start() for p in p_l: p.join() #以下重复代码用上面的方法 #p1.start() #p2.start() #p3.start() #p1.join() #主进程等待p1运行结束 #p2.join() #主进程等待p2运行结束 #p3.join() #主进程等待p3运行结束 print('主') #task <peter> is running #task <jerry> is running #task <tom> is running #task <peter> is done #task <jerry> is done #task <tom> is done #主
守护进程
1.守护进程会在主进程代码执行结束后就终止
2.守护进程内无法再开启子进程,否则抛出异常
from multiprocessing import Process import time def work(name): print('task <%s> is running' % name) time.sleep(2) print('task <%s> is done' % name) if __name__ == '__main__': p1=Process(target=work,args=('tom',)) p1.daemon = True p1.start() print('主') #主 from multiprocessing import Process import time def foo(): print(123) time.sleep(1) print('end123') def bar(): print(456) time.sleep(3) print('end456') if __name__ == '__main__': p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() print('--main--') #--main-- #456 #end456
p1没有打印的原因是 主进程代码结束后直接把p1结束掉了
也就是--main--一出现就把p1结束掉了,但也有可能p2比--main--
运行的快,提前出现,但只要主进程一结束,就被结束掉了。
同步锁
from multiprocessing import Process,Lock import time def work(name, mutex): mutex.acquire() # 加锁 print('task <%s> is running' % name) time.sleep(2) print('task <%s> is done' % name) mutex.release() # 解锁 if __name__ == '__main__': mutex=Lock() p1=Process(target=work,args=('tom',mutex)) p2=Process(target=work,args=('jerry',mutex)) p1.start() p2.start() print('主') #主 #task <tom> is running #task <tom> is done #task <jerry> is running #task <jerry> is done
模拟抢票
#db.tex {"count":1}
#qiangpiao.py import os import json import time from multiprocessing import Process,Lock def search(): # 查看余票 dic=json.load(open('db.txt')) print('[%s] 看到剩余票数<%s> ' % (os.getpid(),dic['count']) def get_ticket(): # 购票 dic = json.load(open('db.txt')) time.sleep(0.5) # 模拟读数据库的网络延迟 if dic['count'] > 0: dic['count'] -=1 time.sleep(0.5) # 模拟写数据库的网络延迟 json.dump(dic,open('db.txt','w')) print('%s 购票成功' % os.getpid()) def task(mutex): search() mutex.acquire() get_ticket() mutex.release() if __name__ == '__main__': mutex=Lock() for i in range(10): p=Process(target=task,args=(mutex,)) p.start()
共享数据(不推荐)
基于共享内存实现进程间通信
Manager:产生共享的内存空间
from multiprocessing import Process,Manager,Lock def task(dic,mutex): with mutex: dic['count'] -= 1 #同下面 #mutex.acquire() #dic['count']-=1 #mutex.release() if __name__ == '__main__': mutex=Lock() # m=Manager() dic=m.dict({'count':100}) # 产生共享字典 {'count':100} p_l=[] for i in range(100): p=Process(target=task,args=(dic,mutex)) p_l.append(p) p.start() for p in p_l: p.join() print(dic) # {'count':0}
队列(推荐)-----先进先出
from multiprocessing import Queue q=Queue(3) q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) #如果放入队列的数据大于队列的长度或者取出的数据大于队列的长度时 #队列就会卡住 from multiprocessing import Queue q=Queue(3) q.put('first',block=False) q.put('second',block=False) q.put('third',block=False) q.put('fourth',block=False) # 队列满了就抛异常 #q.put_nowait('fourth',block=False) 和上面的效果一样 q.put('fourth',timeout=3) # 3秒后队列还是满的话就抛出异常
生产者消费者模型
from multiprocessing import Process,Queue import time,os def producer(q): for i in range(10): time.sleep(1) res='包子%s' %i q.put(res) print('<%s> 生产了 [%s]' %(os.getpid(),res)) def consumer(q): while True: res=q.get() time.sleep(1.5) print('<%s> 吃了 [%s]' % (os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者 p1=Process(target=producer,args=(q,)) #消费者 c1=Process(target=consumer,args=(q,)) p1.start() c1.start() print('主')
#上面程序的问题是在运行时主进程会卡住
#原因是有子进程没有结束,主进程卡住在等子进程结束
#p1在造完10次后就结束掉了,c1有个死循环一直在get,如果没有
#就会一直卡住
解决办法是生产者发一个结束信号告诉消费者不用继续get了
from multiprocessing import Process,Queue import time,os def producer(q): for i in range(3): time.sleep(1) res='包子%s' %i q.put(res) print('<%s> 生产了 [%s]' %(os.getpid(),res)) q.put(None) def consumer(q): while True: res=q.get() if res is None:break time.sleep(1.5) print('<%s> 吃了 [%s]' % (os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者 p1=Process(target=producer,args=(q,)) #消费者 c1=Process(target=consumer,args=(q,)) p1.start() c1.start() print('主')
#这个程序也是有毛病的,目前只有一个生产者一个消费者,
#真实情况下是多个生产者多个消费者
from multiprocessing import Process,Queue import time,os def producer(q,name): for i in range(3): time.sleep(1) res='%s%s' %(name,i) q.put(res) print('<%s> 生产了 [%s]' % (os.getpid(),res)) q.put(None) #一个人生产完了直接发送结束信号,有可能别人还在生产, #一个消费者收到了结束信号,但是还有别的生产者生产的东西 #但这个消费者却不能继续消费了,不合理 #所以不能再这里写q.put(None) def consumer(): while True: res=q.get() if res is None:break time.sleep(1.5) print('<%s> 吃了 [%s]' % (os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,'包子')) p2=Process(target=producer,args=(q,'饺子')) p3=Process(target=producer,args=(q,'混沌')) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) p1.start() p2.start() p3.start() c1.start() c2.start() print('主')
from multiprocessing import Process,Queue import time,os def producer(q,name): for i in range(3): time.sleep(1) res='%s%s' %(name,i) q.put(res) print('<%s> 生产了 [%s]' % (os.getpid(),res)) q.put(None) #一个人生产完了直接发送结束信号,有可能别人还在生产, #一个消费者收到了结束信号,但是还有别的生产者生产的东西 #但这个消费者却不能继续消费了,不合理 #所以不能再这里写q.put(None) def consumer(): while True: res=q.get() if res is None:break time.sleep(1.5) print('<%s> 吃了 [%s]' % (os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,'包子')) p2=Process(target=producer,args=(q,'饺子')) p3=Process(target=producer,args=(q,'混沌')) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() # p1p2p3都结束掉然后发送结束信号 q.put(None) #put一个None,一个消费者收到了,另外的消费者还会继续卡住 q.put(None) #所以有几个消费者就put几次None .思路没有问题,但是写法low print('主')
from multiprocessing import Process,Queue,JoinableQueue import time,os def producer(q,name): for i in range(10): time.sleep(1) res='%s%s' %(name,i) q.put(res) print('<%s> 生产了 [%s]' % (os.getpid(),res)) q.join() #生产者等待q结束 def consumer(): while True: res=q.get() time.sleep(1.5) print('<%s> 吃了 [%s]' % (os.getpid(),res)) q.task_done() #取走一个包子,就通知生产者 #通知q.join()一个已经被取走了,直到q.join()变为0 if __name__ == '__main__': q=JoinableQueue() # 消费者给生产者发送结束信号 #生产者们:即厨师们 p1=Process(target=producer,args=(q,'包子')) p2=Process(target=producer,args=(q,'饺子')) p3=Process(target=producer,args=(q,'混沌')) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True #主进程一旦结束,c1就没有存在的必要了 c2.daemon=True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() #主进程等待一个子进程结束 print('主')
进程池
from multiprocessing import Pool import time,os def work(): print('task <%s> is running'% os.getpid()) time.sleep(2) print('task <%s> is done'% os.getpid()) if __name__ == '__main__': #print(os.cpu_count()) p=Pool(4) # 不填写的话默认使用cpu的个数 for i in range(10): p.apply(work) # 往进程池里面提交任务,apply是同步执行, # 提交一个任务,一直等着这个任务运行完毕了才执行 # 下一个任务,效率低
了解
from multiprocessing import Pool import time,os def work(n): print('task <%s> is running'% os.getpid()) time.sleep(2) print('task <%s> is done'% os.getpid()) return n**2 if __name__ == '__main__': #print(os.cpu_count()) p=Pool(4) # 不填写的话默认使用cpu的个数 for i in range(10): res=p.apply(work,args=(i,)) #得到work()的执行结果 print(res)
from multiprocessing import Pool import time,os def work(n): print('task <%s> is running'% os.getpid()) time.sleep(2) return n**2 if __name__ == '__main__': p=Pool(4) res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) # 异步提交任务 res_l.append(res) #print(res_l) # 生成的一堆对象 p.close() #不允许再往进程池里面提交新的任务了 p.join() # 主进程等p结束(4个进程把10个任务都循环完了才算结束) # 这样子res才能get到结果 #确保进程池里面的所有任务都运行完了,再来采集结果 for res in res_l: print(res.get())
进程池控制并发的套接字通信
服务端
from multiprocessing import Pool from socket import * import os s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(('127.0.0.1',8080)) s.listen(5) def talk(conn,addr): print(os.getpid()) while True: try: data=conn.recv(1024) if not data:break conn.send(data.upper()) except Exception: break conn.close() if __name__ == '__main__': p=Pool(4) while True: conn,addr=s.accept() p.apply_async(talk,args=(conn,addr)) s.close()
客户端
from socket import * c.socket(AF_INET,SOCK_STREAM) c.connetc(('127.0.0.1',8080)) while True: msg=input('>>:').strip() if not msg:continue c.send(msg.encode('utf-8')) data=c.recv(1024) print(data.decode('utf-8')) c.close()
进程池之回调函数
把耗时长的任务放到进程池中,耗时时间短的放到回调函数中
左边并发着执行一堆任务,谁好了谁就交给回调函数处理,
整体的效率就提升起来了
import requests import os,time def get_page(url): print('<%s> get: %s' % (os.getpid(),url)) response = request.get(url) if response.status_code==200: return {'url':url,'text':response.text} def parse_page(dic): print('<%s> parse: %s' %(os.getpid(),dic['url'])) time.sleep(0.5) res='url:%s size:%s '%(dic['url'],len(dic['text'])) #模拟解析网页内容 with open('db.txt','a') as f: f.write(res) if __name__ == '__main__': p=Pool(4) urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] for url in urls: p.apply_async(get_page,args=(url,),callback=parse_page) #异步的提交一堆任务,提交完后,一堆任务在进程池中轮询执行, #谁执行完就会通知主进程调用parse_page解析 p.close() p.join() print('主进程pid:',os.getpid())
paramiko模块
模拟SSH远程控制服务端,可以实现远程执行命令和上传下载文件
#创建SSH对象 ssh=paramiko.SSHClient() #允许链接不在know_hosts文件中的主机 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #连接服务器 ssh.connect(hostname='192.111.111.111',port=22,username='root',password='123') #执行命令 stdin,stdout,stderr=ssh.exec_command('df') #获取命令结果 result=stdout.read() print(result.decode('utf-8')) #关闭连接 ssh.close()