zoukankan      html  css  js  c++  java
  • Python之旅.第九章.并发编程

    一、上节课复习

    if __name__ == '__main__': 放在最后面

    obj.join(1)  #只等1

     

    二、守护进程

    from multiprocessing import Process

    import time

     

    def task(name):

        print('%s is running' % name)

        time.sleep(3)

     

    if __name__ == '__main__':

        obj = Process(target=task, args=('egon',))

        obj.daemon=True    #obj变成守护进程,主进程执行完毕后子进程跟着结束

        obj.start()  # 发送信号给操作系统

    print('')

     

    三、互斥锁

    强调:必须是lock.acquire()一次,然后 lock.release()释放一次,才能继续lock.acquire(),不能连续的lock.acquire()。否者程序停在原地。

     

    互斥锁vs join 

    大前提:二者的原理都是一样,都是将并发变成串行,从而保证有序(在多个程序共享一个资源时,为保证有序不乱,需将并发变成串行)

    区别一:join是按照人为指定的顺序执行,而互斥锁是所以进程平等地竞争,谁先抢到谁执行

    区别二:互斥锁可以让一部分代码(修改共享数据的代码)串行,而join只能将代码整体串行(详见抢票系统)

     

    from multiprocessing import Process,Lock

    import time,random

     

    mutex=Lock()

     

    def task1(lock):

        lock.acquire() 

        print('task1:名字是egon')

        time.sleep(random.randint(1,3))

        print('task1:性别是male')

        time.sleep(random.randint(1,3))

        print('task1:年龄是18')

        lock.release()

     

    def task2(lock):

        lock.acquire()

        print('task2:名字是alex')

        time.sleep(random.randint(1,3))

        print('task2:性别是male')

        time.sleep(random.randint(1,3))

        print('task2:年龄是78')

        lock.release()

     

    def task3(lock):

        lock.acquire()

        print('task3:名字是lxx')

        time.sleep(random.randint(1,3))

        print('task3:性别是female')

        time.sleep(random.randint(1,3))

        print('task3:年龄是30')

        lock.release()

     

    if __name__ == '__main__':

        p1=Process(target=task1,args=(mutex,))

        p2=Process(target=task2,args=(mutex,))

        p3=Process(target=task3,args=(mutex,))

     

        p1.start()

        p2.start()

        p3.start()

     

    四、抢票系统

    import json

    import time

    import random

    import os

    from multiprocessing import Process,Lock

     

    mutex=Lock()

     

    def search():

        time.sleep(random.randint(1,3))

        with open('db.json','r',encoding='utf-8') as f:

            dic=json.load(f)

            print('%s 剩余票数:%s' %(os.getpid(),dic['count']))

     

    def get():

        with open('db.json','r',encoding='utf-8') as f:

            dic=json.load(f)

        if dic['count'] > 0:

            dic['count']-=1

            time.sleep(random.randint(1,3))

            with open('db.json','w',encoding='utf-8') as f:

                json.dump(dic,f)

            print('%s 购票成功' %os.getpid())

     

    def task(lock):

        search()

        lock.acquire()

        get()

        lock.release()

     

    if __name__ == '__main__':

        for i in range(10):

            p=Process(target=task,args=(mutex,))

            p.start()

     

    五、IPC通信机制

    进程之间通信必须找到一种介质,该介质必须满足

    1、是所有进程共享的

    2、必须是内存空间

    附加:帮我们自动处理好锁的问题

     

    a from multiprocessing import Manager(共享内存,但要自己解决锁的问题)

    b IPC中的队列(Queue 共享,内存,自动处理锁的问题(最常用)

    c IPC中的管道(Pipe),共享,内存,需自己解决锁的问题

     

    a Manager

    from multiprocessing import Process,Manager,Lock

    import time

     

    mutex=Lock()

     

    def task(dic,lock):

        lock.acquire()

        temp=dic['num']

        time.sleep(0.1)

        dic['num']=temp-1

        lock.release()

     

    if __name__ == '__main__':

        m=Manager()

        dic=m.dict({'num':10})

     

        l=[]

        for i in range(10):

            p=Process(target=task,args=(dic,mutex))

            l.append(p)

            p.start()

     

        for p in l:

            p.join()

    print(dic)

     

    b 用队列Queue

    1)共享的空间

    2)是内存空间

    3)自动帮我们处理好锁定问题

     

    from multiprocessing import Queue

    q=Queue(3)  #设置队列中maxsize个数为三

    q.put('first')

    q.put({'second':None})

    q.put('')

    # q.put(4)   #阻塞。不报错,程序卡在原地等待队列中清出一个值。默认blok=True

    print(q.get())

    print(q.get())

    print(q.get())

     

    强调:

    1、队列用来存成进程之间沟通的消息,数据量不应该过大

    2maxsize的值超过的内存限制就变得毫无意义

                                                                  

                                                                                                                                                                                                          

    了解:

    q=Queue(3)

    q.put('first',block=False)

    q.put('second',block=False)

    q.put('third',block=False)

    q.put('fourth',block=False)  #报错 queue.Full

     

    q.put('first',block=True)

    q.put('second',block=True)

    q.put('third',block=True)

    q.put('fourth',block=True,timeout=3)  #等待3秒后若还进不去报错。注意timeout不能和block=False连用

     

    q.get(block=False)

    q.get(block=False)

    q.get(block=False)

    q.get(block=False)           #报错 queue.Empty

     

    q.get(block=True)

    q.get(block=True)

    q.get(block=True)

    q.get(block=True,timeout=2)    #等待2秒后还取不出东西则报错。注意timeout不能和block=False连用

     

    六、生产者消费者模型

    该模型中包含两类重要的角色:

    1、生产者:将负责造数据的任务比喻为生产者

    2、消费者:接收生产者造出的数据来做进一步的处理,该类人物被比喻成消费者

     

    实现生产者消费者模型三要素

    1、生产者

    2、消费者

    3、队列

     

    什么时候用该模型:

    程序中出现明显的两类任何,一类任务是负责生产,另外一类任务是负责处理生产的数据的

     

    该模型的好处:

    1、实现了生产者与消费者解耦和

    2、平衡了生

     

    import time

    import random

    from multiprocessing import Process,Queue

     

    def consumer(name,q):

        while True:

            res=q.get()

            time.sleep(random.randint(1,3))

            print('33[46m消费者===%s 吃了 %s33[0m' %(name,res))

     

    def producer(name,q,food):

        for i in range(5):

            time.sleep(random.randint(1,2))

            res='%s%s' %(food,i)

            q.put(res)

            print('33[45m生产者者===%s 生产了 %s33[0m' %(name,res))

     

    if __name__ == '__main__':

        #1、共享的盆

        q=Queue()

     

        #2、生产者们

        p1=Process(target=producer,args=('egon',q,'包子'))

        p2=Process(target=producer,args=('刘清政',q,'泔水'))

        p3=Process(target=producer,args=('杨军',q,'米饭'))

     

        #3、消费者们

        c1=Process(target=consumer,args=('alex',q))

        c2=Process(target=consumer,args=('梁书东',q))

     

        p1.start()

        p2.start()

        p3.start()

        c1.start()

    c2.start()

     

    生产者消费者模型是解决问题的思路不是技术。可以用进程和队列来实现,也可以用其他的来实现。

  • 相关阅读:
    Windows下临界区的使用CRITICAL_SECTION
    MFC中消息映射的实现
    Oracle中提供的事件触发机制
    CreateEvent()详解
    内核参数优化之2-1 tcp/ip 标志位报文解析
    内核参数优化之1 keepalive解析
    python之7-3对象的信息/方法获取
    python之7-2类的继承与多态
    python之7-1类
    python之6-3嵌套函数
  • 原文地址:https://www.cnblogs.com/yangli0504/p/8945737.html
Copyright © 2011-2022 走看看