管道

#创建管道的类: Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道 #参数介绍: dumplex:默认管道是全双工的,如果将duplex设置成False,conn1只能用于接收,conn2只能用于发送。 #主要方法: conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。 conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象 #其他方法: conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法 conn1.fileno():返回连接使用的整数文件描述符 conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout设成None,操作将无限期地等待数据到达。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。 conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收 conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。 管道介绍
from multiprocessing import Process, Pipe conn1, conn2 = Pipe() conn1.send('你好') print('>>>>>>>>>>>') msg = conn2.recv() print(msg)
from multiprocessing import Process, Pipe def func1(conn2): msg = conn2.recv() print(msg) if __name__ == '__main__': conn1, conn2 = Pipe() p = Process(target=func1, args=(conn2,)) p.start() conn1.send('你好啊,我叫赛利亚')
管道错误模拟: 管道关闭, 异常处理
from multiprocessing import Process, Pipe def func(conn2): while 1: try: # 如果管道一端关闭了, 另外一端接收消息时会报错, 要使用异常处理 msg = conn2.recv() print(msg) except EOFError: print('对方管道已关闭') conn2.close() break if __name__ == '__main__': conn1, conn2 = Pipe() p = Process(target=func, args=(conn2,)) p.start() conn1.send('你好啊') conn1.close() # conn1.recv() # OSError: handle is closed
数据共享
Manager
进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的 虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
from multiprocessing import Process, Manager def func(m_dic): m_dic['name'] = '大猪蹄子' # 修改共享数据 if __name__ == '__main__': m = Manager() m_dic = m.dict({'name': '大佬'}) # 创建共享数据 print('原始>>>', m_dic) # 打印初始共享数据 p = Process(target=func, args=(m_dic,)) p.start() p.join() print('主进程>>>>', m_dic) # 打印的是修改后的共享数据
''' 多进程同时获取数据, 修改后重新赋值, 可能同时拿到100, 减1后都把99赋值回去, 得到的数据不准确,数据不安全 可以通过加锁解决 ''' from multiprocessing import Process, Manager def func(m_dic): m_dic['count'] -= 1 if __name__ == '__main__': m = Manager() m_dic = m.dict({'count': 100}) lst = [] for i in range(50): p = Process(target=func, args=(m_dic,)) p.start() lst.append(p) [p.join() for p in lst] print('主进程>>>', m_dic)
# 加锁, 解决数据错乱问题 from multiprocessing import Process, Manager, Lock def func(m_dic, ml): # with ml: 下面的缩进内容等同于 ml.acquire() ml.release() 之间的内容, 作用: 加锁 with ml: m_dic['count'] -= 1 if __name__ == '__main__': m = Manager() ml = Lock() m_dic = m.dict({'count': 100}) lst = [] for i in range(50): p = Process(target=func, args=(m_dic, ml)) p.start() lst.append(p) [p.join() for p in lst] print(m_dic)
进程池
什么是进程池?进程池的作用. 并行 并发 同步 异步 阻塞 非阻塞 互斥 死锁.
import time from multiprocessing import Process, Pool def func(n): time.sleep(1) print(n) if __name__ == '__main__': pool = Pool(4) # 设置进程数量, 如果不设置, 默认是CPU数量 pool.map(func, range(100)) # map自带join功能, 异步执行任务, 参数是可迭代对象
进程池比多进程效率高

import time from multiprocessing import Process, Pool def func(n): for i in range(100): n += 1 print(n) if __name__ == '__main__': pool_start_time = time.time() pool = Pool() pool.map(func, range(100)) pool_end_time = time.time() pool_dif_time = pool_end_time - pool_start_time lst = [] p_s_time = time.time() for i in range(100): p = Process(target=func, args=(i,)) p.start() lst.append(p) [p.join() for p in lst] p_e_time = time.time() pd_time = p_e_time - p_s_time print('进程池执行时间>>>', pool_dif_time) print('多进程执行时间>>>', pd_time)

