zoukankan      html  css  js  c++  java
  • Python-线程同步

    1、线程同步

      线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数的操作做。

       不同操作系统实现技术有所不同,有 临界区(Critical Section),互斥量(Mutex), 信号量(Semaphore),时间Event等

     2、Event

      Even事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的True或False的变化来进行操作。

    名称 含义
    set() 标记设置为True
    clear() 标记设置为False
    is_set() 标记是否为True
    wait(timeout=None) 设置等待标记为True的时长,None为无限等待,等到返回True,未等到超时了,返回False

       测试:需求:老板雇了一个员工,让他生成杯子,老板一直等待这个工人,知道生产了10个杯子。

         1、普通线程实现:   

     1 import threading
     2 import time
     3 
     4 flag = False
     5 
     6 def worker(count=10):
     7     print('im working')
     8     global flag
     9     cpus = []
    10     while True:
    11         time.sleep(0.1)
    12         cpus.append(1)
    13         if len(cpus) > 10:
    14             flag = True
    15             break
    16     print(cpus)
    17 
    18 def boss():
    19     global flag
    20     while True:
    21         time.sleep(2)
    22         if flag:
    23             print('good job')
    24             break
    25 
    26 w = threading.Thread(target=worker)
    27 b = threading.Thread(target= boss)
    28 w.start()
    29 b.start()
    View Code

         结果:    

    1 im working
    2 [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
    3 good job
    4 
    5 Process finished with exit code 0
    View Code

        2、利用Event 类实现:  

     1 from threading import Event, Thread
     2 import logging
     3 import time
     4 
     5 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     6 logging.basicConfig(format=FORMAT, level=logging.INFO)
     7 
     8 def boss(event:Event):
     9     logging.info('boss')
    10     event.wait()
    11     logging.info('good job')
    12 
    13 def worker(event:Event, count=10):
    14     logging.info('working')
    15     cups = []
    16     while True:
    17         logging.info('make 1')
    18         time.sleep(0.5)
    19         cups.append(1)
    20         if len(cups) >= count:
    21             event.set()
    22             break
    23     logging.info('finished {}'.format(cups))
    24 
    25 event = Event()
    26 w = Thread(target=worker, args=(event,))
    27 b = Thread(target=boss, args=(event,))
    28 w.start()
    29 b.start()
    View Code

        结果:

     1 2018-10-12 10:58:11,515 Thread-1 8448 working
     2 2018-10-12 10:58:11,515 Thread-1 8448 make 1
     3 2018-10-12 10:58:11,515 Thread-2 1064 boss
     4 2018-10-12 10:58:12,030 Thread-1 8448 make 1
     5 2018-10-12 10:58:12,545 Thread-1 8448 make 1
     6 2018-10-12 10:58:13,060 Thread-1 8448 make 1
     7 2018-10-12 10:58:13,574 Thread-1 8448 make 1
     8 2018-10-12 10:58:14,089 Thread-1 8448 make 1
     9 2018-10-12 10:58:14,604 Thread-1 8448 make 1
    10 2018-10-12 10:58:15,119 Thread-1 8448 make 1
    11 2018-10-12 10:58:15,634 Thread-1 8448 make 1
    12 2018-10-12 10:58:16,148 Thread-1 8448 make 1
    13 2018-10-12 10:58:16,663 Thread-1 8448 finished [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
    14 2018-10-12 10:58:16,663 Thread-2 1064 good job
    15 
    16 Process finished with exit code 0
    View Code

      总结:使用同一个Event实例的标记flag,谁wait 就是等到flag变为 True,或等到超时返回False,不限等待个数。

      wait使用: 

     1 from threading import Event, Thread
     2 import logging
     3 import time
     4 
     5 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     6 logging.basicConfig(format=FORMAT, level=logging.INFO)
     7 
     8 def boss(event:Event):
     9     logging.info('boss')
    10     while event.wait(6):
    11         logging.info('good job')
    12         break
    13 
    14 
    15 def worker(event:Event, count=10):
    16     logging.info('working')
    17     cups = []
    18     while True:
    19         logging.info('make 1')
    20         time.sleep(0.5)
    21         cups.append(1)
    22         if len(cups) >= count:
    23             event.set()
    24             break
    25     logging.info('finished {}'.format(cups))
    26 
    27 event = Event()
    28 w = Thread(target=worker, args=(event,))
    29 b = Thread(target=boss, args=(event,))
    30 w.start()
    31 b.start()
    View Code

       Event练习:实现Timer,延时执行的线程,延时计add(x, y)

     1 from threading import Thread, Event
     2 
     3 class Timer:
     4     def __init__(self, interval, fn, args=(), kwargs={}):
     5         self.interval = interval
     6         self.fn = fn
     7         self.args = args
     8         self.kwargs = kwargs
     9         self.event = Event()
    10 
    11     def start(self):
    12         Thread(target=self.run).start()
    13 
    14     def cancel(self):
    15         self.event.set()
    16 
    17     def run(self):
    18         self.event.wait(self.interval)
    19         if not self.event.is_set():
    20             print(self.args, self.kwargs)
    21             self.fn(*self.args, **self.kwargs)
    22         self.event.set()
    23 def add(x, y):
    24     print(x + y)
    25 
    26 t = Timer(4, add,(4, 5) )
    27 print(t.args, t.kwargs)
    28 t.start()
    29 # t.cancel()
    30 
    31 print('--------------------------------')
    32 print('main thread exit')
    33 add(5, 6)
    test

      结果:

    1 (4, 5) {}
    2 --------------------------------
    3 main thread exit
    4 11
    5 (4, 5) {}
    6 9
    View Code

       测试:cancel

     1 from threading import Thread, Event
     2 
     3 class Timer:
     4     def __init__(self, interval, fn, args=(), kwargs={}):
     5         self.interval = interval
     6         self.fn = fn
     7         self.args = args
     8         self.kwargs = kwargs
     9         self.event = Event()
    10 
    11     def start(self):
    12         Thread(target=self.run).start()
    13 
    14     def cancel(self):
    15         self.event.set()
    16 
    17     def run(self):
    18         self.event.wait(self.interval)
    19         if not self.event.is_set():
    20             self.fn(*self.args, **self.kwargs)
    21         self.event.set() # 习惯上 做完了,不允许在做一次了
    22         # print(self.event.is_set())
    23 def add(x, y):
    24     print(x + y)
    25 
    26 t = Timer(2, add,(4, 5) )
    27 t.start()
    28 t.cancel()
    29 
    30 print('--------------------------------')
    31 print('main thread exit')
    32 add(5, 6)
    View Code

      结果: 

    1 --------------------------------
    2 main thread exit
    3 11
    4 
    5 Process finished with exit code 0
    View Code

     3、Lock

      锁,凡是存在共享资源争抢的地方都可以使用锁,从而保证只有一个使用这可以完全使用这个资源。

      Lock,锁,一旦线程获得锁,其他试图获取锁的线程将被阻塞。

    名称 含义
    acquire(blocking=True, timeout=-1) 默认阻塞,阻塞可以设置超时时间,非阻塞时,timeout禁止设置,成功获取锁,返回True,否则返回Fasle
    release() 释放锁,可以从任何线程调用释放,已上的锁,会被重置为unlock未上锁的锁上调用,抛RuntimeError异常

      测试:需求:订单生产1000个杯子,组织10个工人生产。

      1、非锁模式:

     1 import threading
     2 from threading import Thread , Lock
     3 import logging
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 cups = []
    10 
    11 def worker(count=10):
    12     logging.info("I'm workin")
    13     while len(cups) < count:
    14         time.sleep(0.0001)
    15         cups.append(1)
    16     logging.info("i finished, cups={}".format(len(cups)))
    17 
    18 for _ in range(10):
    19     Thread(target=worker,args=(1000,)).start()
    代码

           结果:从结果看出不是自己想要的结果,因为if判断的时候,也会被其他的线程打断

     1 2018-10-14 17:16:52,317 Thread-1 5444 I'm workin
     2 2018-10-14 17:16:52,348 Thread-2 6896 I'm workin
     3 2018-10-14 17:16:52,348 Thread-3 2900 I'm workin
     4 2018-10-14 17:16:52,348 Thread-4 4104 I'm workin
     5 2018-10-14 17:16:52,348 Thread-5 7448 I'm workin
     6 2018-10-14 17:16:52,348 Thread-6 4276 I'm workin
     7 2018-10-14 17:16:52,348 Thread-7 3752 I'm workin
     8 2018-10-14 17:16:52,348 Thread-8 6156 I'm workin
     9 2018-10-14 17:16:52,348 Thread-9 3568 I'm workin
    10 2018-10-14 17:16:52,348 Thread-10 7428 I'm workin
    11 2018-10-14 17:16:53,941 Thread-1 5444 i finished, cups=1000
    12 2018-10-14 17:16:53,941 Thread-10 7428 i finished, cups=1001
    13 2018-10-14 17:16:53,941 Thread-6 4276 i finished, cups=1002
    14 2018-10-14 17:16:53,941 Thread-5 7448 i finished, cups=1003
    15 2018-10-14 17:16:53,941 Thread-9 3568 i finished, cups=1004
    16 2018-10-14 17:16:53,941 Thread-2 6896 i finished, cups=1005
    17 2018-10-14 17:16:53,956 Thread-8 6156 i finished, cups=1006
    18 2018-10-14 17:16:53,956 Thread-7 3752 i finished, cups=1007
    19 2018-10-14 17:16:53,956 Thread-4 4104 i finished, cups=1008
    20 2018-10-14 17:16:53,956 Thread-3 2900 i finished, cups=1009
    21 
    22 Process finished with exit code 0
    结果

       2、加锁测试 

     1 import threading
     2 from threading import Thread , Lock
     3 import logging
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 cups = []
    10 lock = Lock()
    11 
    12 def worker(count=10):
    13     logging.info("I'm workin")
    14     flag = False
    15 
    16     while True:
    17         lock.acquire()
    18 
    19         if len(cups) >= count:
    20             flag = True
    21         # lock.release() # 注意释放 锁  的位置,这里不合适
    22 
    23         time.sleep(0.0001)
    24         if not flag:
    25             cups.append(1)
    26         lock.release() # 注意释放 锁  的位置
    27         if flag:
    28             break
    29 
    30     logging.info("i finished, cups={}".format(len(cups)))
    31 
    32 for _ in range(10):
    33     Thread(target=worker,args=(1000,)).start()
    代码

         结果:可以看到针对同一个资源,使用Lock

     1 D:python3.7python.exe E:/code_pycharm/复习/t1.py
     2 2018-10-14 17:43:42,748 Thread-1 1628 I'm workin
     3 2018-10-14 17:43:42,748 Thread-2 7260 I'm workin
     4 2018-10-14 17:43:42,748 Thread-3 1308 I'm workin
     5 2018-10-14 17:43:42,748 Thread-4 7280 I'm workin
     6 2018-10-14 17:43:42,748 Thread-5 7676 I'm workin
     7 2018-10-14 17:43:42,748 Thread-6 7288 I'm workin
     8 2018-10-14 17:43:42,749 Thread-7 276 I'm workin
     9 2018-10-14 17:43:42,749 Thread-8 1052 I'm workin
    10 2018-10-14 17:43:42,749 Thread-9 3144 I'm workin
    11 2018-10-14 17:43:42,749 Thread-10 5732 I'm workin
    12 2018-10-14 17:43:43,786 Thread-6 7288 i finished, cups=1000
    13 2018-10-14 17:43:43,787 Thread-10 5732 i finished, cups=1000
    14 2018-10-14 17:43:43,788 Thread-5 7676 i finished, cups=1000
    15 2018-10-14 17:43:43,789 Thread-3 1308 i finished, cups=1000
    16 2018-10-14 17:43:43,790 Thread-1 1628 i finished, cups=1000
    17 2018-10-14 17:43:43,791 Thread-8 1052 i finished, cups=1000
    18 2018-10-14 17:43:43,792 Thread-9 3144 i finished, cups=1000
    19 2018-10-14 17:43:43,793 Thread-4 7280 i finished, cups=1000
    20 2018-10-14 17:43:43,794 Thread-2 7260 i finished, cups=1000
    21 2018-10-14 17:43:43,795 Thread-7 276 i finished, cups=1000
    22 
    23 Process finished with exit code 0
    结果

        总结:每个线程获取锁之后,一定要释放锁(但是要注意释放的位置)。对于同一个资源,某线程获锁后,其他线程要不永久等待,要不设置过期时间,过期后,返回False,再跳过有锁的代码块,往下进行。永久等待,就是等获取锁的线程释放后,自己获取锁。锁的释放主要注意一点是 线程安全,不会产生不确定结果。

      测试:无锁,计算类,加减

      1、无锁测试

     1 import threading
     2 from threading import Thread , Lock
     3 import logging
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 class Counter:
    10     def __init__(self):
    11         self._val = 0
    12 
    13     @property
    14     def value(self):
    15         return self._val
    16 
    17     def inc(self):
    18         self._val += 1
    19 
    20     def dec(self):
    21         self._val -= 1
    22 
    23 def run(c:Counter, count=100):
    24     for _ in range(count):
    25         for i in range(-50, 50):
    26             if i < 0:
    27                 c.dec()
    28             else:
    29                 c.inc()
    30 
    31 c = Counter()
    32 c1 = 100000 # 线程数
    33 c2 = 10
    34 for i in range(c1):
    35     Thread(target=run, args=(c, c2)).start()
    36 
    37 print(c.value)
    无锁测试

      结果:随着 计算量增多,导致出现 不确定结果,所以线程不安全(c2 大于100后,就会出现结果不是 0 的结果)  

    0
    
    Process finished with exit code 0

       2、加锁,减锁 

        一般来说,加锁就需要减锁,但是加锁后解锁前,还要有一些代码执行,就有可能抛异常,一旦异常出现,锁是无法释放,但是当前线程可能因为这个异常被终止,这就产生了死锁。

      加锁,解锁常用语句:

        1、使用try。。。finally语句保证锁的释放

        2、with上下文管理,锁对象支持上下文管理

      测试:锁,计算类,加减

     1 import threading
     2 from threading import Thread , Lock
     3 import logging
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 class Counter:
    10     def __init__(self):
    11         self._val = 0
    12         self.lock = Lock()
    13 
    14     @property
    15     def value(self):
    16         with self.lock:
    17             return self._val
    18 
    19     def inc(self):
    20         with self.lock:
    21             self._val += 1
    22 
    23     def dec(self):
    24         try:
    25             self.lock.acquire()
    26             self._val -= 1
    27         finally:
    28             self.lock.release()
    29 
    30 
    31 def run(c:Counter, count=100):
    32     for _ in range(count):
    33         for i in range(-50, 50):
    34             if i < 0:
    35                 c.dec()
    36             else:
    37                 c.inc()
    38 
    39 c = Counter()
    40 c1 = 10 # 线程数
    41 c2 = 1000
    42 for i in range(c1):
    43     Thread(target=run, args=(c, c2)).start()
    44 
    45 print(c.value)
    46 # -35
    47 
    48 
    49 因为多线程,执行的时候,主线程中的print早就打印完了,而子线程还没结束,所以得到的值不是0, 是某一时刻值
    最后print(c.value)问题

      正对上面的问题作出修正,获取的值是0,也就是等子线程都结束了,在print

     1 import threading
     2 from threading import Thread , Lock
     3 import logging
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 class Counter:
    10     def __init__(self):
    11         self._val = 0
    12         self.lock = Lock()
    13 
    14     @property
    15     def value(self):
    16         with self.lock:
    17             return self._val
    18 
    19     def inc(self):
    20         with self.lock:
    21             self._val += 1
    22 
    23     def dec(self):
    24         try:
    25             self.lock.acquire()
    26             self._val -= 1
    27         finally:
    28             self.lock.release()
    29 
    30 
    31 def run(c:Counter, count=100):
    32     for _ in range(count):
    33         for i in range(-50, 50):
    34             if i < 0:
    35                 c.dec()
    36             else:
    37                 c.inc()
    38 
    39 c = Counter()
    40 c1 = 10 # 线程数
    41 c2 = 1000
    42 
    43 # 方式 1:判断当前活着的线程是否为1
    44 for i in range(c1):
    45     Thread(target=run, args=(c, c2)).start()
    46 
    47 while True:
    48     time.sleep(1)
    49     if threading.active_count() == 1: # 只剩下主线程
    50         print(threading.enumerate(), '-----')
    51         print(c.value)
    52         break
    53     else:
    54         print(threading.enumerate())
    55 
    56 # 方式 2:使用join,只要有一个线程join  主线程就等待
    57 thread = []
    58 for i in range(c1):
    59     t = Thread(target=run, args=(c, c2))
    60     thread.append(t)
    61     t.start()
    62 
    63 for t in thread:
    64     t.join()
    65 
    66 print(c.value)
    两种方式实现结果为0

      锁的应用场景:

        锁适用于    访问和修改    同一个共享资源的时候,即读写同一资源的时候。

        如果全部都是取同一个资源 时不需要锁的。因为这时可以认为共享资源是不可变得,每一次读取都是一样的值,所以不用加锁。

      使用锁注意事项:

      1. 少用锁,必要时用锁。使用了锁,多线程访问被锁的资源的时候,就成了串行,要么排队,要么争抢。
      2. 加锁时间越短越好,不需要立即释放。
      3. 一定避免死锁。

       但是,不使用锁,提高了效率,但是结果如果不是正确的,效率也没用,所以正确性排第一

      测试:实现非阻塞的锁,也就是说,非等待,没有获取锁,就去干别的事: 

     1 import threading
     2 from threading import Thread , Lock
     3 import logging
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 def worker(tasks):
    10     for task in tasks:
    11         time.sleep(0.001)
    12         if task.lock.acquire(False):# 非等待模式,没有获取就else 
    13             logging.info("{}{} begin to start".format(threading.current_thread(), task.name))
    14             # s适当的时机释放锁,为了演示 不释放了
    15 
    16         else:
    17             logging.info("{}{} is working".format(threading.current_thread(), task.name))
    18 
    19 class Task:
    20     def __init__(self, name):
    21         self.name = name
    22         self.lock = Lock()
    23 
    24 # 构造10 个任务
    25 tasks = [Task('task--{}'.format(x)) for x in range(10)]
    26 
    27 # 启动5 个线程
    28 for i in range(5):
    29     threading.Thread(target=worker, name='work -- {}'.format(i), args=(tasks,)).start()
    View Code

      结果:

      1 D:python3.7python.exe E:/code_pycharm/复习/t1.py
      2 1
      3 2018-10-14 20:23:57,046 work -- 0 9316 <Thread(work -- 0, started 9316)>task--0 begin to start
      4 2
      5 2018-10-14 20:23:57,046 work -- 1 10260 <Thread(work -- 1, started 10260)>task--0 is working
      6 2018-10-14 20:23:57,047 work -- 0 9316 <Thread(work -- 0, started 9316)>task--1 begin to start3
      7 4
      8 5
      9 6
     10 7
     11 
     12 2018-10-14 20:23:57,047 work -- 3 5076 <Thread(work -- 3, started 5076)>task--0 is working
     13 2018-10-14 20:23:57,047 work -- 4 8464 <Thread(work -- 4, started 8464)>task--0 is working
     14 2018-10-14 20:23:57,047 work -- 2 6196 <Thread(work -- 2, started 6196)>task--0 is working
     15 2018-10-14 20:23:57,047 work -- 1 10260 <Thread(work -- 1, started 10260)>task--1 is working
     16 2018-10-14 20:23:57,048 work -- 3 5076 <Thread(work -- 3, started 5076)>task--1 is working
     17 2018-10-14 20:23:57,048 work -- 0 9316 <Thread(work -- 0, started 9316)>task--2 begin to start
     18 2018-10-14 20:23:57,048 work -- 2 6196 <Thread(work -- 2, started 6196)>task--1 is working
     19 2018-10-14 20:23:57,048 work -- 1 10260 <Thread(work -- 1, started 10260)>task--2 is working
     20 2018-10-14 20:23:57,048 work -- 4 8464 <Thread(work -- 4, started 8464)>task--1 is working
     21 8
     22 9
     23 10
     24 11
     25 12
     26 13
     27 1415
     28 
     29 16
     30 17
     31 2018-10-14 20:23:57,049 work -- 4 8464 <Thread(work -- 4, started 8464)>task--2 is working
     32 2018-10-14 20:23:57,049 work -- 3 5076 <Thread(work -- 3, started 5076)>task--2 is working
     33 2018-10-14 20:23:57,049 work -- 0 9316 <Thread(work -- 0, started 9316)>task--3 begin to start
     34 2018-10-14 20:23:57,049 work -- 2 6196 <Thread(work -- 2, started 6196)>task--2 is working
     35 2018-10-14 20:23:57,049 work -- 1 10260 <Thread(work -- 1, started 10260)>task--3 is working
     36 18
     37 1920
     38 
     39 2122
     40 
     41 2018-10-14 20:23:57,050 work -- 3 5076 <Thread(work -- 3, started 5076)>task--3 is working
     42 2018-10-14 20:23:57,050 work -- 1 10260 <Thread(work -- 1, started 10260)>task--4 begin to start
     43 2018-10-14 20:23:57,050 work -- 4 8464 <Thread(work -- 4, started 8464)>task--3 is working
     44 2018-10-14 20:23:57,050 work -- 0 9316 <Thread(work -- 0, started 9316)>task--4 is working
     45 2018-10-14 20:23:57,050 work -- 2 6196 <Thread(work -- 2, started 6196)>task--3 is working
     46 2018-10-14 20:23:57,051 work -- 1 10260 <Thread(work -- 1, started 10260)>task--5 begin to start
     47 2018-10-14 20:23:57,051 work -- 4 8464 <Thread(work -- 4, started 8464)>task--4 is working
     48 23
     49 2018-10-14 20:23:57,051 work -- 0 9316 <Thread(work -- 0, started 9316)>task--5 is working
     50 24
     51 2018-10-14 20:23:57,051 work -- 3 5076 <Thread(work -- 3, started 5076)>task--4 is working
     52 25
     53 26
     54 2018-10-14 20:23:57,051 work -- 2 6196 <Thread(work -- 2, started 6196)>task--4 is working
     55 27
     56 2018-10-14 20:23:57,052 work -- 1 10260 <Thread(work -- 1, started 10260)>task--6 begin to start
     57 2018-10-14 20:23:57,052 work -- 3 5076 <Thread(work -- 3, started 5076)>task--5 is working
     58 28
     59 29
     60 30
     61 31
     62 2018-10-14 20:23:57,052 work -- 4 8464 <Thread(work -- 4, started 8464)>task--5 is working
     63 32
     64 2018-10-14 20:23:57,052 work -- 0 9316 <Thread(work -- 0, started 9316)>task--6 is working
     65 2018-10-14 20:23:57,052 work -- 2 6196 <Thread(work -- 2, started 6196)>task--5 is working
     66 2018-10-14 20:23:57,053 work -- 0 9316 <Thread(work -- 0, started 9316)>task--7 begin to start
     67 33
     68 2018-10-14 20:23:57,053 work -- 3 5076 <Thread(work -- 3, started 5076)>task--6 is working
     69 3435
     70 
     71 2018-10-14 20:23:57,053 work -- 4 8464 <Thread(work -- 4, started 8464)>task--6 is working
     72 36
     73 2018-10-14 20:23:57,053 work -- 1 10260 <Thread(work -- 1, started 10260)>task--7 is working
     74 37
     75 2018-10-14 20:23:57,053 work -- 2 6196 <Thread(work -- 2, started 6196)>task--6 is working
     76 2018-10-14 20:23:57,054 work -- 3 5076 <Thread(work -- 3, started 5076)>task--7 is working
     77 38
     78 2018-10-14 20:23:57,054 work -- 0 9316 <Thread(work -- 0, started 9316)>task--8 begin to start
     79 39
     80 40
     81 2018-10-14 20:23:57,054 work -- 4 8464 <Thread(work -- 4, started 8464)>task--7 is working
     82 41
     83 42
     84 2018-10-14 20:23:57,054 work -- 1 10260 <Thread(work -- 1, started 10260)>task--8 is working
     85 2018-10-14 20:23:57,054 work -- 2 6196 <Thread(work -- 2, started 6196)>task--7 is working
     86 43
     87 44
     88 45
     89 46
     90 47
     91 2018-10-14 20:23:57,055 work -- 3 5076 <Thread(work -- 3, started 5076)>task--8 is working
     92 2018-10-14 20:23:57,055 work -- 0 9316 <Thread(work -- 0, started 9316)>task--9 begin to start
     93 2018-10-14 20:23:57,055 work -- 1 10260 <Thread(work -- 1, started 10260)>task--9 is working
     94 2018-10-14 20:23:57,055 work -- 2 6196 <Thread(work -- 2, started 6196)>task--8 is working
     95 2018-10-14 20:23:57,055 work -- 4 8464 <Thread(work -- 4, started 8464)>task--8 is working
     96 48
     97 49
     98 50
     99 2018-10-14 20:23:57,056 work -- 2 6196 <Thread(work -- 2, started 6196)>task--9 is working
    100 2018-10-14 20:23:57,056 work -- 3 5076 <Thread(work -- 3, started 5076)>task--9 is working
    101 2018-10-14 20:23:57,056 work -- 4 8464 <Thread(work -- 4, started 8464)>task--9 is working
    102 
    103 Process finished with exit code 0
    加了一个count的结果

    4、可重入锁RLock,也叫递归锁

      可重入锁,是线程相关的锁。

      线程A 获得可重入锁,并可以多次成功获取,不会阻塞,最后要在线程A 中做 和acquire次数相同的release

      测试单线程:

     1 import threading
     2 from threading import Thread , Lock
     3 import logging
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 lock = threading.RLock()
    10 
    11 print(lock.acquire())
    12 print('----------------')
    13 print(lock.acquire(False))
    14 print(lock.acquire())
    15 print(lock.acquire(timeout=4))
    16 print('lock in main thread{}'.format(lock))
    17 # ------------------------------------------------------
    18 lock.release()
    19 lock.release()
    20 lock.release()
    21 lock.release()
    22 print("main thread {}".format(threading.current_thread().ident))
    23 print('lock in main thread{}'.format(lock))
    24 # ------------------------------------------------------
    25 # lock.release() # 多release()一次,抛异常
    26 #
    27 # print('lock in main thread{}'.format(lock))
    28 
    29 # ------------------------------------------------------
    30 print(lock.acquire(False))
    31 print('lock in main thread{}'.format(lock))
    32 # threading.Thread(3, lambda x:x.release(), args=(lock,)).start() # 上一句的锁是主线程的,但是像现在夸线程了,所以抛异常
    33 lock.release()
    View Code

      结果:

     1 True
     2 ----------------
     3 True
     4 True
     5 True
     6 lock in main thread<locked _thread.RLock object owner=9060 count=4 at 0x0000000002930E68>
     7 main thread 9060
     8 lock in main thread<unlocked _thread.RLock object owner=0 count=0 at 0x0000000002930E68>
     9 True
    10 lock in main thread<locked _thread.RLock object owner=9060 count=1 at 0x0000000002930E68>
    11 
    12 Process finished with exit code 0
    View Code

      测试多线程

     1 import threading
     2 from threading import Thread , Lock
     3 import logging
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 lock = threading.RLock()
    10 
    11 print(lock.acquire())
    12 print('----------------')
    13 print(lock.acquire(False))
    14 print(lock.acquire())
    15 print(lock.acquire(timeout=4))
    16 print('lock in main thread{}'.format(lock))
    17 # ------------------------------------------------------
    18 lock.release()
    19 lock.release()
    20 lock.release()
    21 # lock.release()
    22 print('==',  'lock in main thread{}'.format(lock))
    23 
    24 def sub(l):
    25     logging.info('sub thread')
    26     l.acquire()
    27     logging.info('end sub')
    28     print('2', lock)
    29 
    30 t = threading.Thread(target=sub, args=(lock,))
    31 t.start()
    32 
    33 print('1', lock)
    34 
    35 print('===== end ====')
    View Code

      结果:

     1 True
     2 ----------------
     3 True
     4 True
     5 True
     6 lock in main thread<locked _thread.RLock object owner=9720 count=4 at 0x00000000021F0E68>
     7 == lock in main thread<locked _thread.RLock object owner=9720 count=1 at 0x00000000021F0E68>
     8 1 <locked _thread.RLock object owner=9720 count=1 at 0x00000000021F0E68>
     9 ===== end ====
    10 2018-10-14 21:51:46,765 Thread-1 10840 sub thread
    11 
    12 
    13 可以看出:主线程的lock的count > 0, 子线程是无法获取锁,只能等待
    14     
    View Code

      可重入锁,与线程相关,可在一个线程中获取锁, 并可继续在同一线程中不阻塞获取锁。当锁未释放完,其他线程获取锁会被阻塞,直到当前持有锁的线程释放完锁。

    5、Condition

     构造方法Condition( lock = None),可以传入一个Lock或一个RLock对象,默认是RLock()

    名称                                含义
    acquire(*args) 获取锁
    wait(self,timeout=None) 等待或超时
    notify(n=1) 唤醒至多指定数目个数的等待线程,没有等待的线程就没有任何操作
    notify_all() 唤醒所有等待的线程

       Condition 用于生产者,消费者模型,为了解决生产者消费者速度匹配问题。

      测试:消费者消费速度大于生产者生产速度   

     1 import threading
     2 from threading import Thread , Lock, Event
     3 import logging, random
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 class Disptcher:
    10     def __init__(self):
    11         self.data = None
    12         self.event = Event()
    13 
    14     def produce(self, total):
    15         for _ in range(total):
    16             data = random.randint(0, 100)
    17             logging.info(data)
    18             self.data = data
    19             self.event.wait(1)
    20         self.event.set()
    21 
    22     def consume(self):
    23         while not self.event.is_set():
    24             data = self.data
    25             logging.info('recieved {}'.format(data))
    26             self.data = None
    27             self.event.wait(0.5)
    28 
    29 d = Disptcher()
    30 p = Thread(target=d.produce, args=(10,), name='producer')
    31 c = Thread(target=d.consume, name='consumer')
    32 
    33 c.start()
    34 p.start()
    不考虑线程安全

      结果:

     1 2018-10-15 10:45:08,751 consumer 7676 recieved None
     2 2018-10-15 10:45:08,798 producer 8032 43
     3 2018-10-15 10:45:09,303 consumer 7676 recieved 43
     4 2018-10-15 10:45:09,824 consumer 7676 recieved None
     5 2018-10-15 10:45:09,824 producer 8032 68
     6 2018-10-15 10:45:10,339 consumer 7676 recieved 68
     7 2018-10-15 10:45:10,838 producer 8032 57
     8 2018-10-15 10:45:10,854 consumer 7676 recieved 57
     9 2018-10-15 10:45:11,371 consumer 7676 recieved None
    10 2018-10-15 10:45:11,854 producer 8032 71
    11 2018-10-15 10:45:11,886 consumer 7676 recieved 71
    12 2018-10-15 10:45:12,400 consumer 7676 recieved None
    13 2018-10-15 10:45:12,868 producer 8032 10
    14 2018-10-15 10:45:12,915 consumer 7676 recieved 10
    15 2018-10-15 10:45:13,430 consumer 7676 recieved None
    16 2018-10-15 10:45:13,882 producer 8032 51
    17 2018-10-15 10:45:13,945 consumer 7676 recieved 51
    18 2018-10-15 10:45:14,460 consumer 7676 recieved None
    19 2018-10-15 10:45:14,896 producer 8032 51
    20 2018-10-15 10:45:14,974 consumer 7676 recieved 51
    21 2018-10-15 10:45:15,489 consumer 7676 recieved None
    22 2018-10-15 10:45:15,910 producer 8032 26
    23 2018-10-15 10:45:16,004 consumer 7676 recieved 26
    24 2018-10-15 10:45:16,519 consumer 7676 recieved None
    25 2018-10-15 10:45:16,924 producer 8032 87
    26 2018-10-15 10:45:17,034 consumer 7676 recieved 87
    27 2018-10-15 10:45:17,548 consumer 7676 recieved None
    28 2018-10-15 10:45:17,938 producer 8032 99
    29 2018-10-15 10:45:18,063 consumer 7676 recieved 99
    30 2018-10-15 10:45:18,578 consumer 7676 recieved None
    31 
    32 Process finished with exit code 0
    View Code

          从上例可以看出,消费者浪费大量时间来主动查看有没有数据。

      测试:使用Condition ,匹配生产者和消费者速度:

     1 import threading
     2 from threading import Thread , Lock, Event, Condition
     3 import logging, random
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 class Disptcher:
    10     def __init__(self):
    11         self.data = None
    12         self.event = Event()
    13         self.cond = Condition() # 默认是RLock,t 同一线程可以获取多个锁
    14 
    15     def produce(self, total):
    16         for _ in range(total):
    17             data = random.randint(0, 100)
    18             with self.cond: #  获取锁
    19                 logging.info(data)
    20                 self.data = data
    21                 self.cond.notify_all() # 通知所有等待的线程
    22             self.event.wait(1) # 模拟产生数据速度
    23 
    24 
    25     def consume(self):
    26         while True:
    27             print('==============')
    28             with self.cond:
    29                 self.cond.wait() # 阻塞等待通知
    30                 logging.info('recieved {}'.format(self.data))
    31                 self.data = None
    32             self.event.wait(0.5) # 模拟消费速度
    33             print('--------------')
    34 
    35 d = Disptcher()
    36 p = Thread(target=d.produce, args=(10,), name='producer')
    37 c = Thread(target=d.consume, name='consumer')
    38 
    39 c.start()
    40 p.start()
    不考虑线程安全

      结果:可以看到每产生一个数据,就通知一次消费者,最终没有数据产生,消费者一直等待。

     1 D:python3.7python.exe E:/code_pycharm/复习/t1.py
     2 2018-10-15 11:17:25,393 producer 7912 39
     3 ==============
     4 2018-10-15 11:17:25,393 consumer 4356 recieved 39
     5 --------------
     6 ==============
     7 2018-10-15 11:17:26,408 producer 7912 78
     8 2018-10-15 11:17:26,408 consumer 4356 recieved 78
     9 --------------
    10 ==============
    11 2018-10-15 11:17:27,422 producer 7912 89
    12 2018-10-15 11:17:27,422 consumer 4356 recieved 89
    13 --------------
    14 ==============
    15 2018-10-15 11:17:28,436 producer 7912 7
    16 2018-10-15 11:17:28,436 consumer 4356 recieved 7
    17 --------------
    18 ==============
    19 2018-10-15 11:17:29,450 producer 7912 7
    20 2018-10-15 11:17:29,450 consumer 4356 recieved 7
    21 --------------
    22 ==============
    23 2018-10-15 11:17:30,464 producer 7912 100
    24 2018-10-15 11:17:30,464 consumer 4356 recieved 100
    25 --------------
    26 ==============
    27 2018-10-15 11:17:31,478 producer 7912 17
    28 2018-10-15 11:17:31,478 consumer 4356 recieved 17
    29 --------------
    30 ==============
    31 2018-10-15 11:17:32,492 producer 7912 12
    32 2018-10-15 11:17:32,492 consumer 4356 recieved 12
    33 --------------
    34 ==============
    35 2018-10-15 11:17:33,506 producer 7912 100
    36 2018-10-15 11:17:33,506 consumer 4356 recieved 100
    37 --------------
    38 ==============
    39 2018-10-15 11:17:34,520 producer 7912 97
    40 2018-10-15 11:17:34,520 consumer 4356 recieved 97
    41 --------------
    42 ==============
    View Code

      测试:一个生产者,多个消费者,每次只通知两个线程

     1 import threading
     2 from threading import Thread , Lock, Event, Condition
     3 import logging, random
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 class Disptcher:
    10     def __init__(self):
    11         self.data = None
    12         self.event = Event()
    13         self.cond = Condition() # 默认是RLock,t 同一线程可以获取多个锁
    14 
    15     def produce(self, total):
    16         for _ in range(total):
    17             data = random.randint(0, 100)
    18             with self.cond: #  获取锁
    19                 logging.info(data)
    20                 self.data = data
    21                 self.cond.notify(2) # 通知所有等待的线程
    22             self.event.wait(1) # 模拟产生数据速度
    23 
    24 
    25     def consume(self):
    26         while True:
    27             # print('==============')
    28             with self.cond:
    29                 self.cond.wait() # 阻塞等待通知
    30                 logging.info('recieved {}'.format(self.data))
    31                 self.data = None
    32             self.event.wait(0.5) # 模拟消费速度
    33             # print('--------------')
    34 
    35 d = Disptcher()
    36 
    37 p = Thread(target=d.produce, args=(10,), name='producer')
    38 
    39 for i in range(5):
    40     c = Thread(target=d.consume, name='consumer --{}'.format(i))
    41     c.start()
    42 
    43 p.start()
    View Code

      结果:可以看到 每次通知的线程都是随机的。

     1 D:python3.7python.exe E:/code_pycharm/复习/t1.py
     2 2018-10-15 11:24:08,780 producer 7220 64
     3 2018-10-15 11:24:08,780 consumer --0 7488 recieved 64
     4 2018-10-15 11:24:08,780 consumer --1 7556 recieved None
     5 2018-10-15 11:24:09,780 producer 7220 65
     6 2018-10-15 11:24:09,780 consumer --2 4824 recieved 65
     7 2018-10-15 11:24:09,781 consumer --3 7036 recieved None
     8 2018-10-15 11:24:10,794 producer 7220 31
     9 2018-10-15 11:24:10,794 consumer --4 6256 recieved 31
    10 2018-10-15 11:24:10,795 consumer --0 7488 recieved None
    11 2018-10-15 11:24:11,809 producer 7220 60
    12 2018-10-15 11:24:11,809 consumer --1 7556 recieved 60
    13 2018-10-15 11:24:11,809 consumer --2 4824 recieved None
    14 2018-10-15 11:24:12,826 producer 7220 44
    15 2018-10-15 11:24:12,827 consumer --3 7036 recieved 44
    16 2018-10-15 11:24:12,828 consumer --0 7488 recieved None
    17 2018-10-15 11:24:13,836 producer 7220 18
    18 2018-10-15 11:24:13,836 consumer --2 4824 recieved 18
    19 2018-10-15 11:24:13,837 consumer --4 6256 recieved None
    20 2018-10-15 11:24:14,850 producer 7220 3
    21 2018-10-15 11:24:14,850 consumer --3 7036 recieved 3
    22 2018-10-15 11:24:14,851 consumer --1 7556 recieved None
    23 2018-10-15 11:24:15,864 producer 7220 33
    24 2018-10-15 11:24:15,864 consumer --0 7488 recieved 33
    25 2018-10-15 11:24:15,864 consumer --4 6256 recieved None
    26 2018-10-15 11:24:16,878 producer 7220 44
    27 2018-10-15 11:24:16,878 consumer --2 4824 recieved 44
    28 2018-10-15 11:24:16,879 consumer --3 7036 recieved None
    29 2018-10-15 11:24:17,892 producer 7220 39
    30 2018-10-15 11:24:17,892 consumer --0 7488 recieved 39
    31 2018-10-15 11:24:17,892 consumer --1 7556 recieved None
    View Code

      注意:上面的例子只为理解,存在很多瑕疵。

        如果一个生产者,多个消费者,若只通知部分线程,就是多播,若都通知,就是广播。

        一般消费者在前,避免数据丢失,如果不在意 ,也就无所谓。

         正常情况下,消费者速度很慢,对数据做一定处理,所以匹配很难,所以增加消费者来解决。

       总结:

        Conditon 用于生产者消费者模型中,解决生产者消费者速度匹配问题

        采用了通知机制

        使用方式:

          使用Conditon,必须先acquire,用完了,release,因为内部使用了锁。默认是RLock,最好使用with上下文。

          消费者wait,等待通知。

          生产者生产好消息,对消费者法通知,可以使用notify或notify_all方法

    6、Barrier     

       叫栅栏,屏障,可以想成路障,道闸 。3.2引入的新功能。

    名称                                     含义
    Barrier(parties, action=None,timeout=None g构建Barrier对象,指定参与方数目,timeout是wait方法未指定超时的默认值。
    n_waiting 当前在屏障中等待的线程数
    parties 各方数,就是需要多少个等待
    wait(timeout=None) 等待通过屏障,返回0到线程数-1 的整数,每个线程返回不同,如果wait方法设置了超时,并超时发送,屏障将处于broken状态。

       测试:线程数  小于等于 parties

     1 import threading
     2 from threading import Thread , Lock, Event, Condition, Barrier
     3 import logging, random
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 barrier = Barrier(3)
    10 
    11 def worker(bar:Barrier):
    12     print(' working', barrier.n_waiting)   # bar.n_waiting
    13     t = bar.wait()
    14     print(t)
    15     print('finished---', bar.n_waiting)
    16 
    17 ts = []
    18 for  i in range(2):
    19     t = Thread(target=worker, args=(barrier,))
    20     t.start()
    21     ts.append(t)
    22 
    23 
    24 while True:
    25     time.sleep(1)
    26     print(barrier.n_waiting)
    View Code

       结果:

    1  working 0
    2  working 1
    3 2
    4 2
    5 2
    6 2
    View Code

       从上述可以看出,当线程数少于 parties的时候,n_waitin 是当前线程数,在等待。

      而刚启动第一个线程的时候,此时n_waiting = 0,还没有一个线程在等待,因为等执行完wait后,在知道有一个在等待。

      所以,当线程数为2 的时候,n_waiting = 1,但是在主线程中,n_waiting = 2,因为当两个线程都处于wait时,主线程发现有两个哎等待,如果没有sleep(1),主线程中可能也会出现0 或 1.

      测试:如果出现abort,即损坏,需要异常处理来解决,正常的线程还是会继续执行

     1 import threading
     2 from threading import Thread , Lock, Event, Condition, Barrier
     3 import logging, random
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 barrier = Barrier(3)
    10 
    11 def worker(bar:Barrier):
    12     print(' working', barrier.n_waiting)   # bar.n_waiting
    13     try:
    14         bar.wait()
    15     except threading.BrokenBarrierError:
    16         print('broken')
    17     print('finished---', bar.n_waiting)
    18 
    19 ts = []
    20 for  i in range(3):
    21     if i == 2:
    22         barrier.abort()
    23     # if i == 4:
    24     #     barrier.reset()
    25     t = Thread(target=worker, args=(barrier,))
    26     t.start()
    27     ts.append(t)
    28 
    29 
    30 while True:
    31     time.sleep(2)
    32     print(barrier.n_waiting, barrier.broken)
    View Code

      结果:从结果可看到 0,1线程也都完成了

     1  working 0
     2  working broken0 working 0
     3 broken
     4 broken
     5 finished--- 0
     6 
     7 
     8 finished---finished--- 0
     9  0
    10 0 True
    11 0 True
    12 0 True
    13 
    14 Process finished with exit code 1
    View Code

      测试:通过异常处理来解决abrot ---> broken

     1 import threading
     2 from threading import Thread , Lock, Event, Condition, Barrier
     3 import logging, random
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 barrier = Barrier(3)
    10 
    11 def worker(bar:Barrier):
    12     print(' working', barrier.n_waiting)   # bar.n_waiting
    13     try:
    14         bar.wait()
    15     except threading.BrokenBarrierError:
    16         print('broken')
    17     print('finished---', bar.n_waiting)
    18 
    19 ts = []
    20 for  i in range(7):
    21     if i == 2:
    22         barrier.abort()
    23     if i == 4:
    24         barrier.reset()
    25     t = Thread(target=worker, args=(barrier,))
    26     t.start()
    27     ts.append(t)
    28 
    29 
    30 while True:
    31     time.sleep(0.5)
    32     print(barrier.n_waiting, barrier.broken)
    View Code

      结果:测试发现,只要线程是3 的倍数,即便abort了,也就是损坏了,还是可以往下执行

     1 D:python3.7python.exe E:/code_pycharm/复习/t2.py
     2  working 0
     3  working 1
     4 broken working
     5 finished---  0
     6 brokenbroken
     7 finished--- 0
     8 0
     9  working 0
    10 
    11  working 1
    12 finished--- 2
    13  working 2
    14 finished--- 0
    15 finished--- 0
    16  working 0
    17 finished--- 0
    18 1 False
    19 1 False
    20 1 False
    21 1 False
    22 1 False
    23 1 False
    24 1 False
    25 
    26 Process finished with exit code 1
    View Code

      测试:如果wait超时,就会将barrier  broken,而且一直处于broken,除非reset,也就是说,如果broken后,即便线程数不是partes的整数倍,都会往下走,不会处于等待,也就是说,来一个线程,不等,直接往下走。如下测试2

      没有broken之前正常等待,出现broken之后,就不用再等,直接执行后面的代码,直到修复

     1 import threading
     2 from threading import Thread , Lock, Event, Condition, Barrier
     3 import logging, random
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 barrier = Barrier(3)
    10 
    11 def worker(bar:Barrier):
    12     print(' working', barrier.n_waiting)   # bar.n_waiting
    13     try:
    14         bar.wait(1)
    15     except threading.BrokenBarrierError:
    16         print('broken')
    17     print('finished---', bar.n_waiting)
    18 
    19 ts = []
    20 for  i in range(6):
    21     t = Thread(target=worker, args=(barrier,))
    22     t.start()
    23     time.sleep(1)
    24     ts.append(t)
    25 
    26 
    27 while True:
    28     time.sleep(0.5)
    29     print(barrier.n_waiting, barrier.broken)
    View Code

      结果:

     1 D:python3.7python.exe E:/code_pycharm/复习/t2.py
     2  working 0
     3  working 1
     4 broken
     5 broken
     6 finished--- 0finished--- 0
     7 
     8  working 0
     9 broken
    10 finished--- 0
    11  working 0
    12 broken
    13 finished--- 0
    14  working 0
    15 broken
    16 finished--- 0
    17  working 0
    18 broken
    19 finished--- 0
    20 0 True
    21 0 True
    22 0 True
    23 0 True
    24 0 True
    View Code

      手动reset()

      

      测试2:

     1 import threading
     2 from threading import Thread , Lock, Event, Condition, Barrier
     3 import logging, random
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 barrier = Barrier(3)
    10 
    11 def worker(bar:Barrier):
    12     print(' working', barrier.n_waiting)   # bar.n_waiting
    13     try:
    14         bar.wait(1)
    15     except threading.BrokenBarrierError:
    16         print('broken')
    17     print('finished---', bar.n_waiting)
    18 
    19 ts = []
    20 for  i in range(6):
    21     t = Thread(target=worker, args=(barrier,))
    22     t.start()
    23     time.sleep(1)
    24     ts.append(t)
    25 
    26 time.sleep(3)
    27 print('--------------------------------------------------')
    28 
    29 for  _ in range(3):
    30     tt = threading.Thread(target=worker,args=(barrier,))
    31     tt.start()
    32 
    33 
    34 while True:
    35     time.sleep(0.5)
    36     print(barrier.n_waiting, barrier.broken)
    View Code

      结果:

     1 D:python3.7python.exe E:/code_pycharm/复习/t2.py
     2  working 0
     3  working 1
     4 broken
     5 finished--- 0
     6 broken
     7 finished--- 0
     8  working 0
     9 broken
    10 finished--- 0
    11  working 0
    12 broken
    13 finished--- 0
    14  working 0
    15 broken
    16 finished--- 0
    17  working 0
    18 broken
    19 finished--- 0
    20 --------------------------------------------------
    21  working 0
    22 broken
    23 finished--- 0
    24  working 0
    25 broken
    26 finished--- 0
    27  working 0
    28 broken
    29 finished--- 0
    30 0 True
    31 0 True
    32 0 True
    33 0 True
    34 0 True
    35 0 True
    36 0 True
    37 
    38 Process finished with exit code 1
    View Code

      

      

       Barrier应用:

        并发初始化:

        所有线程都必须初始化完成后,才能继续工作,例如运行前加载数据,检查,如果这些工作没有完成就开始运行,将不能正常进行。

        10个线程做10种工作准备,每个线程负责一种工作,只有这10个线程完成后,才能继续工作,先完成的要等待后完成的线程。

      例如:启动一个程序,需要先加载磁盘文件,缓存预热,初始化里链接池等工作,这些工作可以齐头并进,不过只有都满足了,成序才能继续向后执行。假设数据库链接失败,则初始化工作失败,就要abort,barrier置为broken,所有线程收到异常退出。

      工作量:

        有10个计算任务,完成6 个,就算完成工作。

    7、semaphore 信号量

      和 Lock很像,信号量对象内部维护一个倒计数器,每一次acquire都会减1,当acquire方法发现技术为0 就阻塞请求的线程,直到其他线程对信号量release后,计数大于 0,恢复阻塞的线程。

    名称                                                  含义
    Semaphore(value=1) 构造方法,value小于0,抛ValueError异常
    acquire(blocking=True, timeout=None) 获取信号量,计数器减一,获取成功返回True
    release() 释放信号量,计数器加1

      计数器永远不会低于0,因为acquire的时候,发现是0,都会被阻塞

      测试1:

     1 import threading
     2 from threading import Thread , Lock, Event, Condition, Semaphore
     3 import logging, random
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 s = Semaphore(3)
    10 
    11 print(s._value)
    12 
    13 print(s.acquire())
    14 print(s.acquire())
    15 print(s._value)
    16 
    17 Thread(target=lambda s:s.acquire(), args=(s,)).start()
    18 time.sleep(2)
    19 print(s._value)
    20 
    21 s.release()
    22 s.release()
    23 s.release()
    24 s.release()
    25 s.release()
    26 
    27 print('=========== end ==============')
    28 
    29 print(s._value)
    release超过value定义数

      结果:可以看到release 超过了value后,在看_value的值,就变了, 所以一般用带边界的信号量、

     1 D:python3.7python.exe E:/code_pycharm/复习/t1.py
     2 3
     3 True
     4 True
     5 1
     6 0
     7 =========== end ==============
     8 5
     9 
    10 Process finished with exit code 0
    View Code

      在BoundedSemaphore 下,如果没有acquire信号量的时候,就release,就会报错

      BoundedSemaphore类:

         有界的信号量,不允许使用release查出初始化的范围,否则ValueError

      应用举例

        连接池:因为资源有限,且开启一个连接成本高,所以使用连接池。

        一个简单的链接池:

          链接池应该有容量(总数),有一个工厂方法可以获取链接,能够把不用的链接返回,供其他调用者使用。

      测试: 此代码是用来演示,并不能用于生产

     1 import threading
     2 from threading import Thread , Lock, Event, Condition, Semaphore,BoundedSemaphore
     3 import logging, random
     4 import time
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
     7 logging.basicConfig(format=FORMAT,level=logging.INFO)
     8 
     9 class Conn:
    10     def __init__(self, name):
    11         self.name = name
    12 
    13 class Pool:
    14     def __init__(self, count:int):
    15         self.count = count
    16         self.pool = [self.__connect('conn - {}'.format(x)) for x in range(self.count)]
    17         self.semaphore = Semaphore(count)
    18 
    19 
    20     def _connect(self, conn_name):
    21         return Conn(conn_name)
    22 
    23     def get_conn(self):
    24         print('------------------')
    25         self.semaphore.acquire()
    26         print('===================')
    27         conn = self.pool.pop()
    28         return conn
    29 
    30     def return_conn(self,conn:Conn):
    31         self.pool.append(conn)
    32         self.semaphore.release()
    33 
    34 pool = Pool(3)
    35 
    36 def worer(pool:Pool):
    37     conn = pool.get_conn()
    38     logging.info(conn)
    39 
    40     threading.Event().wait(random.randint(1, 4))
    41     pool.return_conn(conn)
    42 
    43 for i in range(6):
    44     threading.Thread(target=worer, name='w --{}'.format(i), args=(pool,)).start()
    View Code

      

      

      信号量和锁:

        锁,只允许同一个时间一个线程独占资源,它是特殊的信号量,即信号量计数器初始值为 1

        信号量,可以多个线程访问共享资源,但是这个共享资源是有限的。

        锁,可以看做特殊的信号量。

    8、数据结构 和 GIL:

      Queue是线程安全的,到目前为止。

      

      因为 if语句 执行 和put 、get执行,中间,其他线程也插进来

      GIL 全局解释器锁

      

      第一段话的意思就是说:如果多线程的时候,每个线程分配到不同的cpu核心上,但是,还是同一时间只有一个线程执行字节码。

      IO密集型:比如print,会调用os模块下的stdout,这个时间,就会有其他线程深入执行。

      事实上,通过测试一个cpu密集型的代码,CPython多线程是没有任何优势的。和一个线程执行时间相当。因为GIL存在,

      事实上,python支持使用单线程,因为里边有一个协程。效率也不亚于多进程

      所以多线程,相当于串行执行,因为GIL ,每次只能执行一个线程代码,只不过时间片不一样。

      墙上时间(手表看的时间)

        
      
        5个100那块 再看一次,什么时候不会出现500
        
    
    
    多线程-锁:
        共享锁,排他锁,锁力度
        线程相关:lock  Rlock
    
        一般来说消费者先启动。避免数据丢失
    
        condition:
            生产速度  匹配消费速度
            一份数据多人使用
            广播策略
            多播策略
    
        复习:正则出现的 切片
    
        Barrier:
    
            Bounded 边界
    
    
        一个资源 lock
        多个有限资源 信号量
        with不能夸函数
    
    
        Queue代码中
            互斥量!!!
    
        内置数据结构本身不是线程安全的。在GIL下,安全,仅在CPython
    
    
        
    为什么要坚持,想一想当初!
  • 相关阅读:
    结构体运算符重载出错分析
    已知空间三个点,解算外接圆圆心坐标,C++编程实现
    IQueryable与IEnumberable的区别
    Asp.net MVC中关于@Html标签Label、Editor使用
    MVC 基架不支持 Entity Framework 6 或更高版本 即 NuGet的几个小技巧
    MVC下拉框Html.DropDownList 和DropDownListFor 的常用方法
    js jq封装ajax方法
    httpSession的正确理解
    vs未能解析此远程名称: 'api.nuget.org'
    未能加载文件或程序集“EntityFramework, Version=6.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089”
  • 原文地址:https://www.cnblogs.com/JerryZao/p/9776961.html
Copyright © 2011-2022 走看看