zoukankan      html  css  js  c++  java
  • 守护进程、互斥锁、ipc机制、生产者消费者模型

    一、补充:
    from multiprocessing import Process
    import time, os

    def task():
    print('%s is running' % os.getpid())
    time.sleep(3)

    if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    p.join() # 等待进程p结束后,join函数内部会发送系统调用wait,去告诉操作系统回收掉进程p的id号

    print(p.pid) # ???此时能否看到子进程p的id号
    print('主')

    答案:可以
    分析:
    p.join()是像操作系统发送请求,告知操作系统p的id号不需要再占用了,回收就可以,
    此时在父进程内还可以看到p.pid,但此时的p.pid是一个无意义的id号,因为操作系统已经将该编号回收


    二、守护进程

    1、什么守护进程
    守护进程其实就是一个‘子进程’,守护进程会伴随主进程的代码运行完毕后而死掉

    2、为何用守护进程
    关键字就两个:
    什么时候需要开启子进程?
    当父进程需要将一个任务并发出去执行,需要将该任务放到一个子进程里

    什么时候需要将子进程设置为守护进程?
    当该子进程内的代码在父进程代码运行完毕后就没有存在的意义了,就应该
    将该子进程设置为守护进程,会在父进程代码结束后死掉


    from multiprocessing import Process
    import time,os

    def task(name):
    print('%s is running' %name)
    time.sleep(3)

    if __name__ == '__main__':
    p1=Process(target=task,args=('守护进程',))
    p2=Process(target=task,args=('正常的子进程',))

    p1.daemon = True # 一定要放到p.start()之前
    p1.start()
    p2.start()

    print('主')


    #主进程代码运行完毕,守护进程就会结束
    from multiprocessing import Process

    import time
    def foo():
    print(123)
    time.sleep(1)
    print("end123")

    def bar():
    print(456)
    time.sleep(3)
    print("end456")

    if __name__ == '__main__':
    p1=Process(target=foo)
    p2=Process(target=bar)

    p1.daemon=True
    p1.start()
    p2.start()
    print("main-------")


    一般电脑
    '''
    main-------
    456
    enn456
    '''
    电脑性能稍微快
    '''
    main-------
    123
    456
    enn456
    '''
    电脑性能特别快
    '''
    123
    main-------
    456
    end456
    '''

    永远看不到 end123



    三、互斥锁
    互斥锁:可以将要执行任务的部分代码(只涉及到修改共享数据的代码)变成串行
    牺牲了效率,但保证数据安全。

    互斥锁 vs p.join()
    1.互斥锁是局部串行
    2.p.join()是要执行任务的所有代码整体串行

    #文件db的内容为:{"count":1}
    #注意一定要用双引号,不然json无法识别

    from multiprocessing import Process,Lock
    import json
    import os
    import time
    import random

    def check():
    time.sleep(1) # 模拟网路延迟
    with open('db.txt','rt',encoding='utf-8') as f:
    dic=json.load(f)
    print('%s 查看到剩余票数 [%s]' %(os.getpid(),dic['count']))

    def get():
    with open('db.txt','rt',encoding='utf-8') as f:
    dic=json.load(f)
    time.sleep(2)
    if dic['count'] > 0:
    # 有票
    dic['count']-=1
    time.sleep(random.randint(1,3))
    with open('db.txt','wt',encoding='utf-8') as f:
    json.dump(dic,f)
    print('%s 购票成功' %os.getpid())
    else:
    print('%s 没有余票' %os.getpid())


    def task(mutex):
    # 查票
    check()

    #购票
    mutex.acquire() # 互斥锁不能连续的acquire,必须是release以后才能重新acquire
    get()
    mutex.release()


    # with mutex:
    # get()

    if __name__ == '__main__':
    mutex=Lock()
    for i in range(10):
    p=Process(target=task,args=(mutex,))
    p.start()
    # p.join()


    四、IPC机制(*****)
    IPC:进程间通信,有两种实现方式
    1、pipe:
    2、queue:pipe+锁

    q=Queue() #先进先出
    注意:
    1、队列占用的是内存空间
    2、不应该往队列中放大数据,应该只存放数据量较小的消息

    掌握的
    q.put('first')
    q.put({'k':'sencond'})
    q.put(['third',])
    # q.put(4)

    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())

    了解的
    q=Queue(3) #先进先出
    q.put('first',block=True,timeout=3)
    q.put({'k':'second'},block=True,timeout=3)
    q.put(['third',],block=True,timeout=3)
    print('===>')
    # q.put(4,block=True,timeout=3)

    print(q.get(block=True,timeout=3))
    print(q.get(block=True,timeout=3))
    print(q.get(block=True,timeout=3))
    print(q.get(block=True,timeout=3))

    block=True时才能与timeout一起用



    q=Queue(3) #先进先出
    q.put('first',block=False,)
    q.put({'k':'sencond'},block=False,)
    q.put(['third',],block=False,)
    print('===>')
    # q.put(4,block=False,) # block=False 队列满了直接抛出异常,不会阻塞

    通用方式:
    for i in range(10):
    q.put(i,block=False)

    print(q.get(block=False))
    print(q.get(block=False))
    print(q.get(block=False))
    print('get over')
    # print(q.get(block=False))



    q=Queue(3) #先进先出

    q.put_nowait('first') #相当于q.put('first',block=False,)
    q.put_nowait(2)
    q.put_nowait(3)
    # q.put_nowait(4)

    print(q.get_nowait()) #相当于q.get(block=False)
    print(q.get_nowait())
    print(q.get_nowait())
    print(q.get_nowait())



    五、生产者消费者模型(******)

    1 什么是生产者消费者模型
    生产者:比喻的是程序中负责产生数据的任务
    消费者:比喻的是程序中负责处理数据的任务

    生产者->共享的介质(队列)<-消费者

    2 为何用
    实现了生产者与消费者的解耦和,生产者可以不停地生产,消费者也可以不停地消费
    从而平衡了生产者的生产能力与消费者消费能力,提升了程序整体运行的效率

    什么时候用?
    当我们的程序中存在明显的两类任务,一类负责产生数据,另外一类负责处理数据
    此时就应该考虑使用生产者消费者模型来提升程序的效率


    简单实现:
    from multiprocessing import Queue,Process
    import time,os,random

    def producer(q):
    for i in range(10):
    res='包子%s'%i
    time.sleep(random.randint(1,3))
    # 往队列里丢
    q.put(res)
    print('33[45m%s生产了%s33[0m'%(os.getpid(),res))


    def consumer(q):
    while True:
    #从队列里取走
    res=q.get()
    if res==None:break
    time.sleep(random.randint(1,3))
    print('33[43m%s 吃了%s33[0m'%(os.getpid(),res))

    if __name__ == '__main__':
    q=Queue()
    # 生产者们
    p1=Process(target=producer,args=(q,))
    # 消费者们
    c1=Process(target=consumer,args=(q,))

    p1.start()
    c1.start()

    print('主')

    问题:程序没有结束,因为p1结束了而c1没有结束。


    完全实现:
    from multiprocessing import Queue,Process
    import time,random

    def producer(name,food,q):
    for i in range(3):
    res='%s%s'%(food,i)
    time.sleep(random.randint(1,3))
    # 往队列里丢
    q.put(res)
    print('33[45m%s 生产了 %s33[0m'%(name,res))
    # q.put(None) 不应该在这里放none,如果有消费者取走none会结束了(不应该结束,后面还有数据)

    def consumer(name,q):
    while True:
    #从队列里取走
    res=q.get()
    if res is None:break
    time.sleep(random.randint(1,3))
    print('33[41m%s 吃了 %s33[0m'%(name,res))

    if __name__ == '__main__':
    q=Queue()
    # 生产者们
    p1=Process(target=producer,args=('egon','包子',q))
    p2=Process(target=producer,args=('wxx','饺子',q))
    p3=Process(target=producer,args=('lxx','混沌',q))
    # 消费者们
    c1=Process(target=consumer,args=('alex',q))
    c2=Process(target=consumer,args=('per',q))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    # 在p1p2p3都结束后,才应该往队列里放结束信号,有几个消费者就应该放几个None
    q.put(None)
    q.put(None)

    终极版:


    from multiprocessing import JoinableQueue,Process
    import time
    import os
    import random

    def producer(name,food,q):
    for i in range(3):
    res='%s%s' %(food,i)
    time.sleep(random.randint(1,3))
    # 往队列里丢
    q.put(res)
    print('33[45m%s 生产了 %s33[0m' %(name,res))
    # q.put(None)

    def consumer(name,q):
    while True:
    #从队列里取走
    res=q.get()
    if res is None:break
    time.sleep(random.randint(1,3))
    print('33[46m%s 吃了 %s33[0m' %(name,res))
    q.task_done() #发送一次信号,证明一个数据已经被取走了

    if __name__ == '__main__':
    q=JoinableQueue()
    # 生产者们
    p1=Process(target=producer,args=('egon','包子',q,))
    p2=Process(target=producer,args=('八戒','泔水',q,))
    p3=Process(target=producer,args=('猴子','翔',q,))
    # 消费者们
    c1=Process(target=consumer,args=('周',q,))
    c2=Process(target=consumer,args=('吴',q,))
    c1.daemon=True
    c2.daemon=True 守护进程(一定要放在start上面)

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    q.join() #等待队列被取干净
    # q.join() 结束意味着:主进程的代码运行完毕--->(生产者运行完毕)+队列中的数据也被取干净了->消费者没有存在的意义
  • 相关阅读:
    HDU 2852 KiKi's K-Number (主席树)
    HDU 2089 不要62
    Light oj 1140 How Many Zeroes?
    Bless You Autocorrect!
    HDU 6201 transaction transaction transaction
    HDU1561 The more ,The better (树形背包Dp)
    CodeForces 607B zuma
    POJ 1651 Mulitiplication Puzzle
    CSUOJ 1952 合并石子
    Uva 1599 Ideal path
  • 原文地址:https://www.cnblogs.com/kingyanan/p/9334750.html
Copyright © 2011-2022 走看看