zoukankan      html  css  js  c++  java
  • 『Python』进程同步

    1. Lock(互斥锁)

    是可用的最低级的同步指令。Lock处于锁定状态时,不被其他的线程拥有。

    from multiprocessing import Process, Value, Lock
    
    
    def func1(num, lock: Lock):
        lock.acquire()
        print(num.value)
        num.value += 1
        lock.release()
    
    
    if __name__ == '__main__':
        lock = Lock()  # 创建锁对象
        val = Value('i', 0)
        proc = [Process(target=func1, args=(val, lock)) for i in range(10)]
        for p in proc:
            p.start()
        for p in proc:
            p.join()
    
        print(val.value)    # 10
    

    上面获取锁和释放锁可以用with语句简化,它可以自动获取和释放锁

    def func1(num, lock): 
        with lock:
            num.value += 1
    
    

    输出:

    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    

    2. RLock(递归锁)

    是一个可以被同一个进程请求多次的同步指令。RLock使用了“拥有的进程”和“递归等级”的概念,处于锁定状态时,RLock被某个进程拥有。拥有RLock的进程可以多次调用acquire(),释放锁时需要调用release()相同次数,其他进程在此期间得不到锁。

    主要就是对于进程处理中会有一些比较复杂的代码逻辑过程,比如很多层的函数调用,而这些函数其实都需要进行加锁保护数据访问。

    这样就可能会反复的多次加锁,因而用RLock就可以进行多次加锁、解锁,直到最终锁被释放;而如果用普通的Lock,当你一个函数A已经加锁,它内部调用另一个函数B,如果B内部也会对同一个锁加锁,那么这种情况就也会导致死锁。而RLock可以解决这个问题。

    from multiprocessing import Process
    from multiprocessing import RLock, Lock, Value
    
    
    def f(v, lock):
        with lock:
            g(v, lock)
    
    
    def g(v, lock):
        with lock:
            for _ in range(10):
                v.value += 1
                print(f"g()===>{v.value}")
    
    
    if __name__ == '__main__':
        # lock = Lock() # 如果用Lock,就会出现死锁
        lock = RLock()
        v = Value("i", 0)
        p = Process(target=f, args=(v, lock))
        p.start()
        p.join()
        print(v.value)  # 10
    
    

    输出:

    g()===>1
    g()===>2
    g()===>3
    g()===>4
    g()===>5
    g()===>6
    g()===>7
    g()===>8
    g()===>9
    g()===>10
    10
    

    3. Semaphore(信号量同步)

    互斥锁同时只允许一个进程更改数据,而信号量Semaphore是同时允许一定数量的进程更改数据 。

    信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1,当计数器为0时,acquire()调用被阻塞。

    这是Dijkstra信号量概念PV操作的Python实现。信号量同步机制适用于访问像服务器这样的有限资源,它与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念。

    【示例·KTV】

    假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。

    image-20200424212652256

    import random
    import time
    from multiprocessing import Process, Semaphore
    
    
    def ktv(i, mutex):
        with mutex:
            print(f"person{i}进来唱歌了")
            time.sleep(random.randint(1, 5))  # 唱歌时间
            print(f"person{i}从ktv出去了")
    
    
    if __name__ == '__main__':
        mutex = Semaphore(4)
        for i in range(5):
            Process(target=ktv, args=(i, mutex)).start()
    
    

    输出:

    person2进来唱歌了
    person0进来唱歌了
    person1进来唱歌了
    person3进来唱歌了
    person1从ktv出去了
    person4进来唱歌了
    person3从ktv出去了
    person0从ktv出去了
    person2从ktv出去了
    person4从ktv出去了
    

    信号量和锁有点类似,它们之间的区别,信号量,相当于计算器

    信号量: 锁 + 计数器

    acquire(): 计数器 - 1,计数器减为0时阻塞

    release(): 计数器+ 1

    4. Condition(条件同步)

    Condition被称为条件变量,除了提供与Lock类似的acquire()release()方法外,还提供了wait()notify()方法,还有notify_all()wait_for()方法。

    【基本原理】

    可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。进程通过acquire()获得Condition对象,当调用wait()方法时,进程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个进程。当调用notify()方法时,Condition对象会从waiting池中挑选一个进程,通知其调用acquire()方法尝试取到锁。

    Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock

    除了notify()方法外,Condition对象还提供了notify_all()方法,可以通知waiting池中的所有进程尝试acquire内部锁。由于上述机制,处于waiting状态的进程只能通过notify()方法唤醒,所以notify_all()的作用在于防止有的线程永远处于沉默状态。

    【示例·一人问一人答】

    import time
    from multiprocessing import Process, Condition
    
    
    class Male(Process):
        def __init__(self, cond):
            super().__init__()
            self.__cond = cond
    
        def run(self):
            with self.__cond:
                time.sleep(2)  # 防止死锁
                print("昔日龌龊不足夸")
                self.__cond.notify()
                self.__cond.wait()
                print("春风得意马蹄疾")
                self.__cond.notify()
    
    
    class Female(Process):
        def __init__(self, cond):
            super().__init__()
            self.__cond = cond
    
        def run(self):
            with self.__cond:
                self.__cond.wait()
                print("今朝放荡思无涯")
                self.__cond.notify()
                self.__cond.wait()
                print("一日看尽长安花")
    
    
    if __name__ == '__main__':
        cond = Condition()
        male = Male(cond)
        female = Female(cond)
        female.start()
        male.start()
    
    

    输出:

    昔日龌龊不足夸
    今朝放荡思无涯
    春风得意马蹄疾
    一日看尽长安花
    

    5. Event(事件)

    Python进程的事件用于主进程控制其他进程的执行,事件对象管理一个内部标志,默认为False,主要提供了以下方法:

    • is_set()

      当且仅当内部标志为True时,返回True

    • set()

      将内部标志设置为True。所有正在等待这个事件的线程将被唤醒。

    • clear()

      将内部标志设置为False

    • wait(timeout=None)

      阻塞进程直到内部变量为True。如果调用时内部标志为True,将立即返回,否则将阻塞进程,直到调用set()方法将标志设置为True或者超过设置的超时时间(秒)。

    【示例·模拟红绿灯】

    车看作是一个进程,wait()等红灯,根据状态变化,wait()遇到True信号,就非阻塞,遇到False,就阻塞;交通灯也是一个进程 红灯:False 绿灯:True

    import random
    import time
    from multiprocessing import Process, Event
    
    
    class TrafficLight(Process):
        def __init__(self, e):
            super().__init__(name="TrafficLight")
            self.e = e
    
        def run(self):
            print('33[1;31m红灯亮33[0m')
            time.sleep(2)
            while True:
                if not self.e.is_set():
                    print('33[1;32m绿灯亮33[0m')
                    self.e.set()
                else:
                    print('33[1;31m红灯亮33[0m')
                    self.e.clear()
                time.sleep(2)
    
    
    class Car(Process):
        def __init__(self, i, e):
            super().__init__(name="Car")
            self.i = i
            self.e = e
    
        def run(self):
            if not self.e.is_set():
                print(f"car{self.i}正在等待")
            self.e.wait()
            print(f"car{self.i}通过路口")
    
    
    if __name__ == '__main__':
        e = Event()
        TrafficLight(e).start()
        for i in range(50):
            time.sleep(random.randrange(0, 5, 2))
            Car(i, e).start()
    
    

    部分输出:

    红灯亮
    car0正在等待
    绿灯亮
    car0通过路口
    红灯亮
    绿灯亮
    car1通过路口
    红灯亮
    car2正在等待
    绿灯亮
    car2通过路口
    红灯亮
    car4正在等待
    car3正在等待
    绿灯亮
    car3通过路口
    car4通过路口
    car5通过路口
    

    6. IPC的解决方案

    当使用多个进程时,通常使用消息传递来进行进程之间的通信(IPC),并必须避免使用任何同步原语(如锁)。对于传递消息,可以使用Pipe()(用于两个进程之间的连接)或队列Queue(允许多个生产者和消费者)。

    6.1 Pipe(不推荐使用)

    Pipe常用来在两个进程间通信,两个进程分别位于管道的两端,默认是双向的。

    • 创建管道

      语法:Pipe(duplex=True)

      功能:在两个进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道

      参数:duplex默认为True,表示管道是全双工的,如果duplexFalse,则conn1只能用于接收,conn2只能用于发送。

      from multiprocessing import Pipe
      
      # 创建双向的Pipe
      conn1, conn2 = Pipe(True)
      # 创建单向Pipe,conn4只能send(),conn3只能recv()
      conn3, conn4 = Pipe(False)
      
      
    • conn1.recv()
      接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError

    • conn1.send(obj)
      通过连接发送对象。obj是与序列化兼容的任意对象。

    • conn1.close()
      关闭连接。如果conn1被垃圾回收,将自动调用此方法

    • conn1.poll([timeout])
      如果连接上的数据可用,返回Truetimeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout设为None,操作将无限期地等待数据到达。

    【示例】

    from multiprocessing import Process, Pipe
    import time
    
    
    class Sever(Process):
        def __init__(self, sever):
            super().__init__()
            self.sever = sever
    
        def run(self):
            n = 0
            print("服务器开启...")
            while self.sever.poll(0.01):
                obj = self.sever.recv()
                n += 1
                print(f"第{n}行:{obj!r}")
            else:
                print("服务器关闭...")
    
    
    class Client(Process):
        def __init__(self, client):
            super().__init__()
            self.client = client
    
        def run(self):
            self.client.send(True)
            self.client.send(1)
            self.client.send("我自俯察世人,你亦怜惜众生")
            self.client.send({"称号": "无极剑圣", "姓名": "易"})
            self.client.close()
    
    
    if __name__ == '__main__':
        sever, client = Pipe()
        Sever(sever).start()
        Client(client).start()
    
    

    输出:

    服务器开启...
    第1行:True
    第2行:1
    第3行:'我自俯察世人,你亦怜惜众生'
    第4行:{'称号': '无极剑圣', '姓名': '易'}
    服务器关闭...
    

    6.2 Queue(推荐使用)

    • 创建队列

      语法:Queue(maxsize=0)

      maxsize是一个整数,用于设置可以放入队列的项目数的上限。达到此大小后,插入将阻止,直到消耗队列项。如果 maxsize小于或等于零,则队列大小为无限大。

    • q.get(block=True, timeout=None)
      返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True。 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中,如果在指定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。

    • q.get_nowait()

      q.get(False)

    • q.put(obj, block=True, timeout=None)
      item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Full异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

    • q.put_nowait(obj)
      q.put(obj,False)

    • q.qsize()

      返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。

    • q.empty()

      如果调用此方法时 q 为空,返回 True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

    • q.full()

      如果q已满,返回为True. 由于并发的存在,结果也可能是不可靠的(参考q.empty()方法)。

    • q.close()

      关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常,例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。

    • q.cancel_join_thread()

      不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。

    • q.join_thread()

      连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。

    【示例·生产者消费者模型】

    import os
    import random
    import time
    from multiprocessing import Process, Queue
    
    
    class Producer(Process):
        def __init__(self, q):
            super().__init__()
            self.queue = q
    
        def run(self):
            for i in range(10):
                time.sleep(random.randint(1, 3))
                res = f"包子{i}"
                self.queue.put(res)
                print(f'33[1;31m{os.getpid()} 生产了 {res}33[0m')
            self.queue.put(None)
    
    
    class Consumer(Process):
        def __init__(self, q):
            super().__init__()
            self.queue = q
    
        def run(self):
            while True:
                res = self.queue.get()
                if res is None: break
                time.sleep(random.randint(2, 4))
                print(f'33[1;32m{os.getpid()} 吃了 {res}33[0m')
    
    
    if __name__ == '__main__':
        q = Queue()
        Producer(q).start()
        Consumer(q).start()
    
    
  • 相关阅读:
    [转]SIFT特征提取分析
    OSGEARTH三维地形开源项目
    使用C#改变鼠标的指针形状
    检测到 LoaderLock:DLL"XXXX"正试图在OS加载程序锁内执行
    未能加载文件或程序集“XXXXX”或它的某一个依赖项。试图加载格式不正确的程序。
    未能进入中断模式,原因如下:源文件“XXXXXX”不属于正在调试的项目。
    C# 版本的 计时器类:精确到微秒 秒后保留一位小数 支持年月日时分秒带单位的输出
    OpenGL2.0及以上版本中glm,glut,glew,glfw,mesa等部件的关系
    OpenGL 4.3配置教程
    ubuntu maven环境安装配置
  • 原文地址:https://www.cnblogs.com/ice-coder/p/12770529.html
Copyright © 2011-2022 走看看