zoukankan      html  css  js  c++  java
  • python之进程---从小白到老鸟(一)

    一、进程简介

     什么是进程

    进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。

    狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。
    广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。

    二、python中的进程

    multiprocess不是一个模块而是python中一个操作、管理进程的包,几乎包含了和进程有关的所有子模块。大致分为四类:

    • 创建进程部分
    • 进程同步部分
    • 进程池部分
    • 进程之间数据共享

    2.1进程创建部分

    2.1.1进程创建的类

    class Process(object):
        def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
            self.name = ''
            self.daemon = False
            self.authkey = None
            self.exitcode = None
            self.ident = 0
            self.pid = 0
            self.sentinel = None
    
        def run(self):
            pass
    
        def start(self):
            pass
    
        def terminate(self):
            pass
    
        def join(self, timeout=None):
            pass
    
        def is_alive(self):
            return False

    从pycharm点开源码,我么可以看到:

    Process([group [, target [, name [, args [, kwargs]]]]])
    
    group参数未使用,值始终为None
    name为子进程的名称
    target表示调用对象,即子进程要执行的任务
    下面两个是target的参数
        args表示调用对象的位置参数元组,args=(1,2)
        kwargs表示调用对象的字典,kwargs={'a':1,'b':2}
    
    强调:
    1. 需要使用关键字的方式来指定参数
    2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
    p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
    p.name:进程的名称
    p.pid:进程的pid
    p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
    p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
    
    属性介绍
    p.start():启动进程,并调用该子进程中的p.run() 
    p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
    p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
    p.is_alive():如果p仍然运行,返回True
    p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程  
    
    方法介绍

    注意事项:
    在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。
    因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if __name__ =='__main__' 判断保护起来,import 的时候 ,就不会递归运行了。

    2.1.2新版process有改动:

    2.1.3创建和调用


    创建和调用用两个方法

    继承法,通过继承process类,同时重写run方法。然后实例化子类,通过子类对象调用start方法完成。

    函数调用法,直接实例化process类同时把函数当参数直接传入target参数,返回相应的process对象,直接调用start方法完成。

    1.函数调用法

     

    import os
    import time
    import random
    from multiprocessing import Process
    
    def f(x):
        print(x,'子进程id 开始:',os.getpid(),'父进程id :',os.getppid())
        time.sleep(random.random()*5)
        print(x, '子进程id 结束:', os.getpid(), '父进程id :', os.getppid())
        return x
    
    if __name__ == '__main__':
        print('主进程id :', os.getpid())
        p_lst = []
        for i in range(5):
            p = Process(target=f, args=(i,))
            p.start()
        print('主进程')

     

    主进程id : 26068
    主进程
    1 子进程id 开始: 15168 父进程id : 26068
    0 子进程id 开始: 13148 父进程id : 26068
    3 子进程id 开始: 9864 父进程id : 26068
    2 子进程id 开始: 16748 父进程id : 26068
    4 子进程id 开始: 28476 父进程id : 26068
    4 子进程id 结束: 28476 父进程id : 26068
    1 子进程id 结束: 15168 父进程id : 26068
    0 子进程id 结束: 13148 父进程id : 26068
    3 子进程id 结束: 9864 父进程id : 26068
    2 子进程id 结束: 16748 父进程id : 26068
    结果

     2.继承法

    import os
    from multiprocessing import Process
    
    class MyProcess(Process):
        def __init__(self,name):
            super().__init__()
            self.name=name
        #需要重写run方法
        def run(self):
    
            print('%s 子进程正在执行' %self.name,"进程id"+str(os.getpid()))
    
    if __name__ == '__main__':
    
        p1=MyProcess('1')
        p1.start() #start会自动调用run

     

    2.2 join方法

    2.2.1 join使用方法原理

     

    2.2.2 join两种使用方法

    1.每层循环启动的时候,都单个进程join加入,会发生 进程“完全串行”的状况

     

    import os
    import time
    import random
    from multiprocessing import Process
    
    def f(x):
        print(x,'子进程id 开始:',os.getpid(),'父进程id :',os.getppid())
        time.sleep(random.random()*5)
        print(x, '子进程id 结束:', os.getpid(), '父进程id :', os.getppid())
        return x
    
    if __name__ == '__main__':
        print('主进程id :', os.getpid())
    
        for i in range(5):
            p = Process(target=f, args=(i,))
            p.start()
            p.join()
    
        print("我是父进程")
    主进程id : 10040
    0 子进程id 开始: 11508 父进程id : 10040
    0 子进程id 结束: 11508 父进程id : 10040
    1 子进程id 开始: 9136 父进程id : 10040
    1 子进程id 结束: 9136 父进程id : 10040
    2 子进程id 开始: 18332 父进程id : 10040
    2 子进程id 结束: 18332 父进程id : 10040
    3 子进程id 开始: 104 父进程id : 10040
    3 子进程id 结束: 104 父进程id : 10040
    4 子进程id 开始: 11584 父进程id : 10040
    4 子进程id 结束: 11584 父进程id : 10040
    我是父进程

     

    2.在循环启动后,列表中的进程依次join,会发生开始和执行,发生有序但是结束不一定有序,但必定在被join进程(父进程)的代码(print("我是父进程")之前。

     

    import os
    import time
    import random
    from multiprocessing import Process
    
    def f(x):
        print(x,'子进程id 开始:',os.getpid(),'父进程id :',os.getppid())
        time.sleep(random.random()*5)
        print(x, '子进程id 结束:', os.getpid(), '父进程id :', os.getppid())
        return x
    
    if __name__ == '__main__':
        print('主进程id :', os.getpid())
        p_lst = []
        for i in range(5):
            p = Process(target=f, args=(i,))
            p.start()
            p_lst.append(p)
        [p.join() for p in p_lst]   
        print("我是父进程")
    主进程id : 15804
    0 子进程id 开始: 6556 父进程id : 15804
    2 子进程id 开始: 19300 父进程id : 15804
    1 子进程id 开始: 10176 父进程id : 15804
    3 子进程id 开始: 17164 父进程id : 15804
    4 子进程id 开始: 1944 父进程id : 15804
    0 子进程id 结束: 6556 父进程id : 15804
    1 子进程id 结束: 10176 父进程id : 15804
    4 子进程id 结束: 1944 父进程id : 15804
    3 子进程id 结束: 17164 父进程id : 15804
    2 子进程id 结束: 19300 父进程id : 15804
    我是父进程

     3.思考为什么明明p.join的代码在后面却最终能被加入执行列表,是因为资源的申请的时间没有,代码让进程进展的时间快??

    import os
    import time
    import random
    from multiprocessing import Process
    
    def f(x):
        print(x,'子进程id 开始:',os.getpid(),'父进程id :',os.getppid())
        time.sleep(random.random()*5)
        print(x, '子进程id 结束:', os.getpid(), '父进程id :', os.getppid())
        return x
    
    if __name__ == '__main__':
        print('主进程id :', os.getpid())
    
    
        p1 = Process(target=f, args=(1,))
        p2 = Process(target=f, args=(2,))
        p3 = Process(target=f, args=(3,))
        p4 = Process(target=f, args=(4,))
        p5 = Process(target=f, args=(5,))
        p6 = Process(target=f, args=(6,))
        for i in [p1,p2,p3,p4,p5,p6]:
            i.start()
    
        for i in [p1,p2,p3,p4,p5,p6]:
            i.join()
    开始与执行有序,结束无序

    结果:

    主进程id : 22588
    1 子进程id 开始: 30048 父进程id : 22588
    2 子进程id 开始: 14176 父进程id : 22588
    3 子进程id 开始: 30412 父进程id : 22588
    4 子进程id 开始: 29788 父进程id : 22588
    5 子进程id 开始: 2252 父进程id : 22588
    6 子进程id 开始: 30760 父进程id : 22588
    5 子进程id 结束: 2252 父进程id : 22588
    3 子进程id 结束: 30412 父进程id : 22588
    1 子进程id 结束: 30048 父进程id : 22588
    6 子进程id 结束: 30760 父进程id : 22588
    2 子进程id 结束: 14176 父进程id : 22588
    4 子进程id 结束: 29788 父进程id : 22588
    我是父进程
    结果

     

      4.自定义类继承Process类的形式开启进程的方式

        

    
    
    # encoding=utf8
    import os
    from multiprocessing import Process
    
    
    class MyProcess(Process):
        def __init__(self,name):
            super().__init__()
            self.name=name
        #需要重写run方法
        def run(self):
    
            print('%s 子进程正在执行' %self.name,"进程id"+str(os.getpid()))
    
    if __name__ == '__main__':
    
        p1=MyProcess('1')
        p2=MyProcess('2')
        p3=MyProcess('3')
    
        p1.start() #start会自动调用run
        p2.start()
        # p2.run()
        p3.start()
    
        p1.join()
        p2.join()
        p3.join()
    
        print('主线程')
    2 子进程正在执行 进程id2516
    3 子进程正在执行 进程id10100
    1 子进程正在执行 进程id3308
    主线程

    进程之间的数据隔离:

    from multiprocessing import Process
    
    def work():
        global n
        n=0
        print('子进程内: ',n)
    
    
    if __name__ == '__main__':
        n = 100
        p=Process(target=work)
        p.start()
        print('主进程内: ',n)
    主进程内:  100
    子进程内:  0

    2.3 Daemon 守护进程

            守护进程会随着主进程的结束而结束。

    主进程创建守护进程

      其一:守护进程会在主进程代码执行结束后就终止。

      其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

    1.进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

    import os
    import time
    from multiprocessing import Process
    
    class Myprocess(Process):
        def __init__(self,person):
            super().__init__()
            self.person = person
        def run(self):
            time.sleep(1)
            print('%s 子进程正在执行' %self.name,"进程id"+str(os.getpid()))
    
    
    if __name__ == '__main__':
    
        p=Myprocess('1')
        #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
        p.daemon=True
        p.start()
        time.sleep(10) # 在sleep时查看进程id对应的进程ps -ef|grep id
    Myprocess-1 子进程正在执行 进程id2856

    如果是随着主进程代码的结束而结束。就会打印不出来。

    class Myprocess(Process):
        def __init__(self,person):
            super().__init__()
            self.person = person
        def run(self):
            time.sleep(1)
            print('%s 子进程正在执行' %self.name,"进程id"+str(os.getpid()))
    
    
    if __name__ == '__main__':
    
        p=Myprocess('1')
        #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
        p.daemon=True
        p.start()
    下面代码并不会打印

    2.主进程结束守护进程也会立即结束,不会等待。

    import time
    from multiprocessing import Process
    
    
    def f1():
        while(True):
            time.sleep(0.1)
            print ("end123")
    
    
    if  __name__ == '__main__':
        p1=Process(target=f1)
        p1.daemon=True
        p1.start()
        time.sleep(1)
        print("主进程结束")

       #打印该行则主进程代码结束,则守护进程p1应该被终止.

    end123
    end123
    end123
    end123
    end123
    main-------

     进程对象的其他方法:

    进程对象的其他方法:terminate,is_alive
    from multiprocessing import Process
    import time
    import random
    
    
    class Myprocess(Process):
        def __init__(self,person):
            self.name=person
            super().__init__()
    
        def run(self):
            print('%s正在和刘亦菲聊天' %self.name)
            time.sleep(random.randrange(1,5))
            print('%s还在和刘亦菲聊天' %self.name)
    
    
    if __name__ == '__main__':
    
        p1=Myprocess('斌哥')
        p1.start()
    
        p1.terminate()#关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
        print(p1.is_alive()) #结果为True
        time.sleep(1)
        print(p1.is_alive()) #结果为False


    属性探查 name属性

    import random
    import time
    from multiprocessing import Process
    
    
    class Myprocess(Process):
    
        def __init__(self,person):
            self.name=person   # name属性是Process中的属性,标示进程的名字
            super().__init__() # 执行父类的初始化方法会覆盖name属性
            # self.name = person # 在这里设置就可以修改进程名字了
            # self.person = person #如果不想覆盖进程名,就修改属性名称就可以了
        def run(self):
            print('%s正在和刘亦菲聊天' %self.name)
            # print('%s正在聊天' %self.person)
            time.sleep(random.randrange(1,5))
            print('%s正在和刘亦菲聊天' %self.name)
            # print('%s还在聊天' %self.person)
    
    if __name__ == '__main__':
    
        p1=Myprocess('斌哥')
        p1.start()
        print(p1.pid)    #可以查看子进程的进程id

    自定义进程有默认name,也可以自己设置。

    3 进程同步

    1. multiprocess.Lock    锁
    2. multiprocess.Semaphore   信号量
    3. multiprocess.Event    事件

    3.1.锁--lock

    让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。

    当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。加锁的形式实现了顺序的执行,但是程序又重新变成串行了,这样确实会浪费了时间,却保证了数据的安全。

    没有加锁的情况下

    #文件db的内容为:{"count":5}
    #注意一定要用双引号,不然json无法识别
    #并发运行,效率高,但竞争写同一文件,数据写入错乱
    from multiprocessing import Process,Lock
    import time,json,random
    def search():
        dic=json.load(open('db'))
        print('33[43m剩余票数%s33[0m' %dic['count'])
    
    def get():
        dic=json.load(open('db'))
        time.sleep(0.1) #模拟读数据的网络延迟
        if dic['count'] >0:
            dic['count']-=1
            time.sleep(0.2) #模拟写数据的网络延迟
            json.dump(dic,open('db','w'))
            print('33[43m购票成功33[0m')
    
    def task():
        search()
        get()
    
    if __name__ == '__main__':
        for i in range(10): #模拟并发100个客户端抢票
            p=Process(target=task)
            p.start()

    加锁的情况下

    #文件db的内容为:{"count":5}
    #注意一定要用双引号,不然json无法识别
    #并发运行,效率高,但竞争写同一文件,数据写入错乱
    from multiprocessing import Process,Lock
    import time,json,random
    def search():
        dic=json.load(open('db'))
        print('33[43m剩余票数%s33[0m' %dic['count'])
    
    def get():
        dic=json.load(open('db'))
        time.sleep(random.random()) #模拟读数据的网络延迟
        if dic['count'] >0:
            dic['count']-=1
            time.sleep(random.random()) #模拟写数据的网络延迟
            json.dump(dic,open('db','w'))
            print('33[32m购票成功33[0m')
        else:
            print('33[31m购票失败33[0m')
    
    def task(lock):
        search()
        lock.acquire()
        get()
        lock.release()
    
    if __name__ == '__main__':
        lock = Lock()
        for i in range(10): #模拟并发100个客户端抢票
            p=Process(target=task,args=(lock,))
            p.start()
    #加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
    虽然可以用文件共享数据实现进程间通信,但问题是:
    1.效率低(共享数据基于文件,而文件是硬盘上的数据)
    2.需要自己加锁处理
    
    #因此我们最好找寻一种解决方案能够兼顾:
    1、效率高(多个进程共享一块内存的数据)
    2、帮我们处理好锁问题。
    这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:
    队列和管道。
    队列和管道都是将数据存放于内存中
    队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
    我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

    3.2.信号量 —— multiprocess.Semaphore

    在semaphore信号量有一个内置计数器,控制进程的数量,acquire()会消耗信号量,计数器会自动减一;

    release()会释放信号量,计数器会自动加一;当计数器为零时,acquire()调用被阻塞,直到release()释放信号量为止。

    所以可以把锁lock理解为Semaphore(1)时的特殊情况。

    from multiprocessing import Process,Semaphore
    import time,random
    
    
    def incar(sem,user):
        sem.acquire()
        print('%s 进到四人小车' %user)
        time.sleep(random.randint(0,3)) #模拟每个人在车里中待的时间不同
        sem.release()
    
    if __name__ == '__main__':
        sem=Semaphore(4)
        p_l=[]
        for i in range(13):
            p=Process(target=incar,args=(sem,'user%d' %(i+1),))
            p.start()
            p_l.append(p)
    
        for i in p_l:
            i.join()
        # print('============》')

    3.3.事件 —— multiprocess.Event

    python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

    is_set() 查看event的状态

    clear:将“Flag”设置为False

    set:将“Flag”设置为True

    from multiprocessing import Process, Event
    import time, random
    
    
    def car(e, n):
        while True:
            if not e.is_set():  # 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
                print('33[31m红灯亮33[0m,car%s等着' % n)
                e.wait()    # 阻塞,等待is_set()的值变成True,模拟信号灯为绿色
                print('33[32m车%s 看见绿灯亮了33[0m' % n)
                time.sleep(random.randint(3, 6))
                if not e.is_set():   #如果is_set()的值是Flase,也就是红灯,仍然回到while语句开始
                    continue
                print('车开远了,car', n)
                break
    
    
    def police_car(e, n):
        while True:
            if not e.is_set():# 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
                print('33[31m红灯亮33[0m,car%s等着' % n)
                e.wait(0.1) # 阻塞,等待设置等待时间,等待0.1s之后没有等到绿灯就闯红灯走了
                if not e.is_set():
                    print('33[33m红灯,警车先走33[0m,car %s' % n)
                else:
                    print('33[33;46m绿灯,警车走33[0m,car %s' % n)
            break
    
    
    
    def traffic_lights(e, inverval):
        while True:
            time.sleep(inverval)
            if e.is_set():
                print('######', e.is_set())
                e.clear()  # ---->将is_set()的值设置为False
            else:
                e.set()    # ---->将is_set()的值设置为True
                print('***********',e.is_set())
    
    
    if __name__ == '__main__':
        e = Event()
        
        for i in range(10):
            p=Process(target=car,args=(e,i,))  # 创建是个进程控制10辆车
            p.start()
    
        for i in range(5):
            p = Process(target=police_car, args=(e, i,))  # 创建5个进程控制5辆警车
            p.start()
        t = Process(target=traffic_lights, args=(e, 10))  # 创建一个进程控制红绿灯
        t.start()
    
        print('============》')

     4.进程通信

    进程通信是指进程之间的信息交换。
    进程是分配系统资源的单位(包括内存地址),因此各进程拥有的内存地址空间相互独立。
    为了保证安全,一个进程不能直接访问另一个进程的地址空间
    但是进程之间的信息交换又是必须实现的。为了保证进程间的安全通信,操作系统提供了一些方法。

    主要有三种形式

    1. 共享存储
    2. 消息传递
    3. 管道通信

    4.1.管道通信

    1.设置一个特殊的共享文件(管理),其实就是一个缓冲区
    2.一个管道只能实现半双工通信
    3.实现双向同时通信需要建立两个管道
    4.各进程要互斥访问管道
    5.写满时,不能再写。读空时,不能再读。
    6.没写满,不能读,没读空,不能写

    进程间通信--管道

    我们先讲管道,因为队列是基于管道实现的。

    创建的管道基于文件描述符的通信(不懂的朋友可以百度一下文件描述符),即建立管道的同时创建了fd[0](读管道描述符)和fd[1](写管道描述符)

    由此可得,如果在父进程中创建管道,则产生了fd[0]和fd[1]文件描述符。那么,fork的子进程便继承了父进程中的fd[0]和fd[1]文件描述符。

    如果想实现父子进程间的通信,例如:父进程写、子进程读,可以关闭父进程的fd[0](读管道描述符)和fd[1](写管道描述符)。

    Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道。

    Pipe方法返回一个二元元组(conn1, conn2),两个元素分别是两个连接对象,代表管道的两端

    Pipe(duplex=True) 函数有一个默认参数duplex

      • 默认等于True,表示这个管道是全双工模式,也就是说conn1和conn2均可收发;
      • 如果duplex=False,那么conn2只负责发消息到消息队列,conn1只负责从消息队列中读取消息

     方法:

      主要方法:
        conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
        conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
      其他方法:
        conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
        conn1.fileno():返回连接使用的整数文件描述符
        conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。

        conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。

                      如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。

        conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。

                            结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收

        conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。

                          offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
    案例:

    from multiprocessing import Process, Pipe
    def f(conn,i):
        conn.send("Hello The_Third_Wave"+"=>"+str(i))
        conn.close()
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        for i in range(5):
            p = Process(target=f, args=(child_conn,i))
            p.start()
            # p.join()
            print(parent_conn.recv())

     结果:

    Hello The_Third_Wave=>0
    Hello The_Third_Wave=>1
    Hello The_Third_Wave=>2
    Hello The_Third_Wave=>3
    Hello The_Third_Wave=>4
    结果

    应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。

    这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。

    管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。

    因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。

    import time
    from multiprocessing import Process, Pipe
    
    
    def rec(parent_conn_w, child_conn_r):
        # parent_conn_w.close() #不写close将不会引发EOFError
        while True:
            try:
                print(child_conn_r.recv())
            except EOFError  as e:
                print(e)
                child_conn_r.close()
    
    
    if __name__ == '__main__':
        parent_conn_w, child_conn_r = Pipe()
        p = Process(target=rec, args=(parent_conn_w, child_conn_r,))
        p.start()
    
        for i in range(10):
            parent_conn_w.send('hello'+str(i))
            time.sleep(1)
        parent_conn_w.close()
        parent_conn_w.send('hello' + str(100))

    结果:

    hello0
    hello1
    hello2
    hello3
    hello4
    hello5
    hello6
    hello7
    hello8
    hello9
    Process Process-1:
    Traceback (most recent call last):
      File "D:appAnaconda3libmultiprocessingconnection.py", line 136, in _check_closed
        raise OSError("handle is closed")
    OSError: handle is closed
    结果与报错

     双工通道验证:

    import time
    from multiprocessing import Process, Pipe
    
    
    def rec(parent_conn, child_conn):
        # parent_conn.close() #不写close将不会引发EOFError
        while True:
            try:
                print(child_conn.recv())
                child_conn.send('儿子发的')
                print(parent_conn.recv())
            except EOFError  as e:
                print(e)
                child_conn.close()
    
    
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe(duplex=True)
    
    
        for i in range(10):
            parent_conn.send('hello'+str(i)+" from parent")
            time.sleep(0.5)
        for i in range(10):
            child_conn.send('hello'+str(i)+" from child")
            time.sleep(0.5)
    
        p = Process(target=rec, args=(parent_conn, child_conn,))
        p.start()
        p.join()
        # p1.start()
        parent_conn.close()
        child_conn.close()

    结果:

    hello0 from parent
    hello0 from child
    hello1 from parent
    hello1 from child
    hello2 from parent
    hello2 from child
    hello3 from parent
    hello3 from child
    hello4 from parent
    hello4 from child
    hello5 from parent
    hello5 from child
    hello6 from parent
    hello6 from child
    hello7 from parent
    hello7 from child
    hello8 from parent
    hello8 from child
    hello9 from parent
    hello9 from child
    结果--双工通道

    进程间通信--队列(multiprocess.Queue)

    那我们有了管道为什么还需要队列??

    这里我么就要谈到管道和消息队列的区别,如下

    管道(PIPE)

         管道通信方式的中间介质是文件,通常称这种文件为管道文件。两个进程利用管道文件进行通信时,一个进程为写进程,另一个进程为读进程。写进程通过写端(发送端)往管道文件中写入信息;读进程通过读端(接收端)从管道文件中读取信息。两个进程协调不断地进行写、读,便会构成双方通过管道传递信息的流水线。

    管道分为匿名管道和命名管道。

    (1)匿名管道:管道是半双工的,数据只能单向通信;需要双方通信时,需要建立起两个管道;只能用于父子进程或者兄弟进程之间(具有亲缘关系的进程)。

    (2)命名管道:可在同一台计算机的不同进程之间或在跨越一个网络的不同计算机的不同进程之间,支持可靠的、单向或双向的数据通信。

          不同于匿名管道之处在于它提供一个路径名与之关联,以FIFO的文件形式存在于文件系统中。这样,即使与FIFO的创建进程不存在亲缘关系的进程,只要可以访问该路径,就能够彼此通过FIFO相互通信(能够访问该路径的进程以及FIFO的创建进程之间),因此,通过FIFO不相关的进程也能交换数据。值得注意的是,FIFO严格遵循先进先出(first in first out),对管道及FIFO的读总是从开始处返回数据,对它们的写则把数据添加到末尾。

         利用系统调用pipe()创建一个无名管道文件,通常称为无名管道或PIPE;利用系统调用mknod()创建一个命名管道文件,通常称为有名管道或FIFO。PIPE是一种非永久性的管道通信机构,当它访问的进程全部终止时,它也将随之被撤消;它也不能用于不同族系的进程之间的通信。而FIFO是一种永久的管道通信机构,它可以弥补PIPE的不足。管道文件被创建后,使用open()将文件进行打开,然后便可对它进行读写操作,通过系统调用write()和read()来实现。通信完毕后,可使用close()将管道文件关闭。因为匿名管道的文件是内存中的特殊文件,而且是不可见的,命名管道的文件是硬盘上的设备文件,是可见的。

    消息队列(message queue)

          消息队列与命名管道类似,但少了打开和关闭管道方面的复杂性。使用消息队列并未解决我们在使用命名管道时遇到的一些问题,如管道满时的阻塞问题。

    消息队列提供了一种在两个不相关进程间传递数据的简单有效的方法。与命名管道相比:消息队列的优势在于,它独立于发送和接收进程而存在,这消除了在同步命名管道的打开和关闭时可能产生的一些困难。

    消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。而且,每个数据块被认为含有一个类型,接收进程可以独立地接收含有不同类型值的数据块。

    优点:

    1.它提供有格式字节流,有利于减少开发人员的工作量;

    2.消息具有类型,在实际应用中,可作为优先级使用。

    3.消息队列可以在几个进程间复用,而不管这几个进程是否具有亲缘关系,这一点与有名管道很相似;

           但消息队列是随内核持续的(删除方法msgctl(msqid, IPC_RMID,...)),与有名管道(随进程持续)相比,生命力更强,应用空间更大。

    其他优点:
          A. 我们可以通过发送消息来几乎完全避免命名管道的同步和阻塞问题。
          B. 我们可以用一些方法来提前查看紧急消息。

    缺点:
          A. 与管道一样,每个数据块有一个最大长度的限制。
          B. 系统中所有队列所包含的全部数据块的总长度也有一个上限。

    限制:
          由于消息缓冲机制中所使用的缓冲区为共用缓冲区,因此使用消息缓冲机制传送数据时,两通信进程必须满足如下条件。
         (1)在发送进程把写入消息的缓冲区挂入消息队列时,应禁止其他进程对消息队列的访问,否则,将引起消息队列的混乱。同理,当接收进程正从消息队列中取消息时,也应禁止其他进程对该队列的访问。
         (2)当缓冲区中无消息存在时,接收进程不能接收任何消息;而发送进程是否可以发送消息,则只由发送进程是否能够申请缓冲区决定。

    以上结论都是从其他博客看的,我感觉python的pipe与队列应该和其他的有些差异。

    1.python的Queue是基于单工Pipe的,同时Pipe也可以发送各种数据类型。

    2.pipe无法限定长度,但是Queue是可以限定长度的。同时Queue多了timeout时间问题。以及多了几个方法而已。所以python中的队列和管道其实差异并没有那么大。

    3.消息缓冲机制是Queue类以及子类所有的,SimpleQueue与pipe是没有的。

     IPC(Inter-Process Communication)

    from multiprocessing import Queue,JoinableQueue,SimpleQueue

    multiprocessing提供了三种队列,分别是Queue、SimpleQueue、JoinableQueue。

    multiprocessing.Queue既是线程安全也是进程安全的,相当于queue.Queue的多进程克隆版。和threading.Queue很像,multiprocessing.Queue支持put和get操作,底层结构是multiprocessing.Pipe。

    multiprocessing.Queue底层是基于Pipe构建的,但是数据传递时并不是直接写入Pipe,而是写入进程本地buffer,通过一个feeder线程写入底层Pipe。

    这样做是为了实现超时控制和非阻塞put/get,所以Queue提供了join_thread、cancel_join_thread、close函数来控制feeder的行为,close函数用来关闭feeder线程、join_thread用来join feeder线程,cancel_join_thread用来在控制在进程退出时,不自动join feeder线程,使用cancel_join_thread有可能导致部分数据没有被feeder写入Pipe而导致的数据丢失。

    和threading.Queue不同的是,multiprocessing.Queue默认不支持join()和task_done操作,这两个支持需要使用mp.JoinableQueue对象。

    SimpleQueue是一个简化的队列,去掉了Queue中的buffer,没有了使用Queue可能出现的问题,但是put和get方法都是阻塞的并且没有超时控制。

     

    Queue类

    初始化:

      创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 

    Queue的实例化对象具有以下方法:

      • queue.get( [ block [ ,timeout ] ] )

          返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。

          timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。

      • queue.get_nowait( )

          同q.get(False)方法。

      • queue.put(item [, block [,timeout ] ] )

          将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。

          timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

      • queue.size()

          返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。

      • queue.empty()

          如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

      • queue.full()

          如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。

    其他方法介绍:

    q.close() 
    关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
    
    q.cancel_join_thread() 
    不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。
    
    q.join_thread() 
    连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。
    其他方法,了解一下

     

    2.共享存储

    1.设置一个共享空间
    2.要互斥地访问共享空间
    3.两种方式

      • 基于数据结构(低级)
      • 基于存储区的共享

    共享内存比管道和消息队列等效率高的原因

        共享内存是进程间通信中最简单的方式之一。共享内存允许两个或更多进程访问同一块内存, 例如某些函数向不同进程返回了指向同一个物理内存区域的指针。当一个进程改变了这块地址中的内容的时候,其它进程都会察觉到这个更改。

      •     因为所有进程共享同一块内存,共享内存在各种进程间通信方式中具有最高的效率。访问共享内存区域和访问进程独有的内存区域一样快,并不需要通过系统调用或者其它需要切入内核的过程来完成。同时它也避免了对数据的各种不必要的复制。
      •     因为系统内核没有对访问共享内存进行同步,您必须提供自己的同步措施。例如,在数据被写入之前不允许进程从共享内存中读取信息、不允许两个进程同时向同一个共享内存地址写入数据等。解决这些问题的常用方法是通过使用信号量进行同步。
      •     共享内存块提供了在任意数量的进程之间进行高效双向通信的机制。每个使用者都可以读取写入数据,但是所有程序之间必须达成并遵守一定的协议,以防止诸如在读取信息之前覆写内存空间等竞争状态的出现。
      •     不幸的是,Linux无法严格保证提供对共享内存块的独占访问,甚至是在您通过使用IPC_PRIVATE创建新的共享内存块的时候也不能保证访问的独占性。 同时,多个使用共享内存块的进程之间必须协调使用同一个键值。     

    共享内存区是最快的可用IPC形式,一旦这样的内存区映射到共享它的进程的地址空间,这些进程间数据的传递就不再通过执行任何进入内核的系统调用来传递彼此的数据,节省了时间。
    共享内存和消息队列,FIFO,管道传递消息的区别:

    消息队列,FIFO,管道的消息传递方式一般为
      1:服务器得到输入
      2:通过管道,消息队列写入数据,通常需要从进程拷贝到内核。
      3:客户从内核拷贝到进程
      4:然后再从进程中拷贝到输出文件
    上述过程通常要经过4次拷贝,才能完成文件的传递。
    共享内存只需要
       1:从输入文件到共享内存区
       2:从共享内存区输出到文件

    进程间共享状态

    在进行并发编程时,通常最好尽量避免使用共享状态。使用多个进程时尤其如此。

    但是,如果你真的需要使用一些共享数据,那么 multiprocessing 提供了两种方法。

    一个是基于内存的但是是固定的数据结构,另一种是基于服务进程是由Manager() 返回的管理器对象,这个可以提供更为复杂的数据结构。

    共享内存

    可以使用 Value 或 Array 将数据存储在共享内存映射中。

    不过我看代码,本质上还是用内存+lock的方式实现的。

    Manager:

    Manager()返回一个manager对象。它控制一个服务器进程,这个进程会管理Python对象并允许其他进程通过代理的方式来操作这些对象。

    manager对象支持多种类型。

    管理器的特点:

    1.支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。简单的说,一个单独的manager可以被网络上的不同计算机的进程共享。

    2.服务器进程管理器比使用共享内存对象更灵活,它们支持二进制对象类型。

    3.缺点是比使用shared memory慢。

    Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。

    from multiprocessing import Process, Manager
    import os
    
    def f(mdict1,mdict2, mlist,i ):
        mdict1[os.getpid()]=i
        mdict2[os.getpid()]=i+1
        mlist.append("id"+str(os.getpid())+"_"+str(i))
    
    
    if __name__ == '__main__':
        with Manager() as manager:
            mdict1 = manager.dict() #产生一个代理对象d
            mdict2 = manager.dict() #产生一个代理对象d
            mlist = manager.list()
            print("type of manager.dict():",type(mdict1))
            for i  in range(5):
                p = Process(target=f, args=(mdict1,mdict2,mlist,i))
                p.start()
                p.join()
    
            print(mdict1)
            print(mdict2)
            print(mlist)

    我们会发现这数据类型其实是代理数据类型。

    代理对象:

    代理是一个 指向 其他共享对象的对象,这个对象(很可能)在另外一个进程中。共享对象也可以说是代理 指涉 的对象。多个代理对象可能指向同一个指涉对象。

    代理对象代理了指涉对象的一系列方法调用(虽然并不是指涉对象的每个方法都有必要被代理)。

        • 指向其他共享对象的对象。
        • 共享对象也可以说是代理 指涉 的对象。
        • 多个代理对象可能指向同一个指涉对象。

    通过这种方式,代理的使用方法可以和它的指涉对象一样。

    模式动机 通过引入一个新的对象(如小图片和远程代理对象)来实现对真实对象的操作或者将新的对象作为真实对象的一个替身,这种实现机制即为代理模式,通过引入代理对象来间接访问一个对象,这就是代理模式的模式动机。
    
    常见的有:分析 代购商品:顾客 ->代购网站 -> 商品 ,  软件开发:客户端 -> 代理对象-> 真实对象。客户端通过一个代理对象来实现对真是对象的访问。
    代理模式知识点补充

                   代码

    >>> from multiprocessing import Manager
    >>> manager = Manager()
    >>> l = manager.list([i*i for i in range(10)])
    >>> print(l)
    [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    >>> print(repr(l))
    <ListProxy object, typeid 'list' at 0x...>
    >>> l[4] #16

    1.对代理使用 str() 函数会返回指涉对象的字符串表示,但是 repr() 却会返回代理本身的内部字符串表示。

    2.被代理的对象很重要的一点是必须可以被序列化,这样才能允许他们在进程间传递。

    3.指涉对象可以包含 代理对象 。这允许管理器中列表、字典或者其他 代理对象 对象之间的嵌套。

    4.如果指涉对象包含了普通 list 或 dict 对象,对这些内部可变对象的修改不会通过管理器传播,因为代理无法得知被包含的值什么时候被修改了。

       但是把存放在容器代理中的值本身是会通过管理器传播的(会触发代理对象中的 __setitem__ )从而有效修改这些对象,所以可以把修改过的值重新赋值给容器代理:

     

    3.消息传递

    1.系统结构化的消息(消息头/消息体)
    2.系统提供“发送、接受原语”
    3.两种方式

      1. 直接通信方式:消息直接挂到接受方的消息队列里
      2. 间接(信箱)通信方式:消息先发送中间体(信箱)

     4.进程池

    进程池是由服务器预先创建的一组子进程,这些子进程的数目在 3~10 个之间(当然这只是典型情况)。线程池中的线程数量应该和 CPU 数量差不多。

    进程池中的所有子进程都运行着相同的代码,并具有相同的属性,比如优先级、 PGID 等。

    当有新的任务来到时,主进程将通过某种方式选择进程池中的某一个子进程来为之服务。相比于动态创建子进程,选择一个已经存在的子进程的代价显得小得多。至于主进程选择哪个子进程来为新任务服务,则有两种方法:

    • 主进程使用某种算法来主动选择子进程。最简单、最常用的算法是随机算法和 Round Robin (轮流算法)。
    • 主进程和所有子进程通过一个共享的工作队列来同步,子进程都睡眠在该工作队列上。当有新的任务到来时,主进程将任务添加到工作队列中。                                                                                                                                                                               这将唤醒正在等待任务的子进程,不过只有一个子进程将获得新任务的“接管权”,它可以从工作队列中取出任务并执行之,而其他子进程将继续睡眠在工作队列上。

      当选择好子进程后,主进程还需要使用某种通知机制来告诉目标子进程有新任务需要处理,并传递必要的数据。最简单的方式是,在父进程和子进程之间预先建立好一条管道,然后通过管道来实现所有的进程间通信。

      在父线程和子线程之间传递数据就要简单得多,因为我们可以把这些数据定义为全局,那么它们本身就是被所有线程共享的。

    线程池主要用于:

    1)需要大量的线程来完成任务,且完成任务的时间比较短。 比如WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大。但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。

    2)对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。

    3)接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。

    multiprocess.Pool模块

    def __init__(self, processes=None, initializer=None, initargs=(),
                     maxtasksperchild=None, context=None)
    • numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
    • initializer:是每个工作进程启动时要执行的可调用对象,默认为None
    • initargs:是要传给initializer的参数组
    • maxtaskperchild表示每个工作进程在退出/被其他新的进程替代前,需要完成的工作任务数,默认为None,表示工作进程存活时间与pool相同,即不会自动退出/被替换。

    简单上手:

    import multiprocessing
    import time
    
    def func(msg):
        print("start:", msg)
        time.sleep(5)
        print("end",msg)
    
    if __name__ == "__main__":
        start=time.time()
        pool = multiprocessing.Pool(processes = 5)
        for i in range(5):
            msg = "hello %d" %(i)
            pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    
        print("start~ start~ start~~~~~~~~~~~~~~~~~~~~~~")
        pool.close()
        pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
        print("Sub-process(es) done.")
        print("耗时:",time.time()-start)

    结果:

    start~ start~ start~~~~~~~~~~~~~~~~~~~~~~
    start: hello 0
    start: hello 1
    start: hello 2
    start: hello 3
    start: hello 4
    end hello 1
    end hello 0
    end hello 3
    end hello 2
    end hello 4
    Sub-process(es) done.
    耗时: 5.418787002563477
    结果:开始有序,结束无序

    换成:

    调用apply是一个串行的效果,任务会被进程一个一个的处理,直接得到结果

     pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    start: hello 0
    end hello 0
    start: hello 1
    end hello 1
    start: hello 2
    end hello 2
    start: hello 3
    end hello 3
    start: hello 4
    end hello 4
    start~ start~ start~~~~~~~~~~~~~~~~~~~~~~
    Sub-process(es) done.
    耗时: 25.363667726516724
    结果:进程完全按顺序执行

     其实map与map_async是相同的道理。

    下面是回调函数:

    callback参数

     通过函数指针调用的函数。如果你把函数的指针(地址)作为参数传递给另一个函数,当这个指针被用来调用其所指向的函数时,这就是回调函数。

           软件模块之间总是存在着一定的接口,从调用方式上,可以把他们分为三类:同步调用、回调和异步调用。

    • 同步调用是一种阻塞式调用,调用方要等待对方执行完毕才返回,它是一种单向调用;
    • 回调是一种双向调用模式,也就是说,被调用方在接口被调用时也会调用对方的接口;
    • 异步调用是一种类似消息或事件的机制,不过它的调用方向刚好相反,接口的服务在收到某种讯息或发生某种事件时,会主动通知客户方(即调用客户方的接口)。
    • 回调和异步调用的关系非常紧密,通常我们使用回调来实现异步消息的注册,通过异步调用来实现消息的通知。同步调用是三者当中最简单的,而回调又常常是异步调用的基础。
    #coding: utf-8
    import os
    import time
    import requests
    from multiprocessing import Pool
    
    
    
    def get_page(url):
        ret = requests.get(url).text
        time.sleep(3)
        return {'url': url, 'ret': ret}
    
    
    def parse_page(ret):
        print("执行parse_page()进程号:",os.getpid(),"==>",time.time())
        with open('ab.txt', 'a') as f:
            f.write('%s - %s
    ' % (ret['url'], len(ret['ret'])))
    
    
    if __name__ == '__main__':
        print("主进程号为:",os.getpid(),"==>",time.time())
        urls = [
          'https://www.baidu.com',
          'http://www.openstack.org',
          'https://www.python.org',
          'https://help.github.com/',
          'http://www.sina.com.cn/'
        ]
        p = Pool(3)
        for url in urls:
      #使用回调函数,当get_page下载完后,主进程调用parse_page自动处理get_page下载的结果,节省了parse_page的时间,
      # 该场景用于一个函数为耗时操作并且产生数据,另一个函数是非耗时操作,这样就节省了非耗时操作函数的时间
            p.apply_async(get_page, args=(url,), callback=parse_page)
        p.close()
        p.join()
        print('')
    callback参数

    join是要让子进程全部处理完之后得到结果统一处理,还有一个非常重要的原因是进程池依附于主进程,主进程结束,进程池消失。进程池的任务没有被处理程序就结束了。

     这里可以明确的看到,callback函数是由主进程执行。所以不存在同时写入报错。

    想一次分配多个任务到进程池中,可以使用map/map_async方法。

    下面是进程池的源码解读:

    pool函数返回的进程池对象中有下面一些数据结构:

    • self._inqueue 接收任务队列(SimpleQueue),用于主进程将任务发送给worker进程
    • self._outqueue 发送结果队列(SimpleQueue),用于worker进程将结果发送给主进程
    • self._taskqueue 同步的任务队列,保存线程池分配给主进程的任务
    • self._cache = {} 任务缓存
    • self._processes worker进程个数
    • self._pool = [] woker进程队列

    进程池工作时,任务的接收、分配。结果的返回,均由进程池内部的各个线程合作完成,来看看进程池内部由那些线程:

    _work_handler线程

    负责保证进程池中的worker进程在有退出的情况下,创建出新的worker进程,并添加到进程队列(pools)中,保持进程池中的worker进程数始终为processes个。_worker_handler线程回调函数为Pool._handler_workers方法,在进程池state==RUN时,循环调用_maintain_pool方法,监控是否有进程退出,并创建新的进程,append到进程池pools中,保持进程池中的worker进程数始终为processes个。

    self._worker_handler = threading.Thread(
                target=Pool._handle_workers,
                args=(self, )
    )
     
    Pool._handle_workers方法在_worker_handler线程状态为运行时(status==RUN),循环调用_maintain_pool方法:
    def _maintain_pool(self):
        if self._join_exited_workers():
            self._repopulate_pool()
     
    _join_exited_workers()监控pools队列中的进程是否有结束的,有则等待其结束,并从pools中删除,当有进程结束时,调用_repopulate_pool(),创建新的进程:
    w = self.Process(target=worker,
                    args=(self._inqueue, self._outqueue,
                          self._initializer, self._initargs,                 
                           self._maxtasksperchild)
                     )
    self._pool.append(w)
     
    w是新创建的进程,它是用来处理实际任务的进程,worker是它的回调函数:
    def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
        assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
        put = outqueue.put
        get = inqueue.get
        if hasattr(inqueue, '_writer'):
            inqueue._writer.close()
            outqueue._reader.close()
     
        if initializer is not None:
            initializer(*initargs)
     
        completed = 0
        while maxtasks is None or (maxtasks and completed < maxtasks):
            try:
                task = get()
            except (EOFError, IOError):
                debug('worker got EOFError or IOError -- exiting')
                break
     
            if task is None:
                debug('worker got sentinel -- exiting')
                break
     
            job, i, func, args, kwds = task
            try:
                result = (True, func(*args, **kwds))
            except Exception, e:
                result = (False, e)
            try:
                put((job, i, result))
            except Exception as e:
                wrapped = MaybeEncodingError(e, result[1])
                debug("Possible encoding error while sending result: %s" % (
                    wrapped))
                put((job, i, (False, wrapped)))
            completed += 1
        debug('worker exiting after %d tasks' % completed)
     
    所有worker进程都使用worker回调函数对任务进行统一的处理,从源码中可以看出:
    它的功能是从接入任务队列中(inqueue)读取出task任务,然后根据任务的函数、参数进行调用(result = (True, func(*args, **kwds),
    再将结果放入结果队列中(outqueue),如果有最大处理上限的限制maxtasks,那么当进程处理到任务数上限时退出。
    View Code

    _task_handler线程

    负责从进程池中的task_queue中,将任务取出,放入接收任务队列(Pipe)

    self._task_handler = threading.Thread(
                target=Pool._handle_tasks,
                args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
    )
    Pool._handle_tasks方法不断从task_queue中获取任务,并放入接受任务队列(in_queue),以此触发worker进程进行任务处理。当从task_queue读取到None元素时,
    表示进程池将要被终止(terminate),不再处理之后的任务请求,同时向接受任务队列和结果任务队列put None元素,通知其他线程结束。
    View Code

    _handle_results线程

    负责将处理完的任务结果,从outqueue(Pipe)中读取出来,放在任务缓存cache中

  • 相关阅读:
    PHP输出日志,json美化
    php获取项目路径
    16进制颜色,正则
    doctrine/instantiator
    cn.archive.ubuntu.com 慢的问题
    yzalis/identicon 像素头像
    Shell 判断进程是否存在
    shell 2>&1
    shell 判断是否继续
    shell
  • 原文地址:https://www.cnblogs.com/wqbin/p/10238970.html
Copyright © 2011-2022 走看看