zoukankan      html  css  js  c++  java
  • 进程模块multiprocessing

    进程模块multiprocessing

    进程的创建 -用Process

    • 注册进程Precess类

        #导入Process类
        from multiprocessing import Process
        
        #创建一个函数
        def fun():pass	 
        #将这个函数的运行注册进一个新的进程中
        p = Process(target=fun)	#注册进程,传入的是地址
        p.start()	#开始这个进程
      

    进程号的查看 -os.getpid() -os.getppid()

    • 查看进程的进程号os.getpid()

        import os
        from multiprocessing improt Process
        #创建一个函数,获取这个进程的进程号
        def func():
        	print('子进程:'os.getpid())
      
        p = Process(target=fun)	#注册进程
        p.start	#开始进程
        print('父进程: ', os.getpid())	#打印主进程的进程号
        
        #结果
        子进程: 1234
        父进程: 6789
      
    • 查看父进程os.getppid()

        import os
        from multiprocessing improt Process
        #创建一个函数,获取这个进程的进程号
        def func():
        	print('子进程的父进程:'os.getpiid())
      
        p = Process(target=fun)	#注册进程
        p.start	#开始进程
        print('父进程的父进程: ', os.getpiid())	#打印主进程的进程号
        
        #结果
        子进程的父进程: 6789	#当前进程 
        父进程的父进程: 1470	#pycharm运行的进程
      

    给进程传入参数 -args=(tuple)

    • 新进程给函数传参数

        '''注册进程'''
        from multiprocessing import Process
        def fun(args):pass		#函数需要传入一个参数 
        
        p = Process(target=fun, args=(canshu1,))	#给args传入就行,但是传入的是一个元祖	
        	
        p.start()	#开始这个进程
      

    进程的开始结束 -主进程和子进程的存活关系

    • 默认情况下

      • 个进程互不干扰,主进程的结束不影响子进程的结束

          import time
          from multiprocessing import Process
          def fun():
              time.sleep(3)
              print('子进程结束')
          if __name__ == '__main__':	#启动进程必须判断这个
          	p = Process(target=fun)
          	p.start()
          	
          	print('主进程结束')
          #结果
          主进程结束
          子进程结束
        
    • join()待子进程结束后程序继续进行

        import time
        from multiprocessing import Process
        def fun():
            print(1234)
            time.sleep(3)
        if __name__ == '__main__':    
        	p = Process(target=fun)
        	p.start()
        	p.join()	#这里是子进程的结束位置,父进程等待子进程结束后继续运行
        	print('您好')
        #结果
        1234
        您好	#停顿3秒后您好才被显示出来
      

    开始多个子进程

    • 开始多个子进程

        from multiprocessing import Process
        def fun(num):
        	print('进程{}正在运行'.format(num))
        	
        if __name__ == '__main__':	
        	for i in range(3):
        		p = Process(target=fun, i)
        		p.start()
        #结果
        进程0正在运行
        进程1正在运行
        进程2正在运行
      
    • 等待全部子进程结束后运行下面的代码

        from multiprocessing import Process
        def fun(num):
        	print('进程{}正在运行'.format(num))
        	
        if __name__ == '__main__':
        	p_list = []
        	for i in range(3):
        		p = Process(target=fun, i)
        		p_list.append(p)
        		p.start()
        	[p.join() for p in p_list]	#关键之处
        	prnt('所有的子进程全部结束')
        #结果	#__结果的输出进程不一定是按照顺序输出的__
        进程2正在运行
        进程0正在运行
        进程1正在运行
        所有的子进程全部结束
      

    利用类创建进程

    	from multiprocessing import Process
    	import os
    	class MyProcess(Process):
    		def __init__(self, arg1, arg2):
    			super().__init__()	#必须继承父类的__init__()
    			self.arg1 = arg1
    			self.arg2 = arg2
    		
    	    def run(self):	#必须有一个run()方法
    	        print(os.getpid())
    	if __name__ == '__main__':
    		p = MyProcess()
    		p.start()		#调用run()方法
    

    多进程之间的数据隔离问题

    • 父进程和子进程之间的数据是隔离开的

    守护进程

    • 子进程-->守护进程 -设置daemon = True
      • 守护进程:主进程结束,守护进程强制被结束
      • 守护进程结束看主进程是否结束,不看其他子进程是否结束

    进程类的其他方法及其属性

    方法

    • p.is_alive() 检测子进程是否存在
    • p.terminate() 结束一个子进程,并不是立即结束,需要操作系统响应
    • p.start() 开始一个进程

    属性

    • p.name 查看进程的名字
    • p.pid 查看进程号
    • p.daemon = Ture 将进程转换成守护进程

    进程锁 - lock模块 -涉及数据安全问题

    • 给一段代码加锁之后只能被一个进程同时调用

        import lock
        lock = lock() 
        
        lock.acquire()	#钥匙
        '''被加锁的代码块'''
        lock.release()	#上交钥匙
      

    进阶系列

    进程的进阶

    信号量 -multiprocessing.Semaphore()

    • 限制进程访问的代码块

    • 有多把钥匙的房间

        from multiprocessing import Semaphore
        from multiprocessing import Process
        from time import sleep
        
        def func(i, sem):
            sem.acquire()
            print('{}进去唱歌了'.format(i))
            sleep(1)
            print('{}唱歌结束出来了'.format(i))
            sem.release()
        
        if __name__ == '__main__':
            sem = Semaphore(4)
            for i in range(10):
                p = Process(target=func, args=(i, sem))
                p.start()
        #结果
        0进去唱歌了
        1进去唱歌了
        2进去唱歌了
        3进去唱歌了
        0唱歌结束出来了
        4进去唱歌了
        1唱歌结束出来了
        5进去唱歌了
        2唱歌结束出来了
        6进去唱歌了
        3唱歌结束出来了
        7进去唱歌了
        4唱歌结束出来了
        8进去唱歌了
        5唱歌结束出来了
        9进去唱歌了
        6唱歌结束出来了
        7唱歌结束出来了
        8唱歌结束出来了
        9唱歌结束出来了
      

    事件 -multiprocessing.Event()

    • 一个信号可以使得所有的进程进入阻塞状态

    • 也可以控制所有的进程接触阻塞

    • 一个事件被创建出来默认是阻塞状态

        #事件创建默认是阻塞的
        from multiprocessing import Event
        
        event = Event()	#创建一个事件叫event
        print(event.is_set())	#检查事件的阻塞,False为阻塞,True为非阻塞状态
        event.wait()	#阻塞状态会等待,非阻塞状态会运行下面代码
        print('这句话不能被打印')
        #结果
        False
        ————————————————————————————————————————————————
        #下面是通过设置改变阻塞状态
      
      • .is_set()用来查看事件的阻塞状态
    • .set().is_set()设置成True,变成非阻塞状态

        #关闭事件的阻塞
        from multiprocessing import Event
        
        event = Event()
        event.set()
        event.wait()
        print(event.is_set())
        print('现在阻塞被关闭')
        #结果
        True
        现在阻塞被关闭
      
    • enent.clear() 再次打开阻塞

        #再次打开阻塞
        from multiprocessing import Event
        
        event = Event()
        event.set()
        event.wait()
        print('正常打印消息')
        
        event.clear()	#开启阻塞
        event.wait()
        print('这条消息将不被输出')
        #结果
        正常打印消息	
      

    红绿灯案例

    	from multiprocessing import Event, Process
    	import time, random
    	
    	def light(e,):
    	    while 1:
    	        if e.is_set():
    	            e.clear()  # 开启阻塞
    	            print('33[31mO33[0m')  # 红灯
    	        else:
    	            e.set()
    	            print('33[32mO33[0m')  # 绿灯
    	
    	        time.sleep(2)
    	
    	
    	def car(e, i):
    	    if not e.is_set():
    	        print('{}正在等待'.format(i))
    	        e.wait()
    	    print('{}通过路口'.format(i))
    	
    	if __name__ == '__main__':
    	
    	    e = Event()
    	    tra = Process(target=light, args=(e,))
    	    tra.start()
    	    i = 0
    	    while 1:
    	        p = Process(target=car, args=(e, i))
    	        p.start()
    	        time.sleep(random.random())
    	        i += 1
        #结果
        绿灯
    	0通过路口
    	1通过路口
    	2通过路口
    	3通过路口
    	4通过路口
    	红灯
    	5正在等待
    	6正在等待
    	7正在等待
    	8正在等待
    	绿灯
    	5通过路口
    	7通过路口
    	8通过路口
    	6通过路口
    	9通过路口
    	10通过路口
    	11通过路口
    	12通过路口
    	红灯
    	13正在等待
    	14正在等待
    	15正在等待
    	16正在等待
    

    进程之间的通信 -队列和管道

    进程间的通信 -IPC

    队列

    • 先进先出
    • 满了或者队列空了都会发生阻塞
      • from multiprocessing import Queue 导入队列

      • q = Queue() 创建队列对象

      • .put() 放数据,满了就阻塞

      • .get() 取数据,取空就阻塞

      • .full() 判断队列是否满了

      • .enpty() 判断队列是否为空

      • .put_nowait() 向队列放元素,队列满了就报错

      • .get_nowait() 取元素,取空就报错

          from multiprocessing import Queue
          
          q = Queue(5)	#队列允许放置5个元素,可选参数
          q.put(1)
          q.put(2)
          q.put(3)
          q.put(4)
          q.put(5)
          print(q.pull())	#判断队列是否满了
          print(q.empty())	#判断队列是否空了
          for i in range(5):
          	print(q.get())
        

    利用队列完成两个进程的数据交换 -IPC

    from multiprocessing import Queue, Process
    
    def put_thing(q):
        for i in range(5):
            q.put(i)
    
    def get_thing(q): 
        print(q.get())
    
    if __name__ == '__main__':
    
        q = Queue()
        p_put = Process(target=put_thing, args=(q,))
        p_put.start()
    
        for i in range(5):
            p_get = Process(target=get_thing, args=(q,))
            p_get.start()
    

    队列模型 -生产者消费者模型

    模型一 Queue模块

    • 需要向消费者告知结束关键字

        from multiprocessing import Process, Queue
        import time
        import random
        
        def producer(q, name, food):
            for i in range(3):
                time.sleep(random.random())
                msg = '{}生产了第{}泡{}'.format(name, i, food)
                print(msg)
                q.put(i)
        
        def customer(q, name, food):
            while 1:
                i = q.get()
                print(i)
                if i == 'nihao':
                    print('getNoneR')
                    break
        
                time.sleep(random.random())
                print('{}消费了第{}泡{}'.format(name, i, food))
        
        if __name__ == '__main__':
            q = Queue()
            pname = 'Alex'
            cname = '你'
            food = '狗屎'
            produ_proce = Process(target=producer, args=(q, pname, food))
            cust_proce = Process(target=customer, args=(q, cname, food))
        
            produ_proce.start()
            cust_proce.start()
            produ_proce.join()
            q.put('nihao')  #生产者消费结束向队列添加一个结束关键字 
      

    模型二 -改进版JoinableQueue()

    • 改进版结合守护进程使用
    • JoinableQueue()Queue()的区别
      • JoinableQueue()加入了两个方法
        • .task_done() 消费者每消费一次就要运行一次,用来减去模块中的计数
        • .join() 等待队列中元素被取完,否则阻塞
      • from multiprocessing import Process, JoinableQueue
        import time
        import random
        	
        def producer(q, name, food):
            for i in range(10):
                time.sleep(random.random())
                msg = '33[31m{}生产了第{}泡{}33[0m'.format(name, i, food)
                print(msg)
                q.put(i)
            q.join()	#判断队列是否被取完,取完后才执行,否则阻塞
        	
        def customer(q, name, food):
            while 1:
                i = q.get()
                time.sleep(random.random())
                print('{}消费了第{}泡{}'.format(name, i, food))
                q.task_done()	#每取一个元素,让队列的计数器减1
        	
        if __name__ == '__main__':
            q = JoinableQueue()
            pname = 'Alex'
            cname = '你'
            food = '狗屎'
            produ_proce = Process(target=producer, args=(q, pname, food))
            cust_proce = Process(target=customer, args=(q, cname, food))
            produ_proce.daemon = True	#将子进程设置成守护进程
            cust_proce.daemon = True	#将子进程设置成守护进程
            
            produ_proce.start()
            cust_proce.start()
        	
            produ_proce.join()
  • 相关阅读:
    [转]Greenplum的工作负载及资源管理
    [转]Tomcat中的Session小结
    [转]Class.forName()的作用与使用总结
    [转]如何在 Git 里撤销(几乎)任何操作
    [转]session和cookie的区别和联系,session的生命周期,多个服务部署时session管理
    piwik获取访客头像,自定义显示访问者头像(URL)和描述(标题和替代)
    php解析url并得到url中的参数及获取url参数
    php结合phantomjs实现网页截屏、抓取js渲染的页面
    利用PhantomJS进行网页截屏,完美解决截取高度的问题
    多线程编程
  • 原文地址:https://www.cnblogs.com/liliudong/p/9732488.html
Copyright © 2011-2022 走看看