一、管道
管道:进程与进程之间能相互通信
通信原理:是基于管道双向通信
from multiprocessing import Pipe, Process conn1, conn2 = Pipe() conn1.send("123456") print(conn2.recv())
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from multiprocessing import Pipe, Process def func(conn): while True: msg = conn.recv() if msg == None: break print(msg) if __name__ == '__main__': conn1, conn2 = Pipe() Process(target=func, args=(conn1,)).start() for i in range(10): conn2.send("吃了么") conn2.send(None) """ #第二种: def func(conn1, conn2): conn2.close() while True: try: msg = conn1.recv() print(msg) except EOFError : conn1.close() break if __name__ == '__main__': conn1, conn2 = Pipe() Process(target=func, args=(conn1,conn2)).start() conn1.close() for i in range(10): conn2.send("吃了么") conn2.close() # 疑问:主进程关闭通道不会影响子进程接收数据 # # ********应该特别注意管道端点的正确管理问题,如果是生产者或者消费者中都没有使用管道的某个端点,就应该将它关闭 这也说明为何在生产中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中 的recv()操作阻塞。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常 因此,在生产中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点 # """
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from multiprocessing import Pipe, Process, Lock import random, time def producer(con, pro, name, food): con.close() for i in range(4): time.sleep(random.randint(1, 3)) f = "%s生产%s%s" % (name, food, i) print(f) pro.send(f) pro.close() def consumer(con, pro, name): pro.close() while True: try: # lock.acquire() food = con.recv() # lock.release() print("%s吃了%s" % (name, food)) time.sleep(random.randint(1, 3)) except EOFError: print("%s吃完啦" % name) con.close() break if __name__ == '__main__': con, pro = Pipe() # lock = Lock() p = Process(target=producer, args=(con, pro, "猪狗", "泔水")) p.start() c = Process(target=consumer, args=(con, pro, "逗比")) c1 = Process(target=consumer, args=(con, pro, "坦克" )) c.start() c1.start() con.close() pro.close() # Pipe 管道数据不安全(可能发生多进程抢一个数据)枷锁来控制抢资源对象 管道属于最底层的东西 # 队列是 管道+锁 所以比较安全,一般用队列 """ 应该特别注意管道端点的正确管理问题,如果是生产者或者消费者中都没有使用管道的某个端点,就应该将它关闭 这也说明为何在生产中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中 的recv()操作阻塞。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常 因此,在生产中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点 """
二、进程之间的数据共享
模块:Manager
from multiprocessing import Manager, Process, Lock
def main(dic, lock): lock.acquire() dic["count"] -= 1 lock.release() print(dic) if __name__ == '__main__': lock = Lock() # 牺牲效率变成串行(同步),但是安全 m = Manager() # 可以把数据共用多个进程共享,但是会发生多个子进程抢同一个资源,造成数据混乱,使用必须加锁 dic = m.dict({"count": 100}) p_lis = [] for i in range(50): p = Process(target=main, args=(dic, lock)) p_lis.append(p) # 把对象添加到列表 p.start() for i in p_lis: i.join() # 把每个对象添加join,所有子进程结束,主进程才运行 print("主进程:", dic)
三、进程池
效率:
每次开启进程,开启属于这个进程的内存空间
寄存器 堆栈 文件
进程过多 操作系统的调度
进程池:
python中 先创建一个属于进程的池子
这个池子指定能存放n个进程
先将这些进程创建好,如果有n个任务,就让n个进程去执行,这样减少cpu的占用率,提高效率(防止一起去执行,占用资源过大)
1.Pool模块进程池
from multiprocessing import Pool, Process import time def fun(n): for i in range(10): print(n + 1) if __name__ == '__main__': start1 = time.time() pool = Pool(5) # 5个进程 pool.map(fun, range(100)) # 100个任务 自带join 和close map参数必须是可迭代 t1 = time.time() - start1 start2 = time.time() p_lis = [] for i in range(100): p = Process(target=fun, args=(i,)) p_lis.append(p) p.start() for i in p_lis: i.join() t2 = time.time() - start2 print(t1, t2) # 0.1535499095916748 2.3756473064422607 # 进程池:是同时并行执行5个程序,差不多执行20次 多进程:是100个子进程分别(异步)执行一次,100次
# 异步apply_async用法:如果使用异步提交的任务,主进程需要使用join,等待进程池内任务都处理完,然后可以用get收集结果
# 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了"""
from multiprocessing import Pool import time, os def func(n): print("start func %s" % n, os.getpid()) time.sleep(1) print("end func%s" % n, os.getpid()) if __name__ == '__main__': pool = Pool(5) # pool.map(func,) for i in range(20): pool.apply_async(func, args=(i,)) # apply同步提交任务 apply_async异步提交任务 time.sleep(5) # pool.close() # 结束进程池接收任务 关闭进程池,防止进一步操作。如果所有操作持续阻塞,它们将在工作进程终止前完成 # pool.join() # 感知进程池中的任务执行结束 # 5个进程交替执行任务 # apply_async()需要先close 后join 来保持子进程和主进程代码的同步性
2.第二种方法:from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor import time import os def task(n): print(n, os.getppid()) time.sleep(2) return n * 2 def call_back(n): print("拿到异步提交任务返回结果:", n.result()) if __name__ == '__main__': pool = ProcessPoolExecutor(5) t_lis = [] for i in range(10): res = pool.submit(task, i).add_done_callback(call_back) #异步多进程 t_lis.append(res) pool.shutdown() # 相当于p.join和p.close() 等待子进程结束 print("主进程:", os.getpid())
三、进程池的返回值
第一种:
from multiprocessing import Pool import time def func(i): time.sleep(1) return i * i if __name__ == '__main__': p = Pool(5) # res = p.map(func, range(10)) # 自带close和join 进程结束后,一次性打印 # print(res) res_lis = [] for i in range(10): res = p.apply_async(func, args=(i,)) res_lis.append(res) # print(res.get()) # get()阻塞等待结果 p.close() p.join()for i in res_lis:
print(i.get()) # 拿到返回值
第二种:
from concurrent.futures import ProcessPoolExecutor import time def func(i): time.sleep(1) return i ** 2 if __name__ == '__main__': pool = ProcessPoolExecutor(5) res_list = [] for i in range(10): res = pool.submit(func, i) res_list.append(res) pool.shutdown() # 相当于close()和join() for i in res_list: print(i.result()) # 不同于上面是get()换成result()
*****模块不一样方式不同
四、回调函数
回调函数会在主函数中进行
第一种:
from concurrent.futures import ProcessPoolExecutor import os def func1(n): print("in func1 ", os.getpid()) return n * n def func2(nn): print("in func2 ", os.getpid()) print(nn.result()) if __name__ == '__main__': pool =ProcessPoolExecutor(5) pool.submit(func1,10).add_done_callback(func2) pool.shutdown() # 关机 print("主进程:",os.getpid())
第二种:
from multiprocessing import Pool import os def func1(n): print("in func1", os.getpid()) return n * n def func2(nn): print("in func2", os.getpid()) print(nn) if __name__ == '__main__': p = Pool(5) # for i in range(10): p.apply_async(func1, args=(10,), callback=func2) p.close() p.join() print("主进程:", os.getpid()) # func2进程数和主进程一样,证明回调
五、进程池socket 通信高并发
#服务端 from multiprocessing import Pool import os def func1(n): print("in func1", os.getpid()) return n * n def func2(nn): print("in func2", os.getpid()) print(nn) if __name__ == '__main__': p = Pool(5) # for i in range(10): p.apply_async(func1, args=(10,), callback=func2) p.close() p.join() print("主进程:", os.getpid()) # func2进程数和主进程一样,证明回调
#客户端 import socket client = socket.socket() client.connect(("127.0.0.1", 8080)) ret = client.recv(1024).decode("utf8") print(ret) msg = input(">>>>>>>:").encode("utf8") client.send(msg) client.close()
六、利用多线程,爬虫例子
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
""" 装模块方法:cmd中: pip3 install 模块名""" import requests from multiprocessing import Pool def get(url): response = requests.get(url) if response.status_code == 200: return url, response.content.decode("utf8") def call_back(args): url, content = args print(url, len(content)) if __name__ == '__main__': url_lst = ["http://www.baidu.com/", "https://www.cnblogs.com", "https://www.sogou.com/"] p = Pool(5) for url in url_lst: p.apply_async(get, args=(url,),callback=call_back) p.close() p.join()