zoukankan      html  css  js  c++  java
  • python 多线程中的同步锁 Lock Rlock Semaphore Event Conditio

    摘要:在使用多线程的应用下,如何保证线程安全,以及线程之间的同步,或者访问共享变量等问题是十分棘手的问题,也是使用多线程下面临的问题,如果处理不好,会带来较严重的后果,使用python多线程中提供LockRlockSemaphoreEventCondition用来保证线程之间的同步,后者保证访问共享变量的互斥问题Lock&;RLock:互斥锁用来保证多线程访问共享变量的问题Semaphore对象:Lock互斥锁的加强版,可以被多个线程同时拥有,而Lock只能被某一个线程同时

    在使用多线程的应用下,如何保证线程安全,以及线程之间的同步,或者访问共享变量等问题是十分棘手的问题,也是使用多线程下面临的问题,如果处理不好,会带来较严重的后果,使用python多线程中提供Lock Rlock Semaphore Event Condition 用来保证线程之间的同步,后者保证访问共享变量的互斥问题

    Lock &; RLock:互斥锁 用来保证多线程访问共享变量的问题
    Semaphore对象:Lock互斥锁的加强版,可以被多个线程同时拥有,而Lock只能被某一个线程同时拥有。
    Event对象: 它是线程间通信的方式,相当于信号,一个线程可以给另外一个线程发送信号后让其执行操作。
    Condition对象:其可以在某些事件触发或者达到特定的条件后才处理数据

    1、Lock(互斥锁)

    请求锁定 — 进入锁定池等待 — 获取锁 — 已锁定 — 释放锁
    Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。
    可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。池中的线程处于状态图中的同步阻塞状态。
    构造方法:
    Lock()
    实例方法:
    acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。
    release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。

    if mutex.acquire(): counter += 1 print

    if mutex.acquire(): 
        counter += 1 
        print( "I am %s, set counter:%s" % (self.name, counter) 
        mutex.release() 
    

      

    2、RLock(可重入锁)

    RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。
    可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。
    构造方法:
    RLock()
    实例方法:
    acquire([timeout])/release(): 跟Lock差不多。

    3、Semaphore(共享对象访问)

    咱们再聊聊Semaphore ,说实话Semaphore是我最晚使用的同步锁,以前类似的实现,是我用Rlock实现的,相对来说有些绕,毕竟Rlock 是需要成对的锁定和开锁的》。。。

    Semaphore管理一个内置的计数器,
    每当调用acquire()时内置计数器-1;
    调用release() 时内置计数器+1;
    计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

    直接上代码,我们把semaphore控制为3,也就是说,同时有3个线程可以用这个锁,剩下的线程也之只能是阻塞等待了…

    #coding:utf-8
    import time
    import threading
    semaphore = threading.Semaphore(3)
    def func(): 
        if semaphore.acquire(): 
            for i in range(3): 
                  time.sleep(1) 
                  print (threading.currentThread().getName() + '获取锁') 
                  semaphore.release() 
                  print (threading.currentThread().getName() + ' 释放锁')
     
    for i in range(5): t1 = threading.Thread(target=func) t1.start()

     

    4、Event(线程间通信)

    Event内部包含了一个标志位,初始的时候为false。
    可以使用使用set()来将其设置为true;
    或者使用clear()将其从新设置为false;
    可以使用is_set()来检查标志位的状态;
    另一个最重要的函数就是wait(timeout=None),用来阻塞当前线程,直到event的内部标志位被设置为true或者timeout超时。如果内部标志位为true则wait()函数理解返回。

    import threading
    import time
    class MyThread(threading.Thread):
      def __init__(self, signal):
        threading.Thread.__init__(self)
        self.singal = signal
      def run(self):     print("I am %s,I will sleep ..."%self.name )     self.singal.wait()     print("I am %s, I awake..." %self.name)

    if __name__ == "__main__":   singal = threading.Event()   for t in range(0, 3):     thread = MyThread(singal)     thread.start()   print ("main thread sleep 3 seconds... " )   time.sleep(3)   singal.set()

     

    5、Condition(线程同步)

    可以把Condition理解为一把高级的琐,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。threadiong.Condition在内部维护一个琐对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。Condition也提供了acquire, release方法,其含义与琐的acquire, release方法一致,其实它只是简单的调用内部琐对象的对应的方法而已。Condition还提供了如下方法(特别要注意:这些方法只有在占用琐(acquire)之后才能调用,否则将会报RuntimeError异常。):

    Condition.wait([timeout]):
    wait方法释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有琐的时候,程序才会继续执行下去。

    Condition.notify():
    唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的琐。

    Condition.notify_all()
    Condition.notifyAll()
    唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的琐。

    对于Condition有个例子,大家可以观摩下。

    from threading import Thread, Condition
    import time
    import random
    queue = []
    MAX_NUM = 10
    condition = Condition()
    class ProducerThread(Thread):
      def run(self):
        nums = range(5)
        global queue
        while True:
          condition.acquire()
          if len(queue) == MAX_NUM:
            print ("Queue full, producer is waiting" )
            condition.wait()
            print ("Space in queue, Consumer notified the producer" )
          num = random.choice(nums)
          queue.append(num)
          print ("Produced", num)
          condition.notify()
          condition.release()
          time.sleep(random.random())
    
    class ConsumerThread(Thread):
      def run(self):
        global queue
        while True:
          condition.acquire()
          if not queue:
            print ("Nothing in queue, consumer is waiting")
            condition.wait()
            print "Producer added something to queue and notified the consumer"
          num = queue.pop(0)
          print ("Consumed", num)
          condition.notify()
          condition.release()
          time.sleep(random.random())
    ProducerThread().start()
    ConsumerThread().start()
    

      

  • 相关阅读:
    工作流调度器azkaban
    日志采集框架Flume
    MAPREDUCE框架结构及核心运行机制
    Python爬虫开发系列之五》数据存储为TXT、JSON格式
    Python爬虫开发系列之二》请求库及解析库安装
    策略模式+简单工厂模式
    简单工厂模式
    单例模式
    5、计算高可用
    4、存储高可用
  • 原文地址:https://www.cnblogs.com/MY0213/p/8997275.html
Copyright © 2011-2022 走看看