zoukankan      html  css  js  c++  java
  • 并发编程之多进程模块、进程间通讯 -3

    多进程模块 multiprocessing

    进程的调用

    1

    from multiprocessing import Process
    import time
    def f(name):
        time.sleep(1)
        print('hello', name,time.ctime())
    
    if __name__ == '__main__':
        p_list=[]
        for i in range(3):
            p = Process(target=f, args=('tom',))
            p_list.append(p)
            p.start()
        for i in p_list:
            p.join()
        print('end')
    View Code

    2

    from multiprocessing import Process
    import time
    
    class MyProcess(Process):
        def __init__(self):
            super(MyProcess, self).__init__()
            #self.name = name
    
        def run(self):
            time.sleep(1)
            print ('hello', self.name,time.ctime())
    
    
    if __name__ == '__main__':
        p_list=[]
        for i in range(3):
            p = MyProcess()
            p.start()
            p_list.append(p)
    
        for p in p_list:
            p.join()
    
        print('end')
    View Code
    from multiprocessing import Process
    import os
    import time
    def info(title):
      
        print("title:",title)
        print('parent process:', os.getppid())
        print('process id:', os.getpid())
    
    def f(name):
        info('function f')
        print('hello', name)
    
    if __name__ == '__main__':
        info('main process line')
        time.sleep(1)
        print("------------------")
        p = Process(target=info, args=('jerry',))
        p.start()
        p.join()
    View Code

    Process类

    构造方法:

    Process([group [, target [, name [, args [, kwargs]]]]])

      group: 线程组,目前还没有实现,库引用中提示必须是None; 
      target: 要执行的方法; 
      name: 进程名; 
      args/kwargs: 要传入方法的参数。

    实例方法:

      is_alive():返回进程是否在运行。

      join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

      start():进程准备就绪,等待CPU调度

      run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

      terminate():不管任务是否完成,立即停止工作进程

    属性:

      daemon:和线程的setDeamon功能一样

      name:进程名字。

      pid:进程号。

    import time
    from  multiprocessing import Process
    
    def foo(i):
        time.sleep(1)
        print (p.is_alive(),i,p.pid)
        time.sleep(1)
    
    if __name__ == '__main__':
        p_list=[]
        for i in range(10):
            p = Process(target=foo, args=(i,))
            #p.daemon=True
            p_list.append(p)
    
        for p in p_list:
            p.start()
        # for p in p_list:
        #     p.join()
    
        print('main process end')
    View Code

    进程间通讯

    进程对列Queue

    from multiprocessing import Process, Queue
    import queue
    
    def f(q,n):
        #q.put([123, 456, 'hello'])
        q.put(n*n+1)
        print("son process",id(q))
    
    if __name__ == '__main__':
        q = Queue()  #try: q=queue.Queue()
        print("main process",id(q))
    
        for i in range(3):
            p = Process(target=f, args=(q,i))
            p.start()
    
        print(q.get())
        print(q.get())
        print(q.get())
    View Code

    管道

    from multiprocessing import Process, Pipe
    
    def f(conn):
        conn.send([12, {"name":"jerry"}, 'hello'])
        response=conn.recv()
        print("response",response)
        conn.close()
        print("q_ID2:",id(child_conn))
    
    if __name__ == '__main__':
    
        parent_conn, child_conn = Pipe()
        print("q_ID1:",id(child_conn))
        p = Process(target=f, args=(child_conn,))
        p.start()
        print(parent_conn.recv())   # prints "[42, None, 'hello']"
        parent_conn.send("hello!")
        p.join()
    View Code

    Managers

    Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。

    from multiprocessing import Process, Manager
    
    def f(d, l,n):
        d[n] = '1'
        d['2'] = 2
        d[0.25] = None
        l.append(n)
        #print(l)
    
        print("son process:",id(d),id(l))
    
    if __name__ == '__main__':
    
        with Manager() as manager:
    
            d = manager.dict()
    
            l = manager.list(range(5))
    
            print("main process:",id(d),id(l))
    
            p_list = []
    
            for i in range(10):
                p = Process(target=f, args=(d,l,i))
                p.start()
                p_list.append(p)
    
            for res in p_list:
                res.join()
    
            print(d)
            print(l)
    View Code

    进程同步

    from multiprocessing import Process, Lock
    
    def f(l, i):
      
        with l.acquire():
            print('hello world %s'%i)
    
    if __name__ == '__main__':
        lock = Lock()
    
        for num in range(10):
            Process(target=f, args=(lock, num)).start()

    进程池

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

    进程池中有两个方法:

    • apply
    • apply_async
    from  multiprocessing import Process,Pool
    import time,os
    
    def Foo(i):
        time.sleep(1)
        print(i)
        return i+100
    
    def Bar(arg):
    
        print(os.getpid())
        print(os.getppid())
        print('logger:',arg)
    
    pool = Pool(5)
    
    Bar(1)
    print("----------------")
    
    for i in range(10):
        #pool.apply(func=Foo, args=(i,))
        #pool.apply_async(func=Foo, args=(i,))
        pool.apply_async(func=Foo, args=(i,),callback=Bar)
    
    pool.close()
    pool.join()
    print('end')
    View Code

    参考:http://www.cnblogs.com/yuanchenqi/articles/6248025.html

  • 相关阅读:
    [erlang 002]gen_server中何时会跑到terminate函数
    设计模式:桥接模式
    设计模式:组合模式
    Harbor:镜像上传和下载
    Harbor:简介和安装
    Docker:compose
    ThinkPHP的静态化页面方法
    php使用memcached详解
    大话PHP设计模式
    PHP魔术方法使用
  • 原文地址:https://www.cnblogs.com/MR-allen/p/10520032.html
Copyright © 2011-2022 走看看