zoukankan      html  css  js  c++  java
  • 扯扯python的多线程的同步锁 Lock RLock Semaphore Event Condition

    我想大家都知道python的gil限制,记得刚玩python那会,知道了有pypy和Cpython这样的解释器,当时听说是很猛,也就意味肯定是突破了gil的限制,最后经过多方面测试才知道,还是那德行…. 如果你的应用英语那种cpu密集运算的,p大部分情况都推荐使用多进程。

    有些扯远了,我个人很喜欢用gevent这种协程的框架,但是不是所有的模块都可以这种用户态的线程… 不得已会用threading… 常用的模块一般都附带线程安全的问题.. 但是如果你自己的扩展模块,有时候会遇到线程安全,也就是线程锁的应用… … 

    python的多线程的同步与其他语言基本相同,主要包含:

    Lock & RLock :用来确保多线程多共享资源的访问。

    Semaphore :简单理解可以理解Lock互斥锁的加强版,他一个锁,可以控制多个thread的访问。。      Lock的话,只能是让一个线程来访问,Semaphore可以控制数目… 

    Event : event线程间通信的方式,一个线程可以发送信号,其他的线程接收到信号后执行操作。 

    Condition : 虽然他有wait notify这样的机制,实现的效果其实跟Event差不多

    咱们先说下Lock和Rlock 

    下面是锁定和释放的过程
     
    请求锁定 — 进入锁定池等待 — 获取锁 — 已锁定 — 释放锁
     
     
    锁 Lock()
     
    Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。
    可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。池中的线程处于状态图中的同步阻塞状态。
    构造方法: 
    Lock()
    实例方法: 
    acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。 
    release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。

    # from xiaorui.cc
     
    import threading
    lock = threading.Lock()
    if mutex.acquire():
        counter += 1
        print "I am %s, set counter:%s" % (self.name, counter)
        mutex.release()

    重入锁 RLock()

    RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。
    可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。
    构造方法: 
    RLock()
    实例方法: 

    acquire([timeout])/release(): 跟Lock差不多。

    Semaphore 信号量对象

    信号量是一个更高级的锁机制。信号量内部有一个计数器而不像锁对象内部有锁标识,而且只有当占用信号量的线程数超过信号量时线程才阻塞。这允许了多个线程可以同时访问相同的代码区。

    Semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1;调用release() 时内置计数器+1;

    计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。 

    直接上代码,我们把semaphore控制为3,也就是说,同时有3个线程可以用这个锁,剩下的线程也之只能是阻塞等待了…

    #coding:utf-8
    #blog xiaorui.cc
    import time
    import threading
     
    semaphore = threading.Semaphore(3)
     
    def func():
        if semaphore.acquire():
            for i in range(3):
                time.sleep(1)
                print (threading.currentThread().getName() + '获取锁')
            semaphore.release()
            print (threading.currentThread().getName() + ' 释放锁')
     
     
    for i in range(5):
      t1 = threading.Thread(target=func)
      t1.start()

    Event事件

    Event内部包含了一个标志位,初始的时候为false。
    可以使用使用set()来将其设置为true;
    或者使用clear()将其从新设置为false;
    可以使用is_set()来检查标志位的状态;
    另一个最重要的函数就是wait(timeout=None),用来阻塞当前线程,直到event的内部标志位被设置为true或者timeout超时。如果内部标志位为true则wait()函数理解返回。

     
    实例: (线程间相互通信)

    import threading
    import time
     
    class MyThread(threading.Thread):
        def __init__(self, signal):
            threading.Thread.__init__(self)
            self.singal = signal
     
        def run(self):
            print "I am %s,I will sleep ..."%self.name
            self.singal.wait()
            print "I am %s, I awake..." %self.name
     
    if __name__ == "__main__":
        singal = threading.Event()
        for t in range(0, 3):
            thread = MyThread(singal)
            thread.start()
     
        print "main thread sleep 3 seconds... "
        time.sleep(3)
     
        singal.set()

    threading.Condition
    可以把Condition理解为一把高级的琐,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。threadiong.Condition在内部维护一个琐对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。Condition也提供了acquire, release方法,其含义与琐的acquire, release方法一致,其实它只是简单的调用内部琐对象的对应的方法而已。Condition还提供了如下方法(特别要注意:这些方法只有在占用琐(acquire)之后才能调用,否则将会报RuntimeError异常。):

    Condition.wait([timeout]):  
    wait方法释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有琐的时候,程序才会继续执行下去。

    Condition.notify():
    唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的琐。

    Condition.notify_all() 
    Condition.notifyAll()
    唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的琐。

    对于Condition有个例子,大家可以观摩下。

    from threading import Thread, Condition
    import time
    import random
     
    queue = []
    MAX_NUM = 10
    condition = Condition()
     
    class ProducerThread(Thread):
        def run(self):
            nums = range(5)
            global queue
            while True:
                condition.acquire()
                if len(queue) == MAX_NUM:
                    print "Queue full, producer is waiting"
                    condition.wait()
                    print "Space in queue, Consumer notified the producer"
                num = random.choice(nums)
                queue.append(num)
                print "Produced", num
                condition.notify()
                condition.release()
                time.sleep(random.random())
     
     
    class ConsumerThread(Thread):
        def run(self):
            global queue
            while True:
                condition.acquire()
                if not queue:
                    print "Nothing in queue, consumer is waiting"
                    condition.wait()
                    print "Producer added something to queue and notified the consumer"
                num = queue.pop(0)
                print "Consumed", num
                condition.notify()
                condition.release()
                time.sleep(random.random())
     
     
    ProducerThread().start()
    ConsumerThread().start()

    http://xiaorui.cc/2015/07/10/%E6%89%AF%E6%89%AFpython%E7%9A%84%E5%A4%9A%E7%BA%BF%E7%A8%8B%E7%9A%84%E5%90%8C%E6%AD%A5%E9%94%81-lock-rlock-semaphore-event-condition/?utm_source=tuicool&utm_medium=referral

  • 相关阅读:
    "rm f xxx"不起作用? 还是需要确认删除?
    (转)C# 3.0语言的新特性——Lambda表达式
    (转)依赖注入的思想(目前见过最好的对DI的描述)
    #import、#include、#import<>和#import””的区别
    Cocoa设计模式之委托
    详解MAC硬盘中各个文件夹
    Cocoa设计模式之单例
    ObjecticeC之关联对象
    UDID被禁用后的集中替代品
    Cocoa设计模式之KVO
  • 原文地址:https://www.cnblogs.com/leijiangtao/p/4009308.html
Copyright © 2011-2022 走看看