zoukankan      html  css  js  c++  java
  • joinablequeue模块 生产者消费者模型 Manager模块 进程池 管道

    一、生产者消费者

      主要是为解耦(借助队列来实现生产者消费者模型)

      import queue  # 不能进行多进程之间的数据传输

      (1)from multiprocessing import Queue    借助Queue解决生产者消费者模型,队列是安全的。

        q = Queue(num)

        num :为队列的最大长度

        q.get() # 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待

        q.put() # 阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待

        q.get_nowait() # 不阻塞,如果有数据直接获取,没有数据就报错

        q.put_nowait() # 不阻塞,如果能往队列中放数据直接放,不可以就报错

        

      (2)from multiprocessing import JoinableQueue  # 可连接的队列

        JoinableQueue 是继承Queue  ,所以可以使用Queue中的方法

        并且JoinableQueue 又多了两个方法

        q.join() # 用于生产者。等待q.task_done的返回结果,通过返回结果,生产者就能获得消费者当前消费了多少个数据。

        q.task_done() # 用于消费者,是指每消费队列中的一个数据,就给join返回一个标识。

     1 from multiprocessing import Queue,Process,Pool,JoinableQueue
     2 
     3 
     4 def consumer(q,lis):
     5     while 1:
     6         for i in lis:
     7             print(i + '拿走了' + q.get())
     8             q.task_done() # get() 一次就会给生产者的join返回一次数据
     9 
    10 
    11 def producer(q,name1):
    12     for i in range(1,9):
    13         q.put(name1 + '第%s号剑'% i)
    14     q.join() # 记录了生产者往队列中添加了8个数据,此时会阻塞,等待消费返回8次数据,后生产者进程才会结束
    15 
    16 
    17 if __name__ == '__main__':
    18     q = JoinableQueue() # 实例化一个队列
    19     p = Process(target=consumer,args=(q,['盖聂','卫庄','高渐离','胜七','掩日']))
    20     p1 = Process(target=producer,args=(q,'越王八剑'))
    21     p.daemon = True # 注意是把消费者设置为守护进程,会随着主进程的结束而结束。
    22     p.start()
    23     p1.start()
    24     p1.join() # 主进程会等待生产者进程结束后才结束,而生产者进程又会等待消费者进程消费完以后才结束。

    二、进程之间的共享内存

      from multiprocessing import Manager,Value

      m = Manager() 

      num = m.dict({键 :值})

      num = m.list([1,2,3])

    from multiprocessing import Process,Manager,Value
    
    def func(num):
        for i in num:
            print(i - 1) # 结果为:0,1,2
    
    if __name__ == '__main__':
        m = Manager() # 用来进程之间共享数据的
        num = m.list([1,2,3])
        p = Process(target=func,args=(num,))
        p.start()
        p.join() # 等待func子进程执行完毕后结束
    
    
    
    #################Value################
    
    from multiprocessing import Process,Manager,Value
    
    def func1(num):
        print(num)
        num.value += 1 # 和Manager用法不一样
        print(num.value)
    
    if __name__ == '__main__':
        num = Value('i',123) # Manager里面不需要传参数
        p = Process(target=func1,args=(num,))
        p.start()
        p.join()

    三、进程池

      进程池:一个池子,里边有固定数量的进程。这些进程一直处于待命状态,一旦有任务来,马上就去处理。

      进程池还会帮程序员去管理池中的进程。

      from multiprocessing import Pool

      p = Pool(os.cpu_count() + 1)

      进程池有三个方法:

        map(func,iterable)     有返回值

        iterable:可迭代对象,是把可迭代对象中的每个元素一次传给任务函数当参数 

    from multiprocessing import Pool
    
    
    def func(num):
        num += 1
        print(num)
        return num # 返回给map方法
    
    
    if __name__ == '__main__':
        p = Pool()
        res = p.map(func,[i for i in range(10)]) # 参数为目标对象和可迭代对象
        p.close()
        p.join() # 等待子进程结束
        print('主进程',res) # res是一个列表

        apply(func,args=()) :apply的实现是进程之间是同步的,池中的进程一个一个的去执行。

        func :进程池中的进程执行的任务函数。

        args :可迭代对象型的参数,是传给任务函数的参数。

        同步处理任务时,不需要close和join

        同步处理任务时,进程池中的所有进程是普通进程(主进程需要等待其子进程执行结束)

    from multiprocessing import Pool
    
    def func(num):
        num += 1
        return num
    
    
    if __name__ == '__main__':
        p = Pool(5) # 实例化5个进程
        for i in range(100):
            res = p.apply(func,args=(i,)) # 这里传的参数是元祖,这里是同步执行
            print(res)

        apply_async(func,args=(),callback=None) :进城之间是异步的,

        func :进程池中的进程执行的任务函数。

        args :可迭代对象型的参数,是传给任务函数的参数

    from multiprocessing import Pool
    
    
    def func(num):
        num += 1
        return num
    
    
    if __name__ == '__main__':
        p = Pool(5) # 实例化5个进程
        lis = []
        for i in range(100):
            res = p.apply_async(func,args=(i,)) # 异步执行,5个进程同时去调用func
            lis.append(res)
            print(res) # 打印结果为 <multiprocessing.pool.ApplyResult object at 0x0347F3D0>
        p.close() # Pool中用apply_async异步执行时必须关闭进程
        p.join() # 因为是异步执行所以需要等待子进程结束
        print(lis) # 100个<multiprocessing.pool.ApplyResult object at 0x0347F3D0> 这种存放在列表中
        [print(i.get()) for i in lis] # 输出100个数字[1......100]

        callback :回调函数,就是说每当进程池中有进程处理完任务,返回的结果可以交给回调函数,由回调函数进行进一步的处理,回调函数只有异步才有,同步是没有的

        异步处理任务时,进程池中的所有进程是守护进程(主进程代码执行完毕守护进程就结束)

        异步处理任务时,必须要加上close和join

        回调函数的使用:

          进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的处理操作

          回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数。

    from multiprocessing import Pool
    import requests
    import os
    
    
    def func(ulr):
        res = requests.get(ulr)
        print('func进程的pid:%s' % os.getpid(),'父进程的pid:%s' % os.getppid())
        if res.status_code == 200:
            return ulr,res.text
    
    
    def cal_back(sta): # func中返回的值被自动调用,并当成形参传进来
        ulr,text = sta
        print('callback回调函数的pid:%s'% os.getpid(),'父进程的pid:%s' % os.getppid())
        # 回调函数的pid和父进程的pid一样
    
    if __name__ == '__main__':
        p = Pool(5)
        lis = ['https://www.baidu.com',
             'http://www.jd.com',
             'http://www.taobao.com',
             'http://www.mi.com',
             'http://www.cnblogs.com',
             'https://www.bilibili.com',
             ]
        print('父进程的pid:%s' % os.getpid())
        for i in lis:
            p.apply_async(func,(i,),callback=cal_back)
        # 异步的执行每一个进程,这里的传参和Process不同,这里必须这样写callback=cal_back
        # 异步执行程序func,在每个任务结束后,在func中return回一个结果,这个结果会自动的被callback函数调用,并当成形参来接收。
        p.close() # 进程间异步必须加上close()
        p.join()  # 等待子进程的结束

    四、管道机制

      from multiprocessing import Pipe

      con1,con2 = Pipe()

      管道是不安全的

      管道是用于多进程之间通信的一种方式。

      如果在单进程中使用管道,con1发数据,那么就用con2来收数据

                  con2发数据,那么就用con1来收数据

      如果在多进程中使用管道,那么就必须是父进程使用con1收,子进程就必须使用con2发

                        父进程使用con1发,子进程就必须使用con2收

                        父进程使用con2收,子进程就必须使用con1发

                        父进程使用con2发,子进程就必须使用con1收

      在管道中有一个著名的错误叫做EOFError。是指,父进程如果关闭了发送端,子进程还继续收数据,那么就会引发EOFError。 

    # 单进程中管道的应用
    from multiprocessing import Pipe
    
    con1,con2 = Pipe() # 管道机制
    
    con1.send('123') # con1发送,需要con2来接收   是固定
    print(con2.recv())
    con2.send('456') # con2发送,需要con1来接收   是固定
    print(con1.recv())
    # 多进程中管道的应用
    from multiprocessing import Process,Pipe
    
    
    def func(con):
        con1,con2 = con
        con1.close() # 因为子进程只用con2与父进程通信,所以关闭了
        while 1:
            try:
                print(con2.recv()) # 接收父进程con1发来的数据
            except EOFError: # 如果父进程的con1发完数据,并关闭管道,子进程的con2继续接收数据,就会报错。
                con2.close() # 当接到报错,此时数据已经接收完毕,关闭con2管道。
                break # 退出循环
    
    
    if __name__ == '__main__':
        con1,con2 = Pipe()
        p = Process(target=func,args=(con1,con2))
        p.start()
        con2.close() # 因为父进程是用con1来发数据的,con2提前关闭。
        for i in range(10): # 生产数据
            con1.send('郭%s' % i) # 给子进程的con2发送数据
        con1.close() # 生产完数据,关闭父进程的con1管道
  • 相关阅读:
    Single Number II
    Pascal's Triangle
    Remove Duplicates from Sorted Array
    Populating Next Right Pointers in Each Node
    Minimum Depth of Binary Tree
    Unique Paths
    Sort Colors
    Swap Nodes in Pairs
    Merge Two Sorted Lists
    Climbing Stairs
  • 原文地址:https://www.cnblogs.com/wjs521/p/9520988.html
Copyright © 2011-2022 走看看