zoukankan      html  css  js  c++  java
  • 并发编程之进程进阶

    一、进程之间的通信

      进程之间的通信称为IPC

      多个进程之间有一些固定的通信内容,socket基于文件家族通信

      进程之间虽然内存不共享,但是是可以通信的

      Lock Semaphore Event 都是进行进程之间的通信的,但是通信的内容不能改变

      进程之间的通信可以通过进程队列和管道来控制

     

    二、进程队列

    队列的特点:先进先出

    使用到的类 from multiprocessing import Queue

    方法:put()      往队列里添加值(队列满了,会进入阻塞状态)

          get()      往队列里取值(队列为空,会进入阻塞状态)

          get_nowait()    往队列里取值(非阻塞事件,队列为空时会报queue.Empty异常)—— 用异常处理

          put_nowait()    往队列里添加值 (非阻塞事件,队列已满会报queue.Full异常,而且数据会丢失)

          empty()       判断队列是否为空(多线程下不准确)

        full()      判断队列是否满了(多线程下不准确)

     

    使用put_nowait(),当队列存满时,queue.Full异常

    from multiprocessing import Queue
    
    q = Queue(2)
    q.put_nowait(1)
    q.put_nowait(2)
    q.put_nowait(3)
    
    '''
    报错:
    queue.Full
    '''
    
    # 解决方案
    from multiprocessing import Queue
    import queue
    q = Queue(2)            # 限制队列的数量
    try:
        q.put_nowait(1)
        q.put_nowait(2)
        q.put_nowait(3)
    except queue.Full:
        print('Full')
    
    '''
    结果
    Full
    '''

     

    使用put_nowait()取值,当队列为空时,抛出queue.Empty异常

    from multiprocessing import Queue
    
    q = Queue()
    print(q.get_nowait())   # 使用put_nowait()取值,当队列为空时,抛出queue.Empty异常
    
    '''
    报错:
        raise Empty
    queue.Empty
    '''
    
    from multiprocessing import Queue
    import queue
    
    q = Queue()
    try:
        print(q.get_nowait())  # 处理异常
    except queue.Empty:
        print('Empty')
        
    '''
    结果:
    Empty
    '''

     

    主进程和子进程的通信

    一般情况下,一个进程负责put,另一个进程负责get

    ① 主进程put,子进程get

    ② 主进程get,子进程put

    ③ 主进程和子进程同时put和get,需要通过join方法,不然容易阻塞

    from multiprocessing import Queue
    from multiprocessing import Process
    
    def getvalue(q):
        while 1:
            ret = q.get()           # 取值
            if ret is None:break    # 值为None,证明已经取完,结束循环
            print(ret)
    
    if __name__ == '__main__':
        q = Queue()
        Process(target=getvalue,args=(q,)).start()
    
        for i in range(5):
            q.put(i)                # 添加值
        q.put(None)                 # 添加完再添加None,不是添加完了
        
    '''
    0
    1
    2
    3
    4
    '''

     

    三、生产者消费者模型(基于队列)

    某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数、线程、进程等)。产生数据的模块称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式

     

    并发
    由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区通信的,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。

     

    支持忙闲不均
    当生产者制造数据快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中,慢慢处理掉。而不至于因为消费者的性能造成数据丢失或影响生产者生产

     

    保持生产者和消费者的平衡,使用队列,若队列为空,则增加生产者数量或者减少消费者数量,若队列为满,减少生产者数量或者增加消费者数量

    from multiprocessing import Queue
    from multiprocessing import Process
    
    def consumer(q,name):       # 定义消费者
        while 1:
            fruits = q.get()
            if fruits is None:break
            print('%s 购买了 %s'% (name,fruits))
    
    def producer(q,name,fruits):
        for i in range(1,11):
            print('%s 采摘了 %s%s' % (name,fruits,i))
            q.put(fruits + str(i))
    
    if __name__ == '__main__':
        q = Queue()
        c1 = Process(target=consumer,args=(q,'小明'))
        c1.start()
    
        p1 = Process(target=producer,args=(q,'果农','西瓜'))
        p1.start()
        p1.join()                       # join阻塞当前进程,待生产者执行完毕才执行下面的代码
        q.put(None)

     

    四、生产者消费者模型(基于JoinableQueue类)

    from multiprocessing import JoinableQueue

    方法:

    put()          # 往队列添加值

    get()          # 往队列取值

    join()          # 队列阻塞直到计数为0,标记队列的数据处理完才结束阻塞

    task_done()         # 通知队列的一个数据处理完,触发计算-1

    join()和task_done() 配合使用,而且消费者要设置为守护进程

     

    方法一:

    from multiprocessing import JoinableQueue
    from multiprocessing import Process
    
    def consumer(q,name):       # 定义消费者
        while 1:
            fruits = q.get()
            if fruits is None:break
            print('%s 购买了 %s'% (name,fruits))
    
    def producer(q,name,fruits):    # 定义生产者
        for i in range(1,6):
            print('%s 采摘了 %s%s' % (name,fruits,i))
            q.put(fruits + str(i))
    
    if __name__ == '__main__':
        q = JoinableQueue()
        c1 = Process(target=consumer,args=(q,'小白'))
        c1.start()
    
        p1 = Process(target=producer,args=(q,'果农','苹果'))
        p1.start()
        p1.join()
        q.put(None)

     

    方法二:

    from multiprocessing import JoinableQueue
    from multiprocessing import Process
    
    def consumer(q,name):       # 定义消费者
        while 1:
            fruits = q.get()
            print('%s 购买了 %s'% (name,fruits))
            q.task_done()
    def producer(q,name,fruits):    # 定义生产者
        for i in range(1,4):
            print('%s 采摘了 %s%s' % (name,fruits,i))
            q.put(fruits + str(i))
    
    if __name__ == '__main__':
        q = JoinableQueue()
        c1 = Process(target=consumer,args=(q,'小白'))
        c2 = Process(target=consumer,args=(q,'小黑'))
        c1.daemon = True        # 主进程结束全部结束
        c2.daemon = True        # 主进程结束全部结束
        c1.start()
        c2.start()
    
        p1 = Process(target=producer,args=(q,'果农','苹果'))
        p2 = Process(target=producer,args=(q,'农民','西瓜'))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        q.join()            # task_done()为0时结束进程

     

    五、管道

    队列是基于管道实现的,管道是基于socket实现的

    队列中进程之间的数据安全,因为队列不会同时取同一个数据,也不会同时放一个数据,队列就是管道+锁实现的。简便的IPC机制,使得进程之间数据安全

    管道中进程之间数据不安全,存储数据复杂,基于socket + pickle实现的

     

    Pipe的对象是一个元组,元组里有两个元素

    from multiprocessing import Pipe
    p = Pipe()
    print(p)
    
    # (<multiprocessing.connection.PipeConnection object at 0x000001C6C19B8A58>, <multiprocessing.connection.PipeConnection object at 0x000001C6C19B89B0>)

     

    如果不使用的端口就要手动关闭,left.close() 或者 right.close()

    Pipe的端口管理不会随着某个进程的关闭就关闭。

    操作系统来管理进程对这些端口的使用,每关闭一个端口,计数器-1,直到所有的端口都关闭只剩下一个端口,recv就会抛出异常,需要用异常处理。

    只要关闭所有send的端口,recv会认为没有信息给它发送,recv就会抛出异常。

    端口是此操作系统管理的,要关闭不使用的端口,触发recv报错,即使通过发送信号停止recv的循环,也需要手动关闭端口,归还资源给操作系统

    第一种方式:

    from multiprocessing import Pipe
    def func(left,right):
        left.close()
        while True:
            try:
                print(right.recv())     # 只剩下一个端口时,recv会抛出异常EOFError
            except EOFError:
                break
    
    if __name__ == '__main__':
        left,right = Pipe()
        Process(target=func,args=(left,right)).start()
        right.close()
        for i in range(5):
            left.send(i)
        left.close()
    
    '''
    0
    1
    2
    3
    4
    '''

     

    第二种方式:

    from multiprocessing import Pipe,Process
    
    def func(left,right):
        left.close()
        while 1:
            ret = right.recv()
            if ret is None:
                right.close()
                break
            print(ret)
    
    if __name__ == '__main__':
        left,right = Pipe()
        Process(target=func,args=(left,right)).start()
        right.close()
        for i in range(3):
            left.send(i)
        left.send(None)         # 给子进程发送讯号结束循环
        left.close()

     

    六、进程池

    CPU的核心数是有限的,开启过多的进程不能提高效率,反而降低了效率

    进程的启动、调度、关闭都通过操作系统分配,需要一定的时间,如果任务超过5个,使用进程池,而不是开启过多的进程

     

    程序的分类:

      计算密集型:充分占用CPU,多进程可以充分利用多核,适合开启多进程,但是不适合开启过多的进程

      OS密集型:大部分时间都在阻塞队列,而不是在运行状态中,不适合开启多进程

     

    开启进程池的步骤:

    from multiprocessing import pool

    p = pool()              可以传数量,不传默认是CPU的核心数

    p.apply_async(函数,参数)    异步提交函数到一个子进程中执行

    p.close()             关闭进程池,用户不能再向这个池中提交任务

    p.join()             阻塞,必须先close()才能执行,join()直到线程池的所有任务结束才结束阻塞

    p.apply()             同步提交任务到一个子进程中执行

    不能传队列、文件句柄作为子进程的参数,可以传管道 

     

    提交任务

    ① 同步提交 apply

    子进程对应函数的返回值,一个一个顺序执行的,并没有任何的并发效果

    ② 异步提交 apply_async

    没有返回值的情况下,要想所有任务能够顺利的执行完毕,必须使用close()和join()的方法

    有返回值的i情况下,通过get()方法取值,get不能再提交任务之后立即执行,应该是先提交所有的任务再通过get获取结果

     

    map() 方法

      异步提交的简化版本,自带close和join方法,直接拿到返回值的可迭代对象,循环获取返回值

     

    一般情况下,多进程少拿到返回值,在回调函数用的多。

    使用第三方的工具使进程池在多个进程之间通信,保证了数据安全、有良好的分发数据、redis是能够共享的大字典、kafaka、memocache等

     

    多进程和进程池

    多进程:

    ①操作系统来统一调度

    ②创建的进程越多,占用的操作系统资源越多,操作系统的效率就会不断下降

     

    进程池:

    进程中的进程是有限的,如果用户量太大,进程池的限制会导致用户的响应及时性受影响

     

    apply   

    同步提交任务,所有的子进程执行完毕才执行主进程

    from multiprocessing import Pool
    import time,os
    
    def func(args):
        time.sleep(0.5)
        print('子进程号:%s,%s'%(os.getpid(),args))
    
    
    if __name__ == '__main__':
        p = Pool(os.cpu_count()+1)          # 线程数为cpu的核心数+1
        for i in range(10):
            p.apply(func,(i,))              # 同步提交任务
        print('主进程%s'% os.getpid())       # 子进程执行完毕才执行主进程

     

    同步提交任务,获取返回值

    from multiprocessing import Pool
    import os
    
    def func(args):
        return args ** 2                    # 子进程返回结果
    
    
    if __name__ == '__main__':
        p = Pool(os.cpu_count()+1)          # 线程数为cpu的核心数+1
        for i in range(10):
            ret = p.apply(func,(i,))        # 同步提交任务,获得返回值
            print('result:%s'% ret)         # 打印返回值
        print('主进程%s'% os.getpid())

     

    apply_async

    异步提交任务,使用close()和join()的方法阻塞主程序,否则主程序结束后会关闭进程池。

    from multiprocessing import Pool
    import os,time
    
    def func(args):
        time.sleep(2)
        print('子进程号:%s,%s'% (os.getpid(),args))
    
    if __name__ == '__main__':
        p = Pool(4)
        for i in range(10):
            p.apply_async(func,(i,))
        p.close()
        p.join()
        print(666)

     

    获取返回值,前提是需要提交所有任务,并且将所有返回值存储到容器中,循环容器通过get()获取值

    from multiprocessing import Pool
    def func(args):
        return args**2
    
    if __name__ == '__main__':
        p = Pool(2)
        ret_li = []                             # 储存返回值的列表
        for i in range(1,11):
            ret = p.apply_async(func,(i,))
            ret_li.append(ret)                  # 将结果添加至列表
        for ret in ret_li:                      # 所有任务提交后在循环

     

    map() 

    异步提交任务,自带close和join方法,参数:子进程函数,iterable

    from multiprocessing import Pool
    import time
    
    def func(args):
        time.sleep(2)
        print(args)
    
    if __name__ == '__main__':
        p = Pool()
        p.map(func,range(20))           # map的参数:子进程函数,iterable

     

    获取返回值,返回一返回值的列表,通过循环返回值列表获取值

    from multiprocessing import Pool
    
    def func(args):
        return args ** 2
    
    if __name__ == '__main__':
        p = Pool()
        ret = p.map(func,range(20))           # 获取一个返回值的列表
        for r in ret:                         # 循环返回值的列表即可获取值
            print(r)

     

    七、数据共享

    from multiprocessing import Manager

    将所有实现了数据共享的比较便捷的类重新又封装了一边,并且在原有的multiprocessing增加了新的机制

    count += 1 不是原子性操作,实际经历了三步:①读入count变量指向的值,② +1,③让count变量指向新的结果值

    当多个线程运行时,很可能某些线程执行第一步时读入的是同一个值,这样再加1和写入就相当于重复劳动了。

     

    with... 上下文管理,不只用于文件处理,在执行一大段语句之前自动做某个操作,并且执行一大段语句之后,自动做某个操作

    例如 with lock 包含了lock.acquire() 和 lock.release()

     

    以Manager为例,由于多进程下数据不安全,需要通过加锁来保证数据的安全

    from multiprocessing import Manager,Lock,Process
    import time
    def func(d,lock):
        with lock:                  # 自动加锁
            d['count'] -= 1
    
    if __name__ == '__main__':
        lock = Lock()
        with Manager() as m:                # 相当于 m = Manager()
            dic = m.dict({'count':300})     # 使用类封装的数据结构
            p_li = []
            for i in range(300):
                p = Process(target=func,args=(dic,lock))
                p.start()
                time.sleep(0.1)
                p_li.append(p)              # 将进程对象添加至列表
            for p in p_li:          
                p.join()                    # 阻塞,保证全部子进程完成
            print(dic)

     

    数据共享的机制:

    ① 支持数据类型非常有限

    ② list dict等都不是数据安全的,需要加锁来保证数据安全

     

    八、回调函数

    子进程执行完的返回值调至主进程执行,称为回调函数

    回调函数一般基于数据池的apply_async 异步提交任务 进行的

    from multiprocessing import Pool
    import os
    
    def func(args):
        print('子进程:%s' % os.getpid())
        return args ** 3
    
    def call_back(ret):
        print('回调函数:%s' % os.getpid())
        print(ret)
    
    if __name__ == '__main__':
        p = Pool()
        print('主进程:%s' % os.getpid())
        p.apply_async(func,args=(1,),callback=call_back)        # 设置回调函数,将子进程的结果返回给回调函数
        p.close()
        p.join()
        
    '''
    主进程:1360
    子进程:6928
    回调函数:1360
    1
    '''

    func执行完毕之后执行callback函数,func的返回值会作为callback的的参数传入,回调函数通过主进程实现

    使用场景:

    ①只用主进程统一获取结果,这样的话在主进程之间没有数据隔离

    ②多个子进程之间所做的事情是比较长时间的,而在主进程中一般情况下回调函数的执行速度都比较快

    例如:子进程中做网络访问、数据分析;主进程拿着访问结果或者数据分析的结果来做一些汇总的工作

    回调函数和同故宫ret.get()的区别

    ① 代码的简洁度不同

    ② 效率问题,子进程执行完立即调用callback

  • 相关阅读:
    .Net Standard(.Net Core)实现获取配置信息
    C# 自定义异常
    C# 表达式树Lambda扩展(四)
    C# 表达式树分页扩展(三)
    C# 表达式树遍历(二)
    C# 表达式树讲解(一)
    C#委托(delegate、Action、Func、predicate)和事件
    搭建Nuget服务器(Nuget私服)
    ORM之Dapper运用
    CentOS7 安装 redise redis-6.0.1
  • 原文地址:https://www.cnblogs.com/st-st/p/9681402.html
Copyright © 2011-2022 走看看