1共享 数据
2进程池与应用
1共享数据
#共享数据 from multiprocessing import Process,Lock,Manager def work(shar_dict,mutex): mutex.acquire() shar_dict['count']-=1 mutex.release() if __name__ == '__main__': mutex=Lock()#生成一把锁 m=Manager()#共享内存 shar_dict=m.dict({'count':100})#共享字典 P_1=[]#生成空的列表 for i in range(100):#循环5次 p=Process(target=work,args=(shar_dict,mutex))#实例化线程 P_1.append(p)#把线程放在列表里 p.start()#启动 for i in P_1: i.join()#等待线程运行完 print(shar_dict)#打印数据 #共享数据:因为共享会有竞争,需要加把锁
注:两个进程需要通过管道交互,队列相当于管道加锁,几个CPU最适合几个进程;等进程需要用join方法。
2进程池与它的应用
from multiprocessing import Pool,Process import os,time def task(n): print('<%s>is running'%os.getpid()) time.sleep(2) print('<%s>is done'%os.getpid()) return n**2 if __name__ == '__main__': # print(os.cpu_count())查看几个CPU的 p=Pool(4)#进程池里的进程数目 for i in range(6): res=p.apply(task,args=(i+1,))#起进程是同步进程 print('本次任务%s'%res)#打印本次进程的结果是同步的 p.close()#对于本次进程池不在往里面放任务 print('主程序')
#异步(结果最后出现) from multiprocessing import Pool, Process import os, time def task(n): print('<%s>is running' % os.getpid()) time.sleep(2) print('<%s>is done' % os.getpid()) return n ** 2 if __name__ == '__main__': p = Pool(4) obj_l=[] for i in range(6): res = p.apply_async(task, args=(i + 1,))#起进程是异步的 obj_l.append(res)#先把所有的结果放在一个列表里 p.close() p.join() print('主程序') print([i.get() for i in obj_l])#使用的是列表推导式 # for i in obj_l:#最后再答应 # print(i.get())
通信服务端使用进程池
#服务端 from socket import * from multiprocessing import Pool s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加 s.bind(('127.0.0.1',8090)) s.listen(5) def talk(conn,addr): while True: #通信循环 try: data=conn.recv(1024) if not data:break conn.send(data.upper()) except Exception: break conn.close() if __name__ == '__main__': p=Pool(4)#进程池最大的进程数如果超过这这个链接数就会夯住,同时并发的只有的4个 while True:#链接循环 conn,addr=s.accept() p.apply_async(talk,args=(conn,addr))#起进程 s.close()
#客户端 from socket import * c=socket(AF_INET,SOCK_STREAM) c.connect(('127.0.0.1',8090)) while True: msg=input('>>: ').strip() if not msg:continue c.send(msg.encode('utf-8')) data=c.recv(1024) print(data.decode('utf-8')) c.close()
在爬虫方面的应用:
from multiprocessing import Pool import requests import os def get_page(url): print('<%s>get<%s>'%(os.getpid(),url)) response=requests.get(url) return {'url':url,'text':response.text} if __name__ == '__main__': p=Pool() urls=['https://www.baidu.com','http://www.openstack.org','https://www.python.org','https://help.github.com/','http://www.sina.com.cn/'] obj_l=[] for url in urls : obj=p.apply_async(get_page,args=(url,) ) obj_l.append(obj) p.close()#本次运行不在放任务 p.join()#等待子进程的结束 print([i.get()for i in obj_l])#列表推导式 # for i in obj_l: # print(i.get())
使用回调函数:
from multiprocessing import Pool import requests import os def get_page(url): print('<%s>get<%s>'%(os.getpid(),url)) response=requests.get(url) return {'url':url,'text':response.text} def parse_page(res): print('<%s> parse [%s]' %(os.getpid(),res['url'])) with open('db.txt','a') as f: parse_res='url:%s size:%s ' %(res['url'],len(res['text'])) f.write(parse_res) if __name__ == '__main__': p=Pool()#不写参数就是默认的cpu个数 urls=['https://www.baidu.com','http://www.openstack.org','https://www.python.org','https://help.github.com/','http://www.sina.com.cn/'] for url in urls : obj=p.apply_async(get_page,args=(url,),callback=parse_page )#使用回调函数起进程和回调都是主进程做的事情,从当前看回调函数没有参数 p.close() p.join() print('主',os.getpid())#打印主线程的id号可以看出运行回调函数的也是主线程
注:window下子进程不接受父进程的当前的值作为初始值
进程与线程的区别
#1 创建线程的开销比创建进程的开销小,因而创建线程的速度快 from multiprocessing import Process from threading import Thread import os import time def work(): print('<%s> is running' %os.getpid()) time.sleep(2) print('<%s> is done' %os.getpid()) if __name__ == '__main__': t=Thread(target=work,) # t=Process(target=work,) t.start() print('主',os.getpid()) #同一下的多个线程共享该进程的资源,而多个进程之间内存功空间是隔离的 # from multiprocessing import Process # from threading import Thread # import os # import time # n=100 # def work(): # global n # n-=100 # if __name__ == '__main__': # # n=100 # p=Process(target=work,) # # p=Thread(target=work,) # p.start() # p.join() # print('主',n)
我们使用进程实现多客户并发,用队列在实现一个一个的排队,但最后我们使用RabbitMQ来实现多客户并发。http://blog.csdn.net/column/details/rabbitmq.html