zoukankan      html  css  js  c++  java
  • day 40(队列+ 管道+ 数据共享+进程池+回调函数)

    队列:

    from multiprocessing import Process,Queue
    q = Queue(10) # 创建一个只能放10个值的队列
    try:
    q.get_nowait() # web qq 长轮询
    except:
    print('queue.Empty')

    q.get()
    for i in range(10):
    q.put(i)
    print(q.qsize())
    print(q.full())
    q.put(1111)
    print('*'*10)
    print(q.empty())
    print(q.full())

    队列可以在创建的时候制定一个容量
    如果在程序运行的过程中,队列已经有了足够的数据,再put就会发生阻塞
    如果队列为空,在get就会发生阻塞
    内存 —— 制定容量
    put
    get
    qsize 不准
    full 不准
    empty 不准
    import time
    from multiprocessing import Process,Queue
    def wahaha(q):
    print(q.get())
    q.put(2)

    if __name__ == '__main__':
    q = Queue()
    p = Process(target=wahaha,args=[q,])
    p.start()
    q.put(1)
    time.sleep(0.1)
    print(q.get())
    在进程中使用队列可以完成双向通信

    import time
    import random
    from multiprocessing import Process,Queue
    生产者消费者模型
    解决数据供需不平衡的情况
    队列是进程安全的 内置了锁来保证队列中的每一个数据都不会被多个进程重复取
    def consumer(q,name):
    while True:
    food = q.get()
    if food == 'done':break
    time.sleep(random.random())
    print('%s吃了%s'%(name,food))

    def producer(q,name,food):
    for i in range(10):
    time.sleep(random.random())
    print('%s生产了%s%s'%(name,food,i))
    q.put('%s%s'%(food,i))

    if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer,args=[q,'Egon','泔水'])
    p2 = Process(target=producer,args=[q,'Yuan','骨头鱼刺'])
    p1.start()
    p2.start()
    Process(target=consumer,args=[q,'alex']).start()
    Process(target=consumer,args=[q,'wusir']).start()
    p1.join()
    p2.join()
    q.put('done')
    q.put('done')

    管道:

    # 管道
    # from multiprocessing import Pipe
    # left,right = Pipe()
    # left.send('1234')
    # print(right.recv())
    # left.send('1234')
    # print(right.recv())

    from multiprocessing import Process, Pipe

    def f(parent_conn,child_conn):
    parent_conn.close() #不写close将不会引发EOFError
    while True:
    try:
    print(child_conn.recv())
    except EOFError:
    child_conn.close()
    break

    if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(parent_conn,child_conn,))
    p.start()
    child_conn.close()
    parent_conn.send('hello')
    parent_conn.send('hello')
    parent_conn.send('hello')
    parent_conn.close()
    p.join()

    数据共享:

    from multiprocessing import Manager,Process,Lock
    def func(dic,lock):
    # lock.acquire()
    # dic['count'] = dic['count']-1
    # lock.release()
    with lock: # 上下文管理 :必须有一个开始动作 和 一个结束动作的时候
    dic['count'] = dic['count'] - 1

    if __name__ == '__main__':
    m = Manager()
    lock = Lock()
    dic = m.dict({'count':100})
    p_lst = []
    for i in range(100):
    p = Process(target=func,args=[dic,lock])
    p_lst.append(p)
    p.start()
    for p in p_lst:p.join()
    print(dic)

    # 同一台机器上 : Queue
    # 在不同台机器上 :消息中间件


    进程池:


    import time
    import random
    from multiprocessing import Pool
    def func(i):
    print('func%s' % i)
    time.sleep(random.randint(1,3))
    return i**2
    if __name__ == '__main__':
    p = Pool(5)
    ret_l = []
    for i in range(15):
    # p.apply(func=func,args=(i,)) # 同步调用
    ret = p.apply_async(func=func,args=(i,))# 异步调用
    ret_l.append(ret)
    for ret in ret_l : print(ret.get())
    # 主进程和所有的子进程异步了


    回调函数:


    import os
    from urllib.request import urlopen
    from multiprocessing import Pool
    def get_url(url):
    print('-->',url,os.getpid())
    ret = urlopen(url)
    content = ret.read()
    return url

    def call(url):
    # 分析
    print(url,os.getpid())

    if __name__ == '__main__':
    print(os.getpid())
    l = [
    'http://www.baidu.com', # 5
    'http://www.sina.com',
    'http://www.sohu.com',
    'http://www.sogou.com',
    'http://www.qq.com',
    'http://www.bilibili.com', #0.1
    ]
    p = Pool(5) # count(cpu)+1
    ret_l = []
    for url in l:
    ret = p.apply_async(func = get_url,args=[url,],callback=call)
    ret_l.append(ret)
    for ret in ret_l : ret.get()


    # 回调函数
    # 在进程池中,起了一个任务,这个任务对应的函数在执行完毕之后
    # 的返回值会自动作为参数返回给回调函数
    # 回调函数就根据返回值再进行相应的处理

    # 回调函数 是在主进程执行的















































  • 相关阅读:
    Permutation Sequence
    Sqrt(x)
    Search in Rotated Sorted Array ||
    [STL]list的erase正确与错误用法
    一个支持Git应用编程开发的第三方库(API)
    VC++生成full dump文件
    Maven构建C++工程的插件-NAR
    VC++ Watch窗口查看指针指向的数组
    Android SDK更新失败的解决方法
    ADT20新建项目Android Support library not installed问题
  • 原文地址:https://www.cnblogs.com/zsdbk/p/9038909.html
Copyright © 2011-2022 走看看