zoukankan      html  css  js  c++  java
  • 多进程

    python3.6

    多进程

    多进程

    1 多进程

     

    1.1 linux/unix/win 启动方式对比

    在不同平台上系统使用的多进程机制是不一样的,所以在python实现中有三种不同的开启多进程的方式

    1.1.1 spawn win的默认方法,支持unix/win

    父进程开启一个新的python解释程序 子进程只获得足够运行run()方法的资源 父进程的文件描述符和句柄不被继承 此方式的速度在三种方式中最慢

    1. run() 该方法是target的参数,参数是一个可调用对象即可

      经测试,当run调用的是function时,资源仍无法保留,只有调用一个可调用对象且初始化方法(init)除self外有其他参数才行

      def hello():
          pass
      class Hello():
          def __init__(self,name):
      	print(name)
      hello.__init__()  # 不需要传参,所以资源无法保留
      hello_cla = Hello('name')
      hello_cla.__init__('name')  # 有传参,资源才能保留
      

      实例:

      from multiprocessing import Process, Queue
      import multiprocessing as mp
      import os
      
      def hello_procs(name):
          print("这是{}{}进程".format(name,os.getpid()))
          print("父进程ID%s"%os.getppid())
          try:
      	print(sourceA)
      	print("调用成功,因为未使用spawn方式")
          except Exception as e:
      	print("调用失败,因为该资源不是必须资源")
      	print(e)
      
      
      class Sou():
          def __init__(self,soua):
      	self.soua = soua
      
          def show_soua(self):
      	print(self.soua)
          def __call__(self):
      	self.show_soua()
      
      if __name__ == '__main__':
          sourceA = "AA"
          sourceB = "BB"
          # 使用forkserver在资源继承方面与spawn表现一致
          mp.set_start_method('spawn')
          print("当前进程的进程id是%s"%os.getpid())
          #p = Process(target=hello_procs, args=('第一个进程',))
          a = Sou(sourceA)
          p = Process(target=a)
          """
          上面的是不带参数的传递方式,如需要给可调用函数传递参数,那么需要修改为__call__(self,参数)
          再接下来就是修改Process参数为(target=a,args=('参数',))
          """
          print('开始进入子进程')
          p.start()
          p.join()
          print("进程结束")
      

    1.1.2 fork unix的默认方法,仅支持unix

    使用unix的fork()[os.fork()]来创建一个当前解释器的子进程 子进程获得父进程全部的资源 此方式的安全问题不好控制

    1.1.3 forkserver 当平台支持unix的管道文件时该方法可用

    在使用这种方式时,开启多进程会开启一个额外的服务进程 当需要一个子进程时父进程去请求服务进程并得到一个子进程 由于服务进程是单线程的,所以该方式是线程安全的

    1.1.4 启动子进程

     
    1. 创建进程对象 表示在单独进程中运行的活动

      Process(group=None,target=None,name=None,args=(),kwargs={},*,daemon=None) group一直为None即可,只是为了和threading.Thread兼容 target 是run()要调用的对象,需要是可执行的 name 无实意,名字 args 可调用对象的位置参数 kwargs 可调用对象的关键字参数 daemon True/False/None 与过程继承有关

    2. run() 表示进程活动的方法
    3. start() 启动进程的活动
    4. isalive() 检测子进程是否存活,只能检测子进程
    5. join([timeout]) 阻塞调用该方法的进程

      None 阻塞,直到进程完毕 正数 阻塞timeout秒

    6. daemon 标识该进程是否为守护进程,True是,False不是,None从上层继承

      主进程不是守护进程,所以只要不明确指定为True,那么创建的所有进程都不是守护进程

      from multiprocessing import Process as ps
      import sys
      
      def hello(*,a=None):
          if a == None:
      	pass
          else:
      	print(a)
      
      
      def not_sh():
          global pish,pfsh,ex
          print("非守护进程%s"%ex)
          pish = ps(target=is_sh, args=(1,), daemon=True)
          pfsh = ps(target=is_sh, args=('非守护进程',)) #daemon为None从上层继承属性
          pish.start()
          pfsh.start()
          for i in range(1): #当该值为1时可以看到守护进程在打印出消息前就退出了
      	print("第二个主进程")
      
          ex="有变化"
      
      def is_sh(jc_type):
          if jc_type == 1:
      	print("守护进程")
      	for i in range(1000):
      	    print("守护进程")
          else:
             print()
             print("不是守护进程")
             for i in range(100):
      	   print("非守护进程")
      
      
      
      if __name__ == '__main__':
          p = ps(target=hello, name='hello', args=(), kwargs={'a':'A'})
          print("进程是否存活%s"%p.is_alive())
          p.start()
          print("守护进程daemon? %s"%p.daemon)
          print("进程是否存活%s"%p.is_alive())
      
          # 创建一个非守护进程
          pish, pfsh =None,None
          ex = "原始"
      
          pnsh = ps(target=not_sh)
      
          pnsh.start()
          pnsh.join()
          print(ex)
      

    1.1.5 选择启动的方法

    import multiprocessing
    multiprocessing.set_start_method('spawn') #传入方式的名字
    # 该方法在程序中至多使用一次
    

    1.1.6 其他内容

     
    1. 使用global可以将数据发送到子进程,但是子进程对数据的修改不会反馈到父进程

    1.2 进程通信

     

    1.2.1 队列 Queue

     
    1. Queue([maxsize]) 创建并可以设置最大值
    2. qsize() 返回队列大致大小(不准确是因为并发) 在MAC上回引发异常NotImplementedError
    3. empty() 是否为空,不准确
    4. full() 队列是否已满,不准确
    5. put(obj[,block[,timeout]])
    6. putnowait(obj) 相当于put(obj,False)
    7. get()
    8. getnowite()
    9. close() 没有返回值,关闭队列,后台线程将数据一次性刷新到管道当关闭后仍去操作会抛出异常OSError: handle is closed
    10. jointhread() 只能在close()后调用,他阻塞直到后台线程退出,确保数据刷新到管道
    11. canceljointhread() 立即关闭队列,不等待后台线程将数据刷新到管道

    1.2.2 TODO 管道 Pipes

    1.2.3 实例

    """进程间通信"""
    from multiprocessing import Process, Queue, Pipe
    import os, random
    
    def write(m):
        print('进程:%s'%os.getpid())
        m.put('数据A')
        """
        put(obj[,block[,timeout]])
        将值放入队列
        当block为True(默认值),且timeout为None(默认值)时,不会抛出异常,会一直等到可以入队时将值入队
        当timeout为正值时,等待timeout秒,超时则抛出queue.Full异常
        当block为False时,一旦无法入队立即抛出异常
        """
    
    def read(m):    
        print('进程:%s'%os.getpid())
        try:
    	print(m.qsize())
        except Exception as e:
    	print("在MAC上会引发异常")
        finally:
    	value = m.get()
    
        """
        get([block[,timeout]])
        获取一个值后删除
        当block为True(默认值)且timeout为None(默认值),那么只有当队列中有内容时获取值
        timeout为正数时,当队列中无值时阻塞timeout秒,而后仍无值则抛出queue.Empty异常
        block为False时一旦无值立即抛出异常
        """
        print(value)
    
    if __name__ == '__main__':
        q = Queue()
    
        pw = Process(target=write, name='写进程', args=(q,))
        print('开始写入数据 %s'%pw.name, end=' :  ')
        pw.start()
    
        pr = Process(target=read, name='读进程', args=(q,))
        print('开始读取数据 %s'%pr.name, end=' :  ')
        pr.start()
    
    # TODO Pipe 通过管道传递消息
    

    1.3 进程同步

     

    1.3.1 锁Lock

    一旦进程,线程获得了锁,那么随后的任何进程,线程在获取锁时将阻塞

    1. acquire(block=True, timeout=None)

      当block为True,方法调用将阻塞,直到解锁 当block为True时,timeout为正数,那么最多只能被阻塞timeout秒,当timeout为负数,阻塞时长为0,当为None一直阻塞

    2. release() 释放锁

      一把锁可以被任意对象释放,未必是上锁的对象来解锁

    1.3.2 实例

    from multiprocessing import Process, Lock
    
    def show_lock(l):
        #l.release() 在try_get_lock中上的锁可以在这里解开
        l.acquire(True,-1)
        # 超时时长为负数,即使被锁定也会执行
        print("函数正常执行")
    
    def try_get_lock(l):
        l.acquire()
        print("获得了锁")
    #    l.release()
    
    if __name__ == '__main__':
        l = Lock()
        #l.acquire(True)
        pg = Process(target=try_get_lock,args=(l,))
        pg.start()
        ps = Process(target=show_lock,args=(l,))
        ps.start()
    

    1.4 进程池 Pool

     

    1.4.1 创建Pool([processes[,initalizer[,initargs[,maxtasksperchild[,context]]]])

    processes 进程的数量 initializer 如果不为None,则在每个工作进程启动时调用initializer(*initargs) maxtasksperchild context 工作进程的上下文

    该类实现了上下文管理

    1. apply(func[,args[,kwds]])

      使用args,kwds调用func,直到结果完成

    2. applyasync(func[,args[,kwds[,callback[,errorcallback]]]])

      返回一个结果对象 返回的对象是AsyncResult 当指定callback(一个接受单参数的可调用对象)时,完成时会调用callback,调用失败则调用errorcallback 回调应该立即完成,否则线程将会阻塞

    3. map(func,iterable[,chunksize])

      与内置函数map()相同,它阻塞直到map完成

    4. mapasync(func,iterable[,chunksize[,callback[,errorcallback]]])

      返回结果的map()

    5. imap(func,iterable[,chunkszie])

      惰性map() chunkszie参数与map()方法的参数相同

    6. starmap(func,iterable[,chunksize]) iterable必须为可迭代对象

      需要注意'abc'也是可迭代对象,一旦加上(),('abc')更不行 func,('abc',) 会给func传入三个参数,而不是一个整体 正确做法 传入(('abc',),),同理,传入其他可迭代内容也可以这样做

    7. starmapasync(func,iterable[,chuunksize[,callback[,errorback]]])

      将iterable拆分后调用func并返回一个结果对象

    8. close()

      一旦任务完成,退出进程

    9. terminate()

      立即停止进程并退出

    10. join()

      等待进程结束,在此之前必须调用close或terminate

    1.4.2 AsyncResult applyasync,mapasync与starmapasync的返回对象

     
    1. get([timeout]) 返回结果,并要求在timeout秒内到达,当timeout不为None时,N秒内未到达抛出异常TimeoutError
    2. wait([timeout]) 等待结果或直到N秒超时
    3. ready() 判断返回是否就绪
    4. successful() 返回调用是否完成且无异常

    1.4.3 实例

    from multiprocessing import Pool, TimeoutError, Process
    import time
    import os
    
    def proc_pool(name):
        print("asd")
        for i in range(5):
    	print(str(i)+' : %s')
        #return "返回的结果值","有两个会怎样?"  不要返回一个以上的值,会导致map调用产生歧义(使用map(func,[1,2])时会返回[返回值1,返回值2]而不是[(返回值1,返回值2),(返回值1,返回值2)]) 当需要返回两个值要显式的返回一个元组
        #return ("返回的结果值","第二个值")
        return "返回值"
    
    def proc_err(name):
        raise Exception
    
    def proc_mm(name):
        print('该函数被调用了%s%s'%(name,type(name)))
        return name
    
    if __name__ == '__main__':
        print("开始启动线程池")
        p = Pool(4)
    
        for i in range(5):
    	p.apply_async(proc_pool, args=('cc',))
    
        #p.map(proc_pool,['cc','dd'])
        def callback(name):
    	print("回调函数%s")
    
        def err_callback(err):
    	try:
    	    print("yc")
    	except Exception as e:
    	    print('发生异常')
    	finally:
    	    print("ww")
    
        mapr = p.map_async(proc_err, 'ee', 3, callback, err_callback)
        #mapr.get() 获得可调用对象的返回值
        #print("返回的结果值%s"%mapr.get())
    
        #mmap = p.starmap(proc_mm,[('abcd'),('a')])
        mmap = p.starmap(proc_mm,((('abc',),),))
        mmaps = p.starmap_async(proc_mm,((('abc',),),))
        list(mmap)
        print(mmaps.get())
        p.close()
        p.join()
    

    1.5 资源共享

     

    1.5.1 使用Array,Value作为存储空间来保存需要共享的资源

    Value(typecodeortype, *args, lock=True) Array(typecodeortype, sizeorinitializer, *, lock=True)

    1. Array,Value的共通之处

      在创建存储空间时,在lock的参数选取上,默认情况是自己创建一个资源锁 但是也可以选择使用一个已经存在的锁,当lock被传入一个已存在的锁时受该锁影响 当设置为False时,资源不被锁保护,导致线程不安全

      tyoecodeortype 都是array模块使用的类型代码 array 表示基本类型的数组,有:字符,整数,浮点数

    2. 区别

      Array存储一个队列,Value存储一个值 Array的sizeofinitializer就是保存的数组,同时该数组的长度也是Array的长度

    1.5.2 实例

    """
    进程共享内容
    使用Value,Array使内容共享
    """
    
    from multiprocessing import Process, Value, Array, Lock
    
    def f(n, a):
        n.value = 3.1415927
        for i in range(len(a)):
    	a[i] = -a[i]
    
    def fun(l, strr):
        # 获得锁,当被锁住,等待最多3秒,继续执行
        l.acquire(True,3)
        try:
    	print(num.value)
    	print(strr.value)
    	print(chr(strr.value))
        except Exception as ex:
    	print(ex)
        finally:
    	print('完成')
    
    if __name__ == '__main__':
        l = Lock()
        num = Value('d', 0.0)
        arr = Array('i', range(10))
        lisi = [1,2,3]
        arrs = Array('i',lisi)
        # 因为python中没有char类型,所以在这里只能转换为数字,最后在转回来
        strr = Value('b',ord('c'))
        p = Process(target=f, args=(num, arrs))
        pl = Process(target=fun, args=(l,strr))
        # 上锁
        l.acquire()
        p.start()
        p.join()
        pl.start()
        pl.join()
        print(num.value)
        print(arrs[:])
    

    1.5.3 使用服务进程server process

     
    1. 使用Manager()会返回一个管理对象

      该管理对象支持的类型更广泛,有: list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array

      该类实现了上下文管理

    2. Manager的两个子类,Manager()返回的就是SyncManager
      1. BaseManager([adress[,authkey]])

        adress是管理器进程侦听新链接的地址,None为随机选一个 authkey是认证密匙,None为使用currentprocess().authkey,否则使用authkey,必须为字符串 currentprocess() 返回当前Process对象 authkey 进程的认证密钥(字节字符串) 当初始化multiprocessing时,使用os.urandom()为主进程分配一个随机字符串 当创建Process对象时,它将继承其父进程的认证密钥 但可以通过将authkey设置为另一个字节字符串来更改。

        1. start([initializer[,initargs]])

          启动子过程以启动管理器

        2. getserver()

          返回Server对象,他表示在manger控制下的实际服务器

        3. connect()

          本地管理器对象链接到远程管理器进程

        4. shutdown()

          停止manager进程,仅当启动使用start()时可用

        5. register(typeid[, callable[, proxytype[, exposed[, methodtotypeid[, createmethod]]]]])

          向Manager注册类型或可调用的类方法

          typeid 用于标识特定类型的共享对象的类型标识符,必须是字符串 callable 用于typeid类型的可调用选项, proxytype 是BaseProxy的子类,用于创建typeid的共享对象代理,None,自动创建 exposed 用于指定代理类型所使用的方法 methodtotypeid 返回代理类型的公开方法 createmethod 确定是否使用typeid创建方法,默认为True

      2. SyncManager

        BaseManager 主要用来创建自定义的Manager

        1. Queue([maxsize]) 创建queue.Queue对象返回其代理

          在进程通信中展示了部分Queue队列的使用方法

        2. Array(typecode,sequence) 创建一个数组并返回其代理

          在进程共享中展示了部分Array的用法

        3. Value(typecode,value) 创建一个值并返回其代理

          在进程共享中展示了部分Value的用法

        4. dict([dict]) 创建一个dict,并返回其代理
        5. list([list]) 创建一个list,并返回其代理
        6. Lock() 创建一个threading.Lock对象并返回其代理
      3. 实例
        from multiprocessing import Process, Manager
        def f(d, l, q, a, v, lo):
            d[1] = '1'
            d['2'] = 2
            d[0.25] = None
            q.put(100)
            lo.acquire(True,3)
            for i in range(len(a)):
        	a[i]=1
            v,value = 100
            l.reverse()
        
        
        if __name__ == '__main__':
            with Manager() as manager:
        	d = manager.dict()
        	l = manager.list(range(10))
        	q = manager.Queue(10)
        	a = manager.Array('i',[1,2,3])
        	v = manager.Value('i',3)
        	lo = manager.Lock()
        	lo.acquire(True)
        	p = Process(target=f, args=(d, l, q, a, v, lo))
        
        	p.start()
        	p.join()
        	print(d)
        	print(l)
        	print("********")
        	print(q.get())
        	print(a[:])
        	print(v.value)
  • 相关阅读:
    工作需求----表单多选框checkbox交互
    工作需求——JQ小效果分享下
    JQ返回顶部代码分享~~~~
    css 分享之background-attachment 属性
    CSS3学习之分享下transition属性
    css 伪元素分享!!!
    phpcm v9 任意调用分页/phpcm v9首页调用分页不起作用或者乱码
    facebook分享不能显示图片链接问题
    Fatal error: Uncaught SoapFault exception:解决办法
    VUE项目 启动提示 npn ERRT nissing script: dev解决办法
  • 原文地址:https://www.cnblogs.com/zhisy/p/6690699.html
Copyright © 2011-2022 走看看