zoukankan      html  css  js  c++  java
  • 进程同步&进程间通信

    主要内容:

    • 1.进程同步
    • 2.进程间通信

    1.进程同步

    (1)锁

    引出:虽然我们实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题:进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。

    import time
    import json
    import random
    from multiprocessing import Process
    
    def get_ticket(i):
        print("我已到达,要开始抢票了")
        with open("ticket","r") as f:
            last_ticket_info = json.load(f)
        last_ticket = last_ticket_info["count"]
        if last_ticket > 0:
            time.sleep(random.random())
            last_ticket = last_ticket-1
            last_ticket_info["count"] = last_ticket
            with open("ticket","w") as f:
                json.dump(last_ticket_info,f)
            print("%s号抢到了,Ynb" % i)
        else:
            print("%s傻缺没有抢到票,明年再来吧" %i)
    
    if __name__ =="__main__":
        for i in range(10):
            p = Process(target=get_ticket,args=(i,))
            p.start()
    模拟抢票 - 没加锁
    import time
    import json
    import random
    from multiprocessing import Process,Lock
    
    def get_ticket(i,ticket_lock):
        print("我已到达,要开始抢票了")
        ticket_lock.acquire()      #加锁,保证每次只有一个进程在执行里边的程序,这一段程序对于所有写上这个锁的进程,大家都变程了串行
        with open("ticket","r") as f:
            last_ticket_info = json.load(f)
        last_ticket = last_ticket_info["count"]
        if last_ticket > 0:
            time.sleep(random.random())
            last_ticket = last_ticket-1
            last_ticket_info["count"] = last_ticket
            with open("ticket","w") as f:
                json.dump(last_ticket_info,f)
            print("%s号抢到了,Ynb" % i)
        else:
            print("%s傻缺没有抢到票,明年再来吧" %i)
        ticket_lock.release()        #解锁,解锁后才能去执行自己的额其他程序
    
    if __name__ =="__main__":
        ticket_lock = Lock()         #在创建进程之前先创建一个锁
        for i in range(10):
            p = Process(target=get_ticket,args=(i,ticket_lock))  #模拟10个人来进行抢票,此时并将锁作为参数传给要执行的函数
            p.start()
    模拟抢票 - 加锁

    可以看出:加锁后只有一个人能抢到最后一张票,符合常理.,此时虽然创建了多个进程,但是,由并发变为了串行.牺牲了运行效率,但是避免了竞争.

    (2) 信号量

    互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。
    假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
    实现:
    信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
    信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
    信号量的概述
    import time
    import random
    from multiprocessing import Process,Semaphore
    
    def dbj(i,s):
        s.acquire()                                 # 此时进来的人获取房间
        print("%s号男同志来洗脚"%i)
        time.sleep(random.randrange(3,6))           #模拟他们的洗脚时间
        s.release()                                 #出房间,此时下个人可以进入
        print("_____________________")
    
    if __name__ =="__main__":
        s = Semaphore(4)                             # 创建一个信号量(此时容量为4)
        for i in range (10):
            p = Process(target=dbj,args = (i,s))     #将信号量作为参数传给进程要执行的函数
            p.start()
    信号量例子--dbj

    (3)事件

    from multiprocessing import Process,Event
    
    e = Event()         #创建一个事件对象
    print(e.is_set())   #is_set()查看一个事件状态,默认为    False
    print("111111")
    e.set()              # 将is_set()的状态改为True
    print(e.is_set())    #查看此时的事件状态    True
    e.clear()            #将事件的状态修改为    False(清空,回到初始的状态)
    print(e.is_set())    #False
    e.wait()             #根据is_set的状态来决定是否在此处阻塞住  ,当is_set() = False 就阻塞住,如果是True就不阻塞
    print("00000")
    
    #set和clear  修改事件的状态 set-->True   clear-->False
    #is_set     用来查看一个事件的状态
    #wait       依据事件的状态来决定是否阻塞 False-->阻塞  True-->不阻塞
    事件
    import time
    import random
    from multiprocessing import Process,Event
    
    def traffic_light(e):
        while 1:
            print("红灯啦")
            time.sleep(5)
            e.set()                         #修改为True
            print("绿灯亮")
            time.sleep(4)
            e.clear()                       #修改为True
    
    def car(i,e):
        if not e.is_set():
            print("我们在等")
            e.wait()                       #判断此时是否要阻塞
            print("走你")
        else:
            print("可以走了")
    
    if __name__ =="__main__":
        e = Event()                         #创建一个事件对象
        hld = Process(target=traffic_light,args=(e,))
        hld.start()
        while 1:
            time.sleep(2)
            for i in range(3):
                time.sleep(random.random())
                p1= Process(target=car,args=(i,e))
                p1.start()
    模拟一个简易的红绿灯

    (4)进程锁小结

    #加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
    虽然可以用文件共享数据实现进程间通信,但问题是:
    1.效率低(共享数据基于文件,而文件是硬盘上的数据)
    2.需要自己加锁处理
    
    #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
    队列和管道都是将数据存放于内存中
    队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
    我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
    
    IPC通信机制(了解):IPC是intent-Process Communication的缩写,含义为进程间通信或者跨进程通信,是指两个进程之间进行数据交换的过程。IPC不是某个系统所独有的,任何一个操作系统都需要有相应的IPC机制,
    比如Windows上可以通过剪贴板、管道和邮槽等来进行进程间通信,而Linux上可以通过命名共享内容、信号量等来进行进程间通信。Android它也有自己的进程间通信方式,Android建构在Linux基础上,继承了一
    部分Linux的通信方式。
    进程锁

    2.进程之间的通信

    (1)队列(重要)

    进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。队列就像一个特殊的列表,但是可以设置固定长度,并且从前面插入数据,从后面取出数据,先进先出

    (1)Queue方法介绍

    q = Queue([maxsize]) 
    创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 
    Queue的实例q具有以下方法:
    
    q.get( [ block [ ,timeout ] ] ) 
    返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
    
    q.get_nowait( ) 
    同q.get(False)方法。
    
    q.put(item [, block [,timeout ] ] ) 
    将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
    
    q.qsize() 
    返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
    
    
    q.empty() 
    如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
    
    q.full() 
    如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
    
    方法介绍
    Queue中方的方法
    from multiprocessing import Queue
    q = Queue(3)        #创建一个队列对象,队列长度为3
    
    q.put(1)         #往队列中添加数据
    q.put(2)
    q.put(3)
    # q.put(4)           #如果队列已满,程序就会停在这里,等待数据被别人取走,再将数据放入队列
                        #如果队列中的数据一直不被取走,程序就会永远停在这
    try:
        q.put_nowait(3)   #可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了就报错.
    except:               #因此,我们用一个try语句来处理这个错误,这样程序不会一直阻塞下去,但是会丢掉这个消息
        print("队列已经满了")
    
    #因此,我们在放入数据之前,可以先看一下队列的状态,如果已经满了就不在put了
    print(q.full())            #查看是否满了,满了返回True,不满返回False
    
    print(q.get())
    print(q.get())
    print(q.get())
    # print(q.get())              #同put方法一样,如果队列已经空了,那么读取就会阻塞
    try:
        q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
    except:             # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
        print('队列已经空了')
    方法示例

    **关于q.empty()**

    import time
    from multiprocessing import Queue
    q = Queue(3)
    q.put(1)
    q.put(2)
    print('----------',q.qsize())                  #q.qsize()这个返回的是当前队列中已经占位的元素个数
    q.put(3)
    
    q.get()
    q.get()
    q.get()
    print(q.empty())                         #此时队列已经空了,因此结果为True
    q.put(4)
    # print('>>>>>',q.empty())          #结果为True
    # print(q.get())                    #  4
    time.sleep(0.1)                     #休眠一会儿
    print('>>>>>',q.empty())          #结果为False
    print(q.get()) 
    p.empty 的注意事项

    在空队列上放置对象之后,在队列的p.empty()方法返回False之前,可能有无限小的延迟,导致返回的结果是True

     (2) 队列之间的通信

    import time
    from multiprocessing import Process,Queue
    
    def girl(q):
        print("来自boy的消息",q.get())
        print("来自校领导的凝视",q.get())
    
    
    def boy(q):
        q.put("约吗")
    
    if __name__ == "__main__":
        q = Queue(5)
        girl_p = Process(target=girl, args=(q,))
        boy_p = Process(target=boy, args=(q,))
    
        girl_p.start()
        boy_p.start()
        time.sleep(1)
        q.put("好好工作,别捣乱")
    子进程之间的通信&子进程与主进程之间的通信

    (3)生产者与消费者模型

    import time
    from multiprocessing import Process,Queue
    
    def producer(q):
        for i in range(1,11):
            time.sleep(1)
            print("生产了包子%s"%i)
            q.put(i)
        q.put(None)  # 针对第三个版本的消费者,往队列里面加了一个结束信号
    
    #版本一
    # def consumer(q):
    #     while 1:
    #         time.sleep(2)
    #         s = q.get()
    #         print("消费者吃了包子%s" %s)               #当队列空了时候会阻塞(等待包子)
    
    #版本二
    # def consumer(q):
    #     while 1:
    #         time.sleep(0.5)
    #         try :
    #             s = q.get(False)
    #             print("消费者吃了包子%s" %s)
    #         except:
    #             break                                  #此时只会生产包子,不会吃包子(吃的太快,还没放运行就结束了)
    
    
    #版本三
    def consumer(q):
        while 1:
            time.sleep(0.5)
            s = q.get()
            if s == None:
                break
            else:
                print('消费者吃了包子%s' %s)
    if __name__ == "__main__":
        q = Queue(20)
        pro_p = Process(target=producer,args=(q,))
        pro_p.start()
        con_p = Process(target=consumer, args=(q,))
        con_p.start()
    生产者&消费者模型

    (4) JoinableQueue([maxsize]) 

    #JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
    
       #参数介绍:
        maxsize是队列中允许最大项数,省略则无大小限制。    
      #方法介绍:
        JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
        q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
        q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止,也就是队列中的数据全部被get拿走了。
    JoinableQueue概述
    import time
    from multiprocessing import Process,JoinableQueue
    
    def prodeucer(q):
        for i in range(1,11):
            time.sleep(0.5)
            print("生产了包子%s号" %i)
            q.put(i)
        q.jion()  #生产者调用此方法进行阻塞,知道队列中所有的项目均被处理,阻塞将持续到队列中每
        # 个项目均调用到q.task_done 也即队列中所有数据都被get拿走了
        print("在这里等你")
    
    def consumer(q):
        while 1:
            time.sleep(1)
            s = q.get()
            print("消费者吃了包子%s"%s)
            q.task_done()                        #给生产者发送了一个任务结束的信号
    
    
    if __name__ =="__main__":
        q = JoinableQueue(10)                   #通过队列来模拟缓冲区,设置大小为10
        #生产者进程
        pro_p = Process(target= prodeucer,args= (q,))
        pro_p.start()
        #消费者进程
        con_p =Process(target= consumer,args = (q,))
        con_p.daemon = True #                   #将消费者进程设置为保护进程
        con_p.start()
        pro_p.join()    #此时给生产者进程设置join,当生产者进程结束后,主进程才会结束,
                        #而生产者进程需要等消费者进程发送了p.task_done信号完毕才会结束,将消费者进设置为保护进程,
                        #而只有当主进程结束后消费者进程才能结束
        print('主进程结束')
    JoinableQueue下的消费者和生产者模型
  • 相关阅读:
    canvas beginPath()的初步理解
    高效取余运算(n-1)&hash原理探讨
    EntityUtils.toString(entity)处理字符集问题解决
    python计算不规则图形面积算法
    VMware与 Device/Credential Guard 不兼容,解决办法及心得
    Java爬取51job保存到MySQL并进行分析
    纯C语言实现循环双向链表创建,插入和删除
    纯C语言实现顺序队列
    纯C语言实现链队
    纯C语言实现链栈
  • 原文地址:https://www.cnblogs.com/wcx666/p/9845493.html
Copyright © 2011-2022 走看看