zoukankan      html  css  js  c++  java
  • Python 线程|进程

    Python   线程

    Threading是用于提供线程相关的操作,线程是应用程序中工作的最小单元。线程与进程的关系下图所示:

      子线程是由主线程产生的,但两者并没有关联。

    利用threading创建线程:

     1 '''利用threading包创建'''
     2 import threading
     3 import time
     4 
     5 def run(n):
     6     time.sleep(2)
     7     print("task:",n)
     8 
     9 '''串行:一个运行完后,再运行另外一个'''
    10 run("t1")  #并不是线程,只是调用方法传参数
    11 run("t2")
    12 
    13 '''并发性'''
    14 t1 = threading.Thread(target=run,args=("T1",))  #t1是线程,args为元组
    15 t2 = threading.Thread(target=run,args=("T2",))
    16 t1.start()  #并发性地工作
    17 t2.start()
    18 
    19 
    20 '''运行结果'''
    21 task: t1         #t1运行后会间隔两秒,然后运行t2
    22 task: t2
    23 
    24 task: T2         #T1,T2同时运行
    25 task: T1

    上述创建了两个线程t1和t2,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。

    更多方法:

    • start    线程准备就绪,等待CPU调度;启动线程的活动,每个线程对象最多只能调用一次。
    • join     逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义。
    • run      表示线程活动的方法。可以在子类中重写此方法。标准run()方法调用传递给对象构造函数的可调用对象作为目标参数(如果有的话),分别使用args和kwargs参数中的顺序参数和关键字参数。线程被cpu调度后自动执行线程对象的run方法
    • get_ident()    获得线程地址
    • setName    为线程设置名称
    • getName    获取线程名称
    • daemon     一个布尔值,指示此线程是否为守护线程。这必须在调用start()之前设置,否则会引发运行时错误。它的初始值继承自创建线程;主线程不是守护进程线程,因此在主线程中创建的所有线程默认为守护进程= False。当没有存活的非守护进程线程时,整个Python程序退出。
    • setDaemon   设置为后台线程或前台线程(默认)如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
     1 #子线程是由主线程产生的,但两者并没有关联
     2 import threading
     3 import time
     4 
     5 def run(n):
     6     print("task:",n)
     7     time.sleep(0.1)
     8     print("taskdone:",n)
     9 
    10 Start_time = time.time()
    11 for i in range(50):     #共有51个线程,代码本身是一个主线程
    12     t = threading.Thread(target=run,args=("t--%s" % i,))
    13     t.start()
    14     t.join()            #join使得主线程与子线程成串行运行
    15 
    16 print(time.time()-Start_time)  #print为创建子线程所产生的时间,而非运行时间
    多个线程的创建
     1 import threading
     2 import time
     3 
     4 class My_Thread(threading.Thread):
     5     def __init__(self,n):
     6         super(My_Thread,self).__init__()
     7         self.n = n
     8 
     9     def  run(self):
    10         print("task:",self.n)
    11         time.sleep(0.1)
    12 t_obj=[]
    13 start_time = time.time()
    14 for i in range(50):     #共有51个线程,代码本身是一个主线程
    15     t = My_Thread("t--%s" % i)
    16     t.setDaemon(True)   #监听端口,当主程序执行完毕,将不会执行其他线程(前提是去掉join方法)
    17     t.start()
    18     t_obj.append(t)
    19 print(time.time()-start_time)
    20 
    21 
    22 '''运行结果'''
    23 task: t--0
    24 task: t--1
    25 task: t--2
    26 task: t--3
    27 task: t--4
    28 task: t--5
    29 task: t--6
    30 task: t--7
    31 task: t--8
    32 task: t--9
    33 task: t--10
    34 task: t--11
    35 task: t--12
    36 task: t--13
    37 task: t--14
    38 task: t--15
    39 task: t--16
    40 task: t--17
    41 task: t--18
    42 task: t--19
    43 task: t--20
    44 task: t--21
    45 task: t--22
    46 task: t--23
    47 task: t--24
    48 task: t--25
    49 task: t--26
    50 task: t--27
    51 task: t--28
    52 task: t--29
    53 task: t--30
    54 task: t--31
    55 task: t--32
    56 task: t--33
    57 task: t--34
    58 task: t--35
    59 task: t--36
    60 task: t--37
    61 task: t--38
    62 task: t--39
    63 task: t--40
    64 task: t--41
    65 task: t--42
    66 task: t--43
    67 task: t--44
    68 task: t--45
    69 task: t--46
    70 task: t--47
    71 task: t--48
    72 task: t--49
    73 0.01196908950805664
    监听端口(setDaemon)

    线程锁(Lock):

     1     def acquire(self, blocking=True, timeout=None):
     2         """Acquire a semaphore, decrementing the internal counter by one.
     3         When invoked without arguments: if the internal counter is larger than
     4         zero on entry, decrement it by one and return immediately. If it is zero
     5         on entry, block, waiting until some other thread has called release() to
     6         make it larger than zero. This is done with proper interlocking so that
     7         if multiple acquire() calls are blocked, release() will wake exactly one
     8         of them up. The implementation may pick one at random, so the order in
     9         which blocked threads are awakened should not be relied on. There is no
    10         return value in this case.
    11         When invoked with blocking set to true, do the same thing as when called
    12         without arguments, and return true.
    13         When invoked with blocking set to false, do not block. If a call without
    14         an argument would block, return false immediately; otherwise, do the
    15         same thing as when called without arguments, and return true.
    16         When invoked with a timeout other than None, it will block for at
    17         most timeout seconds.  If acquire does not complete successfully in
    18         that interval, return false.  Return true otherwise.
    19         """
    20         #获得一个信号量,将内部计数器减1。在没有参数的情况下调用时:如果内部计数器在入口时
    21         # 大于0,则将其递减1并立即返回。如果进入时为零,阻塞,等待其他线程调用release()
    22         # 使其大于零。这是通过适当的联锁完成的,这样,如果多个acquire()调用被阻塞,
    23         # release()就会唤醒其中一个调用。实现可以随机选择一个线程,因此不应该依赖于被阻塞
    24         # 线程被唤醒的顺序。在本例中没有返回值。当阻塞集调用为true时,执行与没有参数调用
    25         # 时相同的操作,并返回true。当阻塞设置为false时,不要阻塞。如果一个没有参数的
    26         # 调用将阻塞,立即返回false;否则,执行与没有参数调用时相同的操作,并返回true。
    27         # 当使用除None以外的超时调用时,它最多将阻塞超时秒。如果在那段时间里收购没有成功
    28         # 完成,还假。否则返回true。
    29         if not blocking and timeout is not None:
    30             raise ValueError("can't specify timeout for non-blocking acquire")
    31         rc = False
    32         endtime = None
    33         with self._cond:
    34             while self._value == 0:
    35                 if not blocking:
    36                     break
    37                 if timeout is not None:
    38                     if endtime is None:
    39                         endtime = _time() + timeout
    40                     else:
    41                         timeout = endtime - _time()
    42                         if timeout <= 0:
    43                             break
    44                 self._cond.wait(timeout)
    45             else:
    46                 self._value -= 1
    47                 rc = True
    48         return rc
    49 
    50     __enter__ = acquire
    51 
    52     def release(self):
    53         """Release a semaphore, incrementing the internal counter by one.
    54         When the counter is zero on entry and another thread is waiting for it
    55         to become larger than zero again, wake up that thread.
    56         """
    57         #释放信号量,增加一个内部计数器。当进入时计数器为零,而另一个线程正在等待计数器
    58         # 再次大于零时,唤醒该线程。
    59         with self._cond:
    60             self._value += 1
    61             self._cond.notify()
    62 
    63     def __exit__(self, t, v, tb):
    64         self.release()
    acquire、release源代码
     1 import threading
     2 import time
     3 
     4 lock = threading.Lock()   #线程锁
     5 
     6 def run(n):
     7     lock.acquire()             #锁定
     8     global num
     9     num+=1
    10     lock.release()              #释放锁
    11     time.sleep(1)
    12 
    13 t_obj = []
    14 num = 0
    15 for i in range(50):
    16     t = threading.Thread(target=run,args=("t--%s" % i,))
    17     t.start()
    18     t_obj.append(t)
    19 
    20 for i in t_obj:
    21     i.join()
    22 
    23 print("num:",num)
    24 
    25 
    26 '''运行结果'''
    27 num: 50
    '''可用来做测试'''
    
    if __name__ == "__main__"  
    
    #表示函数的开始位置,判断自主运行与否
    

      线程池(信号量(semaphore)):

      信号量管理一个计数器,该计数器表示release()调用的数量减去acquire()调用的数量,再加上一个初始值。acquire()方法如果有必要会阻塞,直到它可以返回而不会使计数器变为负数为止。如果未指定,值默认为1。

    '''信号量'''
    import threading
    import time
    
    def run(n):
        Semaphore.acquire()
        print("task:",n)
        time.sleep(1)
        Semaphore.release()
    
    if __name__ == "__main__":
        Semaphore = threading.BoundedSemaphore(5)
        #每五个子进程运行一次,间隔一秒后,再运行下五个
        for i in range(20):
            t = threading.Thread(target=run,args=(i,))
    
            t.start()
        while threading.active_count()!=1:
            pass
        else:
            print("--all threading has done")
       1 """Thread module emulating a subset of Java's threading model."""
       2 #线程模块模拟Java线程模型的一个子集。
       3 import os as _os
       4 import sys as _sys
       5 import _thread
       6 
       7 from time import monotonic as _time
       8 from traceback import format_exc as _format_exc
       9 from _weakrefset import WeakSet
      10 from itertools import islice as _islice, count as _count
      11 try:
      12     from _collections import deque as _deque
      13 except ImportError:
      14     from collections import deque as _deque
      15 
      16 # Note regarding PEP 8 compliant names
      17 #  This threading model was originally inspired by Java, and inherited
      18 # the convention of camelCase function and method names from that
      19 # language. Those original names are not in any imminent danger of
      20 # being deprecated (even for Py3k),so this module provides them as an
      21 # alias for the PEP 8 compliant names
      22 # Note that using the new PEP 8 compliant names facilitates substitution
      23 # with the multiprocessing module, which doesn't provide the old
      24 # Java inspired names.
      25 
      26 __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread',
      27            'enumerate', 'main_thread', 'TIMEOUT_MAX',
      28            'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
      29            'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',
      30            'setprofile', 'settrace', 'local', 'stack_size']
      31 
      32 # Rename some stuff so "from threading import *" is safe
      33 _start_new_thread = _thread.start_new_thread
      34 _allocate_lock = _thread.allocate_lock
      35 _set_sentinel = _thread._set_sentinel
      36 get_ident = _thread.get_ident
      37 ThreadError = _thread.error
      38 try:
      39     _CRLock = _thread.RLock
      40 except AttributeError:
      41     _CRLock = None
      42 TIMEOUT_MAX = _thread.TIMEOUT_MAX
      43 del _thread
      44 
      45 
      46 # Support for profile and trace hooks
      47 #支持配置文件和跟踪挂钩
      48 
      49 _profile_hook = None
      50 _trace_hook = None
      51 
      52 def setprofile(func):
      53     """Set a profile function for all threads started from the threading module.
      54     The func will be passed to sys.setprofile() for each thread, before its
      55     run() method is called.
      56     """
      57     #为从线程模块启动的所有线程设置一个配置文件函数。在调用其run()方法之前,
      58     # func将被传递给每个线程的sys.setprofile()。
      59 
      60     global _profile_hook
      61     _profile_hook = func
      62 
      63 def settrace(func):
      64     """Set a trace function for all threads started from the threading module.
      65     The func will be passed to sys.settrace() for each thread, before its run()
      66     method is called.
      67     """
      68     #为从线程模块启动的所有线程设置跟踪函数。在调用其run()方法之前,
      69     # func将被传递给每个线程的sys.settrace()。
      70 
      71     global _trace_hook
      72     _trace_hook = func
      73 
      74 # Synchronization classes
      75 # 同步类
      76 
      77 Lock = _allocate_lock
      78 
      79 def RLock(*args, **kwargs):
      80     """Factory function that returns a new reentrant lock.
      81     A reentrant lock must be released by the thread that acquired it. Once a
      82     thread has acquired a reentrant lock, the same thread may acquire it again
      83     without blocking; the thread must release it once for each time it has
      84     acquired it.
      85     """
      86     #返回一个新的可重入锁的工厂函数。可重入锁必须由获得它的线程释放。
      87     # 一旦一个线程获得了可重入锁,该线程可以在不阻塞的情况下再次获得该锁;
      88     # 线程每次获得它时都必须释放它一次。
      89 
      90     if _CRLock is None:
      91         return _PyRLock(*args, **kwargs)
      92     return _CRLock(*args, **kwargs)
      93 
      94 class _RLock:
      95     """This class implements reentrant lock objects.
      96     A reentrant lock must be released by the thread that acquired it. Once a
      97     thread has acquired a reentrant lock, the same thread may acquire it
      98     again without blocking; the thread must release it once for each time it
      99     has acquired it.
     100     """
     101     #该类实现可重入锁对象。可重入锁必须由获得它的线程释放。一旦一个线程获得了可重入锁,
     102     # 该线程可以在不阻塞的情况下再次获得该锁;线程每次获得它时都必须释放它一次。
     103 
     104     def __init__(self):
     105         self._block = _allocate_lock()
     106         self._owner = None
     107         self._count = 0
     108 
     109     def __repr__(self):
     110         owner = self._owner
     111         try:
     112             owner = _active[owner].name
     113         except KeyError:
     114             pass
     115         return "<%s %s.%s object owner=%r count=%d at %s>" % (
     116             "locked" if self._block.locked() else "unlocked",
     117             self.__class__.__module__,
     118             self.__class__.__qualname__,
     119             owner,
     120             self._count,
     121             hex(id(self))
     122         )
     123 
     124     def acquire(self, blocking=True, timeout=-1):
     125         """Acquire a lock, blocking or non-blocking.
     126         When invoked without arguments: if this thread already owns the lock,
     127         increment the recursion level by one, and return immediately. Otherwise,
     128         if another thread owns the lock, block until the lock is unlocked. Once
     129         the lock is unlocked (not owned by any thread), then grab ownership, set
     130         the recursion level to one, and return. If more than one thread is
     131         blocked waiting until the lock is unlocked, only one at a time will be
     132         able to grab ownership of the lock. There is no return value in this
     133         case.
     134         When invoked with the blocking argument set to true, do the same thing
     135         as when called without arguments, and return true.
     136         When invoked with the blocking argument set to false, do not block. If a
     137         call without an argument would block, return false immediately;
     138         otherwise, do the same thing as when called without arguments, and
     139         return true.
     140         When invoked with the floating-point timeout argument set to a positive
     141         value, block for at most the number of seconds specified by timeout
     142         and as long as the lock cannot be acquired.  Return true if the lock has
     143         been acquired, false if the timeout has elapsed.
     144         """
     145         #获得一个锁,阻塞或非阻塞。在没有参数的情况下调用时:如果这个线程已经拥有锁,
     146         # 那么将递归级别增加1,并立即返回。否则,如果另一个线程拥有锁,
     147         # 则阻塞直到锁被解锁。一旦锁被解锁(不属于任何线程),然后获取所有权,
     148         # 将递归级别设置为1,然后返回。如果有多个线程被阻塞,等待锁被解锁,
     149         # 每次只有一个线程能够获取锁的所有权。在本例中没有返回值。当阻塞参数设置
     150         # 为true时,执行与没有参数时相同的操作,并返回true。当阻塞参数设置为false时,
     151         # 不要阻塞。如果一个没有参数的调用将阻塞,立即返回false;否则,执行与没有
     152         # 参数调用时相同的操作,并返回true。当将浮点超时参数设置为正值时,如果获得
     153         # 了锁,则最多阻塞超时指定的秒数,如果超时已过,则返回true;如果超时已过,则返回false。
     154 
     155         me = get_ident()
     156         if self._owner == me:
     157             self._count += 1
     158             return 1
     159         rc = self._block.acquire(blocking, timeout)
     160         if rc:
     161             self._owner = me
     162             self._count = 1
     163         return rc
     164 
     165     __enter__ = acquire
     166 
     167     def release(self):
     168         """Release a lock, decrementing the recursion level.
     169         If after the decrement it is zero, reset the lock to unlocked (not owned
     170         by any thread), and if any other threads are blocked waiting for the
     171         lock to become unlocked, allow exactly one of them to proceed. If after
     172         the decrement the recursion level is still nonzero, the lock remains
     173         locked and owned by the calling thread.
     174         Only call this method when the calling thread owns the lock. A
     175         RuntimeError is raised if this method is called when the lock is
     176         unlocked.
     177         There is no return value.
     178         """
     179         #释放锁,降低递归级别。如果减量后为零,则将锁重置为解锁(不属于任何线程),
     180         # 如果任何其他线程被阻塞,等待锁解锁,则只允许其中一个线程继续执行。如果在递减
     181         # 之后递归级别仍然是非零,则锁仍然被锁定,并且由调用线程拥有。只有当调用线程拥有
     182         # 锁时才调用此方法。如果在解锁锁时调用此方法,将引发运行时错误。没有返回值。
     183 
     184         if self._owner != get_ident():
     185             raise RuntimeError("cannot release un-acquired lock")
     186         self._count = count = self._count - 1
     187         if not count:
     188             self._owner = None
     189             self._block.release()
     190 
     191     def __exit__(self, t, v, tb):
     192         self.release()
     193 
     194     # Internal methods used by condition variables
     195     #条件变量使用的内部方法
     196 
     197     def _acquire_restore(self, state):
     198         self._block.acquire()
     199         self._count, self._owner = state
     200 
     201     def _release_save(self):
     202         if self._count == 0:
     203             raise RuntimeError("cannot release un-acquired lock")
     204         count = self._count
     205         self._count = 0
     206         owner = self._owner
     207         self._owner = None
     208         self._block.release()
     209         return (count, owner)
     210 
     211     def _is_owned(self):
     212         return self._owner == get_ident()
     213 
     214 _PyRLock = _RLock
     215 
     216 
     217 class Condition:
     218     """Class that implements a condition variable.
     219     A condition variable allows one or more threads to wait until they are
     220     notified by another thread.
     221     If the lock argument is given and not None, it must be a Lock or RLock
     222     object, and it is used as the underlying lock. Otherwise, a new RLock object
     223     is created and used as the underlying lock.
     224     """
     225     #实现条件变量的类。条件变量允许一个或多个线程等待,直到另一个线程通知它们。
     226     # 如果锁参数是给定的而不是空的,那么它必须是一个锁或RLock对象,并且它被用作底层锁。
     227     # 否则,将创建一个新的RLock对象并将其用作底层锁。
     228 
     229     def __init__(self, lock=None):
     230         if lock is None:
     231             lock = RLock()
     232         self._lock = lock
     233         # Export the lock's acquire() and release() methods
     234         #导出锁的acquire()和release()方法
     235         self.acquire = lock.acquire
     236         self.release = lock.release
     237         # If the lock defines _release_save() and/or _acquire_restore(),
     238         # these override the default implementations (which just call
     239         # release() and acquire() on the lock).  Ditto for _is_owned().
     240         #如果锁定义了_release_save()和/或_acquire_restore(),就会覆盖默认的实现
     241         # (它只调用release()和acquire()对锁进行访问)。_is_owned同上()。
     242         try:
     243             self._release_save = lock._release_save
     244         except AttributeError:
     245             pass
     246         try:
     247             self._acquire_restore = lock._acquire_restore
     248         except AttributeError:
     249             pass
     250         try:
     251             self._is_owned = lock._is_owned
     252         except AttributeError:
     253             pass
     254         self._waiters = _deque()
     255 
     256     def __enter__(self):
     257         return self._lock.__enter__()
     258 
     259     def __exit__(self, *args):
     260         return self._lock.__exit__(*args)
     261 
     262     def __repr__(self):
     263         return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
     264 
     265     def _release_save(self):
     266         self._lock.release()           # No state to save 没有状态保存
     267 
     268     def _acquire_restore(self, x):
     269         self._lock.acquire()           # Ignore saved state 忽略保存的状态
     270 
     271     def _is_owned(self):
     272         # Return True if lock is owned by current_thread.
     273         #如果锁属于current_thread,则返回True。
     274         # This method is called only if _lock doesn't have _is_owned().
     275         #只有当_lock没有_is_owned()时才调用该方法。
     276         if self._lock.acquire(0):
     277             self._lock.release()
     278             return False
     279         else:
     280             return True
     281 
     282     def wait(self, timeout=None):
     283         """Wait until notified or until a timeout occurs.
     284         If the calling thread has not acquired the lock when this method is
     285         called, a RuntimeError is raised.
     286         This method releases the underlying lock, and then blocks until it is
     287         awakened by a notify() or notify_all() call for the same condition
     288         variable in another thread, or until the optional timeout occurs. Once
     289         awakened or timed out, it re-acquires the lock and returns.
     290         When the timeout argument is present and not None, it should be a
     291         floating point number specifying a timeout for the operation in seconds
     292         (or fractions thereof).
     293         When the underlying lock is an RLock, it is not released using its
     294         release() method, since this may not actually unlock the lock when it
     295         was acquired multiple times recursively. Instead, an internal interface
     296         of the RLock class is used, which really unlocks it even when it has
     297         been recursively acquired several times. Another internal interface is
     298         then used to restore the recursion level when the lock is reacquired.
     299         """
     300         #等待直到通知或超时发生。如果调用该方法时调用的线程没有获得锁,则会引发运行时错误。
     301         # 该方法释放底层锁,然后阻塞,直到它被另一个线程中的notify()或notify_all()调用
     302         # 唤醒,或者直到出现可选超时为止。一旦被唤醒或超时,它会重新获得锁并返回。
     303         # 当出现timeout参数而不是None时,它应该是一个浮点数,以秒(或几分之一)为单位指定
     304         # 操作的超时。当底层锁是RLock时,不会使用其release()方法释放它,因为当递归地多次
     305         # 获取锁时,这可能不会真正解锁它。相反,使用了RLock类的内部接口,即使递归地获得了
     306         # 多次,它也会真正地解锁它。然后使用另一个内部接口在重新获得锁时恢复递归级别。
     307 
     308         if not self._is_owned():
     309             raise RuntimeError("cannot wait on un-acquired lock")
     310         waiter = _allocate_lock()
     311         waiter.acquire()
     312         self._waiters.append(waiter)
     313         saved_state = self._release_save()
     314         gotit = False
     315         try:    # restore state no matter what (e.g., KeyboardInterrupt)
     316                 #无论如何都要恢复状态(例如,键盘中断)
     317             if timeout is None:
     318                 waiter.acquire()
     319                 gotit = True
     320             else:
     321                 if timeout > 0:
     322                     gotit = waiter.acquire(True, timeout)
     323                 else:
     324                     gotit = waiter.acquire(False)
     325             return gotit
     326         finally:
     327             self._acquire_restore(saved_state)
     328             if not gotit:
     329                 try:
     330                     self._waiters.remove(waiter)
     331                 except ValueError:
     332                     pass
     333 
     334     def wait_for(self, predicate, timeout=None):
     335         """Wait until a condition evaluates to True.
     336         predicate should be a callable which result will be interpreted as a
     337         boolean value.  A timeout may be provided giving the maximum time to
     338         wait.
     339         """
     340         #等待,直到条件的值为True。谓词应该是可调用的,其结果将被解释为布尔值。
     341         # 可能会提供一个超时,以提供最长的等待时间。
     342 
     343         endtime = None
     344         waittime = timeout
     345         result = predicate()
     346         while not result:
     347             if waittime is not None:
     348                 if endtime is None:
     349                     endtime = _time() + waittime
     350                 else:
     351                     waittime = endtime - _time()
     352                     if waittime <= 0:
     353                         break
     354             self.wait(waittime)
     355             result = predicate()
     356         return result
     357 
     358     def notify(self, n=1):
     359         """Wake up one or more threads waiting on this condition, if any.
     360         If the calling thread has not acquired the lock when this method is
     361         called, a RuntimeError is raised.
     362         This method wakes up at most n of the threads waiting for the condition
     363         variable; it is a no-op if no threads are waiting.
     364         """
     365         #唤醒在此条件下等待的一个或多个线程(如果有的话)。如果调用该方法时调用的线程没有获得锁,
     366         # 则会引发运行时错误。该方法最多唤醒n个等待条件变量的线程;如果没有线程在等待,那么
     367         # 这是一个no-op。
     368         if not self._is_owned():
     369             raise RuntimeError("cannot notify on un-acquired lock")
     370         all_waiters = self._waiters
     371         waiters_to_notify = _deque(_islice(all_waiters, n))
     372         if not waiters_to_notify:
     373             return
     374         for waiter in waiters_to_notify:
     375             waiter.release()
     376             try:
     377                 all_waiters.remove(waiter)
     378             except ValueError:
     379                 pass
     380 
     381     def notify_all(self):
     382         """Wake up all threads waiting on this condition.
     383         If the calling thread has not acquired the lock when this method
     384         is called, a RuntimeError is raised.
     385         """
     386         #唤醒在此条件下等待的所有线程。如果调用该方法时调用的线程没有获得锁,
     387         # 则会引发运行时错误。
     388         self.notify(len(self._waiters))
     389 
     390     notifyAll = notify_all
     391 
     392 
     393 class Semaphore:
     394     """This class implements semaphore objects.
     395     Semaphores manage a counter representing the number of release() calls minus
     396     the number of acquire() calls, plus an initial value. The acquire() method
     397     blocks if necessary until it can return without making the counter
     398     negative. If not given, value defaults to 1.
     399     """
     400     #这个类实现信号量对象。信号量管理一个计数器,该计数器表示release()调用的数量减去
     401     # acquire()调用的数量,再加上一个初始值。acquire()方法如果有必要会阻塞,直到它可以
     402     # 返回而不会使计数器变为负数为止。如果未指定,值默认为1。
     403 
     404     # After Tim Peters' semaphore class, but not quite the same (no maximum)
     405     #在Tim Peters的信号量类之后,但不完全相同(没有最大值)
     406 
     407     def __init__(self, value=1):
     408         if value < 0:
     409             raise ValueError("semaphore initial value must be >= 0")
     410         self._cond = Condition(Lock())
     411         self._value = value
     412 
     413     def acquire(self, blocking=True, timeout=None):
     414         """Acquire a semaphore, decrementing the internal counter by one.
     415         When invoked without arguments: if the internal counter is larger than
     416         zero on entry, decrement it by one and return immediately. If it is zero
     417         on entry, block, waiting until some other thread has called release() to
     418         make it larger than zero. This is done with proper interlocking so that
     419         if multiple acquire() calls are blocked, release() will wake exactly one
     420         of them up. The implementation may pick one at random, so the order in
     421         which blocked threads are awakened should not be relied on. There is no
     422         return value in this case.
     423         When invoked with blocking set to true, do the same thing as when called
     424         without arguments, and return true.
     425         When invoked with blocking set to false, do not block. If a call without
     426         an argument would block, return false immediately; otherwise, do the
     427         same thing as when called without arguments, and return true.
     428         When invoked with a timeout other than None, it will block for at
     429         most timeout seconds.  If acquire does not complete successfully in
     430         that interval, return false.  Return true otherwise.
     431         """
     432         #获得一个信号量,将内部计数器减1。在没有参数的情况下调用时:如果内部计数器在入口时
     433         # 大于0,则将其递减1并立即返回。如果进入时为零,阻塞,等待其他线程调用release()
     434         # 使其大于零。这是通过适当的联锁完成的,这样,如果多个acquire()调用被阻塞,
     435         # release()就会唤醒其中一个调用。实现可以随机选择一个线程,因此不应该依赖于被阻塞
     436         # 线程被唤醒的顺序。在本例中没有返回值。当阻塞集调用为true时,执行与没有参数调用
     437         # 时相同的操作,并返回true。当阻塞设置为false时,不要阻塞。如果一个没有参数的
     438         # 调用将阻塞,立即返回false;否则,执行与没有参数调用时相同的操作,并返回true。
     439         # 当使用除None以外的超时调用时,它最多将阻塞超时秒。如果在那段时间里收购没有成功
     440         # 完成,还假。否则返回true。
     441         if not blocking and timeout is not None:
     442             raise ValueError("can't specify timeout for non-blocking acquire")
     443         rc = False
     444         endtime = None
     445         with self._cond:
     446             while self._value == 0:
     447                 if not blocking:
     448                     break
     449                 if timeout is not None:
     450                     if endtime is None:
     451                         endtime = _time() + timeout
     452                     else:
     453                         timeout = endtime - _time()
     454                         if timeout <= 0:
     455                             break
     456                 self._cond.wait(timeout)
     457             else:
     458                 self._value -= 1
     459                 rc = True
     460         return rc
     461 
     462     __enter__ = acquire
     463 
     464     def release(self):
     465         """Release a semaphore, incrementing the internal counter by one.
     466         When the counter is zero on entry and another thread is waiting for it
     467         to become larger than zero again, wake up that thread.
     468         """
     469         #释放信号量,增加一个内部计数器。当进入时计数器为零,而另一个线程正在等待计数器
     470         # 再次大于零时,唤醒该线程。
     471         with self._cond:
     472             self._value += 1
     473             self._cond.notify()
     474 
     475     def __exit__(self, t, v, tb):
     476         self.release()
     477 
     478 
     479 class BoundedSemaphore(Semaphore):
     480     """Implements a bounded semaphore.
     481     A bounded semaphore checks to make sure its current value doesn't exceed its
     482     initial value. If it does, ValueError is raised. In most situations
     483     semaphores are used to guard resources with limited capacity.
     484     If the semaphore is released too many times it's a sign of a bug. If not
     485     given, value defaults to 1.
     486     Like regular semaphores, bounded semaphores manage a counter representing
     487     the number of release() calls minus the number of acquire() calls, plus an
     488     initial value. The acquire() method blocks if necessary until it can return
     489     without making the counter negative. If not given, value defaults to 1.
     490     """
     491     #实现有界信号量。有界信号量检查其当前值是否不超过初始值。如果是,则会引发ValueError。
     492     # 在大多数情况下,信号量被用来保护有限容量的资源。如果信号量被释放了太多次,这是错误
     493     # 的信号。如果未指定,值默认为1。与常规信号量一样,有界信号量管理一个计数器,
     494     # 表示release()调用的数量减去acquire()调用的数量,再加上一个初始值。acquire()方法
     495     # 如果有必要会阻塞,直到它可以返回而不会使计数器变为负数为止。如果未指定,值默认为1。
     496 
     497     def __init__(self, value=1):
     498         Semaphore.__init__(self, value)
     499         self._initial_value = value
     500 
     501     def release(self):
     502         """Release a semaphore, incrementing the internal counter by one.
     503 
     504         When the counter is zero on entry and another thread is waiting for it
     505         to become larger than zero again, wake up that thread.
     506 
     507         If the number of releases exceeds the number of acquires,
     508         raise a ValueError.
     509         """
     510         #释放信号量,增加一个内部计数器。当进入时计数器为0,而另一个线程正在等待i再次
     511         # 大于0时,唤醒那个线程。如果发布的数量超过了获得的数量,则引发一个ValueError。
     512         with self._cond:
     513             if self._value >= self._initial_value:
     514                 raise ValueError("Semaphore released too many times")
     515             self._value += 1
     516             self._cond.notify()
     517 
     518 
     519 class Event:
     520     """Class implementing event objects.
     521 
     522     Events manage a flag that can be set to true with the set() method and reset
     523     to false with the clear() method. The wait() method blocks until the flag is
     524     true.  The flag is initially false.
     525     """
     526     #类实现事件对象。事件管理的标志可以用set()方法设置为true,用clear()方法重置为false。
     527     # wait()方法将阻塞,直到标记为true。标志最初是假的。
     528 
     529     # After Tim Peters' event class (without is_posted())
     530     #在Tim Peters的事件类之后(没有is_post ())
     531 
     532     def __init__(self):
     533         self._cond = Condition(Lock())
     534         self._flag = False
     535 
     536     def _reset_internal_locks(self):
     537         # private!  called by Thread._reset_internal_locks by _after_fork()
     538         #私人!调用线程._reset_internal_locks _after_fork()
     539         self._cond.__init__(Lock())
     540 
     541     def is_set(self):
     542         """Return true if and only if the internal flag is true."""
     543         #当且仅当内部标志为true时返回true。
     544         return self._flag
     545 
     546     isSet = is_set
     547 
     548     def set(self):
     549         """Set the internal flag to true.
     550         All threads waiting for it to become true are awakened. Threads
     551         that call wait() once the flag is true will not block at all.
     552         """
     553         #将内部标志设置为true。等待它成真的所有线程都被唤醒。一旦标志为true,
     554         # 调用wait()的线程将不会阻塞。
     555         with self._cond:
     556             self._flag = True
     557             self._cond.notify_all()
     558 
     559     def clear(self):
     560         """Reset the internal flag to false.
     561         Subsequently, threads calling wait() will block until set() is called to
     562         set the internal flag to true again.
     563         """
     564         #将内部标志重置为false。随后,调用wait()的线程将阻塞,直到调用set()将内部标志再次设置为true。
     565         with self._cond:
     566             self._flag = False
     567 
     568     def wait(self, timeout=None):
     569         """Block until the internal flag is true.
     570         If the internal flag is true on entry, return immediately. Otherwise,
     571         block until another thread calls set() to set the flag to true, or until
     572         the optional timeout occurs.
     573         When the timeout argument is present and not None, it should be a
     574         floating point number specifying a timeout for the operation in seconds
     575         (or fractions thereof).
     576         This method returns the internal flag on exit, so it will always return
     577         True except if a timeout is given and the operation times out.
     578         """
     579         #阻塞,直到内部标志为true。如果进入时内部标志为true,则立即返回。否则,阻塞直到
     580         # 另一个线程调用set()将标志设置为true,或者直到出现可选超时。当出现timeout参数
     581         # 而不是None时,它应该是一个浮点数,以秒(或几分之一)为单位指定操作的超时。这个
     582         # 方法在退出时返回内部标志,因此它总是返回True,除非超时和操作超时。
     583         with self._cond:
     584             signaled = self._flag
     585             if not signaled:
     586                 signaled = self._cond.wait(timeout)
     587             return signaled
     588 
     589 
     590 # A barrier class.  Inspired in part by the pthread_barrier_* api and
     591 # the CyclicBarrier class from Java.  See
     592 '''一个障碍类。部分灵感来自于pthread_barrier_* api和来自Java的循环屏障类。看到'''
     593 # http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and
     594 # http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/
     595 #        CyclicBarrier.html
     596 # for information.     ##获取信息
     597 # We maintain two main states, 'filling' and 'draining' enabling the barrier
     598 # to be cyclic.  Threads are not allowed into it until it has fully drained
     599 # since the previous cycle.  In addition, a 'resetting' state exists which is
     600 # similar to 'draining' except that threads leave with a BrokenBarrierError,
     601 # and a 'broken' state in which all threads get the exception.
     602 '''我们维持两种主要状态,“填充”和“排水”,使屏障是循环的。线程不允许进入它,直到它从
     603 上一个循环中完全耗尽为止。此外,存在一种“重置”状态,类似于“耗尽”状态,只是线程留下了
     604 一个故障的barriererror错误,以及所有线程都得到异常的“中断”状态。'''
     605 class Barrier:
     606     """Implements a Barrier.
     607     Useful for synchronizing a fixed number of threads at known synchronization
     608     points.  Threads block on 'wait()' and are simultaneously once they have all
     609     made that call.
     610     """
     611     #实现了一个障碍。用于在已知同步点同步固定数量的线程。线程阻塞在'wait()'上,
     612     # 并且一旦它们都进行了该调用,就会同时阻塞。
     613 
     614     def __init__(self, parties, action=None, timeout=None):
     615         """Create a barrier, initialised to 'parties' threads.
     616         'action' is a callable which, when supplied, will be called by one of
     617         the threads after they have all entered the barrier and just prior to
     618         releasing them all. If a 'timeout' is provided, it is uses as the
     619         default for all subsequent 'wait()' calls.
     620         """
     621         #创建一个障碍,初始化为“party”线程。“action”是一个可调用的线程,当它被提供时,
     622         # 它将被其中一个线程在它们全部进入壁垒并释放它们之前调用。如果提供了'timeout',
     623         # 那么它将用作所有后续'wait()'调用的默认值。
     624         self._cond = Condition(Lock())
     625         self._action = action
     626         self._timeout = timeout
     627         self._parties = parties
     628         self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
     629         self._count = 0
     630 
     631     def wait(self, timeout=None):
     632         """Wait for the barrier.
     633         When the specified number of threads have started waiting, they are all
     634         simultaneously awoken. If an 'action' was provided for the barrier, one
     635         of the threads will have executed that callback prior to returning.
     636         Returns an individual index number from 0 to 'parties-1'.
     637         """
     638         #等待障碍。当指定数量的线程开始等待时,它们都同时被唤醒。如果为barrier提供了一个
     639         # “操作”,其中一个线程将在返回之前执行该回调。返回从0到“parties-1”的单个索引号。
     640 
     641         if timeout is None:
     642             timeout = self._timeout
     643         with self._cond:
     644             self._enter() # Block while the barrier drains. 隔离墙排水时要进行隔离。
     645             index = self._count
     646             self._count += 1
     647             try:
     648                 if index + 1 == self._parties:
     649                     # We release the barrier
     650                     self._release()
     651                 else:
     652                     # We wait until someone releases us
     653                     self._wait(timeout)
     654                 return index
     655             finally:
     656                 self._count -= 1
     657                 # Wake up any threads waiting for barrier to drain.
     658                 #唤醒任何等待屏障耗尽的线程。
     659                 self._exit()
     660 
     661     # Block until the barrier is ready for us, or raise an exception
     662     # if it is broken.
     663     #阻止,直到障碍为我们准备好,或提出一个例外,如果它被打破。
     664     def _enter(self):
     665         while self._state in (-1, 1):
     666             # It is draining or resetting, wait until done正在排水或重置,等待完成
     667             self._cond.wait()
     668         #see if the barrier is in a broken state看看势垒是否处于破碎状态
     669         if self._state < 0:
     670             raise BrokenBarrierError
     671         assert self._state == 0
     672 
     673     # Optionally run the 'action' and release the threads waiting
     674     # in the barrier.
     675     #可以选择运行“action”,并释放等待在barrier中的线程。
     676 
     677     def _release(self):
     678         try:
     679             if self._action:
     680                 self._action()
     681             # enter draining state  进入排水状态
     682             self._state = 1
     683             self._cond.notify_all()
     684         except:
     685             #an exception during the _action handler.  Break and reraise
     686             #_action处理程序期间的异常。打破和reraise
     687             self._break()
     688             raise
     689 
     690     # Wait in the barrier until we are released.  Raise an exception
     691     # if the barrier is reset or broken.
     692     #在障碍物里等着,直到我们被释放。如果障碍被重置或破坏,则引发异常。
     693     def _wait(self, timeout):
     694         if not self._cond.wait_for(lambda : self._state != 0, timeout):
     695             #timed out.  Break the barrier
     696             self._break()
     697             raise BrokenBarrierError
     698         if self._state < 0:
     699             raise BrokenBarrierError
     700         assert self._state == 1
     701 
     702     # If we are the last thread to exit the barrier, signal any threads
     703     #     # waiting for the barrier to drain.
     704     #如果我们是最后一个退出屏障的线程,那么向等待屏障流出的线程发出信号。
     705     def _exit(self):
     706         if self._count == 0:
     707             if self._state in (-1, 1):
     708                 #resetting or draining
     709                 self._state = 0
     710                 self._cond.notify_all()
     711 
     712     def reset(self):
     713         """Reset the barrier to the initial state.
     714         Any threads currently waiting will get the BrokenBarrier exception
     715         raised.
     716         """
     717         #将势垒重置为初始状态。当前等待的任何线程都将引发故障障碍异常。
     718         with self._cond:
     719             if self._count > 0:
     720                 if self._state == 0:
     721                     #reset the barrier, waking up threads 重置障碍,唤醒线程
     722                     self._state = -1
     723                 elif self._state == -2:
     724                     #was broken, set it to reset state 被破坏,设置为重置状态
     725                     #which clears when the last thread exits 最后一个线程退出时哪个线程清除
     726                     self._state = -1
     727             else:
     728                 self._state = 0
     729             self._cond.notify_all()
     730 
     731     def abort(self):
     732         """Place the barrier into a 'broken' state.
     733         Useful in case of error.  Any currently waiting threads and threads
     734         attempting to 'wait()' will have BrokenBarrierError raised.
     735         """
     736         #将障碍设置为“破碎”状态。在发生错误时很有用。任何当前正在等待的线程和
     737         # 试图“wait()”的线程都会出现故障障碍。
     738         with self._cond:
     739             self._break()
     740 
     741     def _break(self):
     742         # An internal error was detected.  The barrier is set to
     743         # a broken state all parties awakened.
     744         #检测到内部错误。障碍被设置为一个破碎的国家,所有各方都觉醒了。
     745         self._state = -2
     746         self._cond.notify_all()
     747 
     748     @property
     749     def parties(self):
     750         """Return the number of threads required to trip the barrier."""
     751         #返回跳闸所需的线程数。
     752         return self._parties
     753 
     754     @property
     755     def n_waiting(self):
     756         """Return the number of threads currently waiting at the barrier."""
     757         #返回阻塞处当前等待的线程数。
     758         # We don't need synchronization here since this is an ephemeral result
     759         # anyway.  It returns the correct value in the steady state.
     760         #我们不需要同步,因为这是一个短暂的结果。它在稳定状态下返回正确的值。
     761         if self._state == 0:
     762             return self._count
     763         return 0
     764 
     765     @property
     766     def broken(self):
     767         """Return True if the barrier is in a broken state."""
     768         #如果屏障处于破坏状态,返回True。
     769         return self._state == -2
     770 
     771 # exception raised by the Barrier class
     772 #由Barrier类引发的异常
     773 class BrokenBarrierError(RuntimeError):
     774     pass
     775 
     776 
     777 # Helper to generate new thread names
     778 #帮助程序生成新的线程名称
     779 _counter = _count().__next__
     780 _counter() # Consume 0 so first non-main thread has id 1.
     781            #消耗0,所以第一个非主线程id为1。
     782 def _newname(template="Thread-%d"):
     783     return template % _counter()
     784 
     785 # Active thread administration  #活动线程管理
     786 _active_limbo_lock = _allocate_lock()
     787 _active = {}    # maps thread id to Thread object 将线程id映射到线程对象
     788 _limbo = {}
     789 _dangling = WeakSet()
     790 
     791 # Main class for threads
     792 '''线程的主类'''
     793 
     794 class Thread:
     795     """A class that represents a thread of control.
     796     This class can be safely subclassed in a limited fashion. There are two ways
     797     to specify the activity: by passing a callable object to the constructor, or
     798     by overriding the run() method in a subclass.
     799     """
     800     #表示控制线程的类。这个类可以以有限的方式安全地子类化。有两种方法可以指定活动:
     801     # 通过将可调用对象传递给构造函数,或者在子类中重写run()方法。
     802 
     803     _initialized = False
     804     # Need to store a reference to sys.exc_info for printing
     805     # out exceptions when a thread tries to use a global var. during interp.
     806     # shutdown and thus raises an exception about trying to perform some
     807     # operation on/with a NoneType
     808     #需要存储对sys的引用。exc_info用于在interp期间线程试图使用全局变量时打印异常。
     809     # 关闭,因此引发了一个异常,即试图对/使用非etype执行某些操作
     810     _exc_info = _sys.exc_info
     811     # Keep sys.exc_clear too to clear the exception just before
     812     # allowing .join() to return.
     813     #Keep sys.ex_clear也可以在allowing.join()返回之前清除异常。
     814     #XXX __exc_clear = _sys.exc_clear
     815 
     816     def __init__(self, group=None, target=None, name=None,
     817                  args=(), kwargs=None, *, daemon=None):
     818         """This constructor should always be called with keyword arguments. Arguments are:
     819         *group* should be None; reserved for future extension when a ThreadGroup
     820         class is implemented.
     821         *target* is the callable object to be invoked by the run()
     822         method. Defaults to None, meaning nothing is called.
     823         *name* is the thread name. By default, a unique name is constructed of
     824         the form "Thread-N" where N is a small decimal number.
     825         *args* is the argument tuple for the target invocation. Defaults to ().
     826         *kwargs* is a dictionary of keyword arguments for the target
     827         invocation. Defaults to {}.
     828         If a subclass overrides the constructor, it must make sure to invoke
     829         the base class constructor (Thread.__init__()) before doing anything
     830         else to the thread.
     831         """
     832         #这个构造函数应该总是使用关键字参数调用。论点是:*group*不应该是;在实现
     833         # ThreadGroup类时为将来的扩展保留。*target*是run()方法调用的可调用对象。
     834         # 默认为None,表示不调用任何东西。*name*是线程名。默认情况下,唯一的名称
     835         # 是由“Thread-N”的形式构造的,其中N是一个小数。*args*是目标调用的参数元组。
     836         # 默认为()。*kwargs*是目标调用的关键字参数字典。默认为{}。如果子类重写构造
     837         # 函数,它必须确保在对线程执行其他操作之前调用基类构造函数(thread. __init__())。
     838 
     839         assert group is None, "group argument must be None for now"
     840         if kwargs is None:
     841             kwargs = {}
     842         self._target = target
     843         self._name = str(name or _newname())
     844         self._args = args
     845         self._kwargs = kwargs
     846         if daemon is not None:
     847             self._daemonic = daemon
     848         else:
     849             self._daemonic = current_thread().daemon
     850         self._ident = None
     851         self._tstate_lock = None
     852         self._started = Event()
     853         self._is_stopped = False
     854         self._initialized = True
     855         # sys.stderr is not stored in the class like
     856         # sys.exc_info since it can be changed between instances
     857         self._stderr = _sys.stderr
     858         # For debugging and _after_fork()
     859         _dangling.add(self)
     860 
     861     def _reset_internal_locks(self, is_alive):
     862         # private!  Called by _after_fork() to reset our internal locks as
     863         # they may be in an invalid state leading to a deadlock or crash.
     864         #私人!由_after_fork()调用,以重置内部锁,因为它们可能处于无效状态,导致死锁或崩溃。
     865         self._started._reset_internal_locks()
     866         if is_alive:
     867             self._set_tstate_lock()
     868         else:
     869             # The thread isn't alive after fork: it doesn't have a tstate anymore.
     870             #在fork之后,线程不再是活的:它不再有tstate。
     871             self._is_stopped = True
     872             self._tstate_lock = None
     873 
     874     def __repr__(self):
     875         assert self._initialized, "Thread.__init__() was not called"
     876         status = "initial"
     877         if self._started.is_set():
     878             status = "started"
     879         self.is_alive() # easy way to get ._is_stopped set when appropriate
     880                         #在适当的情况下,获得._is_stopped设置的简单方法
     881         if self._is_stopped:
     882             status = "stopped"
     883         if self._daemonic:
     884             status += " daemon"
     885         if self._ident is not None:
     886             status += " %s" % self._ident
     887         return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status)
     888 
     889     def start(self):
     890         """Start the thread's activity.
     891         It must be called at most once per thread object. It arranges for the
     892         object's run() method to be invoked in a separate thread of control.
     893         This method will raise a RuntimeError if called more than once on the
     894         same thread object.
     895         """
     896         #启动线程的活动。每个线程对象最多只能调用一次。它安排在一个单独的控制线程中
     897         # 调用对象的run()方法。如果在同一个线程对象上调用多次,此方法将引发运行时错误。
     898         if not self._initialized:
     899             raise RuntimeError("thread.__init__() not called")
     900 
     901         if self._started.is_set():
     902             raise RuntimeError("threads can only be started once")
     903         with _active_limbo_lock:
     904             _limbo[self] = self
     905         try:
     906             _start_new_thread(self._bootstrap, ())
     907         except Exception:
     908             with _active_limbo_lock:
     909                 del _limbo[self]
     910             raise
     911         self._started.wait()
     912 
     913     def run(self):
     914         """Method representing the thread's activity.
     915         You may override this method in a subclass. The standard run() method
     916         invokes the callable object passed to the object's constructor as the
     917         target argument, if any, with sequential and keyword arguments taken
     918         from the args and kwargs arguments, respectively.
     919         """
     920         #表示线程活动的方法。您可以在子类中重写此方法。标准run()方法调用传递给对象
     921         # 构造函数的可调用对象作为目标参数(如果有的话),分别使用args和kwargs参数
     922         # 中的顺序参数和关键字参数。
     923         try:
     924             if self._target:
     925                 self._target(*self._args, **self._kwargs)
     926         finally:
     927             # Avoid a refcycle if the thread is running a function with
     928             # an argument that has a member that points to the thread.
     929             #如果线程正在运行一个具有指向线程的成员的参数的函数,请避免使用refcycle。
     930             del self._target, self._args, self._kwargs
     931 
     932     def _bootstrap(self):
     933         # Wrapper around the real bootstrap code that ignores
     934         # exceptions during interpreter cleanup.  Those typically
     935         # happen when a daemon thread wakes up at an unfortunate
     936         # moment, finds the world around it destroyed, and raises some
     937         # random exception *** while trying to report the exception in
     938         # _bootstrap_inner() below ***.  Those random exceptions
     939         # don't help anybody, and they confuse users, so we suppress
     940         # them.  We suppress them only when it appears that the world
     941         # indeed has already been destroyed, so that exceptions in
     942         # _bootstrap_inner() during normal business hours are properly
     943         # reported.  Also, we only suppress them for daemonic threads;
     944         # if a non-daemonic encounters this, something else is wrong.
     945         '''包装真正的引导代码,在解释器清理期间忽略异常。这通常发生在守护进程线程
     946         在一个不幸的时刻醒来,发现它周围的世界被破坏,并在试图报告***下面的异常
     947         in_bootstrap_inner()时引发一些随机异常时。这些随机的异常对任何人都没有
     948         帮助,而且它们混淆了用户,所以我们抑制了它们。只有当世界似乎确实已经被破坏
     949         时,我们才会抑制它们,以便在正常工作时间内正确报告_bootstrap_inner()中
     950         的异常。而且,我们只对daemonic线程禁止它们;如果一个非daemonic遇到了这个
     951         问题,就会出现其他问题'''
     952         try:
     953             self._bootstrap_inner()
     954         except:
     955             if self._daemonic and _sys is None:
     956                 return
     957             raise
     958 
     959     def _set_ident(self):
     960         self._ident = get_ident()
     961 
     962     def _set_tstate_lock(self):
     963         """
     964         Set a lock object which will be released by the interpreter when
     965         the underlying thread state (see pystate.h) gets deleted.
     966         """
     967         #设置一个锁对象,当底层线程状态(请参阅pystate.h)被删除时,解释器将释放这个锁对象。
     968         self._tstate_lock = _set_sentinel()
     969         self._tstate_lock.acquire()
     970 
     971     def _bootstrap_inner(self):
     972         try:
     973             self._set_ident()
     974             self._set_tstate_lock()
     975             self._started.set()
     976             with _active_limbo_lock:
     977                 _active[self._ident] = self
     978                 del _limbo[self]
     979 
     980             if _trace_hook:
     981                 _sys.settrace(_trace_hook)
     982             if _profile_hook:
     983                 _sys.setprofile(_profile_hook)
     984 
     985             try:
     986                 self.run()
     987             except SystemExit:
     988                 pass
     989             except:
     990                 # If sys.stderr is no more (most likely from interpreter
     991                 # shutdown) use self._stderr.  Otherwise still use sys (as in
     992                 # _sys) in case sys.stderr was redefined since the creation of
     993                 # self.
     994                 #如果系统。stderr不再使用self._stderr(很可能是由于解释器关闭)。否则,
     995                 # 在case sys中仍然使用sys(如in_sys)。stderr自自我创造以来被重新定义。
     996                 if _sys and _sys.stderr is not None:
     997                     print("Exception in thread %s:
    %s" %
     998                           (self.name, _format_exc()), file=_sys.stderr)
     999                 elif self._stderr is not None:
    1000                     # Do the best job possible w/o a huge amt. of code to
    1001                     # approximate a traceback (code ideas from Lib/traceback.py)
    1002                     #尽最大的努力做最好的工作。近似回溯的代码(来自Lib/traceback.py的代码思想)
    1003                     exc_type, exc_value, exc_tb = self._exc_info()
    1004                     try:
    1005                         print((
    1006                             "Exception in thread " + self.name +
    1007                             " (most likely raised during interpreter shutdown):"), file=self._stderr)
    1008                         print((
    1009                             "Traceback (most recent call last):"), file=self._stderr)
    1010                         while exc_tb:
    1011                             print((
    1012                                 '  File "%s", line %s, in %s' %
    1013                                 (exc_tb.tb_frame.f_code.co_filename,
    1014                                     exc_tb.tb_lineno,
    1015                                     exc_tb.tb_frame.f_code.co_name)), file=self._stderr)
    1016                             exc_tb = exc_tb.tb_next
    1017                         print(("%s: %s" % (exc_type, exc_value)), file=self._stderr)
    1018                         self._stderr.flush()
    1019                     # Make sure that exc_tb gets deleted since it is a memory
    1020                     # hog; deleting everything else is just for thoroughness
    1021                     #确保exc_tb被删除,因为它占用内存;删除所有其他内容只是为了彻底
    1022                     finally:
    1023                         del exc_type, exc_value, exc_tb
    1024             finally:
    1025                 # Prevent a race in
    1026                 # test_threading.test_no_refcycle_through_target when
    1027                 # the exception keeps the target alive past when we
    1028                 # assert that it's dead.
    1029                 #防止test_threading中的竞争。test_no_refcycle_through_target,
    1030                 # 当异常断言目标已死时,该异常将使目标保持存活。
    1031                 #XXX self._exc_clear()
    1032                 pass
    1033         finally:
    1034             with _active_limbo_lock:
    1035                 try:
    1036                     # We don't call self._delete() because it also
    1037                     # grabs _active_limbo_lock.
    1038                     #我们不调用self._delete(),因为它也抓取_active_limbo_lock。
    1039                     del _active[get_ident()]
    1040                 except:
    1041                     pass
    1042 
    1043     def _stop(self):
    1044         # After calling ._stop(), .is_alive() returns False and .join() returns
    1045         # immediately.  ._tstate_lock must be released before calling ._stop().
    1046         #调用._stop()后,.is_alive()返回False, .join()立即返回。
    1047 
    1048         # Normal case:  C code at the end of the thread's life
    1049         # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and
    1050         # that's detected by our ._wait_for_tstate_lock(), called by .join()
    1051         # and .is_alive().  Any number of threads _may_ call ._stop()
    1052         # simultaneously (for example, if multiple threads are blocked in
    1053         # .join() calls), and they're not serialized.  That's harmless -
    1054         # they'll just make redundant rebindings of ._is_stopped and
    1055         # ._tstate_lock.  Obscure:  we rebind ._tstate_lock last so that the
    1056         # "assert self._is_stopped" in ._wait_for_tstate_lock() always works
    1057         # (the assert is executed only if ._tstate_lock is None).
    1058         #正常情况:线程生命周期结束时的C代码(_threadmodule.c中的release_sentinel)
    1059         # 释放了._tstate_lock,我们的._wait_for_tstate_lock()检测到这一点,
    1060         # 它被.join()和.is_alive()调用。同时调用任意数量的线程_may_ ._stop()
    1061         # (例如,如果多个线程在.join()调用中被阻塞,并且它们没有被序列化)。这是无害的,
    1062         # 他们只会对._is_stopped和._tstate_lock进行冗余的重绑定。晦涩的:
    1063         # 我们将._tstate_lock绑定到最后,以便“断言self”。_is_stopped()中
    1064         # 的._wait_for_tstate_lock()总是有效的(只有当._tstate_lock为空时才执行断言)。
    1065 
    1066         # Special case:  _main_thread releases ._tstate_lock via this
    1067         # module's _shutdown() function.
    1068         #特殊情况:_main_thread通过这个模块的_shutdown()函数释放._tstate_lock。
    1069         lock = self._tstate_lock
    1070         if lock is not None:
    1071             assert not lock.locked()
    1072         self._is_stopped = True
    1073         self._tstate_lock = None
    1074 
    1075     def _delete(self):
    1076         "Remove current thread from the dict of currently running threads."
    1077         with _active_limbo_lock:
    1078             del _active[get_ident()]
    1079             # There must not be any python code between the previous line
    1080             # and after the lock is released.  Otherwise a tracing function
    1081             # could try to acquire the lock again in the same thread, (in
    1082             # current_thread()), and would block.
    1083             #前一行和锁释放后之间不应该有任何python代码。否则,跟踪函数可以尝试在相
    1084             # 同的线程(在current_thread()中)中再次获取锁,并将阻塞。
    1085 
    1086     def join(self, timeout=None):
    1087         """Wait until the thread terminates.
    1088         This blocks the calling thread until the thread whose join() method is
    1089         called terminates -- either normally or through an unhandled exception
    1090         or until the optional timeout occurs.
    1091         When the timeout argument is present and not None, it should be a
    1092         floating point number specifying a timeout for the operation in seconds
    1093         (or fractions thereof). As join() always returns None, you must call
    1094         isAlive() after join() to decide whether a timeout happened -- if the
    1095         thread is still alive, the join() call timed out.
    1096         When the timeout argument is not present or None, the operation will
    1097         block until the thread terminates.
    1098         A thread can be join()ed many times.
    1099         join() raises a RuntimeError if an attempt is made to join the current
    1100         thread as that would cause a deadlock. It is also an error to join() a
    1101         thread before it has been started and attempts to do so raises the same
    1102         exception.
    1103         """
    1104         #等待直到线程终止。这将阻塞调用线程,直到调用join()方法的线程终止——通常或通过
    1105         # 未处理的异常终止,或直到出现可选超时为止。当出现timeout参数而不是None时,
    1106         # 它应该是一个浮点数,以秒(或几分之一)为单位指定操作的超时。因为join()总是
    1107         # 返回None,所以必须在join()之后调用isAlive(),以决定是否发生超时——如果线程
    1108         # 仍然活着,则join()调用超时。当timeout参数不存在或不存在时,操作将阻塞,
    1109         # 直到线程终止。一个线程可以多次连接()ed。如果尝试连接当前线程,join()将引发
    1110         # 一个运行时错误,因为这会导致死锁。在线程启动之前连接()线程也是一个错误,
    1111         # 试图这样做会引发相同的异常。
    1112         if not self._initialized:
    1113             raise RuntimeError("Thread.__init__() not called")
    1114         if not self._started.is_set():
    1115             raise RuntimeError("cannot join thread before it is started")
    1116         if self is current_thread():
    1117             raise RuntimeError("cannot join current thread")
    1118 
    1119         if timeout is None:
    1120             self._wait_for_tstate_lock()
    1121         else:
    1122             # the behavior of a negative timeout isn't documented, but
    1123             # historically .join(timeout=x) for x<0 has acted as if timeout=0
    1124             #没有记录消极超时的行为,但是在历史上,x<0时的.join(timeout=x)就像timeout=0一样
    1125             self._wait_for_tstate_lock(timeout=max(timeout, 0))
    1126 
    1127     def _wait_for_tstate_lock(self, block=True, timeout=-1):
    1128         # Issue #18808: wait for the thread state to be gone.
    1129         # At the end of the thread's life, after all knowledge of the thread
    1130         # is removed from C data structures, C code releases our _tstate_lock.
    1131         # This method passes its arguments to _tstate_lock.acquire().
    1132         # If the lock is acquired, the C code is done, and self._stop() is
    1133         # called.  That sets ._is_stopped to True, and ._tstate_lock to None.
    1134         #问题#18808:等待线程状态消失。在线程生命周期结束时,在从C数据结构中删除所有
    1135         # 线程知识之后,C代码释放我们的_tstate_lock。该方法将其参数
    1136         # 传递给_tstate_lock.acquire()。如果获得了锁,则完成C代码,
    1137         # 并调用self._stop()。这将._is_stopped设置为True,._tstate_lock设置为None。
    1138         lock = self._tstate_lock
    1139         if lock is None:  # already determined that the C code is done 已经确定C代码已经完成
    1140             assert self._is_stopped
    1141         elif lock.acquire(block, timeout):
    1142             lock.release()
    1143             self._stop()
    1144 
    1145     @property
    1146     def name(self):
    1147         """A string used for identification purposes only.
    1148         It has no semantics. Multiple threads may be given the same name. The
    1149         initial name is set by the constructor.
    1150         """
    1151         #仅用于识别目的的字符串。它没有语义。多个线程可能被赋予相同的名称。初始名称由构造函数设置。
    1152         assert self._initialized, "Thread.__init__() not called"
    1153         return self._name
    1154 
    1155     @name.setter
    1156     def name(self, name):
    1157         assert self._initialized, "Thread.__init__() not called"
    1158         self._name = str(name)
    1159 
    1160     @property
    1161     def ident(self):
    1162         """Thread identifier of this thread or None if it has not been started.
    1163         This is a nonzero integer. See the get_ident() function. Thread
    1164         identifiers may be recycled when a thread exits and another thread is
    1165         created. The identifier is available even after the thread has exited.
    1166         """
    1167         #此线程的线程标识符,如果没有启动,则为空。这是非零整数。请参阅get_ident()函数。
    1168         # 当线程退出并创建另一个线程时,可以回收线程标识符。即使线程已经退出,标识符也是可用的。
    1169         assert self._initialized, "Thread.__init__() not called"
    1170         return self._ident
    1171 
    1172     def is_alive(self):
    1173         """Return whether the thread is alive.
    1174         This method returns True just before the run() method starts until just
    1175         after the run() method terminates. The module function enumerate()
    1176         returns a list of all alive threads.
    1177         """
    1178         #返回线程是否存在。这个方法在run()方法开始之前返回True,直到run()方法终止之后。
    1179         # 模块函数enumerate()返回一个包含所有活线程的列表。
    1180         assert self._initialized, "Thread.__init__() not called"
    1181         if self._is_stopped or not self._started.is_set():
    1182             return False
    1183         self._wait_for_tstate_lock(False)
    1184         return not self._is_stopped
    1185 
    1186     isAlive = is_alive
    1187 
    1188     @property
    1189     def daemon(self):
    1190         """A boolean value indicating whether this thread is a daemon thread.
    1191         This must be set before start() is called, otherwise RuntimeError is
    1192         raised. Its initial value is inherited from the creating thread; the
    1193         main thread is not a daemon thread and therefore all threads created in
    1194         the main thread default to daemon = False.
    1195         The entire Python program exits when no alive non-daemon threads are
    1196         left.
    1197         """
    1198         #一个布尔值,指示此线程是否为守护线程。这必须在调用start()之前设置,否则会引发
    1199         # 运行时错误。它的初始值继承自创建线程;主线程不是守护进程线程,因此在主线程中
    1200         # 创建的所有线程默认为守护进程= False。当没有存活的非守护进程线程时,
    1201         # 整个Python程序退出。
    1202         assert self._initialized, "Thread.__init__() not called"
    1203         return self._daemonic
    1204 
    1205     @daemon.setter
    1206     def daemon(self, daemonic):
    1207         if not self._initialized:
    1208             raise RuntimeError("Thread.__init__() not called")
    1209         if self._started.is_set():
    1210             raise RuntimeError("cannot set daemon status of active thread")
    1211         self._daemonic = daemonic
    1212 
    1213     def isDaemon(self):   #Daemon:守护进程
    1214         return self.daemon
    1215 
    1216     def setDaemon(self, daemonic):
    1217         self.daemon = daemonic
    1218 
    1219     def getName(self):
    1220         return self.name
    1221 
    1222     def setName(self, name):
    1223         self.name = name
    1224 
    1225 # The timer class was contributed by Itamar Shtull-Trauring
    1226 #计时器类由Itamar Shtull-Trauring贡献
    1227 
    1228 class Timer(Thread):
    1229     """Call a function after a specified number of seconds:
    1230             t = Timer(30.0, f, args=None, kwargs=None)
    1231             t.start()
    1232             t.cancel()     # stop the timer's action if it's still waiting
    1233     """
    1234     #在指定的秒数后调用一个函数:t = Timer(30.0, f, args=None, kwargs=None)
    1235     #t.start() t.cancel()如果计时器仍在等待,则停止计时器的操作
    1236 
    1237     def __init__(self, interval, function, args=None, kwargs=None):
    1238         Thread.__init__(self)
    1239         self.interval = interval
    1240         self.function = function
    1241         self.args = args if args is not None else []
    1242         self.kwargs = kwargs if kwargs is not None else {}
    1243         self.finished = Event()
    1244 
    1245     def cancel(self):
    1246         """Stop the timer if it hasn't finished yet."""
    1247         #如果计时器还没有结束,请停止。
    1248         self.finished.set()
    1249 
    1250     def run(self):
    1251         self.finished.wait(self.interval)
    1252         if not self.finished.is_set():
    1253             self.function(*self.args, **self.kwargs)
    1254         self.finished.set()
    1255 
    1256 
    1257 # Special thread class to represent the main thread
    1258 '''表示主线程的特殊线程类'''
    1259 
    1260 class _MainThread(Thread):
    1261 
    1262     def __init__(self):
    1263         Thread.__init__(self, name="MainThread", daemon=False)
    1264         self._set_tstate_lock()
    1265         self._started.set()
    1266         self._set_ident()
    1267         with _active_limbo_lock:
    1268             _active[self._ident] = self
    1269 
    1270 
    1271 # Dummy thread class to represent threads not started here.
    1272 # These aren't garbage collected when they die, nor can they be waited for.
    1273 # If they invoke anything in threading.py that calls current_thread(), they
    1274 # leave an entry in the _active dict forever after.
    1275 # Their purpose is to return *something* from current_thread().
    1276 # They are marked as daemon threads so we won't wait for them
    1277 # when we exit (conform previous semantics).
    1278 #伪线程类来表示这里没有启动的线程。它们死后不会被垃圾收集,也不会被等待。如果它们在
    1279 # 线程中调用任何东西。调用current_thread()的py在_active dict中永远留下一个条目。
    1280 # 它们的目的是从current_thread()返回*something*。它们被标记为守护线程,因此在退出
    1281 # 时我们不会等待它们(符合前面的语义)。
    1282 
    1283 class _DummyThread(Thread):
    1284 
    1285     def __init__(self):
    1286         Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True)
    1287 
    1288         self._started.set()
    1289         self._set_ident()
    1290         with _active_limbo_lock:
    1291             _active[self._ident] = self
    1292 
    1293     def _stop(self):
    1294         pass
    1295 
    1296     def is_alive(self):
    1297         assert not self._is_stopped and self._started.is_set()
    1298         return True
    1299 
    1300     def join(self, timeout=None):
    1301         assert False, "cannot join a dummy thread"
    1302 
    1303 
    1304 # Global API functions
    1305 #全球API函数
    1306 
    1307 def current_thread():
    1308     """Return the current Thread object, corresponding to the caller's thread of control.
    1309     If the caller's thread of control was not created through the threading
    1310     module, a dummy thread object with limited functionality is returned.
    1311     """
    1312     #返回当前线程对象,对应于调用方的控制线程。如果没有通过线程模块创建调用者的控制线
    1313     # 程,则返回具有有限功能的虚拟线程对象。
    1314     try:
    1315         return _active[get_ident()]
    1316     except KeyError:
    1317         return _DummyThread()
    1318 
    1319 currentThread = current_thread
    1320 
    1321 def active_count():
    1322     """Return the number of Thread objects currently alive.
    1323     The returned count is equal to the length of the list returned by
    1324     enumerate().
    1325     """
    1326     #返回当前存活的线程对象的数量。返回的计数等于enumerate()返回的列表的长度。
    1327     with _active_limbo_lock:
    1328         return len(_active) + len(_limbo)
    1329 
    1330 activeCount = active_count
    1331 
    1332 def _enumerate():
    1333     # Same as enumerate(), but without the lock. Internal use only.
    1334     #与enumerate()相同,只是没有锁。内部使用。
    1335     return list(_active.values()) + list(_limbo.values())
    1336 
    1337 def enumerate():
    1338     """Return a list of all Thread objects currently alive.
    1339     The list includes daemonic threads, dummy thread objects created by
    1340     current_thread(), and the main thread. It excludes terminated threads and
    1341     threads that have not yet been started.
    1342     """
    1343     #返回当前所有线程对象的列表。该列表包括daemonic线程、current_thread()创建的虚拟
    1344     # 线程对象和主线程。它排除终止的线程和尚未启动的线程。
    1345     with _active_limbo_lock:
    1346         return list(_active.values()) + list(_limbo.values())
    1347 
    1348 from _thread import stack_size
    1349 
    1350 # Create the main thread object,
    1351 # and make it available for the interpreter
    1352 # (Py_Main) as threading._shutdown.
    1353 #创建主线程对象,并将其作为thread ._shutdown提供给解释器(Py_Main)。
    1354 
    1355 _main_thread = _MainThread()
    1356 
    1357 def _shutdown():
    1358     # Obscure:  other threads may be waiting to join _main_thread.  That's
    1359     # dubious, but some code does it.  We can't wait for C code to release
    1360     # the main thread's tstate_lock - that won't happen until the interpreter
    1361     # is nearly dead.  So we release it here.  Note that just calling _stop()
    1362     # isn't enough:  other threads may already be waiting on _tstate_lock.
    1363     #晦涩:其他线程可能正在等待加入_main_thread。这很可疑,但有些代码可以做到。
    1364     # 我们不能等待C代码释放主线程的tstate_lock——这要等到解释器快死的时候才会发生。
    1365     # 我们在这里释放它。注意,仅仅调用_stop()是不够的:其他线程可能已经在
    1366     # 等待_tstate_lock了。
    1367     if _main_thread._is_stopped:
    1368         # _shutdown() was already called
    1369         return
    1370     tlock = _main_thread._tstate_lock
    1371     # The main thread isn't finished yet, so its thread state lock can't have
    1372     # been released.
    1373     #主线程尚未完成,因此它的线程状态锁无法释放。
    1374     assert tlock is not None
    1375     assert tlock.locked()
    1376     tlock.release()
    1377     _main_thread._stop()
    1378     t = _pickSomeNonDaemonThread()
    1379     while t:
    1380         t.join()
    1381         t = _pickSomeNonDaemonThread()
    1382 
    1383 def _pickSomeNonDaemonThread():
    1384     for t in enumerate():
    1385         if not t.daemon and t.is_alive():
    1386             return t
    1387     return None
    1388 
    1389 def main_thread():
    1390     """Return the main thread object.
    1391     In normal conditions, the main thread is the thread from which the
    1392     Python interpreter was started.
    1393     """
    1394     #返回主线程对象。在正常情况下,主线程是Python解释器启动的线程。
    1395     return _main_thread
    1396 
    1397 # get thread-local implementation, either from the thread
    1398 # module, or from the python fallback
    1399 #从线程模块或python回退中获取线程本地实现
    1400 
    1401 try:
    1402     from _thread import _local as local
    1403 except ImportError:
    1404     from _threading_local import local
    1405 
    1406 
    1407 def _after_fork():
    1408     """
    1409     Cleanup threading module state that should not exist after a fork.
    1410     """
    1411     # Reset _active_limbo_lock, in case we forked while the lock was held
    1412     # by another (non-forked) thread.  http://bugs.python.org/issue874900
    1413     #Reset _active_limbo_lock,以防我们分叉而锁被另一个(非分叉的)线程持有。
    1414     global _active_limbo_lock, _main_thread
    1415     _active_limbo_lock = _allocate_lock()
    1416 
    1417     # fork() only copied the current thread; clear references to others.
    1418     #fork()只复制当前线程;明确提及他人。
    1419     new_active = {}
    1420     current = current_thread()
    1421     _main_thread = current
    1422     with _active_limbo_lock:
    1423         # Dangling thread instances must still have their locks reset,
    1424         # because someone may join() them.
    1425         #悬空线程实例必须重新设置它们的锁,因为有人可能会加入()它们。
    1426         threads = set(_enumerate())
    1427         threads.update(_dangling)
    1428         for thread in threads:
    1429             # Any lock/condition variable may be currently locked or in an
    1430             # invalid state, so we reinitialize them.
    1431             #任何锁/条件变量可能当前被锁定或处于无效状态,因此我们重新初始化它们。
    1432             if thread is current:
    1433                 # There is only one active thread. We reset the ident to
    1434                 # its new value since it can have changed.
    1435                 #只有一个活动线程。我们将ident重置为它的新值,因为它可能已经更改。
    1436                 thread._reset_internal_locks(True)
    1437                 ident = get_ident()
    1438                 thread._ident = ident
    1439                 new_active[ident] = thread
    1440             else:
    1441                 # All the others are already stopped.
    1442                 thread._reset_internal_locks(False)
    1443                 thread._stop()
    1444 
    1445         _limbo.clear()
    1446         _active.clear()
    1447         _active.update(new_active)
    1448         assert len(_active) == 1
    1449 
    1450 
    1451 if hasattr(_os, "register_at_fork"):
    1452     _os.register_at_fork(after_in_child=_after_fork)
    threading源代码

      队列:

      Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。

     1 '''队列'''
     2 import queue
     3 q =queue.Queue()    #设置队列
     4 q.put("q1")              #队列中放入数据
     5 q.put("q2")
     6 q.put("q3")
     7 
     8 # print(q.qsize())  #获取队列大小
     9 
    10 '''队列中获取数据,取出的数据超出存入数据时会等待,不会报错'''
    11 print(q.get())
    12 print(q.get())
    13 print(q.get())
    14 # print(q.get())
    15 
    16 '''获取队列,但不会等待,超出后直接报错'''
    17 print(q.get_nowait())
    18 print(q.get_nowait())
    19 print(q.get_nowait())
    20 # print(q.get_nowait())
    21 
    22 '''设置优先级排序的依据'''
    23 q = queue.PriorityQueue(maxsize=0)
    24 q.put((3,"q1"))        #当maxsizie<=0时,队列无限大,>0时,给定数据即为队列大小
    25 q.put((1,"q2"))
    26 q.put((-4,"q3"))
    27 print(q.get())         #获取时会从小到大按顺序获取
    28 print(q.get())
    29 print(q.get())

    上述代码只是队列的应用,下面将队列应用与线程之中:

     1 import queue
     2 import time
     3 import threading
     4 
     5 q = queue.Queue(maxsize=10)
     6 def gave(name):
     7     count = 1
     8     while True:
     9         q.put("--骨头--%s" % count)
    10         print("%s 生产骨头 %s" % (name,count))
    11         time.sleep(1)
    12         count+=1
    13 
    14 def consumer(name):
    15     while q.qsize()>0:
    16     # while True:
    17         print("%s 吃掉 %s" % (name,q.get()))
    18         # time.sleep(10)
    19 
    20 g = threading.Thread(target=gave,args=("王二小",))
    21 c = threading.Thread(target=consumer,args=("旺财",))
    22 g.start()
    23 c.start()
    对垒事件
      1 #print('33[41;1m--red light on---33[0m')    #红灯
      2 #print('33[43;1m--yellow light on---33[0m') #黄灯
      3 #print('33[42;1m--green light on---33[0m') #绿灯
      4 '''主要用在数据同步上'''
      5 '''红绿灯事件'''
      6 
      7 # import threading
      8 # import time
      9 # # import queue
     10 # event = threading.Event()
     11 # # q = queue.Queue()
     12 #
     13 # def light():
     14 #     count = 1
     15 #     while True:
     16 #         if count<=5:
     17 #             event.set()
     18 #             print('33[42;1m--green light on---33[0m')
     19 #         elif 5<count<=10:
     20 #             event.clear()
     21 #             print('33[43;1m--yellow light on---33[0m')
     22 #         else:
     23 #             print('33[41;1m--red light on---33[0m')
     24 #             if count>=15:
     25 #                 count = 0
     26 #         time.sleep(1)
     27 #         count+=1
     28 #
     29 # def car(name):
     30 #     while True:
     31 #         if event.is_set():
     32 #             time.sleep(1)
     33 #             print("%s is running..." % name)
     34 #         else:
     35 #             print("car is waiting...")
     36 #             event.wait()  #等待事件event对象发生变化
     37 #
     38 #
     39 # Light = threading.Thread(target=light,)
     40 # Light.start()
     41 # Car = threading.Thread(target=car,args=("BENZ",))
     42 # Car.start()
     43 
     44 
     45 import threading
     46 import time
     47 import queue
     48 
     49 event=threading.Event()
     50 q=queue.PriorityQueue(maxsize=20)
     51 #在循环之前先放入十辆车:
     52 for i in range(10):
     53     q.put("旧车辆,%s" % "QQ")
     54 
     55 def light():
     56     count=0
     57     while True:
     58         if count<10:
     59             event.set()
     60             print("33[42;1m--green light on---33[0m",10-count)
     61         elif 10<=count<15:
     62             event.clear()
     63             print("33[43;1m--yellow light on---33[0m",15-count)
     64         else:
     65             event.clear()
     66             if count>=25:
     67                 count=0
     68                 continue
     69             print("33[41;1m--red light on---33[0m",25-count)
     70         time.sleep(1)
     71         count+=1
     72 
     73 def car(name):
     74     while True:
     75         if event.is_set() and q.qsize()>=1:
     76             print("%s is running..." % name)
     77             time.sleep(1)
     78             print("道路还有【%s】辆车" % q.qsize())
     79         else:
     80             print("car is waiting...")
     81             print("现在道路中有车%s辆" % q.qsize())
     82             event.wait()   #等待事件event对象发生变化
     83 
     84 #路口停车
     85 def Put():
     86     n=0
     87     while q.qsize()<20:
     88         time.sleep(2)
     89         q.put("新车辆%s车辆" % n)
     90         n+=1
     91         print("车辆已驶入")
     92     else:
     93         event.wait()
     94         print("停止驶入")
     95         print("停止驶入后道路中有车%s" % q.qsize())
     96 
     97 #车辆行驶
     98 def Get():
     99     while True:
    100         if event.is_set():
    101             time.sleep(2)
    102             print("%s车辆--------通过" % q.get())
    103         else:
    104             print("禁止通行!!")
    105             event.wait()
    106 
    107 
    108 
    109 C=threading.Thread(target=car,args=("...T...",))
    110 L=threading.Thread(target=light)
    111 P=threading.Thread(target=Put)
    112 G=threading.Thread(target=Get)
    113 L.start()
    114 C.start()
    115 P.start()
    116 G.start()
    红绿灯事件
     1 import threading
     2 
     3 money = 0
     4 lock = threading.Lock()
     5 
     6 #存钱
     7 def get_money(Sum):
     8     global money
     9     money+=Sum     #x=money+sum;money=x
    10 
    11 #取钱
    12 def put_money(Sum):
    13     global money
    14     money-=Sum
    15 
    16 def run(Sum):
    17     lock.acquire()
    18     for i in range(10000):
    19         put_money(Sum)
    20         get_money(Sum)
    21     lock.release()
    22 
    23 #单线程中不会存在问题
    24 #然而在多线程中,操作系统交叉处理赋值语句,导致
    25 # 全局变量被一个线程修改,而另一个线程却不知情。
    26 m1 = threading.Thread(target=run,args=(100,))
    27 m2 = threading.Thread(target=run,args=(1000,))
    28 m1.start()
    29 m2.start()
    30 m1.join()
    31 m2.join()
    32 print(money)
    银行事件
      1 '''A multi-producer, multi-consumer queue.'''
      2 #多生产者、多消费者队列。
      3 import threading
      4 from collections import deque
      5 from heapq import heappush, heappop
      6 from time import monotonic as time
      7 try:
      8     from _queue import SimpleQueue
      9 except ImportError:
     10     SimpleQueue = None
     11 
     12 __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue']
     13 
     14 
     15 try:
     16     from _queue import Empty
     17 except AttributeError:
     18     class Empty(Exception):
     19         'Exception raised by Queue.get(block=0)/get_nowait().'
     20         pass
     21 
     22 class Full(Exception):
     23     'Exception raised by Queue.put(block=0)/put_nowait().'
     24     pass
     25 
     26 
     27 class Queue:
     28     '''Create a queue object with a given maximum size.
     29     If maxsize is <= 0, the queue size is infinite.
     30     '''
     31     #创建一个具有给定最大大小的队列对象。如果maxsize <= 0,则队列大小为无穷大。
     32 
     33     def __init__(self, maxsize=0):
     34         self.maxsize = maxsize
     35         self._init(maxsize)
     36 
     37         # mutex must be held whenever the queue is mutating.  All methods
     38         # that acquire mutex must release it before returning.  mutex
     39         # is shared between the three conditions, so acquiring and
     40         # releasing the conditions also acquires and releases mutex.
     41         #当队列发生变化时,必须持有互斥锁。所有获得互斥锁的方法都必须在返回之前释放它。
     42         # 互斥锁在这三个条件之间是共享的,因此获取和释放条件也获得和释放互斥锁。
     43         self.mutex = threading.Lock()
     44 
     45         # Notify not_empty whenever an item is added to the queue; a
     46         # thread waiting to get is notified then.
     47         #当一个项目被添加到队列中时,通知not_empty;然后会通知等待获取的线程。
     48         self.not_empty = threading.Condition(self.mutex)
     49 
     50         # Notify not_full whenever an item is removed from the queue;
     51         # a thread waiting to put is notified then.
     52         #当一个项目从队列中删除时,通知not_full;然后会通知等待放置的线程。
     53         self.not_full = threading.Condition(self.mutex)
     54 
     55         # Notify all_tasks_done whenever the number of unfinished tasks
     56         # drops to zero; thread waiting to join() is notified to resume
     57         #当未完成任务的数量降为零时,通知all_tasks_done;等待加入()的线程被通知恢复
     58         self.all_tasks_done = threading.Condition(self.mutex)
     59         self.unfinished_tasks = 0
     60 
     61     def task_done(self):
     62         '''Indicate that a formerly enqueued task is complete.
     63         Used by Queue consumer threads.  For each get() used to fetch a task,
     64         a subsequent call to task_done() tells the queue that the processing
     65         on the task is complete.
     66         If a join() is currently blocking, it will resume when all items
     67         have been processed (meaning that a task_done() call was received
     68         for every item that had been put() into the queue).
     69         Raises a ValueError if called more times than there were items
     70         placed in the queue.
     71         '''
     72         #指示以前加入队列的任务已经完成。由队列使用者线程使用。对于用于获取任务的每个
     73         # get(),对task_done()的后续调用将告诉队列任务的处理已经完成。如果一个join()
     74         # 当前处于阻塞状态,那么当所有项都被处理完时(这意味着对于每个已将()放入队列的
     75         # 项都接收了task_done()调用),它将恢复。如果调用的次数超过了队列中放置的项的
     76         # 次数,就会引发ValueError。
     77         with self.all_tasks_done:
     78             unfinished = self.unfinished_tasks - 1
     79             if unfinished <= 0:
     80                 if unfinished < 0:
     81                     raise ValueError('task_done() called too many times')
     82                 self.all_tasks_done.notify_all()
     83             self.unfinished_tasks = unfinished
     84 
     85     def join(self):
     86         '''Blocks until all items in the Queue have been gotten and processed.
     87         The count of unfinished tasks goes up whenever an item is added to the
     88         queue. The count goes down whenever a consumer thread calls task_done()
     89         to indicate the item was retrieved and all work on it is complete.
     90         When the count of unfinished tasks drops to zero, join() unblocks.
     91         '''
     92         #阻塞,直到获取和处理队列中的所有项。当一个项目被添加到队列中时,未完成任务的计数
     93         # 就会上升。每当使用者线程调用task_done()时,计数就会下降,以指示检索了项目并完成
     94         # 了对其的所有工作。当未完成任务的计数降为0时,join()将解块。
     95         with self.all_tasks_done:
     96             while self.unfinished_tasks:
     97                 self.all_tasks_done.wait()
     98 
     99     def qsize(self):
    100         '''Return the approximate size of the queue (not reliable!).'''
    101         #返回队列的大致大小(不可靠!)
    102         with self.mutex:
    103             return self._qsize()
    104 
    105     def empty(self):
    106         '''Return True if the queue is empty, False otherwise (not reliable!).
    107         This method is likely to be removed at some point.  Use qsize() == 0
    108         as a direct substitute, but be aware that either approach risks a race
    109         condition where a queue can grow before the result of empty() or
    110         qsize() can be used.
    111         To create code that needs to wait for all queued tasks to be
    112         completed, the preferred technique is to use the join() method.
    113         '''
    114         #如果队列为空,返回True,否则返回False(不可靠!)这种方法可能会在某个时候被删除。
    115         # 使用qsize() == 0作为直接的替代,但是要注意,在使用empty()或qsize()的结果之前,
    116         # 队列可能会增长,这可能会带来竞争条件的风险。要创建需要等待所有排队任务完成的代码,
    117         # 首选技术是使用join()方法。
    118         with self.mutex:
    119             return not self._qsize()
    120 
    121     def full(self):
    122         '''Return True if the queue is full, False otherwise (not reliable!).
    123         This method is likely to be removed at some point.  Use qsize() >= n
    124         as a direct substitute, but be aware that either approach risks a race
    125         condition where a queue can shrink before the result of full() or
    126         qsize() can be used.
    127         '''
    128         #如果队列满了,返回True,否则返回False(不可靠!)这种方法可能会在某个时候被删除。
    129         # 使用qsize() >= n作为直接替代,但是要注意,在使用full()或qsize()的结果之前,
    130         # 队列可能会收缩,这可能会导致竞争条件的风险。
    131         with self.mutex:
    132             return 0 < self.maxsize <= self._qsize()
    133 
    134     def put(self, item, block=True, timeout=None):
    135         '''Put an item into the queue.
    136         If optional args 'block' is true and 'timeout' is None (the default),
    137         block if necessary until a free slot is available. If 'timeout' is
    138         a non-negative number, it blocks at most 'timeout' seconds and raises
    139         the Full exception if no free slot was available within that time.
    140         Otherwise ('block' is false), put an item on the queue if a free slot
    141         is immediately available, else raise the Full exception ('timeout'
    142         is ignored in that case).
    143         '''
    144         #将项目放入队列中。如果可选的args 'block'为true,而'timeout'为None(默认值),
    145         # 那么如果有必要,阻塞直到空闲的插槽可用为止。如果“timeout”是非负数,它最多会
    146         # 阻塞“timeout”秒,如果在这段时间内没有可用的空闲时间,它就会引发完全异常。
    147         # 否则(‘block’为false),如果有空闲的插槽立即可用,就在队列中放置一个项目,
    148         # 否则引发完整的异常(在这种情况下忽略‘timeout’)。
    149         with self.not_full:
    150             if self.maxsize > 0:
    151                 if not block:
    152                     if self._qsize() >= self.maxsize:
    153                         raise Full
    154                 elif timeout is None:
    155                     while self._qsize() >= self.maxsize:
    156                         self.not_full.wait()
    157                 elif timeout < 0:
    158                     raise ValueError("'timeout' must be a non-negative number")
    159                 else:
    160                     endtime = time() + timeout
    161                     while self._qsize() >= self.maxsize:
    162                         remaining = endtime - time()
    163                         if remaining <= 0.0:
    164                             raise Full
    165                         self.not_full.wait(remaining)
    166             self._put(item)
    167             self.unfinished_tasks += 1
    168             self.not_empty.notify()
    169 
    170     def get(self, block=True, timeout=None):
    171         '''Remove and return an item from the queue.
    172         If optional args 'block' is true and 'timeout' is None (the default),
    173         block if necessary until an item is available. If 'timeout' is
    174         a non-negative number, it blocks at most 'timeout' seconds and raises
    175         the Empty exception if no item was available within that time.
    176         Otherwise ('block' is false), return an item if one is immediately
    177         available, else raise the Empty exception ('timeout' is ignored
    178         in that case).
    179         '''
    180         #从队列中删除并返回项。如果可选的args 'block'为true,而'timeout'为None
    181         # (默认值),则在项可用之前,如果有必要,阻塞。如果“timeout”是非负数,它最多
    182         # 会阻塞“timeout”秒,如果在这段时间内没有可用项,就会引发空异常。
    183         # 否则(‘block’为false),如果一个项立即可用,返回一个项,否则引发空异常
    184         # (在这种情况下忽略'timeout')。
    185         with self.not_empty:
    186             if not block:
    187                 if not self._qsize():
    188                     raise Empty
    189             elif timeout is None:
    190                 while not self._qsize():
    191                     self.not_empty.wait()
    192             elif timeout < 0:
    193                 raise ValueError("'timeout' must be a non-negative number")
    194             else:
    195                 endtime = time() + timeout
    196                 while not self._qsize():
    197                     remaining = endtime - time()
    198                     if remaining <= 0.0:
    199                         raise Empty
    200                     self.not_empty.wait(remaining)
    201             item = self._get()
    202             self.not_full.notify()
    203             return item
    204 
    205     def put_nowait(self, item):
    206         '''Put an item into the queue without blocking.
    207         Only enqueue the item if a free slot is immediately available.
    208         Otherwise raise the Full exception.
    209         '''
    210         #将项目放入队列中而不阻塞。只有当一个空闲的插槽立即可用时,才将项目加入队列。否则引发完全异常。
    211         return self.put(item, block=False)
    212 
    213     def get_nowait(self):
    214         '''Remove and return an item from the queue without blocking.
    215         Only get an item if one is immediately available. Otherwise
    216         raise the Empty exception.
    217         '''
    218         #在不阻塞的情况下从队列中删除并返回项。只有当一个项目是立即可用的。否则引发空异常。
    219         return self.get(block=False)
    220 
    221     # Override these methods to implement other queue organizations
    222     # (e.g. stack or priority queue).
    223     # These will only be called with appropriate locks held
    224     #重写这些方法以实现其他队列组织(例如堆栈或优先队列)。只有在持有适当的锁时才会调用这些函数
    225 
    226     # Initialize the queue representation
    227     '''初始化队列表示'''
    228     def _init(self, maxsize):
    229         self.queue = deque()
    230 
    231     def _qsize(self):
    232         return len(self.queue)
    233 
    234     # Put a new item in the queue
    235     def _put(self, item):
    236         self.queue.append(item)
    237 
    238     # Get an item from the queue
    239     def _get(self):
    240         return self.queue.popleft()
    241 
    242 
    243 class PriorityQueue(Queue):
    244     '''Variant of Queue that retrieves open entries in priority order (lowest first).
    245     Entries are typically tuples of the form:  (priority number, data).
    246     '''
    247     #按优先级顺序(最低优先级)检索打开项的队列的变体。条目通常是表单的元组(优先级号、数据)。
    248 
    249     def _init(self, maxsize):
    250         self.queue = []
    251 
    252     def _qsize(self):
    253         return len(self.queue)
    254 
    255     def _put(self, item):
    256         heappush(self.queue, item)
    257 
    258     def _get(self):
    259         return heappop(self.queue)
    260 
    261 
    262 class LifoQueue(Queue):
    263     '''Variant of Queue that retrieves most recently added entries first.'''
    264     #队列的变体,它首先检索最近添加的条目。
    265 
    266     def _init(self, maxsize):
    267         self.queue = []
    268 
    269     def _qsize(self):
    270         return len(self.queue)
    271 
    272     def _put(self, item):
    273         self.queue.append(item)
    274 
    275     def _get(self):
    276         return self.queue.pop()
    277 
    278 
    279 class _PySimpleQueue:
    280     '''Simple, unbounded FIFO queue.
    281     This pure Python implementation is not reentrant.
    282     '''
    283     #简单、无界的FIFO队列。这个纯Python实现是不可重入的。
    284 
    285     # Note: while this pure Python version provides fairness
    286     # (by using a threading.Semaphore which is itself fair, being based
    287     #  on threading.Condition), fairness is not part of the API contract.
    288     # This allows the C version to use a different implementation.
    289     #注意:虽然这个纯Python版本提供了公平性(通过使用线程)。信号量本身是公平的,
    290     # 基于thread . condition),公平不是API契约的一部分。这允许C版本使用不同的实现。
    291 
    292     def __init__(self):
    293         self._queue = deque()
    294         self._count = threading.Semaphore(0)
    295 
    296     def put(self, item, block=True, timeout=None):
    297         '''Put the item on the queue.
    298         The optional 'block' and 'timeout' arguments are ignored, as this method
    299         never blocks.  They are provided for compatibility with the Queue class.
    300         '''
    301         #将项目放到队列中。可选的“block”和“timeout”参数被忽略,因为这个方法从不阻塞。
    302         # 它们是为了与队列类兼容而提供的。
    303         self._queue.append(item)
    304         self._count.release()
    305 
    306     def get(self, block=True, timeout=None):
    307         '''Remove and return an item from the queue.
    308         If optional args 'block' is true and 'timeout' is None (the default),
    309         block if necessary until an item is available. If 'timeout' is
    310         a non-negative number, it blocks at most 'timeout' seconds and raises
    311         the Empty exception if no item was available within that time.
    312         Otherwise ('block' is false), return an item if one is immediately
    313         available, else raise the Empty exception ('timeout' is ignored
    314         in that case).
    315         '''
    316         #从队列中删除并返回项。如果可选的args 'block'为true,而'timeout'为None
    317         # (默认值),则在项可用之前,如果有必要,阻塞。如果“timeout”是非负数,它最多
    318         # 会阻塞“timeout”秒,如果在这段时间内没有可用项,就会引发空异常。否则
    319         # (‘block’为false),如果一个项立即可用,返回一个项,否则引发空异常
    320         # (在这种情况下忽略'timeout')。
    321         if timeout is not None and timeout < 0:
    322             raise ValueError("'timeout' must be a non-negative number")
    323         if not self._count.acquire(block, timeout):
    324             raise Empty
    325         return self._queue.popleft()
    326 
    327     def put_nowait(self, item):
    328         '''Put an item into the queue without blocking.
    329         This is exactly equivalent to `put(item)` and is only provided
    330         for compatibility with the Queue class.
    331         '''
    332         #将项目放入队列中而不阻塞。这完全等同于‘put(item)’,并且只提供与队列类的兼容性。
    333         return self.put(item, block=False)
    334 
    335     def get_nowait(self):
    336         '''Remove and return an item from the queue without blocking.
    337         Only get an item if one is immediately available. Otherwise
    338         raise the Empty exception.
    339         '''
    340         #在不阻塞的情况下从队列中删除并返回项。只有当一个项目是立即可用的。否则引发空异常。
    341         return self.get(block=False)
    342 
    343     def empty(self):
    344         '''Return True if the queue is empty, False otherwise (not reliable!).'''
    345         #如果队列为空,返回True,否则返回False(不可靠!)
    346         return len(self._queue) == 0
    347 
    348     def qsize(self):
    349         '''Return the approximate size of the queue (not reliable!).'''
    350         #返回队列的大致大小(不可靠!)
    351         return len(self._queue)
    352 
    353 
    354 if SimpleQueue is None:
    355     SimpleQueue = _PySimpleQueue
    queue源代码
    Python进程

    进程(multiprocessing):

    线程是进程最小的数据单元;每个进程都是相互独立的,它们之间不能共享数据。

    启动单个进程:

     1 '''启动一个进程'''
     2 import multiprocessing
     3 import time
     4 
     5 def run(name):
     6     time.sleep(2)
     7     print("hello",name)
     8 
     9 if __name__ == "__main__":
    10     p = multiprocessing.Process(target=run,args=("pp",))
    11     p.start()
    12     p.join()
    13 
    14 
    15 '''运行结果'''
    16 hello pp

    启动多个进程:

     1 import multiprocessing
     2 import time
     3 
     4 def run(name):
     5     time.sleep(2)
     6     print("hello",name)
     7 
     8 for i in range(10):
     9     if __name__ == "__main__":
    10         p = multiprocessing.Process(target=run, args=("pp",))
    11         p.start()
    12         p.join()

    在进程中创建进程:

     1 import multiprocessing
     2 import threading
     3 import time
     4 
     5 def thread_run():
     6     print(threading.get_ident())  #get_ident()获得线程地址
     7 
     8 def run(name):
     9     time.sleep(2)
    10     print("hello",name)
    11     t = threading.Thread(target=thread_run)
    12     t.start()
    13 
    14 if __name__ == "__main__":
    15     p = multiprocessing.Process(target=run,args=("pp",))
    16     p.start()
    17     p.join()
    18 
    19 
    20 '''运行结果'''
    21 hello pp
    22 2404

    数据共享:

     1 '''通过中间介质(pickle)使两个进程实现数据共享,实质上并不是完全的数据共享,
     2 只是将子进程的对象(队列Queue)进行克隆'''
     3 # import threading
     4 # import queue
     5 # 
     6 # def f():
     7 #     q.put("jfkdsljfkdls")
     8 # 
     9 # if __name__=="__main__":
    10 #     q=queue.Queue()
    11 #     p=threading.Thread(target=f)
    12 #     p.start()
    13 #     print(q.get())
    14 #     p.join()
    15 
    16 '''进程'''
    17 from multiprocessing import Process,Queue
    18 import queue
    19 
    20 def f(q2):
    21     q2.put("hkshhdjskajdksa")
    22 
    23 if __name__=="__main__":
    24     q=Queue()
    25     p=Process(target=f,args=(q,))
    26     p.start()
    27     print(q.get())
    28     p.join()
    通过中间介质(pickle)
     1 '''manager是用来传递对象'''
     2 from multiprocessing import Process,Manager
     3 import os
     4 
     5 def f(d,l,l1):
     6     d[os.getpid()] = os.getpid()   #os.getpid():Return the current process id.
     7     l.append(os.getpid())
     8     l1.append(os.getpid())  #l1属于直接传递,不能回传
     9     # print(l)
    10     # print("l1:***",l1)   #普通列表在子进程中每次会获得一个新值,但都会被下一个值覆盖
    11     # print(d)
    12 
    13 if __name__ == "__main__":
    14     with Manager() as mager:
    15         d = mager.dict()         #由manager生成的字典
    16         l = mager.list(range(5)) #由manager生成的列表
    17         l1 = []                  #普通列表无法共享数据,最后仍旧是空列表
    18         p_list = []
    19         for i in range(10):
    20             p = Process(target=f,args=(d,l,l1))
    21             p.start()
    22             p.join()
    23         print("l:",l)
    24         print(l1)
    25         print("d:",d)
    数据共享(manager)

    数据传递:

     1 '''主进程(父类进程) 子进程'''
     2 '''管道通信实现数据之间的传递'''
     3 # from multiprocessing import Process,Pipe
     4 #
     5 # def f(con):
     6 #     con.send("hello from child1")
     7 #     con.send("hello from child2")
     8 #     print("parent news:",con.recv())
     9 #     con.close()
    10 #
    11 # if __name__ == "__main__":
    12 #     Parent_con,Child_con = Pipe()
    13 #     p = Process(target=f,args=(Child_con,))
    14 #     p.start()
    15 #     print(Parent_con.recv())
    16 #     print(Parent_con.recv())
    17 #     Parent_con.send("from parent")
    18 #     p.join()
    19 
    20 
    21 
    22 '''运行结果'''
    23 hello from child1
    24 hello from child2
    25 parent news: from parent
    管道通信

     进程锁(Lock):

    屏幕存在共享,多进程可以同时使用屏幕,进程加锁的目的在于,确保屏幕被独个进程使用。

    from multiprocessing import Process,Lock
    
    def f(l,i):
        l.acquire()
        print("+++",i)
        l.release()
    
    if __name__ == "__main__":
        lock = Lock()   #加锁的目的是为了确保屏幕被单独占用
        for num in range(100):
            Process(target=f,args=(lock,num)).start()
    
    
    '''运行结果'''
    +++ 0    
    +++ 1
    +++ 2
    +++ 3
    +++ 4
    +++ 5
    .
    .
    .(不在这里演示完所有内容)

    进程池(pool):

    python中,进程池内部会维护一个进程序列。当需要时,程序会去进程池中获取一个进程。

    如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

    from multiprocessing import Process,Pool
    import time
    import os
    
    def foo(i):
        time.sleep(2)
        print("in the process:",os.getpid())
        return i+100
    
    
    def bar(args):
        print("system done",args)
    
    if __name__ == "__main__":
        pool = Pool(5)
        for i in range(10):
            # pool.apply(func=foo,args=(i,))
            #生成进程,把pool放入容器
            #apply本身是一个串行方法,不受join影响
    
            pool.apply_async(func=foo,args=(i,),callback=bar)
            #apply_async是一个并行方法,受join影响
            #callback()回调函数为主进程操作,进程池一旦开始运行,回调函数会自动执行
    
    
        print("end")
        pool.close()  #pool关闭是需要时间的,所以在close之后再join
        pool.join()

    pool的内置方法:

    • apply    串行方法。从进程池里取一个进程并同步执行,不受join影响
    • apply_async    并行方法。从进程池里取出一个进程并异步执行,受join影响
    • terminate    立刻关闭进程池
    • join 主进程等待所有子进程执行完毕,必须在close或terminete之后(如上述代码)
    • close 等待所有进程结束才关闭线程池
      1 #
      2 # Module providing the `Pool` class for managing a process pool
      3 #模块提供用于管理进程池的“池”类
      4 # multiprocessing/pool.py
      5 #
      6 # Copyright (c) 2006-2008, R Oudkerk
      7 # Licensed to PSF under a Contributor Agreement.
      8 #
      9 
     10 __all__ = ['Pool', 'ThreadPool']
     11 
     12 #
     13 # Imports
     14 #
     15 
     16 import threading
     17 import queue
     18 import itertools
     19 import collections
     20 import os
     21 import time
     22 import traceback
     23 
     24 # If threading is available then ThreadPool should be provided.  Therefore
     25 # we avoid top-level imports which are liable to fail on some systems.
     26 from . import util
     27 from . import get_context, TimeoutError
     28 
     29 #
     30 # Constants representing the state of a pool
     31 #表示池状态的常数
     32 
     33 RUN = 0
     34 CLOSE = 1
     35 TERMINATE = 2
     36 
     37 #
     38 # Miscellaneous
     39 #
     40 
     41 job_counter = itertools.count()
     42 
     43 def mapstar(args):
     44     return list(map(*args))
     45 
     46 def starmapstar(args):
     47     return list(itertools.starmap(args[0], args[1]))
     48 
     49 #
     50 # Hack to embed stringification of remote traceback in local traceback
     51 #
     52 
     53 class RemoteTraceback(Exception):
     54     def __init__(self, tb):
     55         self.tb = tb
     56     def __str__(self):
     57         return self.tb
     58 
     59 class ExceptionWithTraceback:
     60     def __init__(self, exc, tb):
     61         tb = traceback.format_exception(type(exc), exc, tb)
     62         tb = ''.join(tb)
     63         self.exc = exc
     64         self.tb = '
    """
    %s"""' % tb
     65     def __reduce__(self):
     66         return rebuild_exc, (self.exc, self.tb)
     67 
     68 def rebuild_exc(exc, tb):
     69     exc.__cause__ = RemoteTraceback(tb)
     70     return exc
     71 
     72 #
     73 # Code run by worker processes
     74 #
     75 
     76 class MaybeEncodingError(Exception):
     77     """Wraps possible unpickleable errors, so they can be
     78     safely sent through the socket."""
     79     #包装可能出现的无法拾取的错误,以便通过套接字安全地发送这些错误。
     80 
     81     def __init__(self, exc, value):
     82         self.exc = repr(exc)
     83         self.value = repr(value)
     84         super(MaybeEncodingError, self).__init__(self.exc, self.value)
     85 
     86     def __str__(self):
     87         return "Error sending result: '%s'. Reason: '%s'" % (self.value,
     88                                                              self.exc)
     89 
     90     def __repr__(self):
     91         return "<%s: %s>" % (self.__class__.__name__, self)
     92 
     93 
     94 def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
     95            wrap_exception=False):
     96     if (maxtasks is not None) and not (isinstance(maxtasks, int)
     97                                        and maxtasks >= 1):
     98         raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
     99     put = outqueue.put
    100     get = inqueue.get
    101     if hasattr(inqueue, '_writer'):
    102         inqueue._writer.close()
    103         outqueue._reader.close()
    104 
    105     if initializer is not None:
    106         initializer(*initargs)
    107 
    108     completed = 0
    109     while maxtasks is None or (maxtasks and completed < maxtasks):
    110         try:
    111             task = get()
    112         except (EOFError, OSError):
    113             util.debug('worker got EOFError or OSError -- exiting')
    114             break
    115 
    116         if task is None:
    117             util.debug('worker got sentinel -- exiting')
    118             break
    119 
    120         job, i, func, args, kwds = task
    121         try:
    122             result = (True, func(*args, **kwds))
    123         except Exception as e:
    124             if wrap_exception and func is not _helper_reraises_exception:
    125                 e = ExceptionWithTraceback(e, e.__traceback__)
    126             result = (False, e)
    127         try:
    128             put((job, i, result))
    129         except Exception as e:
    130             wrapped = MaybeEncodingError(e, result[1])
    131             util.debug("Possible encoding error while sending result: %s" % (
    132                 wrapped))
    133             put((job, i, (False, wrapped)))
    134 
    135         task = job = result = func = args = kwds = None
    136         completed += 1
    137     util.debug('worker exiting after %d tasks' % completed)
    138 
    139 def _helper_reraises_exception(ex):
    140     'Pickle-able helper function for use by _guarded_task_generation.'
    141     #用于_guarded_task_generation的可选择助手函数。
    142     raise ex
    143 
    144 #
    145 # Class representing a process pool  类表示进程池
    146 #
    147 
    148 class Pool(object):
    149     '''
    150     Class which supports an async version of applying functions to arguments.
    151     '''
    152     #类,该类支持将函数应用于参数的异步版本。
    153     _wrap_exception = True
    154 
    155     def Process(self, *args, **kwds):
    156         return self._ctx.Process(*args, **kwds)
    157 
    158     def __init__(self, processes=None, initializer=None, initargs=(),
    159                  maxtasksperchild=None, context=None):
    160         self._ctx = context or get_context()
    161         self._setup_queues()
    162         self._taskqueue = queue.SimpleQueue()
    163         self._cache = {}
    164         self._state = RUN
    165         self._maxtasksperchild = maxtasksperchild
    166         self._initializer = initializer
    167         self._initargs = initargs
    168 
    169         if processes is None:
    170             processes = os.cpu_count() or 1
    171         if processes < 1:
    172             raise ValueError("Number of processes must be at least 1")
    173 
    174         if initializer is not None and not callable(initializer):
    175             raise TypeError('initializer must be a callable')
    176 
    177         self._processes = processes
    178         self._pool = []
    179         self._repopulate_pool()
    180 
    181         self._worker_handler = threading.Thread(
    182             target=Pool._handle_workers,
    183             args=(self, )
    184             )
    185         self._worker_handler.daemon = True
    186         self._worker_handler._state = RUN
    187         self._worker_handler.start()
    188 
    189 
    190         self._task_handler = threading.Thread(
    191             target=Pool._handle_tasks,
    192             args=(self._taskqueue, self._quick_put, self._outqueue,
    193                   self._pool, self._cache)
    194             )
    195         self._task_handler.daemon = True
    196         self._task_handler._state = RUN
    197         self._task_handler.start()
    198 
    199         self._result_handler = threading.Thread(
    200             target=Pool._handle_results,
    201             args=(self._outqueue, self._quick_get, self._cache)
    202             )
    203         self._result_handler.daemon = True
    204         self._result_handler._state = RUN
    205         self._result_handler.start()
    206 
    207         self._terminate = util.Finalize(
    208             self, self._terminate_pool,
    209             args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
    210                   self._worker_handler, self._task_handler,
    211                   self._result_handler, self._cache),
    212             exitpriority=15
    213             )
    214 
    215     def _join_exited_workers(self):
    216         """Cleanup after any worker processes which have exited due to reaching
    217         their specified lifetime.  Returns True if any workers were cleaned up.
    218         """
    219         #在由于达到指定的生存期而退出的任何工作进程之后进行清理。如果有工人被清理干净,
    220         # 返回True。
    221         cleaned = False
    222         for i in reversed(range(len(self._pool))):
    223             worker = self._pool[i]
    224             if worker.exitcode is not None:
    225                 # worker exited
    226                 util.debug('cleaning up worker %d' % i)
    227                 worker.join()
    228                 cleaned = True
    229                 del self._pool[i]
    230         return cleaned
    231 
    232     def _repopulate_pool(self):
    233         """Bring the number of pool processes up to the specified number,
    234         for use after reaping workers which have exited.
    235         """
    236         #将池进程的数量增加到指定的数量,以便在收割已退出的工人后使用。
    237         for i in range(self._processes - len(self._pool)):
    238             w = self.Process(target=worker,
    239                              args=(self._inqueue, self._outqueue,
    240                                    self._initializer,
    241                                    self._initargs, self._maxtasksperchild,
    242                                    self._wrap_exception)
    243                             )
    244             self._pool.append(w)
    245             w.name = w.name.replace('Process', 'PoolWorker')
    246             w.daemon = True
    247             w.start()
    248             util.debug('added worker')
    249 
    250     def _maintain_pool(self):
    251         """Clean up any exited workers and start replacements for them.
    252         """
    253         #清理所有离职的员工,并开始替换他们。
    254         if self._join_exited_workers():
    255             self._repopulate_pool()
    256 
    257     def _setup_queues(self):
    258         self._inqueue = self._ctx.SimpleQueue()
    259         self._outqueue = self._ctx.SimpleQueue()
    260         self._quick_put = self._inqueue._writer.send
    261         self._quick_get = self._outqueue._reader.recv
    262 
    263     def apply(self, func, args=(), kwds={}):
    264         '''
    265         Equivalent of `func(*args, **kwds)`.
    266         Pool must be running.
    267         '''
    268         #相当于“func(*args, ** kwds)”。池必须正在运行。
    269         return self.apply_async(func, args, kwds).get()
    270 
    271     def map(self, func, iterable, chunksize=None):
    272         '''
    273         Apply `func` to each element in `iterable`, collecting the results
    274         in a list that is returned.
    275         '''
    276         #对“iterable”中的每个元素应用“func”,在返回的列表中收集结果。
    277         return self._map_async(func, iterable, mapstar, chunksize).get()
    278 
    279     def starmap(self, func, iterable, chunksize=None):
    280         '''
    281         Like `map()` method but the elements of the `iterable` are expected to
    282         be iterables as well and will be unpacked as arguments. Hence
    283         `func` and (a, b) becomes func(a, b).
    284         '''
    285         #方法类似于“map()”,但“iterable”的元素也应该是可迭代的,并将作为参数解压缩。
    286         # 因此“func”和(a, b)变成了func(a, b)。
    287         return self._map_async(func, iterable, starmapstar, chunksize).get()
    288 
    289     def starmap_async(self, func, iterable, chunksize=None, callback=None,
    290             error_callback=None):
    291         '''
    292         Asynchronous version of `starmap()` method.
    293         '''
    294         #异步版本的“starmap()”方法。
    295         return self._map_async(func, iterable, starmapstar, chunksize,
    296                                callback, error_callback)
    297 
    298     def _guarded_task_generation(self, result_job, func, iterable):
    299         '''Provides a generator of tasks for imap and imap_unordered with
    300         appropriate handling for iterables which throw exceptions during
    301         iteration.'''
    302         #为imap和imap_unordered提供任务生成器,并为迭代期间抛出异常的迭代提供适当的处理。
    303         try:
    304             i = -1
    305             for i, x in enumerate(iterable):
    306                 yield (result_job, i, func, (x,), {})
    307         except Exception as e:
    308             yield (result_job, i+1, _helper_reraises_exception, (e,), {})
    309 
    310     def imap(self, func, iterable, chunksize=1):
    311         '''
    312         Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
    313         '''
    314         #等价于“map()”——可能比“Pool.map()”慢得多。
    315         if self._state != RUN:
    316             raise ValueError("Pool not running")
    317         if chunksize == 1:
    318             result = IMapIterator(self._cache)
    319             self._taskqueue.put(
    320                 (
    321                     self._guarded_task_generation(result._job, func, iterable),
    322                     result._set_length
    323                 ))
    324             return result
    325         else:
    326             if chunksize < 1:
    327                 raise ValueError(
    328                     "Chunksize must be 1+, not {0:n}".format(
    329                         chunksize))
    330             task_batches = Pool._get_tasks(func, iterable, chunksize)
    331             result = IMapIterator(self._cache)
    332             self._taskqueue.put(
    333                 (
    334                     self._guarded_task_generation(result._job,
    335                                                   mapstar,
    336                                                   task_batches),
    337                     result._set_length
    338                 ))
    339             return (item for chunk in result for item in chunk)
    340 
    341     def imap_unordered(self, func, iterable, chunksize=1):
    342         '''
    343         Like `imap()` method but ordering of results is arbitrary.
    344         '''
    345         #Like `imap()`方法,但结果的顺序是任意的。
    346         if self._state != RUN:
    347             raise ValueError("Pool not running")
    348         if chunksize == 1:
    349             result = IMapUnorderedIterator(self._cache)
    350             self._taskqueue.put(
    351                 (
    352                     self._guarded_task_generation(result._job, func, iterable),
    353                     result._set_length
    354                 ))
    355             return result
    356         else:
    357             if chunksize < 1:
    358                 raise ValueError(
    359                     "Chunksize must be 1+, not {0!r}".format(chunksize))
    360             task_batches = Pool._get_tasks(func, iterable, chunksize)
    361             result = IMapUnorderedIterator(self._cache)
    362             self._taskqueue.put(
    363                 (
    364                     self._guarded_task_generation(result._job,
    365                                                   mapstar,
    366                                                   task_batches),
    367                     result._set_length
    368                 ))
    369             return (item for chunk in result for item in chunk)
    370 
    371     def apply_async(self, func, args=(), kwds={}, callback=None,
    372             error_callback=None):
    373         '''
    374         Asynchronous version of `apply()` method.  “apply()”方法的异步版本。
    375         '''
    376         if self._state != RUN:
    377             raise ValueError("Pool not running")
    378         result = ApplyResult(self._cache, callback, error_callback)
    379         self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
    380         return result
    381 
    382     def map_async(self, func, iterable, chunksize=None, callback=None,
    383             error_callback=None):
    384         '''
    385         Asynchronous version of `map()` method. 方法的异步版本
    386         '''
    387         return self._map_async(func, iterable, mapstar, chunksize, callback,
    388             error_callback)
    389 
    390     def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
    391             error_callback=None):
    392         '''
    393         Helper function to implement map, starmap and their async counterparts.
    394         '''
    395         #帮助函数实现映射,星图和他们的异步对等。
    396         if self._state != RUN:
    397             raise ValueError("Pool not running")
    398         if not hasattr(iterable, '__len__'):
    399             iterable = list(iterable)
    400 
    401         if chunksize is None:
    402             chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
    403             if extra:
    404                 chunksize += 1
    405         if len(iterable) == 0:
    406             chunksize = 0
    407 
    408         task_batches = Pool._get_tasks(func, iterable, chunksize)
    409         result = MapResult(self._cache, chunksize, len(iterable), callback,
    410                            error_callback=error_callback)
    411         self._taskqueue.put(
    412             (
    413                 self._guarded_task_generation(result._job,
    414                                               mapper,
    415                                               task_batches),
    416                 None
    417             )
    418         )
    419         return result
    420 
    421     @staticmethod
    422     def _handle_workers(pool):
    423         thread = threading.current_thread()
    424 
    425         # Keep maintaining workers until the cache gets drained, unless the pool
    426         # is terminated.
    427         #继续维护worker,直到缓存耗尽,除非池终止。
    428         while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
    429             pool._maintain_pool()
    430             time.sleep(0.1)
    431         # send sentinel to stop workers
    432         pool._taskqueue.put(None)
    433         util.debug('worker handler exiting')
    434 
    435     @staticmethod
    436     def _handle_tasks(taskqueue, put, outqueue, pool, cache):
    437         thread = threading.current_thread()
    438 
    439         for taskseq, set_length in iter(taskqueue.get, None):
    440             task = None
    441             try:
    442                 # iterating taskseq cannot fail
    443                 #迭代taskseq不会失败
    444                 for task in taskseq:
    445                     if thread._state:
    446                         util.debug('task handler found thread._state != RUN')
    447                         break
    448                     try:
    449                         put(task)
    450                     except Exception as e:
    451                         job, idx = task[:2]
    452                         try:
    453                             cache[job]._set(idx, (False, e))
    454                         except KeyError:
    455                             pass
    456                 else:
    457                     if set_length:
    458                         util.debug('doing set_length()')
    459                         idx = task[1] if task else -1
    460                         set_length(idx + 1)
    461                     continue
    462                 break
    463             finally:
    464                 task = taskseq = job = None
    465         else:
    466             util.debug('task handler got sentinel')
    467 
    468         try:
    469             # tell result handler to finish when cache is empty
    470             #告诉结果处理程序在缓存为空时结束
    471             util.debug('task handler sending sentinel to result handler')
    472             outqueue.put(None)
    473 
    474             # tell workers there is no more work
    475             util.debug('task handler sending sentinel to workers')
    476             for p in pool:
    477                 put(None)
    478         except OSError:
    479             util.debug('task handler got OSError when sending sentinels')
    480 
    481         util.debug('task handler exiting')
    482 
    483     @staticmethod
    484     def _handle_results(outqueue, get, cache):
    485         thread = threading.current_thread()
    486 
    487         while 1:
    488             try:
    489                 task = get()
    490             except (OSError, EOFError):
    491                 util.debug('result handler got EOFError/OSError -- exiting')
    492                 return
    493 
    494             if thread._state:
    495                 assert thread._state == TERMINATE, "Thread not in TERMINATE"
    496                 util.debug('result handler found thread._state=TERMINATE')
    497                 break
    498 
    499             if task is None:
    500                 util.debug('result handler got sentinel')
    501                 break
    502 
    503             job, i, obj = task
    504             try:
    505                 cache[job]._set(i, obj)
    506             except KeyError:
    507                 pass
    508             task = job = obj = None
    509 
    510         while cache and thread._state != TERMINATE:
    511             try:
    512                 task = get()
    513             except (OSError, EOFError):
    514                 util.debug('result handler got EOFError/OSError -- exiting')
    515                 return
    516 
    517             if task is None:
    518                 util.debug('result handler ignoring extra sentinel')
    519                 continue
    520             job, i, obj = task
    521             try:
    522                 cache[job]._set(i, obj)
    523             except KeyError:
    524                 pass
    525             task = job = obj = None
    526 
    527         if hasattr(outqueue, '_reader'):
    528             util.debug('ensuring that outqueue is not full')
    529             # If we don't make room available in outqueue then
    530             # attempts to add the sentinel (None) to outqueue may
    531             # block.  There is guaranteed to be no more than 2 sentinels.
    532             #如果我们不在outqueue中留出可用的空间,那么尝试将sentinel (None)
    533             # 添加到outqueue可能会阻塞。保证不超过2个哨兵。
    534             try:
    535                 for i in range(10):
    536                     if not outqueue._reader.poll():
    537                         break
    538                     get()
    539             except (OSError, EOFError):
    540                 pass
    541 
    542         util.debug('result handler exiting: len(cache)=%s, thread._state=%s',
    543               len(cache), thread._state)
    544 
    545     @staticmethod
    546     def _get_tasks(func, it, size):
    547         it = iter(it)
    548         while 1:
    549             x = tuple(itertools.islice(it, size))
    550             if not x:
    551                 return
    552             yield (func, x)
    553 
    554     def __reduce__(self):
    555         raise NotImplementedError(
    556               'pool objects cannot be passed between processes or pickled'
    557             #不能在进程之间传递池对象或pickle池对象
    558               )
    559 
    560     def close(self):
    561         util.debug('closing pool')
    562         if self._state == RUN:
    563             self._state = CLOSE
    564             self._worker_handler._state = CLOSE
    565 
    566     def terminate(self):
    567         util.debug('terminating pool')
    568         self._state = TERMINATE
    569         self._worker_handler._state = TERMINATE
    570         self._terminate()
    571 
    572     def join(self):
    573         util.debug('joining pool')
    574         if self._state == RUN:
    575             raise ValueError("Pool is still running")
    576         elif self._state not in (CLOSE, TERMINATE):
    577             raise ValueError("In unknown state")
    578         self._worker_handler.join()
    579         self._task_handler.join()
    580         self._result_handler.join()
    581         for p in self._pool:
    582             p.join()
    583 
    584     @staticmethod
    585     def _help_stuff_finish(inqueue, task_handler, size):
    586         # task_handler may be blocked trying to put items on inqueue
    587         #试图将项放入inqueue时可能阻塞task_handler
    588         util.debug('removing tasks from inqueue until task handler finished')
    589         inqueue._rlock.acquire()
    590         while task_handler.is_alive() and inqueue._reader.poll():
    591             inqueue._reader.recv()
    592             time.sleep(0)
    593 
    594     @classmethod
    595     def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
    596                         worker_handler, task_handler, result_handler, cache):
    597         # this is guaranteed to only be called once 这保证只调用一次
    598         util.debug('finalizing pool')
    599 
    600         worker_handler._state = TERMINATE
    601         task_handler._state = TERMINATE
    602 
    603         util.debug('helping task handler/workers to finish')
    604         cls._help_stuff_finish(inqueue, task_handler, len(pool))
    605 
    606         if (not result_handler.is_alive()) and (len(cache) != 0):
    607             raise AssertionError(
    608                 "Cannot have cache with result_hander not alive")
    609 
    610         result_handler._state = TERMINATE
    611         outqueue.put(None)                  # sentinel
    612 
    613         # We must wait for the worker handler to exit before terminating
    614         # workers because we don't want workers to be restarted behind our back.
    615         #我们必须在终止工人之前等待工人处理程序退出,因为我们不希望工人在我们背后重新启动。
    616         util.debug('joining worker handler')
    617         if threading.current_thread() is not worker_handler:
    618             worker_handler.join()
    619 
    620         # Terminate workers which haven't already finished.
    621         if pool and hasattr(pool[0], 'terminate'):
    622             util.debug('terminating workers')
    623             for p in pool:
    624                 if p.exitcode is None:
    625                     p.terminate()
    626 
    627         util.debug('joining task handler')
    628         if threading.current_thread() is not task_handler:
    629             task_handler.join()
    630 
    631         util.debug('joining result handler')
    632         if threading.current_thread() is not result_handler:
    633             result_handler.join()
    634 
    635         if pool and hasattr(pool[0], 'terminate'):
    636             util.debug('joining pool workers')
    637             for p in pool:
    638                 if p.is_alive():
    639                     # worker has not yet exited
    640                     util.debug('cleaning up worker %d' % p.pid)
    641                     p.join()
    642 
    643     def __enter__(self):
    644         return self
    645 
    646     def __exit__(self, exc_type, exc_val, exc_tb):
    647         self.terminate()
    648 
    649 #
    650 # Class whose instances are returned by `Pool.apply_async()`
    651 #
    652 
    653 class ApplyResult(object):
    654 
    655     def __init__(self, cache, callback, error_callback):
    656         self._event = threading.Event()
    657         self._job = next(job_counter)
    658         self._cache = cache
    659         self._callback = callback
    660         self._error_callback = error_callback
    661         cache[self._job] = self
    662 
    663     def ready(self):
    664         return self._event.is_set()
    665 
    666     def successful(self):
    667         if not self.ready():
    668             raise ValueError("{0!r} not ready".format(self))
    669         return self._success
    670 
    671     def wait(self, timeout=None):
    672         self._event.wait(timeout)
    673 
    674     def get(self, timeout=None):
    675         self.wait(timeout)
    676         if not self.ready():
    677             raise TimeoutError
    678         if self._success:
    679             return self._value
    680         else:
    681             raise self._value
    682 
    683     def _set(self, i, obj):
    684         self._success, self._value = obj
    685         if self._callback and self._success:
    686             self._callback(self._value)
    687         if self._error_callback and not self._success:
    688             self._error_callback(self._value)
    689         self._event.set()
    690         del self._cache[self._job]
    691 
    692 AsyncResult = ApplyResult       # create alias -- see #17805
    693 
    694 #
    695 # Class whose instances are returned by `Pool.map_async()`
    696 #
    697 
    698 class MapResult(ApplyResult):
    699 
    700     def __init__(self, cache, chunksize, length, callback, error_callback):
    701         ApplyResult.__init__(self, cache, callback,
    702                              error_callback=error_callback)
    703         self._success = True
    704         self._value = [None] * length
    705         self._chunksize = chunksize
    706         if chunksize <= 0:
    707             self._number_left = 0
    708             self._event.set()
    709             del cache[self._job]
    710         else:
    711             self._number_left = length//chunksize + bool(length % chunksize)
    712 
    713     def _set(self, i, success_result):
    714         self._number_left -= 1
    715         success, result = success_result
    716         if success and self._success:
    717             self._value[i*self._chunksize:(i+1)*self._chunksize] = result
    718             if self._number_left == 0:
    719                 if self._callback:
    720                     self._callback(self._value)
    721                 del self._cache[self._job]
    722                 self._event.set()
    723         else:
    724             if not success and self._success:
    725                 # only store first exception
    726                 self._success = False
    727                 self._value = result
    728             if self._number_left == 0:
    729                 # only consider the result ready once all jobs are done
    730                 if self._error_callback:
    731                     self._error_callback(self._value)
    732                 del self._cache[self._job]
    733                 self._event.set()
    734 
    735 #
    736 # Class whose instances are returned by `Pool.imap()`
    737 #
    738 
    739 class IMapIterator(object):
    740 
    741     def __init__(self, cache):
    742         self._cond = threading.Condition(threading.Lock())
    743         self._job = next(job_counter)
    744         self._cache = cache
    745         self._items = collections.deque()
    746         self._index = 0
    747         self._length = None
    748         self._unsorted = {}
    749         cache[self._job] = self
    750 
    751     def __iter__(self):
    752         return self
    753 
    754     def next(self, timeout=None):
    755         with self._cond:
    756             try:
    757                 item = self._items.popleft()
    758             except IndexError:
    759                 if self._index == self._length:
    760                     raise StopIteration from None
    761                 self._cond.wait(timeout)
    762                 try:
    763                     item = self._items.popleft()
    764                 except IndexError:
    765                     if self._index == self._length:
    766                         raise StopIteration from None
    767                     raise TimeoutError from None
    768 
    769         success, value = item
    770         if success:
    771             return value
    772         raise value
    773 
    774     __next__ = next                    # XXX
    775 
    776     def _set(self, i, obj):
    777         with self._cond:
    778             if self._index == i:
    779                 self._items.append(obj)
    780                 self._index += 1
    781                 while self._index in self._unsorted:
    782                     obj = self._unsorted.pop(self._index)
    783                     self._items.append(obj)
    784                     self._index += 1
    785                 self._cond.notify()
    786             else:
    787                 self._unsorted[i] = obj
    788 
    789             if self._index == self._length:
    790                 del self._cache[self._job]
    791 
    792     def _set_length(self, length):
    793         with self._cond:
    794             self._length = length
    795             if self._index == self._length:
    796                 self._cond.notify()
    797                 del self._cache[self._job]
    798 
    799 #
    800 # Class whose instances are returned by `Pool.imap_unordered()`
    801 #类,其实例由' Pool.imap_unordered() '返回
    802 
    803 class IMapUnorderedIterator(IMapIterator):
    804 
    805     def _set(self, i, obj):
    806         with self._cond:
    807             self._items.append(obj)
    808             self._index += 1
    809             self._cond.notify()
    810             if self._index == self._length:
    811                 del self._cache[self._job]
    812 
    813 #
    814 #
    815 #
    816 
    817 class ThreadPool(Pool):
    818     _wrap_exception = False
    819 
    820     @staticmethod
    821     def Process(*args, **kwds):
    822         from .dummy import Process
    823         return Process(*args, **kwds)
    824 
    825     def __init__(self, processes=None, initializer=None, initargs=()):
    826         Pool.__init__(self, processes, initializer, initargs)
    827 
    828     def _setup_queues(self):
    829         self._inqueue = queue.SimpleQueue()
    830         self._outqueue = queue.SimpleQueue()
    831         self._quick_put = self._inqueue.put
    832         self._quick_get = self._outqueue.get
    833 
    834     @staticmethod
    835     def _help_stuff_finish(inqueue, task_handler, size):
    836         # drain inqueue, and put sentinels at its head to make workers finish
    837         #排干队伍内的水,并在其头部放置哨兵,使工人完成工作
    838         try:
    839             while True:
    840                 inqueue.get(block=False)
    841         except queue.Empty:
    842             pass
    843         for i in range(size):
    844             inqueue.put(None)
    pool.py
  • 相关阅读:
    我们可以用JAX-WS轻松实现JAVA平台与其他编程环境(.net等)的互操作
    Eclipse 枚举类报错
    eclipse 遇关键字enum编译问题解决
    接口测试框架开发(二):extentreports报告中文乱码问题
    接口测试框架开发(一):rest-Assured_接口返回数据验证
    使用ant运行testng的testng.xml并且使用testng-results.xsl美化结果
    TestNG简单的学习-TestNG运行
    ExtentReports 结合 TestNg 生成自动化 html 报告 (支持多 suite)
    TestNG+ReportNG+Maven优化测试报告
    IDEA+MAVEN+testNG(reportNG)
  • 原文地址:https://www.cnblogs.com/Chestnut-g/p/10011614.html
Copyright © 2011-2022 走看看