zoukankan      html  css  js  c++  java
  • python多进程

    多进程

    • 进程:正在进行的过程或者说是一个任务,而负责执行任务则是cpu
    • 同一个程序执行两次是两次进程
    • 并发:
    • 并行:基于多核cpu

    unix开子进程的拷贝一份父进程的数据

    进行的三个状态:运行,阻塞,就绪

    同步与异步

    • 同步:提交一个任务,只有等待这个任务结束才会继续提交下一个任务
    • 异步:只管提交任务,不等待这个任务执行完毕就可以做其他事情
    • 阻塞:socket模块中的:recv,accept
    • 非阻塞:正常执行代码

    在python中如何开启子进程

    1. multiprocessing模块中的process类
    # 方式1
    from multiprocessing import Process
    import time
    
    def task(name):
    	print('%s is running'%name)
    	time.sleep(5)
    	print('%s is done'%name)
    
    if __name__ =='__main__':
    	p = Process(target=task,args=('子进程1',))
    	p.start() #仅仅只是给操作系统发送了一个信号
    
    #方式二:自定义类继承自process
    from multiprocessing import Process
    import time
    class MyProcess(Process):
    	def __init__(self,name):
    		super().__init__()
    		self.name = name
    
    	def run(self): #函数名必须是run
    		print('%s is running'%self.name)
    		time.sleep(5)
    		print('%s is done'%self.name)
    
    if __name__ == '__main__':
    	p = MyProcess('子进程1')
    	p.start()#本质就是在调用p.run
    	print('进')
    

    查看进程的pid与ppid(进程id)

    • 补充:
      • windows查看正在执行的进程:tasklist | findstr pycharm
      • Mac os查看正在执行的进程:ps aus|grep pycharm
    from multiprocessing import Process
    import time
    import os
    
    
    def task(name):
    	print('%s is running,parent id is <%s>'%(os.getpid(),os.getppid()))
    	time.sleep(5)
    	print('%s is done'%os.getpid())
    
    if __name__ =='__main__':
    	p = Process(target=task,args=('子进程1',))
    	p.start() #仅仅只是给操作系统发送了一个信号
    	print('主',os.getpid())
    

    process对象的其他属性或方法

    • join()等子进程结束完毕才会继续运行主进程,主进程结束后所有的僵尸进程结束
    • start()只是向操作系统发送信号,并不只是把进程立马开起来,如果连续有几个start有可能执行的先后顺序会错乱,如果start方法后面立马接一个join,多个子进程会变成串行
    • is_alive()查看子进程时候已经结束
    • terminate()杀死这个子进程
    • pid()查看进程id
    • name()查看这个对象的名字

    守护进程

    • 当主进程执行结束子进程跟着结束:将daemon属性设置为true,守护进程不能有子进程。代码执行到程序最后一行代表程序运行结束

    互斥锁

    • 把并发变成串行:使用multiprocessing模块下的Lock对象
    #牺牲效率实现子进程串行
    from multiprocessing import Process,Lock
    import time
    
    def tack(name,lock):
    	lock.acquire()#加锁
    	print('%s,1'%name)
    	time.sleep(1)
    	print('%s,2'%name)
    	time.sleep(1)
    	print('%s,3'%name)
    	lock.release()#释放锁
    if __name__ == '__main__':
    	lock = Lock()#实例化锁对象
    	for i in range(3):
    		p = Process(target=tack,args=('进程%s'%i,lock))#把锁传到子进程中
    		p.start()
    
    • 互斥锁与join的区别:join确实能实现代码串行,但join把整个代码变成串行,但互斥锁可以把部分代码变成串行

    事件

    • 一个信号可以使所有的进程都进入阻塞状态,也可以控制所有的进程解除阻塞
    • 一个事件被创建之后,默认是阻塞状态
    from multiprocessing import Event
    e = Event()# 创建了一事件
    print(e.is_set())# 查看一个事件的状态,默认是阻塞
    e.set() # 将这个事件状态改为True
    print(e.is_set())
    e.wait()# 根据e.is_set()的值决定是否阻塞,如果是True就是不阻塞,如果是False就是阻塞状态
    e.clear() # 将这个事件状态改为False
    
    • 事件模拟红绿灯:
    import time
    import random
    from multiprocessing import Process,Event
    
    def light(e):
    	while True:
    		if e.is_set():
    			e.clear()
    			print('红灯亮了')
    		else:
    			e.set()
    			print('绿灯亮了')
    		time.sleep(5)
    def cars(i,e):
    	if not e.is_set():
    		print('%s 在等待'%i)
    		e.wait()
    	print('%s 通行了'%i)
    
    
    if __name__ == '__main__':
    	e = Event()
    	p1 = Process(target=light,args=(e,))
    	p1.start()
    	for i in range(20):
    		p = Process(target=cars,args=(i,e))
    		p.start()
    		time.sleep(2)
    

    队列

    • multiprocessing有提供基于ipc通信类(inter-process communication)
    from multiprocessing import Queue
    
    q = Queue(4)#指定队列大小,如果不指定大小则大小为无穷尽
    
    q.put('hello')#插入数据到队列中
    print(q.full())#判断队列中数据是否满了
    q.get()#从队列读取并删除一个数据
    print(q.empty())#判断队列中数据是否空了
    
    • 生产者消费这模型:
      • 生产者:生产数据任务
      • 消费者:消费数据任务
    from multiprocessing import Process,Queue
    import time
    def producer(q):
    	for i in range(3):
    		res = '包子%s'%i
    		time.sleep(0.5)
    		q.put(res)
    		print('生产者生产了%s'%res)
    
    def consumer(q):
    	while True:
    		res = q.get()
    		if res == None:break
    		time.sleep(0.7)
    		print('消费者吃了%s'%res)
    
    
    if __name__ == '__main__':
    	q = Queue()
    	p1 = Process(target=producer,args=(q,))
    	c1 = Process(target=consumer,args=(q,))
    	p1.start()
    	c1.start()
    	p1.join()
    	c1.join()
    	q.put(None)
    
    • JoinableQueue:
      • 消费者:每次获取一个数据,处理一个数据,并且发送一个记号:标志一个数据被处理成功,计数器-1
      • 生产者:每一次生产一个数据,且每一次生产的数据放在队列中计数器+1,当生产者全部生产完毕之后发送一个join:已经停止生产数据且要等待之前生产的数据被消费完,当数据都被处理完时,join阻塞结束
    from multiprocessing import Process,JoinableQueue
    import time
    def producer(q):
    	for i in range(3):
    		res = '包子%s'%i
    		time.sleep(0.5)
    		q.put(res)
    		print('生产者生产了%s'%res)
    	q.join() #阻塞,直到一个队列中的所有数据,全部被处理完毕
    
    def consumer(q):
    	while True:
    		res = q.get()
    		time.sleep(0.7)
    		print('消费者吃了%s'%res)
    		q.task_done() # 处理完一个数据,队列中的计数器-1
    
    
    if __name__ == '__main__':
    	q = JoinableQueue()
    	p1 = Process(target=producer,args=(q,))
    	c1 = Process(target=consumer,args=(q,))
    	p1.start()
    	c1.daemon = True #守护进程,主进程中的代码执行完毕后,子进程自动结束
    	c1.start()
    	p1.join()
    

    管道

    • 在进程之间传递信息(数据)
    from multiprocessing import Pipe,Process
    
    def func(conn1,conn2):
    	conn1.close()
    	while True:
    		try:
    			msg = conn2.recv()
    			print(msg)
    		except EOFError:
    			print('子进程结束')
    			conn2.close()
    			break
    
    
    if __name__ == '__main__':
    	conn1,conn2 = Pipe()
    	Process(target=func,args=(conn1,conn2)).start()
    	conn2.close()
    	for i in range(5):
    		conn1.send('吃屎了你')
    	conn1.close()
    
    

    进程池

    • 创建进程池的过程
    1. 创建一个属于进程的池子
    2. 这个池子指定能存放多少个进程
    3. 开启进程
    from multiprocessing import Pool,Process
    import time
    import os
    def func(i):
    	print('%s 进程正在运行'%i,os.getpid())
    	time.sleep(2)
    	print('%s 进程运行结束'%i,os.getpid())
    
    
    if __name__ == '__main__':
    	pool = Pool(3)
    	for i in range(10):
    	    # pool.apply(func,args=(i,)) 同步调用
    		pool.apply_async(func,args=(i,)) # 异步调用
    	pool.close() # 结束进程池接收任务
    	pool.join() # 感知进程池中的任务执行结束
    	# apply_async必须是与close、join一起使用
    
    • 进程池的返回值
    from multiprocessing import Pool
    
    def func(i):
    
    	return i*i
    
    if __name__ == '__main__':
    	p = Pool(5)
    	res_lis = []
    	for i in range(10):
    		res = p.apply_async(func,args=(i,))
    		res_lis.append(res) 
    
    	for res in res_lis:
    		print(res.get())# res.get 默认是阻塞的,因为需要接收进程处理的结果,之后接收到结果之后才可以继续执行,所以
    							# 所以将进程的返回结果放在一个列表中,然后循环这个列表,再执行get就不会阻塞了
    	p.close()
    	p.join()
    
    • 进程池的回调函数
    from multiprocessing import Pool
    import os
    def func1(i):
    	print('in func1',os.getpid())
    	return i*i
    
    def func2(ii):
    	print('in func2',os.getpid())
    	print(ii)
    
    if __name__ == '__main__':
    	'''
    	1. func1执行结果当作func2的参数,func1进程执行结束后返回结果,通过callback调用func2,将func1的执行返回结果当作参数传递给func2执行
    	2. func1是一个新的进程
    	3. func2是在主进程中执行的
    	'''
    	p = Pool(5)
    	p.apply_async(func1,args=(2,),callback=func2) 
    	p.close()
    	p.join()
    
  • 相关阅读:
    DataTables中自增序号的实现
    MVC中调用模态框之后导致JS失效
    teamviewer13破解版
    屏幕录制专家破解版
    Navicat Premium 12 (内含破解补丁)
    Navicat新建连接出现10060 "Unknown error" 错误
    Windows添加永久路由
    VMware虚拟机下Linux网络配置——桥接模式
    文件基本属性
    ifconfig: command not found
  • 原文地址:https://www.cnblogs.com/wualin/p/9956900.html
Copyright © 2011-2022 走看看