zoukankan      html  css  js  c++  java
  • 守护模式,互斥锁,IPC通讯,生产者消费者模型

    '''
    1,什么是生产者消费者模型
    生产者:比喻的是程序中负责产生数据的任务
    消费者:比喻的是程序中负责处理数据的任务

    生产者->共享的介质(队列)<-消费者

    2,为何用
    实现了生产者与消费者的解耦合,生产者可以不停地生产,消费者也可以不停地消费
    从而平衡了消费者地生产能力与消费能力,提升了程序整体运行地效率

    什么时候用?
    当我们地程序中存在明显的两类任务,一类负责产生数据,另一类负责处理数据
    此时我们就应该考虑使用生产者消费者模型来提升程序地效率

    '''
    from multiprocessing import Queue ,Process
    import time,os, random
    def producer(q):
    for i in range(10):
    res='包子%s'%i
    time.sleep(random.randint(1,3))
    q.put(res)
    q.put(None)


    def consumer(q):
    while True:
    res=q.get()
    if res is None:break
    time.sleep(random.randint(1,3))
    print('%s 吃了%s'%(os.getpid(),res))

    if __name__ == '__main__':
    q=Queue()
    p1=Process(target=producer,args=(q,))
    c1=Process(target=consumer,args=(q,))

    p1.start()
    c1.start()
    print('主')

    ----------------------割割更健康-----------------

    from multiprocessing import Queue ,Process
    import time,os,random

    def producer(name,food,q):
    for i in range(3):
    res='%s%s'%(food,i)
    time.sleep(random.randint(1,3))
    #往队列里面丢
    q.put(res)
    print('33[45m%s 生产了 %s33[0m' %(name,res))

    def consumer(name,q):
    while True:
    #往队列里取走
    res=q.get()
    if res is None:break
    time.sleep(random.randint(1,3))
    print('33[46m%s 吃了 %s33[0m' %(name,res))
    q.task_done()
    if __name__ == '__main__':
    q=Queue()
    p1=Process(target=producer,args=('egon','包子',q,))
    p2=Process(target=producer,args=('杨军','水',q,))
    p3=Process(target=producer,args=('侯老师','线',q,))

    # 消费者
    c1=Process(target=consumer,args=('alex',q))
    c2=Process(target=consumer,args=('wupeiqidsb',q))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    #再p1/p2/p3都结束后,才应该往队列里放结束信号,有几个消费者就应该放几个None
    q.put(None) #额,给消费者发结束信号
    q.put(None)

    print('主')


    # ---------------------------------终极解决方案---------------------

    from multiprocessing import JoinableQueue,Process
    import time,os,random

    def producer(name,food,q):
    for i in range(3):
    res='%s %s '%(food,i)
    time.sleep(random.randint(1,3))
    q.put(res)
    print('%s 生产了%s '%(name,res))

    def consumer(name,q):
    while True:
    res=q.get()

    if res is None:break
    time.sleep(random.randint(1,3))
    print('%s 吃了%s'%(name,res))
    #向q.join()发送一次信号,证明一个数据已经被取走了
    q.task_done()

    if __name__ == '__main__':
    q=JoinableQueue()
    p1=Process(target=producer,args=('egon','包子',q,))
    p2=Process(target=producer,args=('严峻','泔水',q,))
    p3=Process(target=producer,args=('侯老师','先',q,))

    c1=Process(target=consumer,args=('alex',q,))
    c2=Process(target=consumer,args=('wpx',q,))
    c1.daemon=True
    c2.daemon=True

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    q.join()#等待队列被取赶紧
    #q.join()
    #结束意味着主进程代码运行完毕--->>(生产者运行完毕)+队列中地数据也被取赶紧了->消费者没有存在的意义
    print('主')

    >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>ICP通讯>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

    进程之间通信必须找到一种介质,该介质必须满足
    1、是所有进程共享的
    2、必须是内存空间
    附加:帮我们自动处理好锁的问题
     
    a、 from multiprocessing import Manager(共享内存,但要自己解决锁的问题)
    b、 IPC中的队列(Queue) 共享,内存,自动处理锁的问题(最常用)
    c、  IPC中的管道(Pipe),共享,内存,需自己解决锁的问题
     
    a、 用队列Queue
    1)共享的空间
    2)是内存空间
    3)自动帮我们处理好锁定问题
     
    from multiprocessing import Queue
    q=Queue(3)  #设置队列中maxsize个数为三
    q.put('first')
    q.put({'second':None})
    q.put('三')
    # q.put(4)   #阻塞。不报错,程序卡在原地等待队列中清出一个值。默认blok=True
    print(q.get())
    print(q.get())
    print(q.get())
     
    强调:
    1、队列用来存成进程之间沟通的消息,数据量不应该过大
    2、maxsize的值超过的内存限制就变得毫无意义

    >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>



    >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>割割很健康>>>>>>>>>>>>>>>

    互斥锁vs join: 
    大前提:二者的原理都是一样,都是将并发变成串行,从而保证有序(在多个程序共享一个资源时,为保证有序不乱,需将并发变成串行)
    区别一:join是按照人为指定的顺序执行,而互斥锁是所以进程平等地竞争,谁先抢到谁执行
    区别二:互斥锁可以让一部分代码(修改共享数据的代码)串行,而join只能将代码整体串行(详见抢票系统)
     
    from multiprocessing import Process,Lock
    import time,random
     
    mutex=Lock()
     
    def task1(lock):
        lock.acquire() 
        print('task1:名字是egon')
        time.sleep(random.randint(1,3))
        print('task1:性别是male')
        time.sleep(random.randint(1,3))
        print('task1:年龄是18')
        lock.release()
     
    def task2(lock):
        lock.acquire()
        print('task2:名字是alex')
        time.sleep(random.randint(1,3))
        print('task2:性别是male')
        time.sleep(random.randint(1,3))
        print('task2:年龄是78')
        lock.release()
     
    def task3(lock):
        lock.acquire()
        print('task3:名字是lxx')
        time.sleep(random.randint(1,3))
        print('task3:性别是female')
        time.sleep(random.randint(1,3))
        print('task3:年龄是30')
        lock.release()
     
    if __name__ == '__main__':
        p1=Process(target=task1,args=(mutex,))
        p2=Process(target=task2,args=(mutex,))
        p3=Process(target=task3,args=(mutex,))
     
        p1.start()
        p2.start()
        p3.start()
     
    一、抢票系统
    import json
    import time
    import random
    import os
    from multiprocessing import Process,Lock
     
    mutex=Lock()
     
    def search():
        time.sleep(random.randint(1,3))
        with open('db.json','r',encoding='utf-8') as f:
            dic=json.load(f)
            print('%s 剩余票数:%s' %(os.getpid(),dic['count']))
     
    def get():
        with open('db.json','r',encoding='utf-8') as f:
            dic=json.load(f)
        if dic['count'] > 0:
            dic['count']-=1
            time.sleep(random.randint(1,3))
            with open('db.json','w',encoding='utf-8') as f:
                json.dump(dic,f)
            print('%s 购票成功' %os.getpid())
     
    def task(lock):
        search()
        lock.acquire()
        get()
        lock.release()
     
    if __name__ == '__main__':
        for i in range(10):
            p=Process(target=task,args=(mutex,))
            p.start()
    
    
    一、守护进程
    
    
    from multiprocessing import Process
    
    
    import time
    
    
     
    
    
    def task(name):
    
    
        print('%s is running' % name)
    
    
        time.sleep(3)
    
    
     
    
    
    if __name__ == '__main__':
    
    
        obj = Process(target=task, args=('egon',))
    
    
        obj.daemon=True    #将obj变成守护进程,主进程执行完毕后子进程跟着结束
    
    
        obj.start()  # 发送信号给操作系统
    
    
    print('主')
     
     


  • 相关阅读:
    mupdf-将pdf转png图片
    Teamcenter二次开发-客户端JAVA开发挂菜单HelloWorld例子
    SqlServer-导入excel表格数据
    PDFlib创建pdf文档
    计算机网络基础知识总结-转载
    到底什么是架构?回答你关于软件架构的所有疑惑。
    SQL语句-获得表的字段数量
    vue3.0---watch使用方法
    小程序picker自定义三级联动
    无法加载文件 D:Program Files odejs ode_globalvue.ps1,因为在此系统上禁止运行脚本。有关详细信息
  • 原文地址:https://www.cnblogs.com/lijieshi/p/9301384.html
Copyright © 2011-2022 走看看