zoukankan      html  css  js  c++  java
  • Python多进程的简单笔记。

    首先是通过os.fork创建多进程:

    参考链接:

    https://www.liaoxuefeng.com/wiki/1016959663602400/1017628290184064

    https://www.cnblogs.com/Lin-Yi/p/7360855.html

    import os
    print('主进程的pid:',os.getpid())
    pid1 = os.fork()
    print('子进程pid1:',pid1, os.getpid(),'',os.getppid())
    pid2 = os.fork()
    print('孙子进程pid2:',pid2,os.getpid(),'',os.getppid())
    print(123)
    主进程的pid: 44648
    子进程pid1: 44649 44648  1670
    孙子进程pid2: 44650 44648  1670
    123
    子进程pid1: 0 44649  44648
    孙子进程pid2: 0 44650  44648
    123
    孙子进程pid2: 44651 44649  44648
    123
    孙子进程pid2: 0 44651  44649
    123

     上面的代码,主要是分析os.fork创建子进程的方式,代码里面我两次os.fork进行了自进程的创建,打印了4次123,从进程编号可以看出来了。

    但pid1或pid2输出为0的时候,说明子进程或者孙子进程在工作。从打印出来的消息可以看出pid 44648创建了儿子进程44649与孙子44650,儿子44649创建了儿子44651。

    所以这里一共有4个进程并行在跑。

    os.fork虽然创建进程非常方便,但网上的资料也比较少,我看了很久也只琢磨了一点。重要一点,子进程必须要再父进程之前完成,后面我简单写一个。

    import os
    import time
    
    print(f'主进程:{os.getpid()}')
    pid = os.fork()
    if pid:
        print(f'主进程在运行{os.getpid()},子进程为{pid}')
    else:
        # time.sleep(1)
        print(f'子进程{os.getpid()}在运行,父进程为{os.getppid()}')
    主进程:44723
    主进程在运行44723,子进程为44724
    子进程44724在运行,父进程为44723
    
    import os
    import time
    
    print(f'主进程:{os.getpid()}')
    pid = os.fork()
    if pid:
        print(f'主进程在运行{os.getpid()},子进程为{pid}')
    else:
        time.sleep(1)
        print(f'子进程{os.getpid()}在运行,父进程为{os.getppid()}')
    
    主进程:44726
    主进程在运行44726,子进程为44727
    

     从运行可以看出,当子进程休息1秒的时间,主进程已经结束,主进程一旦结束,子进程将不在执行,由于该模块用的比较少,我就不仔细研究如何子进程守护了。

    后面主要介绍multiprocessing的使用,及通过Queue进行通讯。

    subprocess(独立开辟一个进程执行一些os的操作,暂时用不到,参考链接。)

    参考链接:https://www.520mwx.com/view/62337

    subprocess模块用于执行系统命令<!--more-->,其实有一个模块也支持执行系统命令,那个模块就是sys.system,但他执行系统命令会直接通过主进程去执行命令,那假如,该命令的执行需要耗费一个小时,那么主进程会卡一个小时,而不会去干别的事,这样就会导致程序的运行效率低下。

    如果由subprocess执行系统命令的时候并不会让主进程去执行,而是主进程会开辟出一个子进程去执行,并不会影响到主进程的运行,主进程该干嘛就干嘛,那么又有个问题,大家都知道进程之间的内存空间是独立的,也就是说进程之间是不能相互访问的,那么在subprocess中,有个管道的概念,既然固定死了进程之间不能相互访问,那么可以将执行命令的结果输出到管道里,该管道其实就是一块共享的内存空间,可以让主进程获取到该共享内存空间存放的数据

    参考:https://www.cnblogs.com/jiangfan95/p/11439207.html(感觉写的很仔细,比廖大的详细)。

     创建进程的类:
    
    Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
    
    强调:
    1. 需要使用关键字的方式来指定参数
    2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
    复制代码
    复制代码
    
    1 group参数未使用,值始终为None
    2 target表示调用对象,即子进程要执行的任务
    3 args表示调用对象的位置参数元组,args=(1,2,'anne',)
    4 kwargs表示调用对象的字典,kwargs={'name':'anne','age':18}
    5 name为子进程的名称
    
    复制代码
    复制代码

    创建并开启进程的两种方法

    第一种通过实例化的方式。

    import random
    import time
    import os
    from multiprocessing import Process

    def demo(name):
    time.sleep(random.random())
    print(f'进程:{name}正在执行,pid为{os.getpid()}, 父pid为{os.getppid()}')


    for i in range(5):
    t = Process(target=demo,args=(i,),name=f'进程{i}') # 参数为元祖
    print(t.name)
    t.start()

    print(f'主进程为{os.getpid()}')
    /usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n1.py
    进程0
    进程1
    进程2
    进程3
    进程4
    主进程为44853
    进程:4正在执行,pid为44858, 父pid为44853
    进程:2正在执行,pid为44856, 父pid为44853
    进程:1正在执行,pid为44855, 父pid为44853
    进程:0正在执行,pid为44854, 父pid为44853
    进程:3正在执行,pid为44857, 父pid为44853
    
    Process finished with exit code 0
    

     第二种通过继承的方法编写多进程。

    import random
    from multiprocessing import Process
    import time
    import os
    
    class MyProcess(Process):
        def __init__(self,name):
            super(MyProcess, self).__init__()
            self.name = f'我是进程{name}'
    
        def run(self) -> None:
            time.sleep(random.random())
            print(f'进程:{self.name}正在执行,pid为{os.getpid()}, 父pid为{os.getppid()}')
    
    
    for i in range(5):
        t = MyProcess('a' + str(i))
        print(t.name)
        t.start()
    
    
    print(f'主进程为{os.getpid()}')
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n2.py
    我是进程a0
    我是进程a1
    我是进程a2
    我是进程a3
    我是进程a4
    主进程为44956
    进程:我是进程a1正在执行,pid为44958, 父pid为44956
    进程:我是进程a4正在执行,pid为44961, 父pid为44956
    进程:我是进程a2正在执行,pid为44959, 父pid为44956
    进程:我是进程a0正在执行,pid为44957, 父pid为44956
    进程:我是进程a3正在执行,pid为44960, 父pid为44956
    
    Process finished with exit code 0
    

     join让子进程产生阻塞,默认join为无限阻塞,但可以传入时间,设置阻塞的超时时间。

    import random
    from multiprocessing import Process
    import time
    import os
    
    class MyProcess(Process):
        def __init__(self,name):
            super(MyProcess, self).__init__()
            self.name = f'我是进程{name}'
    
        def run(self) -> None:
            time.sleep(random.random())
            print(f'进程:{self.name}正在执行,pid为{os.getpid()}, 父pid为{os.getppid()}')
    
    l_process = []
    for i in range(5):
        t = MyProcess('a' + str(i))
        print(t.name)
        t.start()
        l_process.append(t)
    
    for m in l_process:
        m.join()         # join可以等待子进程结束后再继续往下运行,通常用于进程间的同步。
    
    
    print(f'主进程为{os.getpid()}')
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n2.py
    我是进程a0
    我是进程a1
    我是进程a2
    我是进程a3
    我是进程a4
    进程:我是进程a4正在执行,pid为45013, 父pid为45008
    进程:我是进程a1正在执行,pid为45010, 父pid为45008
    进程:我是进程a0正在执行,pid为45009, 父pid为45008
    进程:我是进程a3正在执行,pid为45012, 父pid为45008
    进程:我是进程a2正在执行,pid为45011, 父pid为45008
    主进程为45008
    
    Process finished with exit code 0
    

     这里上一个daemon守护进程的实例,还有如何在运行的进程中,获取进程的名称:

    #主进程代码运行完毕,守护进程就会结束
    from multiprocessing import Process
    from multiprocessing import current_process
    import time
    
    def foo():
        print(current_process().name)     # 调试的时候,通过current_process().name返回进程的姓名
        print(123)
        time.sleep(1)
        print("end123")                   # 没有输出
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    
    p1=Process(target=foo,name='foo')      # 给予姓名
    p2=Process(target=bar)
    
    p1.daemon=True                    # 默认为False作为非守护进程,设置为True为守护进程。守护进程会在主程序退出之前自动终止。
    p1.start()
    p2.start()
    time.sleep(0.5)                 # 阻塞主进行0.5秒,要不然,p1关闭了进程保护,都没机会执行任何代码。
    print("main-------") 
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n3.py
    foo
    123
    456
    main-------
    end456
    
    Process finished with exit code 0
    

    终止进程,terminate终止进程。

    import multiprocessing
    import time
    
    def foo():
        print('Starting worker')
        time.sleep(1)
        print('Finished worker')
    
    
    t1 = multiprocessing.Process(target=foo)
    t1.start()
    print(t1,'is_alive',t1.is_alive())
    t1.terminate()
    # time.sleep(0.2)            # 手动设置阻塞也可以,但决定是join好。
    print(t1,'is_alive',t1.is_alive())
    t1.join()            # 终止进程要使用Join等待进程退出,使进程有足够的时间更新对象的状态,可以反应进程状态
    print(t1,'is_alive',t1.is_alive())
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n4.py
    <Process(Process-1, started)> is_alive True
    <Process(Process-1, started)> is_alive True
    <Process(Process-1, stopped[SIGTERM])> is_alive False
    
    Process finished with exit code 0
    

     多个进程操作同个资源,也会如果不加锁,可能会引发冲突。

    import multiprocessing
    import time
    import random
    
    lock = multiprocessing.Lock()
    
    def work():
        print(multiprocessing.current_process().name + '开始正在工作')
        time.sleep(random.random())
        print(multiprocessing.current_process().name + '已经结束工作')
    
    def farm():
        print(multiprocessing.current_process().name + '开始正在工作')
        time.sleep(random.random())
        print(multiprocessing.current_process().name + '已经结束工作')
    
    for i in range(5):
        t1 = multiprocessing.Process(target=work, name=f'工人{i}')
        t2 = multiprocessing.Process(target=farm, name=f'农民{i}')
        t1.start()
        t2.start()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n5.py
    工人0开始正在工作
    农民0开始正在工作
    工人1开始正在工作
    农民1开始正在工作
    工人2开始正在工作
    农民2开始正在工作
    工人3开始正在工作
    农民3开始正在工作
    农民3已经结束工作
    工人4开始正在工作
    农民4开始正在工作
    工人2已经结束工作
    农民0已经结束工作
    农民4已经结束工作
    农民1已经结束工作
    工人0已经结束工作
    工人3已经结束工作
    工人1已经结束工作
    农民2已经结束工作
    工人4已经结束工作
    
    Process finished with exit code 0
    

     上面是不加锁的情况下,工作会进行穿插。

    import multiprocessing
    import time
    import random
    
    lock = multiprocessing.Lock()
    
    def work():
        with lock:    # 用with写,不用怕死锁
            print(multiprocessing.current_process().name + '开始正在工作')
            time.sleep(random.random())
            print(multiprocessing.current_process().name + '已经结束工作')
    
    def farm():
        lock.acquire()
        try:          # 用try写。
            print(multiprocessing.current_process().name + '开始正在工作')
            time.sleep(random.random())
            print(multiprocessing.current_process().name + '已经结束工作')
        finally:
            lock.release()
    
    for i in range(5):
        t1 = multiprocessing.Process(target=work, name=f'工人{i}')
        t2 = multiprocessing.Process(target=farm, name=f'农民{i}')
        t1.start()
        t2.start()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n5.py
    工人0开始正在工作
    工人0已经结束工作
    农民0开始正在工作
    农民0已经结束工作
    工人1开始正在工作
    工人1已经结束工作
    农民1开始正在工作
    农民1已经结束工作
    工人2开始正在工作
    工人2已经结束工作
    农民2开始正在工作
    农民2已经结束工作
    工人3开始正在工作
    工人3已经结束工作
    农民3开始正在工作
    农民3已经结束工作
    工人4开始正在工作
    工人4已经结束工作
    农民4开始正在工作
    农民4已经结束工作
    
    Process finished with exit code 0
    

     可以看出来,加了锁的,农民或工人只有一个能工作,另外一个必须休息,另外只有一把锁,虽然这样安全,但效率真的很低。

    进程间通信

    虽然可以用文件共享数据实现进程间通信,但问题是:

    1)效率低(共享数据基于文件,而文件是硬盘上的数据) 2)需要自己加锁处理

    因此我们最好找寻一种解决方案能够兼顾:1)效率高(多个进程共享一块内存的数据)2)帮我们处理好锁问题。

    这样看来进程间的通讯还是比较多,可以通过socket进行通讯,还可以文件共享数据,最后也是最方便的是队列,管道,其实就是共享内存。

    mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

    1 队列和管道都是将数据存放于内存中

    2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性

    简单的一些使用介绍

    import multiprocessing
    
    q = multiprocessing.Queue(maxsize=3)
    q.put('1')    # 放如元素
    print(q.empty())  # 查看容器是否有空间
    q.get()          # 取出元素
    q.get(block=True)     # 设置区块,如果为False取不到数据就报错
    

     一个简单的生产者,消费者关系:

    import multiprocessing
    import time
    
    
    def producer(q):
        for i in ['鸡腿','duck', 'milk', 'egg']:
            time.sleep(1)
            print('put the',i)
            q.put(i)
    
    
    def consumer(q):
        while True:
            print('eat:', q.get(timeout=3))    # 3秒取不到报错
    
    
    q = multiprocessing.Queue()
    
    t1 = multiprocessing.Process(target=producer,args=(q,))
    t2 = multiprocessing.Process(target=consumer,args=(q,))
    t1.start()
    t2.start()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n6.py
    put the 鸡腿
    eat: 鸡腿
    put the duck
    eat: duck
    put the milk
    eat: milk
    put the egg
    eat: egg
    Process Process-2:
    Traceback (most recent call last):
      File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
        self.run()
      File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 99, in run
        self._target(*self._args, **self._kwargs)
      File "/Users/shijianzhong/study/multi_processing/n6.py", line 14, in consumer
        print('eat:', q.get(timeout=3))
      File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 105, in get
        raise Empty
    _queue.Empty
    
    Process finished with exit code 0
    
    import multiprocessing
    import time
    
    
    def producer(q):
        for i in ['鸡腿','duck', 'milk', 'egg']:
            time.sleep(1)
            print('put the',i)
            q.put(i)
        q.put(None)
    
    def consumer(q):
        while True:
            if q.get() is None:     # 加一个信号,停止条件
                break
            print('eat:', q.get())    # 3秒取不到报错
    
    
    q = multiprocessing.Queue()
    
    t1 = multiprocessing.Process(target=producer,args=(q,))
    t2 = multiprocessing.Process(target=consumer,args=(q,))
    t1.start()
    t2.start()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n6.py
    put the 鸡腿
    put the duck
    eat: duck
    put the milk
    put the egg
    eat: egg
    
    Process finished with exit code 0
    

     通过在队列里面放如一个截止信号,可以让生产者知道后,结束进程。

    也可以通过JoinableQueue来实现当生产者停止后,消费者也停止,主要原理是通过阻塞队列来实现。

    import multiprocessing
    import time
    import random
    
    
    def producer(name, q):
        for i in ['鸡腿', 'duck', 'milk', 'egg']:
            time.sleep(random.random())
            print(name, 'put the', i)
            q.put(name + ':' + i)
        q.join()                      # 执行到这一步说明,生产加工已经完成,阻塞队列。
    
    
    def consumer(name, q):
        while True:
            time.sleep(random.random())
            print(name, 'eat:', q.get())  # 取数据
            q.task_done()               # 取一次数据,向队列产生一次取出的信号。
    
    
    q = multiprocessing.JoinableQueue()
    p1 = multiprocessing.Process(target=producer, args=('xm001', q,))
    p2 = multiprocessing.Process(target=producer, args=('xm002', q,))
    p3 = multiprocessing.Process(target=producer, args=('xm001', q,))
    
    t1 = multiprocessing.Process(target=consumer, args=('xibai01', q,))
    t2 = multiprocessing.Process(target=consumer, args=('xibai02', q,))
    
    t1.daemon = True    # 进程守护打开,消费者的停止,主要就是因为主进程的停止而停止。
    t2.daemon = True
    for i in (p1, p2, p3, t1, t2):
        i.start()
    
    p1.join()              # 先阻塞生产者的主函数,函数里面再阻塞队列,等两次阻塞都通过以后,执行主函数,主函数执行后,守护的进程退出。
    p2.join()
    p3.join()
    
    print('main')
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n7.py
    xm001 put the 鸡腿
    xibai02 eat: xm001:鸡腿
    xm002 put the 鸡腿
    xibai01 eat: xm002:鸡腿
    xm002 put the duck
    xibai01 eat: xm002:duck
    xm001 put the duck
    xm001 put the 鸡腿
    xibai01 eat: xm001:duck
    xibai02 eat: xm001:鸡腿
    xm001 put the milk
    xibai02 eat: xm001:milk
    xm001 put the egg
    xibai01 eat: xm001:egg
    xm002 put the milk
    xibai02 eat: xm002:milk
    xm001 put the duck
    xibai01 eat: xm001:duck
    xm002 put the egg
    xibai02 eat: xm002:egg
    xm001 put the milk
    xibai02 eat: xm001:milk
    xm001 put the egg
    xibai01 eat: xm001:egg
    main
    
    Process finished with exit code 0
    

    2. 管道

    创建管道的类:

    Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道

    参数介绍:

    dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。

    Queue就是基础Pipe的,这个我就先不写了。

    3. 共享数据

    展望未来,基于消息传递的并发编程是大势所趋

    即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合

    通过消息队列交换数据。这样极大地减少了对使用锁定和其他同步手段的需求,

    还可以扩展到分布式系统中

    进程间通信应该尽量避免使用本节所讲的共享数据的方式

    import multiprocessing
    
    def demo(l):
        l.pop()                # 没有加锁,数据处理还是可以的,安全起见,可以加锁。
    
    
    lock = multiprocessing.Lock()
    arg = multiprocessing.Manager()
    l_all = arg.list(range(20))        # 可以做字典,我这里做的列表
    print(l_all)
    
    l_arry = []
    for i in range(10):
        t = multiprocessing.Process(target=demo, args=(l_all,))
        t.start()
        l_arry.append(t)
    for i in l_arry:
        i.join()          # 必须阻塞,要不然主程序结束了,数据没了。
    
    
    print(l_all)
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n9.py
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    
    Process finished with exit code 0
    

    控制资源的并发访问,有时候可能需要允许多个工作进程同时访问一个资源,但要限制总数。列如,连接池支持同时连接,但数目可能是固定的,或者一个网络应用可能支持固定数目的并发下载。

    这个时候就可以用Semaphore

    import random
    import multiprocessing
    import time
    
    class ActivePool:
    
        def __init__(self):            
            self.mgr = multiprocessing.Manager()
            self.active = self.mgr.list()        # 建立一个模型,开虚拟连接池,这里用列表
            self.lock = multiprocessing.Lock()
    
        def makeActive(self, name):
            with self.lock:                   # 为了防止数据错误,全部加锁了
                self.active.append(name)
    
        def makeInactive(self, name):
            with self.lock:
                self.active.remove(name)
    
        def __str__(self):
            with self.lock:
                return str(self.active)
    
    def worker(s , pool):
        name  = multiprocessing.current_process().name
        with s:
            pool.makeActive(name)                    # 通过s来限制进入的线程,进来的线程操作了都是全共享数据列表
            print(f'Activating {name} now running {pool}')
            time.sleep(random.random())
            pool.makeInactive(name)
    
    if __name__ == '__main__':
        pool = ActivePool()
        s = multiprocessing.Semaphore(3)
        # 初始化以后开始工作
        jobs = [multiprocessing.Process(target=worker, name=str(i), args=(s, pool)) for i in range(10)]
        for i in jobs:
            i.start()
    
        while True:
            alive = 0
            for j in jobs:
                # 死循环这个工作任务列表里面的任务,查看是否有任务,有任务就打印出来。
                if j.is_alive():
                    alive += 1
                    j.join(timeout=0.1)
                    print(f'Now running{pool}')
            # 没有任务的话,就结束这个循环。
            if alive == 0:
                break
    /usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n10.py
    Activating 0 now running ['0', '1']
    Activating 1 now running ['0', '1', '2']
    Activating 2 now running ['0', '1', '2']
    Activating 3 now running ['0', '1', '3']
    Now running['0', '1', '3']
    Now running['0', '1', '3']
    Now running['0', '1', '3']
    Now running['0', '1', '3']
    Now running['0', '1', '3']
    Now running['0', '1', '3']
    Activating 4 now running ['0', '3', '4']
    Now running['0', '3', '4']
    Activating 5 now running ['3', '4', '5']
    Now running['3', '4', '5']
    Activating 6 now running ['4', '5', '6']
    Now running['4', '5', '6']
    Now running['4', '5', '6']
    Now running['4', '5', '6']
    Now running['4', '5', '6']
    Now running['4', '5', '6']
    Now running['4', '5', '6']
    Now running['4', '5', '6']
    Now running['4', '5', '6']
    Activating 7 now running ['5', '6', '7']
    Activating 8 now running ['6', '7', '8']
    Now running['6', '7', '8']
    Now running['6', '7', '8']
    Activating 9 now running ['7', '8', '9']
    Now running['7', '8', '9']
    Now running['7', '9']
    Now running['7', '9']
    Now running['7', '9']
    Now running['7']
    Now running['7']
    Now running['7']
    Now running['7']
    Now running['7']
    Now running[]
    
    Process finished with exit code 0
    

     Event类是一种简单的方法,可以在进程之间传递状态信息。事件可以在设置状态和未设置状态之间切换。通过使用一个可选的超时值,时间对象的用户等待其状态从未设置变为设置。

    里面就4个参数['clear', 'is_set', 'set', 'wait']

    从字面也可以很好的辩识,后面我上一个简单的代码实例。

    # _*_coding:utf-8_*_
    # !/usr/bin/env python
    
    from multiprocessing import Process, Event
    import time, random
    
    
    def wait_for_event(e):
        print('wait_for_event: starting')
        e.wait()            # 默认e未设置,这里阻塞
        print('wait_for_event: e.is_set()->', e.is_set())
    
    
    def wait_for_event_timeout(e, t):
        print('wait_for_event_timeout: starting')
        e.wait(t)          # 可以设置阻塞时间
        print('wait_for_event_timeout: e.is_set()', e.is_set())
    
    if __name__ == '__main__':
        e = Event()
        w1 = Process(target=wait_for_event, name='block', args=(e,))
        w1.start()
        w2 = Process(target=wait_for_event_timeout, name='nonblock', args=(e,2)) # 等待2秒后,阻塞将取消。
        w2.start()
        print('main:waiting before calling Event.set()')          # 
        time.sleep(3)
        e.set()       # 3秒后,给时间设置。
        print('main:event is set')
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n11.py
    main:waiting before calling Event.set()
    wait_for_event: starting
    wait_for_event_timeout: starting
    wait_for_event_timeout: e.is_set() False
    main:event is set
    wait_for_event: e.is_set()-> True
    
    Process finished with exit code 0
    

     事件就像一个全局的对象,传入各个进程里面当做信号进行传输。

    最后讲一个pool,进程池。

    1 Pool([numprocess  [,initializer [, initargs]]]):创建进程池 

     参数介绍:

    1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
    2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
    3 initargs:是要传给initializer的参数组
     

    主要方法:

    复制代码
    1 p.apply(func [, args [, kwargs]])
    在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
    2 p.apply_async(func [, args [, kwargs]]):
    在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,
    将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。   
    3 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
    4 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
    5.P.map():在功能上等价于内置的map()的结果,只不过各个任务会并行运行。(感觉跟高阶函数map基本一样,只不过这个是平行操作,返回的时候列表,高阶函数map返回的是迭代器)



      Multiprocessing.Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行它。Pool类用于需要执行的目标很多,而手动限制进程数量又太繁琐时,如果目标少且不用控制进程数量则可以用Process类。

    class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])


        processes: 是要使用的工作进程数。如果进程是None,那么使用返回的数字os.cpu_count()。也就是说根据本地的cpu个数决定,processes小于等于本地的cpu个数;
        initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
        maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
        context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。

    from multiprocessing import Process, Pool
    import time
    import multiprocessing
    
    
    def func(msg):
        print("msg:", msg)
        time.sleep(0.1)
        return msg
    
    if __name__ == '__main__':
        def s_time(func):
            def wrap():
                t1 = time.perf_counter()
                func()
                cost_time = time.perf_counter() - t1
                print(f'消耗事件{cost_time:0.5f}')
            return wrap
    
    
        @s_time                # 写了一个简单的装饰器,测试成勋跑的事件。
        def run():
            count = multiprocessing.cpu_count()
            pool = Pool(processes=count)
            res_l = []
            for i in range(10):
                msg = "hello %d" % (i)
                # res = pool.apply_async(func, (msg,))  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
                res = pool.apply(func, (msg,))
                res_l.append(res)  # 同步执行,即执行完一个拿到结果,再去执行另外一个
            print("==============================>")
            pool.close()
            pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
            print(res_l)  # 看到的就是最终的结果组成的列表
            for i in res_l:  # apply是同步的,所以直接得到结果,没有get()方法
                # print(i.get())
                print(i)
    
    
        run()     
    
    
    
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n13.py
    msg: hello 0
    msg: hello 1
    msg: hello 2
    msg: hello 3
    msg: hello 4
    msg: hello 5
    msg: hello 6
    msg: hello 7
    msg: hello 8
    msg: hello 9
    ==============================>
    ['hello 0', 'hello 1', 'hello 2', 'hello 3', 'hello 4', 'hello 5', 'hello 6', 'hello 7', 'hello 8', 'hello 9']
    hello 0
    hello 1
    hello 2
    hello 3
    hello 4
    hello 5
    hello 6
    hello 7
    hello 8
    hello 9
    消耗事件1.08404
    
    Process finished with exit code 0
    
    from multiprocessing import Process, Pool
    import time
    import multiprocessing
    
    
    def func(msg):
        print("msg:", msg)
        time.sleep(0.1)
        return msg
    
    if __name__ == '__main__':
        def s_time(func):
            def wrap():
                t1 = time.perf_counter()
                func()
                cost_time = time.perf_counter() - t1
                print(f'消耗事件{cost_time:0.5f}')
            return wrap
    
    
        @s_time                # 写了一个简单的装饰器,测试成勋跑的事件。
        def run():
            count = multiprocessing.cpu_count()
            pool = Pool(processes=count)
            res_l = []
            for i in range(10):
                msg = "hello %d" % (i)
                res = pool.apply_async(func, (msg,))  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
                # res = pool.apply(func, (msg,))
                res_l.append(res)  # 同步执行,即执行完一个拿到结果,再去执行另外一个
            print("==============================>")
            pool.close()
            pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
            print(res_l)  # 看到的就是最终的结果组成的列表
            for i in res_l:  # apply是同步的,所以直接得到结果,没有get()方法
                print(i.get())
                # print(i)
    
    
        run()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n13.py
    ==============================>
    msg: hello 0
    msg: hello 1
    msg: hello 2
    msg: hello 3
    msg: hello 4
    msg: hello 5
    msg: hello 6
    msg: hello 7
    msg: hello 8
    msg: hello 9
    [<multiprocessing.pool.ApplyResult object at 0x109f69450>, <multiprocessing.pool.ApplyResult object at 0x109f69590>, <multiprocessing.pool.ApplyResult object at 0x109f696d0>, <multiprocessing.pool.ApplyResult object at 0x109f69810>, <multiprocessing.pool.ApplyResult object at 0x109f69950>, <multiprocessing.pool.ApplyResult object at 0x109f69b10>, <multiprocessing.pool.ApplyResult object at 0x109f69c50>, <multiprocessing.pool.ApplyResult object at 0x109f69d90>, <multiprocessing.pool.ApplyResult object at 0x109f69ed0>, <multiprocessing.pool.ApplyResult object at 0x109f69a50>]
    hello 0
    hello 1
    hello 2
    hello 3
    hello 4
    hello 5
    hello 6
    hello 7
    hello 8
    hello 9
    消耗事件0.17523
    
    Process finished with exit code 0
    

     最后上演一个进程池的回调函数,还真好用,输入函数的返回值,直接传入了回调函数里面。

    from multiprocessing import Pool
    import requests
    import os
    import multiprocessing
    
    
    def get_page(url):
        print('<进程%s> get %s' % (os.getpid(), url))        # 获取进程号,打印网址
        respone = requests.get(url)
        respone.encoding = 'utf-8'
        if respone.status_code == 200:
            return {'url': url, 'text': respone.text}        # 返回网址,网页源码
    
    
    def pasrse_page(res):
        print('<进程%s> parse %s' % (os.getpid(), res['url']))    # 回调函数进程
        parse_res = 'url:<%s> size:[%s]
    ' % (res['url'], len(res['text']))
        with open('db.txt', 'a') as f:     # 写入本地文件
            f.write(parse_res)
    
    
    if __name__ == '__main__':
        urls = [
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
        ]
        count = multiprocessing.cpu_count()
        p = Pool(count)
        res_l = []
        for url in urls:
            res = p.apply_async(get_page, args=(url,), callback=pasrse_page)   # 异步进行操作
            res_l.append(res)
    
        p.close()      # 关闭池子,不允许新的进程进入
        p.join()       # 阻塞主进程
        # print([res.get() for res in res_l])
        print('main')
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n14.py
    <进程48832> get https://www.baidu.com
    <进程48833> get https://www.python.org
    <进程48834> get https://www.openstack.org
    <进程48835> get https://help.github.com/
    <进程48836> get http://www.sina.com.cn/
    <进程48831> parse https://www.baidu.com
    <进程48831> parse http://www.sina.com.cn/
    <进程48831> parse https://www.openstack.org
    <进程48831> parse https://www.python.org
    <进程48831> parse https://help.github.com/
    main
    
    Process finished with exit code 0
    

     从进程编号可以看出来,有多个进程参与了解析网页,就一个进程负责写入信息。还是非常不错的工作任务关系。



  • 相关阅读:
    大数据第59天—MySQL之员工奖金-杨大伟
    大数据第58天—MySQL常用命令-杨大伟
    大数据第56天—Mysql练习题12道之十一-查出销售表中所有会员购买金额,同时分组查出退货表中所有会员的退货金额-杨大伟
    大数据第55天—Mysql练习题12道之十-查询各自区组的money排名前十的账号-杨大伟
    大数据第54天—Mysql练习题12道之九-充值日志表-杨大伟
    大数据第53天—Mysql练习题12道之八-线上服务器访问日志-杨大伟
    大数据第52天—Mysql练习题12道之七-图书管理数据库-杨大伟
    mac 破解pycharm2020.2.3
    mac连接oracle数据库
    python DPI-1047:Cannot locate a 64-bit Oracle Client library:The specified module could not be found.
  • 原文地址:https://www.cnblogs.com/sidianok/p/12020006.html
Copyright © 2011-2022 走看看