zoukankan      html  css  js  c++  java
  • day034 锁,信号量,事件,队列,子进程与子进程通信,生产者消费者模型,joinableQueue

    进程锁

    #加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
    虽然可以用文件共享数据实现进程间通信,但问题是:
    1.效率低(共享数据基于文件,而文件是硬盘上的数据)
    2.需要自己加锁处理

    异步运行到某段程序时转换成同步:降低了效率但是确保了数据的安全性(串行),引入multiprocessing中的Lock:

    from multiprocessing import Process,Lock
    import time import json import random def get_ticket(i,ticket_lock): print("大家都到齐了") ticket_lock.acquire() #抢钥匙 以下变成串行 with open("ticket","r") as f: last_ticket_info=json.load(f) last_ticket=last_ticket_info["count"] if last_ticket>0: time.sleep(random.random()) last_ticket=last_ticket-1 last_ticket_info["count"]=last_ticket with open("ticket","w") as f: json.dump(last_ticket_info,f) print("%s号抢到了" % i) else: print("%s号没抢到了" % i) ticket_lock.release() #交钥匙给下一个人 if __name__=="__main__": ticket_lock=Lock() #创建锁得的对象 for i in range(10): p=Process(target=get_ticket,args=(i,ticket_lock)) # 传参 p.start()

    #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

    队列和管道都是将数据存放于内存中
    队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
    我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

    队列

    进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。队列就像一个特殊的列表,但是可以设置固定长度,并且从前面插入数据,从后面取出数据,先进先出。

    from multiprocessing import Queue
    q=Queue(3) #创建一个队列对象,队列长度为3
    
    #put ,get ,put_nowait,get_nowait,full,empty
    q.put(3)   #往队列中添加数据
    q.put(2)
    q.put(1)
    # q.put(4)   # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
               # 如果队列中的数据一直不被取走,程序就会永远停在这里。
    try:
        q.put_nowait(4) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
    except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
        print('队列已经满了')
    
    # 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
    print(q.full()) #查看是否满了,满了返回True,不满返回False
    
    print(q.get())  #取出数据
    print(q.get())
    print(q.get())
    # print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
    try:
        q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
    except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
        print('队列已经空了')
    
    print(q.empty()) #空了
    
    队列的简单用法

    队列是进程安全的:同一时间只能一个进程拿到队列中的一个数据,你拿到了一个数据,这个数据别人就拿不到了。

    生产者和消费者模型:

    import time
    from multiprocessing import Process,Queue
    def produce(q):
        for i in range(1,11):
            time.sleep(1)
            print("生产了%s包子"% i)
            q.put(i)
    def consumer(q):
        while 1:
            time.sleep(2)
            s=q.get()
            if s==None:    #判断是否还有数据(接收完成信号)
                break
            else:
                print("消费者吃了%s包子" % s)
    if __name__=="__main__":
        q= Queue(20)      #设置队列长度
        pro_p=Process(target=produce,args=(q,))   #创建进程
        pro_p.start()
        con_p=Process(target=consumer,args=(q,))  #创建进程
        con_p.start()
        pro_p.join()
        q.put(None)      #在主程序中加信号也可以在生产者中添加完成信号

    利用JoinableQueue来解决:

    JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
     参数介绍:
     maxsize是队列中允许最大项数,省略则无大小限制。   
    方法介绍:
      JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
      q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
      q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止,也就是队列中的数据全部被get拿走了。
     
    import time
    from multiprocessing import Process,JoinableQueue
    def produce(q):
        for i in range(1,11):
            time.sleep(1)
            print("生产了%s包子"% i)
            q.put(i)
        q.join()       #接收完成的信号结束进程
    def consumer(q):
        while 1:
            time.sleep(2)
            s=q.get()
            print("消费者吃了%s包子" % s)
            q.task_done()#给共享的进程发送完成的信号
    if __name__=="__main__":
        q= JoinableQueue(20)   #设置队列长度
        pro_p=Process(target=produce,args=(q,)) #创建对象
        pro_p.start()
        con_p=Process(target=consumer,args=(q,)) #创建对象
        con_p.daemon=True
        con_p.start()
        pro_p.join()  #等待生产者进程完成
        print("主程序运行完成")

    信号量

    互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。

    import time
    import random
    from multiprocessing import Process,Semaphore
    def dbj(i,s):
        s.acquire()   #进程锁 串行
        print('%s号男主人公来洗脚'%i)
        print('-------------')
        time.sleep(random.randrange(3,6))
        s.release()
    
    if __name__ == '__main__':
        s = Semaphore(4) #创建一个计数器,每次acquire就减1,直到减到0,那么上面的任务只有4个在同时异步的执行,后面的进程需要等待.
        for i in range(10):
            p1 = Process(target=dbj,args=(i,s,))
            p1.start()

    事件

    python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
     事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
    clear:将“Flag”设置为False
    set:将“Flag”设置为True

    from multiprocessing import Process,Semaphore,Event
    import time,random
    
    e = Event() #创建一个事件对象
    print(e.is_set())  #is_set()查看一个事件的状态,默认为False,可通过set方法改为True
    print('look here!')
    # e.set()          #将is_set()的状态改为True。
    # print(e.is_set())#is_set()查看一个事件的状态,默认为False,可通过set方法改为Tr
    # e.clear()        #将is_set()的状态改为False
    # print(e.is_set())#is_set()查看一个事件的状态,默认为False,可通过set方法改为Tr
    e.wait()           #根据is_set()的状态结果来决定是否在这阻塞住,is_set()=False那么就阻塞,is_set()=True就不阻塞
    print('give me!!')
    
    #set和clear  修改事件的状态 set-->True   clear-->False
    #is_set     用来查看一个事件的状态
    #wait       依据事件的状态来决定是否阻塞 False-->阻塞  True-->不阻塞
    
    事件方法的使用
  • 相关阅读:
    ubuntu18.04安装dash-to-dock出错的问题
    使用SVN+Axure RP 8.0创建团队项目
    软件工程实践专题第一次作业
    C#单问号(?)与双问号(??)
    词根 ten 展开 持有 /tin/tent/tain “to hold”
    vscode 对js文件不格式化的修正方案 settings.json
    open cv java 可以 对图片进行分析,得到数据。考试答题卡 2B铅笔涂黑嘎达 识别
    bounties 赏金 bon = good 来自法语 bonjour 早上好
    class cl表示 汇聚 集合 ss表示 阴性 这里表示抽象
    git svn 提交代码日志填写规范 BUG NEW DEL CHG TRP gitz 日志z
  • 原文地址:https://www.cnblogs.com/litieshuai/p/9844722.html
Copyright © 2011-2022 走看看