zoukankan      html  css  js  c++  java
  • 0514 队列 管道 进程池 回调函数

    进程间通信——队列和管道(multiprocess.Queue、multiprocess.Pipe)

    进程间通信

    IPC(Inter-Process Communication)

    队列:创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

    Queue模块([maxsize]) 创建共享的进程队列。

    参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。 底层队列使用管道和锁定实现

    q.get([block[,timeout]]) 
    返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False
    ,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为
    可用,将引发Queue.Empty异常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item[,block [,timeout]]) 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发
    Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

    不可靠的方法
    q.qsize() 
    返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引
    发NotImplementedError异常。 q.empty() 如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。 q.full() 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
    进程间的通信
    import
    time from multiprocessing import Process,Queue def fun(q): print(q.get()) q.put(2) if __name__ == '__main__': q=Queue() p = Process(target=fun,args=[q,]).start() q.put(1) time.sleep(2)#不睡的话会卡在这里,睡之后先执行子进程 print(q.get())

    from multiprocessing import Queue
    q = Queue(3)
    q.put(1)
    q.put(1)如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列
    q.put(1)如果队列中的数据一直不被取走,程序就会永远停在这里。
    try: 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
    q.put_nowait(1)因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
    except:
    print('123')# 会打印123
    get_nowait()方法与q.put_nowait(1)一样
    消费者模型
    import
    time import random from multiprocessing import Process,Queue 生产者消费者模型 解决数据供需不平衡的情况 队列是进程安全的 内置了锁来保证队列中的每一个数据都不会被多个进程重复取 def consumer(q,name): while True: food = q.get() if food == 'done':break time.sleep(random.random()) print('%s吃了%s'%(name,food)) def producer(q,name,food): for i in range(10): time.sleep(random.random()) print('%s生产了%s%s'%(name,food,i)) q.put('%s%s'%(food,i)) if __name__ == '__main__': q = Queue() p1 = Process(target=producer,args=[q,'Egon','泔水']) p2 = Process(target=producer,args=[q,'Yuan','骨头鱼刺']) p1.start() p2.start() Process(target=consumer,args=[q,'alex']).start() Process(target=consumer,args=[q,'wusir']).start() p1.join() p2.join() q.put('done') q.put('done')

    进阶版 JoinableQueue模块
    from multiprocessing import Process,JoinableQueue
    def consumer(q,name,):
    while True:
    food = q.get() 消费食物
    print('%s吃了%s'%(name,food))
    q.task_done() consumer每完成一个任务就会给q发送一个taskdone

    def producer(q,name,food):
    for i in range(10):
    print('%s生产了%s%s'%(name,food,i))
    q.put('%s%s'%(food,i)) 生产食物
    q.join() 等到所有的数据都被taskdone才结束

    if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer,args=[q,'egon','泔水'])
    p2 = Process(target=producer,args=[q,'alex','鱼刺'])
    p1.start()
    p2.start()
    c1 = Process(target=consumer,args=[q,'taibai',])
    c2 = Process(target=consumer,args=[q,'wusir',])
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法:
    
    q.task_done() 
    使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。
    
    q.join() 
    生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。 
    下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。
    
    
    producer
    put
    生产完全部的数据就没有其他工作了
    在生产数据方 : 允许执行q.join
    join会发起一个阻塞,直到所有当前队列中的数据都被消费
    consumer
    get 获取到数据
    处理数据
    q.task_done() 告诉q,刚刚从q获取的数据已经处理完了

    consumer每完成一个任务就会给q发送一个taskdone
    producer在所有的数据都生产完之后会执行q.join()
    producer会等待consumer消费完数据才结束
    主进程中对producer进程进行join
    主进程中的代码会等待producer执行完才结束
    producer结束就意味着主进程代码的结束
    consumer作为守护进程结束

    consumer中queue中的所有数据被消费
    producer join结束
    主进程的代码结束
    consumer结束
    主进程结束

    管道  Pipe模块

    from multiprocessing import Pipe
    A,B = Pipe()
    A.send('1234')
    print(B.recv())
    A.send('1234')
    print(B.recv())

    应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭
    这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能
    在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成
    EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。
    from multiprocessing import Process, Pipe

    def f(parent_conn,child_conn):
    parent_conn.close() #不写close将不会引发EOFError
    while True:
    try:
    print(child_conn.recv())
    except EOFError:
    child_conn.close()
    break

    if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(parent_conn,child_conn,))
    p.start()
    child_conn.close()
    parent_conn.send('hello')
    parent_conn.send('hello')
    parent_conn.send('hello')
    parent_conn.close()
    p.join()
    数据共享   Manager模块

    from multiprocessing import Manager,Process,Lock
    def func(dic,lock):
    with lock: 上下文管理 :必须有一个开始动作 和 一个结束动作的时候
    dic['count'] = dic['count'] - 1

    if __name__ == '__main__':
    m = Manager()
    lock = Lock()
    dic = m.dict({'count':100})
    p_lst = []
    for i in range(100):
    p = Process(target=func,args=[dic,lock])
    p_lst.append(p)
    p.start()
    for p in p_lst:p.join()
    print(dic)

    同一台机器上 : Queue
    在不同台机器上 :消息中间件

    进程池和multiprocess.Pool模块

    定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果

    Pool([numprocess  [,initializer [, initargs]]]):创建进程池

    参数介绍:

      1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值

      2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None

      3 initargs:是要传给initializer的参数组

    主要方法:

    1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    2 '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同
    线程调用p.apply()函数或者使用p.apply_async()''' 3 4 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 5 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递
    给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。''' 6 7 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成 8 9 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
    import time
    import random
    from multiprocessing import Pool
    def func(i):
    print('func%s' % i)
    time.sleep(random.randint(1,3))
    return i**2 # 平方
    if __name__ == '__main__':
    p = Pool(5)
    ret_l = []
    for i in range(15):
    # p.apply(func=func,args=(i,)) # 同步调用
    ret = p.apply_async(func=func,args=(i,))# 异步调用 返回函数的值
    ret_l.append(ret)
    for ret in ret_l : print(ret.get())
    # 主进程和所有的子进程异步了

    其他方法:

    1 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
    2 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程
    操作中引发了异常,它将在调用此方法时再次被引发。 3 obj.ready():如果调用完成,返回True 4 obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常 5 obj.wait([timeout]):等待结果变为可用。 6 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数

    回调函数(爬虫)

    需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数
    去处理该结果,该函数即回调函数我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执
    行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
    import os
    from urllib.request import urlopen
    from multiprocessing import Pool
    def get_url(url):
    print('-->',url,os.getpid())
    ret = urlopen(url)
    # content = ret.read()
    return url

    def call(url):
    # 分析
    print(url,os.getpid())

    if __name__ == '__main__':
    print(os.getpid())
    l = [
    'http://www.baidu.com', # 5
    'http://www.sina.com',
    'http://www.sohu.com',
    'http://www.sogou.com',
    'http://www.qq.com',
    'http://www.bilibili.com', #0.1
    ]
    p = Pool(5) # count(cpu)+1
    ret_l = []
    for url in l:
    ret = p.apply_async(func = get_url,args=[url,],callback=call)
    ret_l.append(ret)
    for ret in ret_l : ret.get()


    # 回调函数
    # 在进程池中,起了一个任务,这个任务对应的函数在执行完毕之后
    # 的返回值会自动作为参数返回给回调函数
    # 回调函数就根据返回值再进行相应的处理

    # 回调函数 是在主进程执行的
  • 相关阅读:
    织梦DEDEcms首页调用文档整篇内容
    dedecms专题列表页不显示标题的解决办法
    怎么让织梦文章按照权重排序
    Codeforces274B
    HDU5693
    HDU2476
    POJ3613
    「LibreOJ NOIP Round #1」旅游路线
    Educational Codeforces Round 48
    组合博弈学习笔记
  • 原文地址:https://www.cnblogs.com/Mr-Murray/p/9035273.html
Copyright © 2011-2022 走看看