zoukankan      html  css  js  c++  java
  • 39.创建多进程及进程通讯 -- Queue--Pipe--Event

    创建多进程

    • windows:进程、线程
    • linux:进程、线程(做了进程通信的多进程实现的线程)
    • 进程之间内存彼此独立,不管是父子进程还是单个独立进程
    • multiprocessing:Process 创建多进程python内置的模块
    • current_process().name 返回的是当前的进程是哪个
    • from multiprocessing import process,current_process
      def work():
          print('我是进程:%s' % current_process().name)
      work()
      
      运行结果:我是进程:MainProcess
      
    • dir(current_process())  :current_process的内置函数
      • ['_Popen', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_bootstrap', '_config', '_identity',
        '_name', '_parent_pid', '_popen', 'authkey', 'daemon', 'exitcode', 'ident', 'is_alive', 'join', 'name', 'pid', 'run', 'sentinel', 'start', 'terminate']
        
      • current_process().pid:是打印当前进程id:返回的是当前进程的id
      • current_process().name:返回的是当前的进程名称
      • current_process().terminate:直接终止进程
      • current_process().is_alive():返回进程的存活状态 True/False
      • current_process().exitcode:0代表进程死亡 None代表进程运行
      • current_process().ident:和pid类似
    • 创建多进程其实就是把一个任务(工作函数)绑定在一个子进程上,然后将任务分配多个多个子进程上去执行
    • 大家记得一定要把多进程的创建放在main函数里面
    • help(process)  :创建多进程函数帮助文档,target:调用函数名 ,name:子进程命名,args:元组传参,kwargs:字典传参
      •  __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *
        , daemon=None)
    • p1 = Process(target=func,name=,args=(),kwargs={}):创建子进程传参模型
      • p1.start():开启进程
      • p1.join():回收子进程
      • p1.name :#获取进程名字
      • p1.pid :获取进程ID之
      • p1.terminate :直接终止进程
      • p1.is_alive() :返回进程的存活状态 True/False
      • p1.exitcode: 0代表进程死亡 None代表进程运行
      • p1.ident :和pid类似
    • import sys  sys.stdout.flush() #刷新缓存区
    • #创建多进程1.py
      from multiprocessing import Process,current_process
      import sys
      def work():
          print('我是子进程:%s进程id:%s' % (current_process().name,current_process().pid))
          sys.stdout.flush() #刷新缓存
      def main():
          print('当前父进程:%s进程id:%s' % (current_process().name,current_process().pid))
          p1 = Process(target=work,name='子进程一号')   #创建子进程
          p2 = Process(target=work,name='子进程二号')    #创建子进程
          p1.start() #开启进程
          p2.start()
          print(p1.name,p1.pid,p1.is_alive(),p1.exitcode,p1.ident)
          print(p2.name,p2.pid,p2.is_alive(),p2.exitcode,p2.ident)
          p1.join()   #回收子进程
          p2.join()
      if __name__ == "__main__":
          main()
      ##当前进程数是3个,一个父进程两个子进程
      

      运行结果:

      当前父进程:MainProcess进程id:134216
      子进程一号 129748 True None 129748
      子进程二号 134780 True None 134780
      我是子进程:子进程一号进程id:129748
      我是子进程:子进程二号进程id:134780
      
    • #创建多进程2.py
      from multiprocessing import Process,current_process
      import sys
      def work(a,char):
          print('-----------------------------')
          print('当前子进程:%s' % current_process().name)
          a[0] = char
          print('当前子进程修改完毕:',a)
          sys.stdout.flush() #刷新缓存
      def main():
          a = [1,2,3]
          print('当前父进程:',a)
          p1 = Process(target=work,name='进程1',args=(a,'a'))
          p2 = Process(target=work,name='进程2',args=(a,'zhang'))
          p1.start()
          p2.start()
          p1.join()
          p2.join()
      if __name__ == "__main__":
          main()
      

      运行结果:

      当前父进程: [1, 2, 3]
      -----------------------------
      当前子进程:进程1
      当前子进程修改完毕: ['a', 2, 3]
      -----------------------------
      当前子进程:进程2
      当前子进程修改完毕: ['zhang', 2, 3]

    僵尸进程

    • 如果父进程没有对子进程合理回收
      • 父进程没有在子进程工作结束之后立即回收
      • PCB控制块 PID继续占用  僵尸进程
    • 子进程结束 ,父进程依然存活
    • from multiprocessing import Process,current_process
      from time import sleep
      def work():
      	print('子进程开始了')
      	print('PID:%s' % current_process().pid )
      	print('子进程结束了')
      def main():
      	p = Process(target=work)
      	p.start() #父进程开启了子进程
      	#p.join() #回收子进程 或者忘记回收
      	#父进程退出了
      	sleep(10) #退出
      	#p.join()
      if __name__ == '__main__':
      	main()  

    孤儿进程

    • Python代码中如果在父进程没有明确写出join回收子进程,在父进程结束之后,父进程回自动帮助回收
    • kill -9 pid  强制杀死

    多进程通讯--Queue

    • 》》》》Python中任何普通的基础数据类型,都不可以在多进程下通信 
    • **Queue**(size):共享通讯队列,阻塞的行为,默认为阻塞,block=False设置为非阻塞
      • 值满了不能放:q.put(block=True) 默认为True
      • 值空了不能取,会一直等:q.get(block=True)
      • 当修改了拿取的方式为非阻塞,那么数据在取不到或者立即放不进去的时候会直接报错
        • queue.Full 满了
        • queue.Empty 空的
      • q.empty():判断队列是否为空
      • q.full():判断队列是否为满
      • q.qsize():返回队列数据个数
    • #创建共享队列1.py
      from multiprocessing import Process,current_process,Queue
      def main():
          q = Queue() #创建共享队列
          print('当前父进程:',q)
          for var in range(5):
              q.put(var)
          print(q.get())
          print(q.get())
          print(q.get())
          print(q.get())
          print(q.get())
      if __name__ == "__main__":
          main()
      

      运行结果:

      当前父进程: <multiprocessing.queues.Queue object at 0x000000000277A2E8>
      0
      1
      2
      3
      4    
    • #创建共享队列2.py
      from multiprocessing import Process,current_process,Queue
      def product(q):#生产数据
          for var in range(5):
              q.put(var,block=False)#非阻塞 
      def custom(q):#消费数据
          for var in range(5):
              i = q.get(block=False)#非阻塞
              print('%s取到的数据:%s' % (current_process().name,i))
      def main():
          q = Queue(5) #创建共享队列
          p1 = Process(target=product,name='生产进程',args=(q,))
          p2 = Process(target=custom,name='消费进程',args=(q,))
          p1.start(),p2.start(),p1.join(),p2.join()
      if __name__ == "__main__":
          main()
      

      运行结果:

      消费进程取到的数据:0
      消费进程取到的数据:1
      消费进程取到的数据:2
      消费进程取到的数据:3
      消费进程取到的数据:4   
    • #创建共享队列3.py
      from multiprocessing import Process,Queue,current_process
      from time import sleep
      import sys
      def product(q):#某个进程生产数据
      	for var in range(10):
      		q.put(var)
      		sleep(1)
      def custom(q):#这个进程消费数据
          for var in range(5):
              i = q.get()
              print('%s进程取到的数据:%s' % (current_process().name,i))
              sys.stdout.flush()
      def main():
      	q = Queue() #共享队列
      	p1 = Process(target=product,args=(q,))
      	p2 = Process(target=custom,name='消费进程1',args=(q,))
      	p3 = Process(target=custom,name='消费进程2',args=(q,))
      	p1.start(),p2.start(),p3.start(),p1.join(),p2.join(),p3.join()
      	print('------------')
      if __name__ == '__main__':
      	main()
      

      运行结果:

      消费进程1进程取到的数据:0
      消费进程1进程取到的数据:1
      消费进程2进程取到的数据:2
      消费进程1进程取到的数据:3
      消费进程2进程取到的数据:4
      消费进程1进程取到的数据:5
      消费进程2进程取到的数据:6
      消费进程1进程取到的数据:7
      消费进程2进程取到的数据:8
      消费进程2进程取到的数据:9
      ------------
      
    • 下面的是子进程while循环取值,在阻塞行为下值空了不能取,会一直等,那么如何才能不让它退出,也不报错呢?如果改为非阻塞那么值空了就会报错,看下面的方法
    • from multiprocessing import Process,Queue,current_process
      from time import sleep
      import sys
      def product(q,sig):#某个进程生产数据
      	for var in range(8):
      		sig.put(True) #每次生产,都放一个True
      		q.put(var)
      		sleep(1)
      	else:
      		sig.put(False)
      def custom(q,sig):#这个进程消费数据
      	while True:
      		if sig.get(): #代表还有数据要生产
      			i = q.get() 
      			print('%s进程取到的数据:%s' % (current_process().name,i))
      			sys.stdout.flush()
      		else: #没有数据要生产了
      			print('子进程结束,数据生产完毕')
      			sig.put(False)
      			break
      def main():
      	q = Queue() #共享队列
      	sig = Queue()
      	p1 = Process(target=product,args=(q,sig))
      	p2 = Process(target=custom,name='消费进程1',args=(q,sig))
      	p3 = Process(target=custom,name='消费进程2',args=(q,sig))
      	p1.start(),p2.start(),p3.start(),p1.join(),p3.join(),p2.join()
      	print('------------')
      if __name__ == '__main__':
      	main()
      

      运行结果为:

      消费进程1进程取到的数据:0
      消费进程1进程取到的数据:1
      消费进程2进程取到的数据:2
      消费进程1进程取到的数据:3
      消费进程2进程取到的数据:4
      消费进程1进程取到的数据:5
      消费进程2进程取到的数据:6
      消费进程1进程取到的数据:7
      消费进程2:子进程结束,数据生产完毕
      消费进程1:子进程结束,数据生产完毕
      ------------

    多进程通讯--pipe

    • Pipe:管道两端, duplex=True 存储的都是pickle数据类型
    • left,right = Pipe(duplex)
    • pickle = p.recv() 向管道这一侧取出来
    • p.send(pickle) 向管道另一侧去发
    • 管道只能存放和拿取出来pickle数据类型
      • pickle 二进制数据 Python中维持数据类型保存 解析
      • pickle.loads:解析数据
      • pickle.dumps:封装为二进制
    • 管道在使用的时候,是有两端的,要注意使用顺序
    • 首先:多个进程可以享用同一个端,也就是一个管道支持多个进程通信,非常安全的操作(要记住)
    • #pipe 单个消费者生产者模型
      from multiprocessing import Process,current_process,Pipe
      import pickle
      def work_a(p):
              for var in range(5):
                      p.send(pickle.dumps(var))
                      print('%s:生产数据|%s' % (current_process().name,var))
      def work_b(p):
              for var in range(5):
                      pickle_obj = pickle.loads(p.recv()) #阻塞
                      print('%s:消费数据|%s' % (current_process().name,pickle_obj))
      def main():
              a,b = Pipe() #创建管道
              p1 = Process(target=work_a,name='生产者',kwargs={'p':a})
              p2 = Process(target=work_b,name='消费者',kwargs={'p':b})
              p1.start()
              p2.start()
              p1.join()
              p2.join()
      if __name__ == "__main__":
          main()
    • 运行结果:
      生产者:生产数据|0
      生产者:生产数据|1
      生产者:生产数据|2
      生产者:生产数据|3
      生产者:生产数据|4
      消费者:消费数据|0
      消费者:消费数据|1
      消费者:消费数据|2
      消费者:消费数据|3
      消费者:消费数据|4   
    • #pipe 多个消费者生产者模型  一个消费者只消费一个数据,公司里面常用的模型,一个管道支持多个进程通信,非常安全的操作
      from multiprocessing import Process,current_process,Pipe
      import pickle
      def work_a(p):
              for var in range(5):
                      p.send(pickle.dumps(var))
                      print('%s:生产数据|%s' % (current_process().name,var))
      def work_b(p):       
              pickle_obj = pickle.loads(p.recv()) #阻塞
              print('%s:消费数据|%s' % (current_process().name,pickle_obj))
      def main():
              a,b = Pipe() #创建管道
              p1 = Process(target=work_a,name='生产者',args=(a,))
              p2 = [] #消费者进程队列
              for var in range(5):
                      p2.append(Process(target=work_b,name='消费者%d' % var,args=(b,)))
              p1.start() #下开启生产者
              for var in p2:
                      var.start()
              p1.join()
              for var in p2:
                      var.join()
      if __name__ == "__main__":
          main()
      

      运行结果:

      生产者:生产数据|0
      生产者:生产数据|1
      生产者:生产数据|2
      生产者:生产数据|3
      生产者:生产数据|4
      消费者1:消费数据|0
      消费者2:消费数据|1
      消费者3:消费数据|2
      消费者0:消费数据|3
      消费者4:消费数据|4    

    多进程状态通讯--Event

    • from multiprocessing import Event
    • e = Event() e:状态实例
    • e.set() #设置当前的实例状态为True
    • e.clear() #设置当前的实例状态为False
    • e.wait()
      • False:阻塞等待
      • True:向下执行
    • from multiprocessing import Event,Process,current_process
      from time import sleep
      import sys
      def work(e):
      	print('%s已开启' % current_process().name)
      	sys.stdout.flush() #刷新输出缓冲区,立竿见影看到打印
      	e.wait() #Sleep 可中断睡眠
      	print('%s正式开始工作' % current_process().name)
      	sys.stdout.flush()
      	print('%s|pid:%s' % (current_process().name,current_process().pid))
      def main():
      	print('主进程开启')
      	e = Event()
      	p1 = Process(target=work,name='进程1',args=(e,))
      	p2 = Process(target=work,name='进程2',args=(e,))
      	p1.start() #进程已经开启
      	p2.start()
      	for var in range(3):
      		print(var)
      		sleep(1) #- e.wait()
      	e.set() #信号变True
      	p1.join()
      	p2.join()
      if __name__ == '__main__':
      	main()  
    • 运行结果:

      主进程开启
      0
      进程2已开启
      进程1已开启
      1
      2
      进程1正式开始工作
      进程2正式开始工作
      进程1|pid:3108
      进程2|pid:6068
      

        

        

        


     

  • 相关阅读:
    一个表对应另一个表中多个主键的查询方法(把一个表当成两个表用)
    可以切换数据库的SqlHelper
    win7安装后的用户选择
    如何删除 Windows.old 文件夹
    Windows Server 2008磁盘清理工具
    sqlserver express版PRIMARY 大小不能超过4G
    一交换机,一光猫、一路由器组internet网的方法
    公司部门职责清晰
    IIS下载EXE(拾遗)
    win2008 IIS 7.0中WebDAV
  • 原文地址:https://www.cnblogs.com/zhangan/p/10214157.html
Copyright © 2011-2022 走看看