zoukankan      html  css  js  c++  java
  • python threading模块

      1 # http://www.runoob.com/python3/python3-multithreading.html
      2 # https://docs.python.org/3/library/threading.html
      3 # http://yoyzhou.github.io/blog/2013/02/28/python-threads-synchronization-locks/
      4 # http://www.jb51.net/article/119414.htm
      5 # http://blog.csdn.net/tomato__/article/details/46606045
      6 # https://www.cnblogs.com/Ajen-lq/p/5325827.html
      7 
      8 
      9 import threading  # module
     10 import socket
     11 import random
     12 import time
     13 import queue
     14 import requests
     15 
     16 
     17 # ['get_ident',           1、返回线程号,一个数字
     18 #  'active_count',        2、存活的线程数
     19 #  'Condition',           3、条件锁
     20 #  'current_thread',      4、返回当前线程的对象
     21 #  'enumerate',           5、存活线程的列表
     22 #  'main_thread',         6、返回主线程对象,python解释器启动的线程
     23 #  'TIMEOUT_MAX',
     24 #  'Event',               8、事件锁
     25 #  'Lock',                9、互斥锁
     26 #  'RLock',               10、重入锁,递归锁
     27 #  'Semaphore',           11、信号量锁
     28 #  'BoundedSemaphore',    12、有限信号量类
     29 #  'Thread',              13、线程
     30 #  'Barrier',             14、栅栏
     31 #  'BrokenBarrierError',
     32 #  'Timer',               16、定时任务
     33 #  'ThreadError',
     34 #  'setprofile',
     35 #  'settrace',
     36 #  'local',               20、
     37 #  'stack_size']
     38 # Python线程同步机制: Locks, RLocks, Semaphores, Conditions, Events和Queues
     39 
     40 
     41 # 1、get_ident
     42 def foo():
     43     print('threading identify no: %s' % threading.get_ident())
     44 
     45 
     46 t1 = threading.Thread(target=foo)
     47 t2 = threading.Thread(target=foo)
     48 t3 = threading.Thread(target=foo)
     49 t1.start()
     50 t2.start()
     51 t3.start()
     52 t1.join()
     53 t2.join()
     54 t3.join()
     55 
     56 
     57 # 2、active_count
     58 def foo():
     59     time.sleep(1)
     60 
     61 
     62 t1 = threading.Thread(target=foo)
     63 t2 = threading.Thread(target=foo)
     64 t3 = threading.Thread(target=foo)
     65 t1.start()
     66 print(threading.active_count())    # 2
     67 t2.start()
     68 print(threading.active_count())    # 3
     69 t3.start()
     70 print(threading.active_count())    # 4
     71 t1.join()
     72 t2.join()
     73 t3.join()
     74 
     75 
     76 # 3、Condition 一个线程等待特定条件,而另一个线程发出特定条件满足的信号
     77 # class Condition(builtins.object) Class that implements a condition variable.
     78 # A condition variable allows one or more threads to wait until they are notified by another thread.
     79 # notify(self, n=1) Wake up one or more threads waiting on this condition, if any.
     80 # notify_all(self) Wake up all threads waiting on this condition.
     81 # wait(self, timeout=None) Wait until notified or until a timeout occurs.
     82 # wait_for(self, predicate, timeout=None) Wait until a condition evaluates to True.
     83 # wait()方法释放锁,然后阻塞直到另一个线程调用notify()或者notify_all()唤醒它。唤醒后,wait()重新获取锁并返回。它也可以指定一个超时时间。
     84 # notify()方法唤醒等待线程中的一个;notify_all()方法唤醒所有等待线程。
     85 # 注意:notify()和notify_all()方法不释放锁;这意味着唤醒的线程或者线程组将不会从wait()调用中立即返回。
     86 # 一个条件变量允许一个或多个线程等待,直到他们被另一个线程通知。
     87 # wait()一旦唤醒或者超时,它重新请求锁并返回。
     88 
     89 
     90 class Producer(threading.Thread):
     91 
     92     def __init__(self, integers, condition):
     93         threading.Thread.__init__(self)
     94         self.integers = integers
     95         self.condition = condition
     96 
     97     def run(self):
     98         while True:
     99             integer = random.randint(0, 256)
    100             with self.condition:  # 获取条件锁
    101                 print('condition acquired by %s' % self.name)
    102                 self.integers.append(integer)
    103                 print('%d appended to list by %s' % (integer, self.name))
    104                 print('condition notified by %s' % self.name)
    105                 self.condition.notify()    # 唤醒消费者线程
    106                 print('condition released by %s' % self.name)
    107             time.sleep(1)  # 暂停1秒钟
    108 
    109 
    110 class Consumer(threading.Thread):
    111     def __init__(self, integers, condition):
    112         threading.Thread.__init__(self)
    113         self.integers = integers
    114         self.condition = condition
    115 
    116     def run(self):
    117         while True:
    118             with self.condition:  # 获取条件锁
    119                 print('condition acquired by %s' % self.name)
    120                 while True:
    121                     if self.integers:  # 判断是否有整数
    122                         integer = self.integers.pop()
    123                         print('%d popped from list by %s' % (integer, self.name))
    124                         break
    125                     print('condition wait by %s' % self.name)
    126                     self.condition.wait()  # 等待商品,并且释放资源让生产者执行
    127                 print('condition released by %s' % self.name)
    128 
    129 
    130 def main():
    131     integers = []
    132     condition = threading.Condition()
    133     t1 = Producer(integers, condition)
    134     t2 = Consumer(integers, condition)
    135     t1.start()
    136     t2.start()
    137     t1.join()
    138     t2.join()
    139 
    140 
    141 main()
    142 
    143 
    144 # 4、current_thread
    145 def foo():
    146     print(threading.current_thread())
    147 
    148 
    149 t1 = threading.Thread(target=foo)
    150 t1.start()
    151 t1.join()
    152 
    153 
    154 # 5、enumerate
    155 def foo():
    156     time.sleep(1)
    157 
    158 
    159 t1 = threading.Thread(target=foo)
    160 t2 = threading.Thread(target=foo)
    161 t3 = threading.Thread(target=foo)
    162 t1.start()
    163 t2.start()
    164 t3.start()
    165 print(threading.enumerate())
    166 t1.join()
    167 t2.join()
    168 t3.join()
    169 
    170 
    171 # 6、main_thread
    172 def foo():
    173     time.sleep(1)
    174 
    175 
    176 t1 = threading.Thread(target=foo)
    177 t1.start()
    178 print(threading.main_thread())
    179 t1.join()
    180 
    181 
    182 # 8、Event 事件锁 一个线程发送/传递事件,另外的线程(多线程)等待事件的触发。
    183 # class Event(builtins.object) Class implementing event objects.
    184 # Events manage a flag that can be set to true with the set() method and reset to false with the clear() method.
    185 # The wait() method blocks until the flag is true.  The flag is initially false.
    186 # set(self) Set the internal flag to true. All threads waiting for it to become true are awakened.
    187 #           Threads that call wait() once the flag is true will not block at all.
    188 # clear(self) Reset the internal flag to false.
    189 # is_set(self) Return true if and only if the internal flag is true.
    190 # wait(self, timeout=None) Block until the internal flag is true.
    191 # isSet = is_set(self)
    192 
    193 
    194 class Producer(threading.Thread):
    195     def __init__(self, integers, event):
    196         threading.Thread.__init__(self)
    197         self.integers = integers
    198         self.event = event
    199 
    200     def run(self):
    201         while True:
    202             integer = random.randint(0, 256)
    203             self.integers.append(integer)
    204             print('%d appended to list by %s' % (integer, self.name))
    205             print('event set by %s' % self.name)
    206             self.event.set()            # 设置事件
    207             self.event.clear()          # 发送事件
    208             print('event cleared by %s' % self.name)
    209             time.sleep(1)
    210 
    211 
    212 class Consumer(threading.Thread):
    213     def __init__(self, integers, event):
    214         threading.Thread.__init__(self)
    215         self.integers = integers
    216         self.event = event
    217 
    218     def run(self):
    219         while True:
    220             self.event.wait()      # 等待事件被触发
    221             # self.event.wait(timeout=2)  # 等待事件被触发,超时之后,不再阻塞,继续执行
    222             try:
    223                 integer = self.integers.pop()
    224                 print('%d popped from list by %s' % (integer, self.name))
    225             except IndexError:
    226                 # catch pop on empty list
    227                 time.sleep(1)
    228 
    229 
    230 def main():
    231     integers = []
    232     event = threading.Event()
    233     t1 = Producer(integers, event)
    234     t2 = Consumer(integers, event)
    235     t1.start()
    236     t2.start()
    237     t1.join()
    238     t2.join()
    239 
    240 
    241 main()
    242 
    243 
    244 # 9、Lock互斥锁
    245 # class lock(builtins.object) A lock object is a synchronization primitive. To create a lock, call threading.Lock().
    246 # acquire() -- lock the lock, possibly blocking until it can be obtained
    247 # release() -- unlock of the lock
    248 # locked() -- test whether the lock is currently locked
    249 # acquire(...) acquire_lock(...) acquire(blocking=True, timeout=-1)
    250 # locked(...) locked_lock(...) locked() -> bool
    251 # release(...) release_lock(...) release()
    252 
    253 
    254 class FetchUrls(threading.Thread):
    255     def __init__(self, urls, output, lock):
    256         threading.Thread.__init__(self)
    257         self.urls = urls
    258         self.output = output
    259         self.lock = lock
    260 
    261     def run(self):
    262         while self.urls:
    263             url = self.urls.pop()
    264             req = requests.get(url)
    265             with self.lock:
    266                 self.output.write(req.content)
    267             print('write done by %s' % self.name)
    268             print('URL %s fetched by %s' % (url, self.name))
    269 
    270 
    271 def main():
    272     lock = threading.Lock()
    273     urls1 = ['http://www.baidu.com', 'http://www.sina.com']
    274     urls2 = ['http://www.sohu.com', 'http://www.youku.com']
    275     f = open('output.txt', 'wb+')
    276     t1 = FetchUrls(urls1, f, lock)
    277     t2 = FetchUrls(urls2, f, lock)
    278     t1.start()
    279     t2.start()
    280     t1.join()
    281     t2.join()
    282     f.close()
    283 
    284 
    285 main()
    286 
    287 
    288 # 10、RLock可重入锁
    289 # RLock是可重入锁(reentrant lock),acquire()能够不被阻塞的被同一个线程调用多次。要注意的是release()需要调用与acquire()相同的次数才能释放锁。
    290 
    291 
    292 # 11、Semaphore信号量锁
    293 # 信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。
    294 # class Semaphore(builtins.object) This class implements semaphore objects.
    295 # Semaphores manage a counter representing the number of release() calls minus the number of acquire() calls, plus an initial value.
    296 # The acquire() method blocks if necessary until it can return without making the counter negative.
    297 # If not given, value defaults to 1.
    298 # acquire(self, blocking=True, timeout=None) Acquire a semaphore, decrementing the internal counter by one. 超时或者不阻塞返回false
    299 # release(self) Release a semaphore, incrementing the internal counter by one.
    300 
    301 
    302 class MyThread(threading.Thread):
    303     def __init__(self, semaphore, list_):
    304         threading.Thread.__init__(self)
    305         self.semaphore = semaphore
    306         self.list_ = list_
    307 
    308     def run(self):
    309         with self.semaphore:
    310             self.list_.append(2)
    311 
    312 
    313 def main():
    314     list_ = [1, 2, 3]
    315     semaphore = threading.Semaphore(2)    # 允许同时2个线程访问共享资源,默认是1
    316     t1 = MyThread(semaphore, list_)
    317     t2 = MyThread(semaphore, list_)
    318     t3 = MyThread(semaphore, list_)
    319     t1.start()
    320     t2.start()
    321     t3.start()
    322     t1.join()
    323     t2.join()
    324     t3.join()
    325     print(list_)
    326 
    327 
    328 main()
    329 
    330 
    331 # 12、BoundedSemaphore有限信号量类
    332 # “有限”(bounded)信号量类,可以确保release()方法的调用次数不能超过给定的初始信号量数值(value参数)
    333 # 重写release方法
    334 # If the number of releases exceeds the number of acquires, raise a ValueError.
    335 
    336 
    337 # 13、Thread
    338 # class Thread(builtins.object) A class that represents a thread of control.
    339 # This class can be safely subclassed in a limited fashion. There are two ways to specify the activity:
    340 # by passing a callable object to the constructor, or by overriding the run() method in a subclass.
    341 # __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
    342 # group 为将来线程组扩展留的
    343 # daemon 默认非守护线程
    344 # getName(self)  方法
    345 # isDaemon(self) 方法
    346 # is_alive(self) 是否存活,run开始之后到run结束之前是True
    347 # join(self, timeout=None) Wait until the thread terminates.
    348 # run(self) Method representing the thread's activity.
    349 # setDaemon(self, daemonic)
    350 # setName(self, name)
    351 # start(self)
    352 # daemon 属性
    353 # ident  属性
    354 # name   属性
    355 
    356 # 无join,无daemon
    357 def foo():
    358     time.sleep(3)
    359 
    360 
    361 t1 = threading.Thread(target=foo)
    362 t1.start()
    363 print('main thread go on')  # 最后再等待线程,释放线程的资源,这里阻塞
    364 
    365 
    366 # 有join
    367 def foo():
    368     time.sleep(3)
    369 
    370 
    371 t1 = threading.Thread(target=foo)
    372 t1.start()
    373 t1.join()   # 等待线程结束,抓线程再继承执行,这里阻塞
    374 print('main thread not go on')
    375 
    376 
    377 # 有daemon
    378 def foo():
    379     while True:
    380         print('abc')
    381 
    382 
    383 t1 = threading.Thread(target=foo)
    384 t1.setDaemon(True)
    385 t1.start()
    386 print('main thread go on')   # 主线程结束,要求子线程同时也结束,这里不阻塞了
    387 
    388 
    389 # 14、Barrier
    390 # class Barrier(builtins.object) Implements a Barrier. Useful for synchronizing a fixed number
    391 # of threads at known synchronization points.
    392 # Threads block on 'wait()' and are simultaneously once they have all made that call.
    393 # __init__(self, parties, action=None, timeout=None)
    394 # abort(self) Place the barrier into a 'broken' state.     方法,终止栅栏,wait会抛出异常
    395 # reset(self) Reset the barrier to the initial state.      方法,恢复栅栏
    396 # wait(self, timeout=None) Wait for the barrier.           方法,等待数量达到parties,一起通过栅栏
    397 # broken Return True if the barrier is in a broken state.                    属性,检查栅栏的状态,True是坏
    398 # n_waiting Return the number of threads currently waiting at the barrier.   属性,正在wait的数量
    399 # parties Return the number of threads required to trip the barrier.         属性,parties的数量
    400 
    401 
    402 def server(b):
    403     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    404     s.bind(("192.168.99.156", 5003))
    405     s.listen(5)
    406 
    407     while True:
    408         # print(b.broken)   # False
    409         # b.abort()
    410         # print(b.broken)   # True
    411         # b.reset()
    412 
    413         b.wait()
    414         print(threading.get_ident())
    415         new_socket, client_addr = s.accept()
    416         new_socket.sendall(bytes("Hello world", encoding="utf-8"))
    417         new_socket.close()
    418         time.sleep(3)
    419 
    420 
    421 def client(b):
    422     while True:
    423         b.wait()       # 超时 raise BrokenBarrierError
    424         print(threading.get_ident())
    425         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    426         s.connect(("192.168.99.156", 5003))
    427         ret = str(s.recv(1024), encoding="utf-8")
    428         print(ret)
    429         time.sleep(0.1)
    430 
    431 
    432 def callback():
    433     print('Barrier')
    434 
    435 
    436 def main():
    437     b = threading.Barrier(2, action=callback, timeout=5)        # 超时 raise BrokenBarrierError, 通过一次栅栏调用一次action
    438     t1 = threading.Thread(target=server, args=(b, ))
    439     t2 = threading.Thread(target=client, args=(b, ))
    440     t1.start()
    441     t2.start()
    442     t1.join()
    443     t2.join()
    444 
    445 
    446 main()
    447 
    448 
    449 # 16、Timer定时任务
    450 # class Timer(Thread) Call a function after a specified number of seconds。
    451 # __init__(self, interval, function, args=None, kwargs=None) 继承Thread
    452 # 新增方法:cancel(self)
    453 # 重写方法:run(self)
    454 # 其他方法同Thread
    455 
    456 
    457 def func():
    458     print('abc')
    459 
    460 
    461 def main():
    462     t = threading.Timer(2, func)
    463     t.start()
    464 
    465 
    466 main()
    467 
    468 
    469 # 20、local
    470 localVal = threading.local()
    471 localVal.val = "Main-Thread"   # 每个线程单独创建内存空间来存储
    472 a = 10
    473 
    474 
    475 def print_student():
    476     global a
    477     print('%s (in %s)' % (localVal.val, threading.current_thread().name))
    478     a += 100                   # 操作的是主线程的变量
    479 
    480 
    481 def print_thread(name):
    482     localVal.val = name        # 每个线程单独创建内存空间来存储
    483     print_student()
    484 
    485 
    486 def main():
    487     t1 = threading.Thread(target=print_thread, args=('One',), name='Thread-A')
    488     t2 = threading.Thread(target=print_thread, args=('Two',), name='Thread-B')
    489     t1.start()
    490     t2.start()
    491     t1.join()
    492     t2.join()
    493     print(localVal.val)
    494     print(a)
    495 
    496 
    497 main()
    498 
    499 
    500 # 21、使用队列同步线程间共享资源
    501 # 队列 Queue
    502 class Producer(threading.Thread):
    503     def __init__(self, q):
    504         threading.Thread.__init__(self)
    505         self.queue_ = q
    506 
    507     def run(self):
    508         while True:
    509             integer = random.randint(0, 256)
    510             self.queue_.put(integer)  # 将生成的整数添加到队列
    511             print('%d put to queue by %s' % (integer, self.name))
    512             time.sleep(1)
    513 
    514 
    515 class Consumer(threading.Thread):
    516     def __init__(self, q):
    517         threading.Thread.__init__(self)
    518         self.queue_ = q
    519 
    520     def run(self):
    521         while True:
    522             integer = self.queue_.get()
    523             print('%d popped from list by %s' % (integer, self.name))
    524             self.queue_.task_done()
    525 
    526 
    527 def main():
    528     q = queue.Queue(20)
    529     t1 = Producer(q)
    530     t2 = Consumer(q)
    531     t1.start()
    532     t2.start()
    533     t1.join()
    534     t2.join()
    535 
    536 
    537 main()
  • 相关阅读:
    hdu 5151 Sit sit sit
    hdu 5150 Sit sit sit
    cf C. Arithmetic Progression
    hdu 5125 magic balls
    Pots(bfs)
    Prime Path(素数筛选+bfs)
    A Knight's Journey(dfs)
    Colored Sticks (字典树哈希+并查集+欧拉路)
    Squares(哈希)
    Gold Balanced Lineup(哈希表)
  • 原文地址:https://www.cnblogs.com/gundan/p/8301824.html
Copyright © 2011-2022 走看看