同步线程
threading.Condition(),Condition使用了一个Lock,所以可以绑定一个共享资源,使多个线程等待这个资源的更新再启动。
当然Condition也可以显示地使用acquire()和release()方法。
一个简单的示例
1: import logging2: import threading3: import time
4: def consumer(cond):5: """6: 等待condition设置然后再使用资源7: :param cond:8: :return:
9: """10: logging.debug("开启consumer线程")
11: with cond:12: cond.wait()13: logging.debug("对consumer线程资源可用")
14: def producer(cond):15: """16: 配置资源17: :param cond:18: :return:
19: """20: logging.debug("开始producer线程")
21: with cond:22: logging.debug("使资源可用")
23: # 唤醒所有等待的线程,老的写法叫notifyAll()24: cond.notify_all()25: logging.basicConfig(26: level=logging.DEBUG,27: format="%(asctime)s %(threadName)-2s %(message)s"
28: )29: condition = threading.Condition()30: c1 = threading.Thread(name="c1", target=consumer,
31: args=(condition,))32: c2 = threading.Thread(name="c2", target=consumer,
33: args=(condition,))34: p = threading.Thread(name="p", target=producer,
35: args=(condition, ))36: c1.start()37: time.sleep(0.2)
38: c2.start()39: time.sleep(0.2)
40: p.start()
结果:
1: 2019-01-26 11:56:06,025 c1 开启consumer线程2: 2019-01-26 11:56:06,226 c2 开启consumer线程3: 2019-01-26 11:56:06,426 p 开始producer线程4: 2019-01-26 11:56:06,426 p 使资源可用5: 2019-01-26 11:56:06,426 c2 对consumer线程资源可用6: 2019-01-26 11:56:06,427 c1 对consumer线程资源可用
屏障barrier是另一种线程同步机制。Barrier建立一个控制点,阻塞所有的参与的线程,直到所有的线程都到达这一点,然后同时释放阻塞的线程。
1: import threading2: import time
3:4: def worker(barrier):5: print(threading.current_thread().name,6: "waiting for barrier with {} others.".format(barrier.n_waiting))
7: # 所有等待的线程都在等待时,所有的线程都被同时释放了。8: worker_id = barrier.wait()9: print(threading.current_thread().name, 'after barrier', worker_id)10: NUM_THREAD = 311: barrier = threading.Barrier(NUM_THREAD)12: # 推倒式13: threads = [14: threading.Thread(15: name="worker - %s" % i,
16: target=worker,17: args=(barrier, )18: )19: for i in range(NUM_THREAD)
20: ]21: for t in threads:
22: print(t.name, "starting")
23: t.start()24: time.sleep(0.1)
25: for t in threads:
26: t.join()
结果:
1: worker - 0 starting2: worker - 0 waiting for barrier with 0 others.
3: worker - 1 starting4: worker - 1 waiting for barrier with 1 others.
5: worker - 2 starting6: worker - 2 waiting for barrier with 2 others.
7: worker - 2 after barrier 28: worker - 1 after barrier 19: worker - 0 after barrier 0
abort()方法会使所有等待线程接收一个BrokenBarrierError。直到reset方法恢复,重新开始拦截。
限制资源的并发访问
如果多个线程同时访问一个资源,但要限制总数。这个可以使用Semaphore来管理。
使用方法:
1: s = threading.Semaphore(2)2: t = threading.Thread(3: target=worker,4: name="t1",
5: args=(s, )6: )
线程特定的数据
对于一些需要保护的资源,需要对这些并非资源所有者的线程隐藏。 threading.local()函数会创建一个对象,它能隐藏值,除非在某个线程中设置了这个属性,这个线程才能看到它。
1: import random2: import threading3: import logging4: def show_value(data):5: try:6: val = data.value7: except AttributeError:8: logging.debug("No value yet")
9: else:
10: logging.debug("value=%s" % val)
11: def worker(data):12: show_value(data)13: data.value = random.randint(1, 100)14: show_value(data)15: logging.basicConfig(16: level=logging.DEBUG,17: format="(%(threadName)-10s %(message)s)",
18: )19: local_data = threading.local()20: show_value(local_data)21: local_data.value = 100022: show_value(local_data)23:24: # 这个worker是看不到local_data的25: for i in range(2):
26: t = threading.Thread(target=worker, args=(local_data, ))27: t.start()28: t.join()29: # 使用子类,来初始化所有的线程开始时都有相同的值30: class MyLocal(threading.local):31: def __init__(self, value):32: super().__init__()33: logging.debug("Initializing %s" % self)
34: self.value = value35: local_data = MyLocal(1000)36: # 同样的worker调用__init__(),每调用一次以设置默认值37: for i in range(2):
38: t = threading.Thread(target=worker, args=(local_data, ))39: t.start()40: t.join()
结果:
1: (MainThread No value yet)2: (MainThread value=1000)3: (Thread-1 No value yet)4: (Thread-1 value=76)5: (Thread-2 No value yet)6: (Thread-2 value=88)7: (MainThread Initializing <__main__.MyLocal object at 0x0000022A8026E168>)8: (Thread-3 Initializing <__main__.MyLocal object at 0x0000022A8026E168>)9: (Thread-3 value=1000)10: (Thread-3 value=31)11: (Thread-4 Initializing <__main__.MyLocal object at 0x0000022A8026E168>)12: (Thread-4 value=1000)13: (Thread-4 value=7)