zoukankan      html  css  js  c++  java
  • 生产者和消费者,锁,队列

    进程同步(锁)

    现在有这么情况,我们模仿了一个抢票软件,文件里有十张票,一共有二十个人去抢,抢走一张就少一张,二十个人我们为了达到并发的效果,显然是要以多进程来实现的,那么这时候就有一个问题,好几个人同时看到的都是还剩下10张票,每个人都抢了一张,结果只是剩下19。问题就是这个,应该是某个人在抢的时候,其他人无法抢才对。所以这时候我们就需要给这个步骤上锁,只要有进程在执行的时候,别的进程就无法执行这段代码,只能等待。

    这时候我们需要在multiprocessing中导入一个Lock类。

    #文件db的内容为:{"count":1}
    #注意一定要用双引号,不然json无法识别
    from multiprocessing import Process,Lock
    import time,json,random
    def search():
        dic=json.load(open('db.txt'))
        print('33[43m剩余票数%s33[0m' %dic['count'])
    
    def get():
        dic=json.load(open('db.txt'))
        time.sleep(0.1) #模拟读数据的网络延迟
        if dic['count'] >0:
            dic['count']-=1
            time.sleep(0.2) #模拟写数据的网络延迟
            json.dump(dic,open('db.txt','w'))
            print('33[43m购票成功33[0m')
    
    def task(lock):
        search()
        lock.acquire()
        get()
        lock.release()
    if __name__ == '__main__':
        lock=Lock()
        for i in range(100): #模拟并发100个客户端抢票
            p=Process(target=task,args=(lock,))
            p.start()
    
    加锁:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全
    

    在我们主进程的代码里面生成一个锁(只能在这里,才是一个锁,在上面的方法里的话就是生成多个锁了),然后把这个锁的对象当做参数传进去。lock.acquire()就是开启锁,只有一个进程能进去执行,下一个进程想进去必须要等lock.release()释放才能进去。这就很好的实现了这段代码只有一个进程才能跑,操作文件同时只能一个进程。

    加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全

    那我速度降低了我肯定不乐意啊,有没有什么方法能够两全其美呢?还真有。

    那就是队列和管道

    队列≈管道+锁

    你懂我意思吧,管道废了,滚。

    队列

    举个栗子。

    凉轻松去一家包子店吃包子,厨师只管自己做包子,做完一个就放一个进大盘子里,凉轻松只管从大盘子里拿包子,没了就等着。这个大盘子就是所谓的队列。

    创建方式

    Queue([maxsize])
    

    maxsize是队列中允许最大项数,省略则无大小限制。

    1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
    2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
      
    3 q.get_nowait():同q.get(False),就是说不等,有就拿,没有就砸店
    4 q.put_nowait():同q.put(False),同上(厨师:我为什么要砸店?)
     
    5 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
    6 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
    7 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
    

    栗子

    from multiprocessing import Process,Queue
    import time,random
    
    def producer(q,name,food):
        '''生产者'''
        for i in range(3):
            print(f'{name}生产了{food}{i}')
            time.sleep(random.randint(1, 3))
            res = f'{food}{i}'
            q.put(res)
        # q.put(None)
    
    def consumer(q,name):
        '''消费者'''
        while True:
            res = q.get(timeout=5)
            if res is None:break
            time.sleep(random.randint(1,3))
            print(f'{name}吃了{res}')
    
    if __name__ == '__main__':
        q = Queue()
        p1 = Process(target=producer,args=(q,'rocky','包子'))
        p2 = Process(target=producer,args=(q,'mac','韭菜'))
        p3 = Process(target=producer,args=(q,'nick','蒜泥'))
        c1 = Process(target=consumer,args=(q,'成哥'))
        c2 = Process(target=consumer,args=(q,'浩南哥'))
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
        p1.join()# 写了三个生产者的join,保证了生产者生产完毕,也就是生产者进程全结束了。
        p2.join()#为什么这个要写在c1和c2 的start下面,因为这样才有边造边吃的效果
        p3.join() 
        q.put(None)# 几个消费者put几次,一旦消费者接收到None,就会挂掉。
        q.put(None)
    

    既然讲到了包子,顺便也讲一下生产者和消费者!

    生产者和消费者

    很直白,厨师就是生产者,凉轻松就是消费者。

    生产者: 生产数据的任务

    消费者: 处理数据的任务

    既然这样,我们就可以把上面的所有东西串起来了,用一个栗子来看

    不行不行,还要讲一下一个东西。

    JoinableQueue

    这玩意儿也是在multiprocessing里的。

    from multiprocessing import Process,Queue,JoinableQueue
    
    
    q = JoinableQueue()
    
    q.put('zhao') # 放队列里一个任务
    q.put('qian')
    
    print(q.get())
    q.task_done() # 完成了一次任务
    print(q.get())
    q.task_done() # 完成了一次任务
    q.join() #计数器不为0的时候 阻塞等待计数器为0后通过
    
    # 想象成一个计数器 :put +1   task_done -1
    

    每当往JoinableQueue队列里放一个东西,他的任务数就+1,每当拿走一个,我们就手动调用一下q的task_done方法,任务数就会-1,队列也是由join方法的,当任务数为0的时候,join才会不阻塞。

    现在可以上总的栗子了!

    综合实例

    from multiprocessing import Process,Queue,JoinableQueue
    import time,random
    
    def producer(q,name,food):
        '''生产者'''
        for i in range(3):
            print(f'{name}生产了{food}{i}')
            time.sleep(random.randint(1, 3))
            res = f'{food}{i}'
            q.put(res)
        # q.put(None)
    
    def consumer(q,name):
        '''消费者'''
        while True:
            res = q.get()
            # if res is None:break
            time.sleep(random.randint(1,3))
            print(f'{name}吃了{res}')
            q.task_done() #
    
    if __name__ == '__main__':
        q = JoinableQueue()
        p1 = Process(target=producer,args=(q,'rocky','包子'))
        p2 = Process(target=producer,args=(q,'mac','韭菜'))
        p3 = Process(target=producer,args=(q,'nick','蒜泥'))
        c1 = Process(target=consumer,args=(q,'成哥'))
        c2 = Process(target=consumer,args=(q,'浩南哥'))
        p1.start()
        p2.start()
        p3.start()
        c1.daemon = True
        c2.daemon = True
        c1.start()
        c2.start()
        p1.join()
        p2.join()
        p3.join() # 生产者生产完毕
        # q.put(None)# 几个消费者put几次
        # q.put(None)
        q.join() # 分析
        # 生产者生产完毕--这是主进程最后一行代码结束--q.join()消费者已经取干净了,没有存在的意义了.
        #这是主进程最后一行代码结束,消费者已经取干净了,没有存在的意义了.守护进程的概念.
    

    rocky生产了包子0
    mac生产了韭菜0
    nick生产了蒜泥0
    nick生产了蒜泥1
    mac生产了韭菜1
    rocky生产了包子1
    nick生产了蒜泥2
    浩南哥吃了韭菜0
    mac生产了韭菜2
    成哥吃了蒜泥0
    rocky生产了包子2
    成哥吃了蒜泥1
    浩南哥吃了包子0
    浩南哥吃了蒜泥2
    浩南哥吃了包子1
    成哥吃了韭菜1
    浩南哥吃了韭菜2
    成哥吃了包子2

    至于为什么这么乱,因为操作系统的调度谁也不知道他是怎样的,你不知道他先执行哪个进程等等等等等。最后程序结束。

    给力!

    在补充一点管道的知识(即使我们不用他了)

    #创建管道的类:
    Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
    #参数介绍:
    dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。
    #主要方法:
        conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
        conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
     #其他方法:
    conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
    conn1.fileno():返回连接使用的整数文件描述符
    conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
     
    conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
    conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    
     
    conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
    
    from multiprocessing import Process,Pipe
    
    import time,os
    def consumer(p,name):
        left,right=p
        left.close()
        while True:
            try:
                baozi=right.recv()
                print('%s 收到包子:%s' %(name,baozi))
            except EOFError:
                right.close()
                break
    def producer(seq,p):
        left,right=p
        right.close()
        for i in seq:
            left.send(i)
            # time.sleep(1)
        else:
            left.close()
    if __name__ == '__main__':
        left,right=Pipe()
    
        c1=Process(target=consumer,args=((left,right),'c1'))
        c1.start()
    
    
        seq=(i for i in range(10))
        producer(seq,(left,right))
    
        right.close()
        left.close()
    
        c1.join()
        print('主进程')
    

    看看就得了。

  • 相关阅读:
    错误:找不到或无法加载主类
    CentOS 7 命令
    CentOS 7 分区
    Pow(x, n)
    Sum Root to Leaf Numbers
    linux下intel 82579LM 网卡驱动安装
    printf打印字符耗时多少
    数组中移动0至后面
    SDL多线程问题之--Unknown request in queue while dequeuing
    java学习123>>IO
  • 原文地址:https://www.cnblogs.com/chanyuli/p/11527980.html
Copyright © 2011-2022 走看看