zoukankan      html  css  js  c++  java
  • python3之threading模块(下)

    同步线程

    threading.Condition(),Condition使用了一个Lock,所以可以绑定一个共享资源,使多个线程等待这个资源的更新再启动。

    当然Condition也可以显示地使用acquire()和release()方法。

    一个简单的示例

      1: import logging
    
      2: import threading
    
      3: import time
    
      4: def consumer(cond):
    
      5:     """
    
      6:     等待condition设置然后再使用资源
    
      7:     :param cond:
    
      8:     :return:
    
      9:     """
    
     10:     logging.debug("开启consumer线程")
    
     11:     with cond:
    
     12:         cond.wait()
    
     13:         logging.debug("对consumer线程资源可用")
    
     14: def producer(cond):
    
     15:     """
    
     16:     配置资源
    
     17:     :param cond:
    
     18:     :return:
    
     19:     """
    
     20:     logging.debug("开始producer线程")
    
     21:     with cond:
    
     22:         logging.debug("使资源可用")
    
     23:         # 唤醒所有等待的线程,老的写法叫notifyAll()
    
     24:         cond.notify_all()
    
     25: logging.basicConfig(
    
     26:     level=logging.DEBUG,
    
     27:     format="%(asctime)s %(threadName)-2s %(message)s"
    
     28: )
    
     29: condition = threading.Condition()
    
     30: c1 = threading.Thread(name="c1", target=consumer,
    
     31:                       args=(condition,))
    
     32: c2 = threading.Thread(name="c2", target=consumer,
    
     33:                       args=(condition,))
    
     34: p = threading.Thread(name="p", target=producer,
    
     35:                      args=(condition, ))
    
     36: c1.start()
    
     37: time.sleep(0.2)
    
     38: c2.start()
    
     39: time.sleep(0.2)
    
     40: p.start()

    结果:

      1: 2019-01-26 11:56:06,025 c1 开启consumer线程
    
      2: 2019-01-26 11:56:06,226 c2 开启consumer线程
    
      3: 2019-01-26 11:56:06,426 p  开始producer线程
    
      4: 2019-01-26 11:56:06,426 p  使资源可用
    
      5: 2019-01-26 11:56:06,426 c2 对consumer线程资源可用
    
      6: 2019-01-26 11:56:06,427 c1 对consumer线程资源可用

    屏障barrier是另一种线程同步机制。Barrier建立一个控制点,阻塞所有的参与的线程,直到所有的线程都到达这一点,然后同时释放阻塞的线程。

      1: import threading
    
      2: import time
    
      3: 
    
      4: def worker(barrier):
    
      5:     print(threading.current_thread().name,
    
      6:           "waiting for barrier with {} others.".format(barrier.n_waiting))
    
      7:     # 所有等待的线程都在等待时,所有的线程都被同时释放了。
    
      8:     worker_id = barrier.wait()
    
      9:     print(threading.current_thread().name, 'after barrier', worker_id)
    
     10: NUM_THREAD = 3
    
     11: barrier = threading.Barrier(NUM_THREAD)
    
     12: # 推倒式
    
     13: threads = [
    
     14:     threading.Thread(
    
     15:         name="worker - %s" % i,
    
     16:         target=worker,
    
     17:         args=(barrier, )
    
     18:     )
    
     19:     for i in range(NUM_THREAD)
    
     20: ]
    
     21: for t in threads:
    
     22:     print(t.name, "starting")
    
     23:     t.start()
    
     24:     time.sleep(0.1)
    
     25: for t in threads:
    
     26:     t.join()

    结果:

      1: worker - 0 starting
    
      2: worker - 0 waiting for barrier with 0 others.
    
      3: worker - 1 starting
    
      4: worker - 1 waiting for barrier with 1 others.
    
      5: worker - 2 starting
    
      6: worker - 2 waiting for barrier with 2 others.
    
      7: worker - 2 after barrier 2
    
      8: worker - 1 after barrier 1
    
      9: worker - 0 after barrier 0

    abort()方法会使所有等待线程接收一个BrokenBarrierError。直到reset方法恢复,重新开始拦截。

    限制资源的并发访问

    如果多个线程同时访问一个资源,但要限制总数。这个可以使用Semaphore来管理。

    使用方法:

      1: s = threading.Semaphore(2)
    
      2: t = threading.Thread(
    
      3:     target=worker,
    
      4:     name="t1",
    
      5:     args=(s, )
    
      6: )

    线程特定的数据

    对于一些需要保护的资源,需要对这些并非资源所有者的线程隐藏。 threading.local()函数会创建一个对象,它能隐藏值,除非在某个线程中设置了这个属性,这个线程才能看到它。

      1: import random
    
      2: import threading
    
      3: import logging
    
      4: def show_value(data):
    
      5:     try:
    
      6:         val = data.value
    
      7:     except AttributeError:
    
      8:         logging.debug("No value yet")
    
      9:     else:
    
     10:         logging.debug("value=%s" % val)
    
     11: def worker(data):
    
     12:     show_value(data)
    
     13:     data.value = random.randint(1, 100)
    
     14:     show_value(data)
    
     15: logging.basicConfig(
    
     16:     level=logging.DEBUG,
    
     17:     format="(%(threadName)-10s %(message)s)",
    
     18: )
    
     19: local_data = threading.local()
    
     20: show_value(local_data)
    
     21: local_data.value = 1000
    
     22: show_value(local_data)
    
     23: 
    
     24: # 这个worker是看不到local_data的
    
     25: for i in range(2):
    
     26:     t = threading.Thread(target=worker, args=(local_data, ))
    
     27:     t.start()
    
     28:     t.join()
    
     29: # 使用子类,来初始化所有的线程开始时都有相同的值
    
     30: class MyLocal(threading.local):
    
     31:     def __init__(self, value):
    
     32:         super().__init__()
    
     33:         logging.debug("Initializing %s" % self)
    
     34:         self.value = value
    
     35: local_data = MyLocal(1000)
    
     36: # 同样的worker调用__init__(),每调用一次以设置默认值
    
     37: for i in range(2):
    
     38:     t = threading.Thread(target=worker, args=(local_data, ))
    
     39:     t.start()
    
     40:     t.join()

    结果:

      1: (MainThread No value yet)
    
      2: (MainThread value=1000)
    
      3: (Thread-1   No value yet)
    
      4: (Thread-1   value=76)
    
      5: (Thread-2   No value yet)
    
      6: (Thread-2   value=88)
    
      7: (MainThread Initializing <__main__.MyLocal object at 0x0000022A8026E168>)
    
      8: (Thread-3   Initializing <__main__.MyLocal object at 0x0000022A8026E168>)
    
      9: (Thread-3   value=1000)
    
     10: (Thread-3   value=31)
    
     11: (Thread-4   Initializing <__main__.MyLocal object at 0x0000022A8026E168>)
    
     12: (Thread-4   value=1000)
    
     13: (Thread-4   value=7)
  • 相关阅读:
    什么是垃圾回收
    Oracle GoldenGate学习之Goldengate介绍
    JVM虚拟机选项:Xms Xmx PermSize MaxPermSize区别
    查看linux系统版本命令
    Case when 的用法,简单Case函数
    case when then else end
    ORACLE视图添加备注
    修改 tomcat 内存
    Linux 内存机制详解宝典
    oracle正则表达式regexp_like的用法详解
  • 原文地址:https://www.cnblogs.com/haoqirui/p/10323633.html
Copyright © 2011-2022 走看看