import time from multiprocessing import Process, Pool def func(i): time.sleep(0.5) print(i**2) if __name__ == '__main__': pool = Pool(4) pool_s_time = time.time() pool.map(func, range(100)) pool_e_time = time.time() pool_dif_time = pool_e_time - pool_s_time p_lst = [] p_s_time = time.time() for i in range(100): p = Process(target=func, args=(i,)) p.start() p_lst.append(p) [p.join() for p in p_lst] p_e_time = time.time() p_dif_time = p_e_time - p_s_time print('数据池执行时间:', pool_dif_time) print('多进程运行时间:', p_dif_time)
进程池的同步方法: apply
import time from multiprocessing import Process, Pool def fun(i): time.sleep(0.5) # print(i) return i**2 if __name__ == '__main__': p = Pool(4) for i in range(10): res = p.apply(fun, args=(i,)) # 同步执行的方法, 它会等待任务的返回结果(return) print(res) # 打印的是fun的返回值(return)
print('主进程结束') # 子进程都结束后打印
进程池的异步方法: apply_async # [ apply()方法的变体,它返回一个结果对象。]
import time from multiprocessing import Process, Pool def fun(i): time.sleep(0.5) return i**2 if __name__ == '__main__': p = Pool(4) res_lst = [] for i in range(10): res = p.apply_async(fun, args=(i,)) # 异步执行, res是对象multiprocessing.pool.ApplyResult object 主进程代码执行完毕不会等待子进程, 直接关闭主进程. res_lst.append(res) for i in res_lst: print(i.get())
print('主进程结束') # 如果没有i.get()方法, 则主进程不会等待子进程执行完就会结束
get([timeout])
在产生结果时返回该结果。如果超时限制不是空, 而且结果没有在时限内返回, 则抛出多进程超时错误。 如果远程调用报出异常,那么get()方法将再次抛出这个异常。
# 如果不加close和join, 程序会直接随主进程结束运行,不会等待打印i. 加join后可以感知进程的运行 import time from multiprocessing import Process, Pool def fun(i): time.sleep(0.5) print(i) return i**2 if __name__ == '__main__': p = Pool(4) res_lst = [] for i in range(10): res = p.apply_async(fun, args=(i,)) res_lst.append(res) # print(res) # 异步执行, res是多个对象 <multiprocessing.pool.ApplyResult object at 0x000001B5BBD7C128> p.close() # 不是关闭进程池,而是不允许再有其他任务来使用进程池 p.join() # 这是感知进程池中任务的方法,等待进程池的任务全部执行完 for el in res_lst: print('结果>>>', el.get())
# time.sleep(4) # 如果把close和join还有for循环都注释掉, 此处等待几秒也可以打印出i print('主进程结束')
回调函数 callback
import os from multiprocessing import Process, Pool def func1(n): print('func1', os.getpid()) return n*n def func2(nn): print('func2', os.getpid()) print(nn) if __name__ == '__main__': print('主进程:', os.getpid()) p = Pool(4) p.apply_async(func1, args=(10,), callback=func2) #把func的返回结果传参给func2, func2 在主进程中运行 如果func1返回多个结果, 那么将以元组的形式传给func2 p.close() p.join()
线程切换

#什么是线程: #指的是一条流水线的工作过程,关键的一句话:一个进程内最少自带一个线程,其实进程根本不能执行,进程不是执行单位,是资源的单位,分配资源的单位 #线程才是执行单位 #进程:做手机屏幕的工作过程,刚才讲的 #我们的py文件在执行的时候,如果你站在资源单位的角度来看,我们称为一个主进程,如果站在代码执行的角度来看,它叫做主线程,只是一种形象的说法,其实整个代码的执行过程成为线程,也就是干这个活儿的本身称为线程,但是我们后面学习的时候,我们就称为线程去执行某个任务,其实那某个任务的执行过程称为一个线程,一条流水线的执行过程为线程 #进程vs线程 #1 同一个进程内的多个线程是共享该进程的资源的,不同进程内的线程资源肯定是隔离的 #2 创建线程的开销比创建进程的开销要小的多 #并发三个任务:1启动三个进程:因为每个进程中有一个线程,但是我一个进程中开启三个线程就够了 #同一个程序中的三个任务需要执行,你是用三个进程好 ,还是三个线程好? #例子: # pycharm 三个任务:键盘输入 屏幕输出 自动保存到硬盘 #如果三个任务是同步的话,你键盘输入的时候,屏幕看不到 #咱们的pycharm是不是一边输入你边看啊,就是将串行变为了三个并发的任务 #解决方案:三个进程或者三个线程,哪个方案可行。如果是三个进程,进程的资源是不是隔离的并且开销大,最致命的就是资源隔离,但是用户输入的数据还要给另外一个进程发送过去,进程之间能直接给数据吗?你是不是copy一份给他或者通信啊,但是数据是同一份,我们有必要搞多个进程吗,线程是不是共享资源的,我们是不是可以使用多线程来搞,你线程1输入的数据,线程2能不能看到,你以后的场景还是应用多线程多,而且起线程我们说是不是很快啊,占用资源也小,还能共享同一个进程的资源,不需要将数据来回的copy!
线程的创建方法一:
# 线程和进程很像, 一个进程中至少有一个线程, 进程是资源层面的, 线程负责实际的操作 import time from threading import Thread def func(n): time.sleep(1) # 子线程运行地太快了, 如果不加time.sleep,会在打印主线程之前跑完 print(123) if __name__ == '__main__': t = Thread(target=func, args=(1,)) t.start() t.join() # 等待子线程跑完再执行主线程下面的内容 print('主线程')
线程的创建方法二:
from threading import Thread class MyThread(Thread): def __init__(self, n): super().__init__() self.n = n def run(self): print('换汤不换药') print('self.n>>>', self.n) if __name__ == '__main__': t = MyThread('你好') t.start() t.join() print('主线程结束')