zoukankan      html  css  js  c++  java
  • python 多线程

    python 中的多线程模块有 _thread,threading 和 Queue 等

    _thread 模块

    _thread 模块中的 start_new_thread() 函数产生新线程,语法:

    _thread.start_new_thread(function, args[, kwargs])

    其中,function 为线程函数,args 为传递给线程函数的参数,必须是 tuple 类型,kwargs 为可选参数

    _thread 模块除了产生线程外,还提供基本同步数据结构锁对象 (lock object,也叫原语锁、简单锁、互斥锁、互斥量、二值信号量)

    代码:

     1 #!/usr/bin/python3
     2 
     3 import _thread
     4 from time import sleep
     5 from datetime import datetime
     6 
     7 date_time_format = '%y-%M-%d %H : %M : %S'
     8 
     9 def date_time_str(date_time):
    10     return datetime.strftime(date_time, date_time_format)
    11 
    12 def loop_one():
    13     print('线程 A 开始于:', date_time_str(datetime.now()))
    14     print('线程 A 休眠 4 秒')
    15     sleep(4)
    16     print('线程 A 休眠结束, 结束于:', date_time_str(datetime.now()))
    17 
    18 def loop_two():
    19     print('线程 B 开始于:', date_time_str(datetime.now()))
    20     print('线程 B 休眠 2 秒')
    21     sleep(2)
    22     print('线程 B 休眠结束, 结束于:', date_time_str(datetime.now()))
    23 
    24 def main():
    25     print('所有线程开始时间:', date_time_str(datetime.now()))
    26     _thread.start_new_thread(loop_one, ())
    27     _thread.start_new_thread(loop_two, ())
    28     sleep(6)#保证主线程退出前线程 A 和线程 B 已退出
    29     print('所有线程结束, 结束于:', date_time_str(datetime.now()))
    30 
    31 if __name__ == '__main__':
    32     main()
    33 
    34 # 输出:
    35 # 所有线程开始时间: 18-44-23 19 : 44 : 19
    36 # 线程 A 开始于: 18-44-23 19 : 44 : 19
    37 # 线程 B 开始于: 18-44-23 19 : 44 : 19
    38 # 线程 A 休眠 4 秒
    39 # 线程 B 休眠 2 秒
    40 # 线程 B 休眠结束, 结束于: 18-44-23 19 : 44 : 21
    41 # 线程 A 休眠结束, 结束于: 18-44-23 19 : 44 : 23
    42 # 所有线程结束, 结束于: 18-44-23 19 : 44 : 25
    View Code

    注意:sleep(6)#保证主线程退出前线程 A 和线程 B 已退出

    _thread 的锁机制:

    _thread.allocate_lock()#创建 lock 对象

    lock.acquire()#尝试获取lock,默认参数为阻塞至获得锁

    lock.locked()#获取锁状态

    lock.release()#释放锁,必须事先获得锁

    _thread.exit()#退出当前线程

    代码:

     1 #!/usr/bin/python3
     2 
     3 import _thread
     4 from time import sleep
     5 from datetime import datetime
     6 
     7 loops = [4, 2]
     8 date_time_format = '%y-%M-%d %H : %M : %S'
     9 
    10 def date_time_str(date_time):
    11     return datetime.strftime(date_time, date_time_format)
    12 
    13 def loop(n_loop, n_sec, lock):
    14     print('线程 (', n_loop, ') 开始执行:', date_time_str(datetime.now()), ', 先休眠 (', n_sec, ') 秒')
    15 
    16     sleep(n_sec)
    17 
    18     print('线程 (', n_loop, ') 休眠结束, 结束于:', date_time_str(datetime.now()))
    19 
    20     lock.release()#释放锁,必须事先获得锁
    21 
    22 
    23 def main():
    24     print('所有线程开始时间:', date_time_str(datetime.now()))
    25     locks = []
    26     n_loops = range(len(loops))
    27     
    28     for i in n_loops:
    29         lock = _thread.allocate_lock()#创建 lock 对象
    30         lock.acquire()#尝试获取lock,默认参数为阻塞至获得锁
    31         locks.append(lock)#将锁对象加入列表中
    32 
    33     for i in n_loops:
    34         _thread.start_new_thread(loop, (i, loops[i], locks[i]))#参数只能传元组
    35 
    36     for i in n_loops:
    37         while locks[i].locked(): pass#获取锁状态
    38 
    39     print('所有线程结束, 结束于:', date_time_str(datetime.now()))
    40 
    41 if __name__ == '__main__':
    42     main()
    43 
    44 # 输出:
    45 # 所有线程开始时间: 18-22-23 20 : 22 : 14
    46 # 线程 ( 0 ) 开始执行: 18-22-23 20 : 22 : 14 , 先休眠 ( 4 ) 秒
    47 # 线程 ( 1 ) 开始执行: 18-22-23 20 : 22 : 14 , 先休眠 ( 2 ) 秒
    48 # 线程 ( 1 ) 休眠结束, 结束于: 18-22-23 20 : 22 : 16
    49 # 线程 ( 0 ) 休眠结束, 结束于: 18-22-23 20 : 22 : 18
    50 # 所有线程结束, 结束于: 18-22-23 20 : 22 : 18
    View Code

    注意:应当避免使用 _thread 模块,原因有 3:

    1.threading 模块对线程的支持更为完善,而且使用 _thread 模块里的属性可能与 threading 冲突

    2._thread 模块的同步原语很少(实际上只有一个),而 threading 模块有很多

    3._thread 模块中在主线程结束时,所有线程都会被强制结束,没有警告也不会有正常的清楚工作,而 threading 模块能确保重要子线程退出后进程才退出

    threading 模块

    # run() 通常需要重写,编写代码实现 做需要的功能。定义线程功能的函数

    # getName() 获得线程对象名称

    # setName() 设置线程对象名称

    # start() 启动线程

    # jion([timeout]) 等待另 一线程结束后再运行。timeout设置等待时间

    # setDaemon(bool) 设置子线程是否随主线程一起结束,必须在start()之前调用。默认为False

    # isDaemon() 判断线程是否随主 线程一起结束。

    # isAlive() 检查线程是否在运行中

    代码:

     1 #!/usr/bin/python3
     2 
     3 import threading
     4 from time import sleep
     5 from datetime import datetime
     6 
     7 loops = [4, 2]
     8 date_time_format = '%y-%M-%d %H : %M : %S'
     9 
    10 def date_time_str(date_time):
    11     return datetime.strftime(date_time, date_time_format)
    12 
    13 def loop(n_loop, n_sec):
    14     print('线程 (', n_loop, ') 开始执行:', date_time_str(datetime.now()), ', 先休眠 (', n_sec, ') 秒')
    15 
    16     sleep(n_sec)
    17 
    18     print('线程 (', n_loop, ') 休眠结束, 结束于:', date_time_str(datetime.now()))
    19 
    20 
    21 def main():
    22     print('所有线程开始时间:', date_time_str(datetime.now()))
    23     threads = []
    24     n_loops = range(len(loops))
    25     
    26     for i in n_loops:
    27         t = threading.Thread(target = loop, args = (i, loops[i]))
    28         threads.append(t)#将锁对象加入列表中
    29 
    30     for i in n_loops:
    31         threads[i].start()
    32 
    33     for i in n_loops:
    34         threads[i].join()#子线程结束后才结束主线程
    35 
    36     print('所有线程结束, 结束于:', date_time_str(datetime.now()))
    37 
    38 if __name__ == '__main__':
    39     main()
    40 
    41 # 输出:
    42 # 所有线程开始时间: 18-44-23 22 : 44 : 29
    43 # 线程 ( 0 ) 开始执行: 18-44-23 22 : 44 : 29 , 先休眠 ( 4 ) 秒
    44 # 线程 ( 1 ) 开始执行: 18-44-23 22 : 44 : 29 , 先休眠 ( 2 ) 秒
    45 # 线程 ( 1 ) 休眠结束, 结束于: 18-44-23 22 : 44 : 31
    46 # 线程 ( 0 ) 休眠结束, 结束于: 18-44-23 22 : 44 : 33
    47 # 所有线程结束, 结束于: 18-44-23 22 : 44 : 33
    View Code

    在创建线程时,也可传入一个可调用的类的是实例供线程启动时执行:

     1 #!/usr/bin/python3
     2 
     3 import threading
     4 from time import sleep
     5 from datetime import datetime
     6 
     7 loops = [4, 2]
     8 date_time_format = "%y-%M-%d %H : %M : %S"
     9 
    10 class ThreadFunc(object):
    11     def __init__(self, func, args, name = ''):
    12         self.name = name
    13         self.func = func
    14         self.args = args
    15 
    16     def __call__(self):
    17         self.func(*self.args)#传入可变参数(元组)
    18 
    19 def date_time_str(date_time):
    20     return datetime.strftime(date_time, date_time_format)
    21 
    22 def loop(n_loop, n_sec):
    23     print('线程(', n_loop, ') 开始执行:', date_time_str(datetime.now()), ', 先休眠(', n_sec, ') 秒')
    24     sleep(n_sec)
    25     print('线程 (', n_loop, ') 休眠结束, 结束于:', date_time_str(datetime.now()))
    26 
    27 def main():
    28     print('所有线程开始时间:', date_time_str(datetime.now()))
    29     threads = []
    30     nloops = range(len(loops))
    31 
    32     for i in nloops:
    33         t = threading.Thread(target = ThreadFunc(loop, (i, loops[i]), loop.__name__))
    34         threads.append(t)
    35 
    36     for i in nloops:
    37         threads[i].start()
    38 
    39     for i in nloops:
    40         threads[i].join()
    41 
    42     print('所有线程结束, 结束于:', date_time_str(datetime.now()))
    43 
    44 if __name__ == '__main__':
    45     main()
    46 
    47 # 输出:
    48 # 所有线程开始时间: 18-43-29 21 : 43 : 24
    49 # 线程( 0 ) 开始执行: 18-43-29 21 : 43 : 24 , 先休眠( 4 ) 秒
    50 # 线程( 1 ) 开始执行: 18-43-29 21 : 43 : 24 , 先休眠( 2 ) 秒
    51 # 线程 ( 1 ) 休眠结束, 结束于: 18-43-29 21 : 43 : 26
    52 # 线程 ( 0 ) 休眠结束, 结束于: 18-43-29 21 : 43 : 28
    53 # 所有线程结束, 结束于: 18-43-29 21 : 43 : 28
    View Code

    线程同步

     1 #!/usr/bin/python3
     2 
     3 import threading
     4 from time import sleep
     5 from datetime import datetime
     6 
     7 date_time_format = "%y-%M-%d %H : %M : %S"
     8 
     9 class MyThread(threading.Thread):
    10     def __init__(self, thredID, name, counter):
    11         threading.Thread.__init__(self)
    12         self.thredID = thredID
    13         self.name = name
    14         self.counter = counter
    15 
    16     def run(self):
    17         print('开启线程: ' + self.name)
    18         #获取锁,用于线程同步
    19         threadLock.acquire()
    20         print_time(self.name, self.counter, 3)
    21         #释放锁,开启下一个线程
    22         threadLock.release()
    23 
    24 
    25 def date_time_str(date_time):
    26     return datetime.strftime(date_time, date_time_format)
    27 
    28 def print_time(threadName, delay, counter):
    29     while counter:
    30         sleep(delay)
    31         print('%s : %s' % (threadName, date_time_str(datetime.now())))
    32         counter -= 1
    33 
    34 def main():
    35     #创建新线程
    36     thread1 = MyThread(1, "thread-1", 1)
    37     thread2 = MyThread(2, "thread-2", 2)
    38 
    39     #开启新线程
    40     thread1.start()
    41     thread2.start()
    42 
    43     #添加线程到线程列表
    44     threads.append(thread1)
    45     threads.append(thread2)
    46 
    47     #等待所有线程完成
    48     for i in threads:
    49         i.join()
    50 
    51     print('退出主线程')
    52 
    53     print('所有线程结束, 结束于:', date_time_str(datetime.now()))
    54 
    55 if __name__ == '__main__':
    56     threadLock = threading.Lock()
    57     threads = []
    58     main()
    View Code

    线程优先级队列

    Queue 模块可以用来进行线程间的通信。python 的 Queue 模块提供了同步、线程安全的队列类,包括 FIFO (先进先出) Queue,LIFO (后进先出队列) LifoQueue 和优先级队列 PriorityQueue。这些队列都实现了锁原语,能够在多线程钟直接使用。可以使用队列实现线程间的同步。

    Queue   模块中的常用方法:

    qsize()   返回队列的大小

    empty()   如果队列为空,返回 True,反之返回 False

    full()   如果队列满了返回 True,反之返回 False

    get([block[, timeout]])   获取队列,timeout 等待时间

    get_nowait()   相当于 Queue.get(False)

    put(timeout)   写入队列,timeout 等待时间

    put_nowait(item)   相当于 Queue.put(item, False)

    task_done()   在完成一项工作后,函数向已经完成的队列发送一个信号

    join()   等到队列为空,再执行别的操作

    代码:

     1 #!/usr/bin/python3
     2 
     3 import threading
     4 import queue
     5 from time import sleep
     6 
     7 
     8 class MyThread(threading.Thread):
     9     def __init__(self, threadID, name, q):
    10         threading.Thread.__init__(self)
    11         self.threadID = threadID
    12         self.name = name
    13         self.q = q
    14 
    15     def run(self):
    16         print('开启线程: ' + self.name)
    17         process_data(self.name, self.q)
    18         print('退出线程: ' + self.name)
    19 
    20 
    21 def process_data(threadName, q):
    22     while not exitFlag:
    23         queueLock.acquire()
    24         if not workQueue.empty():
    25             data = q.get()
    26             queueLock.release()
    27             print('%s processing %s' % (threadName, data))
    28         else:
    29             queueLock.release()
    30         sleep(1)
    31 
    32 
    33 def main():
    34     global exitFlag
    35     exitFlag = 0
    36 
    37     threadList = ['thread-1', 'thread-2', 'thread-3']
    38     nameList = ['one', 'two', 'three', 'four', 'five']
    39 
    40     threads = []
    41     threadID = 1;
    42 
    43     #创建新线程
    44     for tName in threadList:
    45         thread = MyThread(threadID, tName, workQueue)
    46         thread.start()
    47         threads.append(thread)
    48         threadID += 1
    49 
    50     #填充队列
    51     queueLock.acquire()
    52     for word in nameList:
    53         workQueue.put(word)
    54     queueLock.release()
    55 
    56     #等待队列清空
    57     while not workQueue.empty():
    58         pass
    59 
    60     #通知线程退出
    61     exitFlag = 1
    62 
    63     #等待所有线程完成
    64     for i in threads:
    65         i.join()
    66 
    67     print('退出主线程')
    68 
    69 
    70 if __name__ == '__main__':
    71     queueLock = threading.Lock()#queueLock为锁对象
    72     workQueue = queue.Queue(10)
    73     main()
    74 
    75 # 输出:
    76 # 开启线程: thread-1
    77 # 开启线程: thread-2
    78 # 开启线程: thread-3
    79 # thread-3 processing one
    80 # thread-2 processing two
    81 # thread-1 processing three
    82 # thread-3 processing four
    83 # thread-2 processing five
    84 # 退出线程: thread-1
    85 # 退出线程: thread-3
    86 # 退出线程: thread-2
    87 # 退出主线程
    View Code

    生产者消费者模型

    一个生产者一个消费者

     1 #!/usr/bin/python3
     2 
     3 import threading
     4 import time
     5 import queue
     6 
     7 class Consumer(threading.Thread):
     8     """docstring for Consumer"""
     9     def __init__(self, queue):
    10         super(Consumer, self).__init__()
    11         self.__queue = queue
    12 
    13     def run(self):
    14         while True:
    15             # pass
    16             msg = self.__queue.get()#若队列为空会自动阻塞
    17             # insinstance(msg, str) 若msg为str(字符串)类型返回True,否则返回False
    18             if isinstance(msg, str) and msg == 'quit':
    19                 break;
    20 
    21             time.sleep(1)                
    22             print('consumer receive msg: %s' % msg)
    23         print('consumer finished')
    24 
    25 def Producer():
    26     q = queue.Queue()
    27     c = Consumer(q)
    28     c.start()
    29 
    30     i = 0
    31     while i < 10:
    32         print('Producer put msg: %s' % i)
    33         q.put(str(i))
    34         time.sleep(0.5)
    35         i += 1
    36 
    37     q.put('quit')
    38     c.join()
    39 
    40 if __name__ == '__main__':
    41     Producer()
    View Code

    一个生产者多个消费者

     1 #!/usr/bin/python3
     2 
     3 import threading
     4 import time
     5 import queue
     6 
     7 class Consumer(threading.Thread):
     8     """docstring for Consumer"""
     9     def __init__(self, queue, num):
    10         super(Consumer, self).__init__()
    11         self.__queue = queue
    12         self.__num = num
    13 
    14     def run(self):
    15         while True:
    16             # pass
    17             msg = self.__queue.get()#若队列为空会自动阻塞
    18             # insinstance(msg, str) 若msg为str(字符串)类型返回True,否则返回False
    19             if isinstance(msg, str) and msg == 'quit':
    20                 break;
    21 
    22             print('consumer %d receive msg: %s' % (self.__num, msg))
    23         print('consumer %d finished' % self.__num)
    24 
    25 
    26 def build_consumer_pool(size, queue):
    27     consumers = []
    28 
    29     for i in range(size):
    30         c = Consumer(num = i, queue = queue)
    31         c.start()
    32         consumers.append(c)
    33 
    34     return consumers
    35 
    36 
    37 def Producer():
    38     q = queue.Queue()
    39     consumers = build_consumer_pool(3, q)
    40 
    41     i = 0
    42     while i < 12:
    43         print('Producer put msg: %s' % i)
    44         q.put(str(i))
    45         i += 1
    46 
    47     for c in consumers:
    48         q.put('quit')
    49 
    50     for c in consumers:
    51         c.join()
    52 
    53 if __name__ == '__main__':
    54     Producer()
    View Code

    使用 python 提供的线程池,multiprocessing 模块中的 Pool 实现一个生产者多个消费者

     1 #!/usr/bin/python3
     2 
     3 from multiprocessing.dummy import Pool as ThreadPool
     4 import time
     5 
     6 def consumer(msg):
     7     print('consumer receive msg: %s' % msg)
     8     return msg
     9 
    10 
    11 def producer():
    12     items = []
    13     pool = ThreadPool(4)
    14 
    15     i = 0
    16     while i < 12:
    17         print('producer put msg: %s' % i)
    18         items.append(str(i))
    19         i += 1
    20 
    21     results = pool.map(consumer, items)
    22     pool.close()
    23     pool.join()
    24     print(results)
    25 
    26 
    27 if __name__ == '__main__':
    28     producer()
    View Code
  • 相关阅读:
    js工具库
    细说log4j之log4j 1.x
    细说log4j之概述
    细说RESTful API安全之概述
    【转】javascript代码混淆和压缩
    细说RESTful API之入门介绍
    j2ee应用开发调试工具
    java定时器实现总结
    浏览器书签同步工具
    简单备份mysql数据库策略及实现方式
  • 原文地址:https://www.cnblogs.com/geloutingyu/p/8921304.html
Copyright © 2011-2022 走看看