★概念
进程是由若干线程组成的,一个进程至少有一个线程;
线程是CPU调度的最小单位;
线程之间资源共享。
★全局解释器锁(GIL)
cpython的特性
同一时刻只能有一个线程访问CPU;
锁的是线程
在多线程环境中,Python 虚拟机按以下方式执行:
1,设置GIL;
2,切换到一个线程去执行;
3,运行指定数量字节码指令或线程主动让出控制;
4,把线程设置为睡眠状态;
5,解锁 GIL;
6,再次重复以上所有步骤。
★threading模块
◇线程的创建-threading.Thread
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 import time 2 import os 3 from threading import Thread 4 5 """多线程并发""" 6 7 8 def func(n): 9 time.sleep(1) 10 print('in func%s' % n) 11 print('子进程PID:%s' % os.getpid()) 12 13 14 for i in range(10): 15 t = Thread(target=func, args=(i+1,)) # 并发10个线程 16 t.start() 17 18 print('主进程PID:%s' % os.getpid())
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 import time 2 from threading import Thread 3 4 5 class MyThread(Thread): # 继承线程类 6 def __init__(self, arg): # 初始化MyThread的参数 7 super().__init__() 8 self.arg = arg 9 10 def run(self): # 重写run方法 11 time.sleep(1) 12 print('in func%s' % self.arg) 13 14 15 for i in range(10): 16 t = MyThread(i+1) 17 t.start()
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 import os 2 from threading import Thread 3 4 5 def func(a, b): 6 global g # 声明全局变量,主线程和子线程共享g 7 c = a + b 8 g = 0 # 这个g同时对主线程的g有作用 9 print('in func的结果:%s' % c) 10 11 12 g = 100 13 t_lis = [] 14 for i in range(10): 15 t = Thread(target=func, args=(i+1, 5)) 16 t.start() 17 t_lis.append(t) 18 19 for t in t_lis: 20 t.join() 21 22 print('主线程g的值:', g) # g的值已被子线程修改 23 print('主进程PID:%s' % os.getpid())
守护线程
守护线程会在主线程代码结束之后等待【其他子线程】的结束而结束
与进程相同:
△主线程会等待子线程结束才结束
△主进程会等待子进程结束才结束
与进程不同:
▽守护线程在【其他线程结束】才结束 -- >这时候主线程已经结束
▽守护进程在【代码结束】而结束 --> 这时候主进程还没结束
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 from threading import Thread 2 import time 3 4 5 def func1(): 6 while True: 7 print('**********') 8 time.sleep(1) 9 10 11 def func2(): 12 time.sleep(10) 13 print('in func2') 14 15 16 t = Thread(target=func1,) 17 t.daemon = True # 设置守护线程, 会一直等所有子线程执行结束,守护线程才结束 18 t.start() 19 t1 = Thread(target=func2,) 20 t1.start() 21 # t1.join() 22 print('in main thread')
◇线程的锁-threading.Lock
方法
acquire() 上锁
release() 解锁
Lock() 互斥锁
from threading import Lock lock = Lock() lock.acquire() # lock.release() lock.acquire() print('ok')
互斥锁(英语:Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。临界区域指的是一块对公共资源进行访问的代码,并非一种机制或是算法。一个程序、进程、线程可以拥有多个临界区域,但是并不一定会应用互斥锁。
有两个互斥锁时,会有可能产生死锁问题
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 from threading import Thread, Lock 2 import time 3 4 """互斥锁""" 5 6 7 def func(lock): 8 lock.acquire() # 一个线程执行完了,下一个线程才能进去执行 9 global n 10 temp = n 11 time.sleep(0.2) 12 n = temp - 1 13 lock.release() 14 15 16 n = 10 17 lock = Lock() 18 t_lst = [] 19 for i in range(10): 20 t = Thread(target=func, args=(lock,)) 21 t_lst.append(t) 22 t.start() 23 24 for t in t_lst: 25 t.join() 26 print(n)
ps:如果加锁,1,第一个线程执行,遇到time.sleep阻塞,时间片不切换,等待阻塞完成,执行完这个线程,2,(这时候第一个执行的线程执行完)CPU执行下一个线程,3,再下一个线程重复以上操作
ps:如果没有锁,1,第一个线程执行,遇到time.sleep阻塞;2,时间片切换,CPU执行下一个线程(这时候第一个执行的线程还没有执行完),3,再下一个线程重复以上操作 -->大家共享阻塞的时间(相当于同时等待)
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 from threading import Thread, Lock 2 import time 3 4 5 def eat1(name): 6 noodle_lock.acquire() 7 print('%s拿到了面条' % name) 8 fork_lock.acquire() 9 print('%s拿到了叉子' % name) 10 print('%s 吃面了' % name) 11 noodle_lock.release() 12 fork_lock.release() 13 14 15 def eat2(name): 16 fork_lock.acquire() 17 print('%s拿到了叉子' % name) 18 time.sleep(0.1) # 先阻塞一下(迫使时间片切换),让另一个线程去拿了面条的锁,然后就会产生死锁的问题 19 noodle_lock.acquire() # 其他线程拿了锁没有解锁,导致我这里无法拿到而一直等待, 20 print('%s拿到了面条' % name) 21 print('%s 吃面了' % name) 22 noodle_lock.release() 23 fork_lock.release() 24 25 26 noodle_lock = Lock() 27 fork_lock = Lock() 28 Thread(target=eat1, args=('alex', )).start() 29 Thread(target=eat2, args=('jin', )).start() 30 Thread(target=eat1, args=('boss', )).start() 31 Thread(target=eat2, args=('egg', )).start()
RLock() 递归锁
1,在一个线程能有多个锁
2,为了解决死锁问题
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 from threading import RLock 2 3 rlock = RLock() 4 5 rlock.acquire() 6 rlock.acquire() 7 rlock.acquire() 8 print('hello')
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 from threading import Thread, RLock 2 import time 3 4 5 def eat1(name): 6 noodle_lock.acquire() 7 print('%s拿到了面条' % name) 8 fork_lock.acquire() 9 print('%s拿到了叉子' % name) 10 print('%s 吃面了' % name) 11 noodle_lock.release() 12 fork_lock.release() 13 14 15 def eat2(name): 16 fork_lock.acquire() # 上锁了 17 print('%s拿到了叉子' % name) 18 time.sleep(2) # 阻塞,就算切换到其他线程也无法执行,因为是同一把锁,另外的线程外面也有锁 19 noodle_lock.acquire() 20 print('%s拿到了面条' % name) 21 print('%s 吃面了' % name) 22 noodle_lock.release() 23 fork_lock.release() # 已经完全解锁了,其他线程能去上锁了然后去执行 24 25 26 noodle_lock = fork_lock = RLock() 27 Thread(target=eat1, args=('alex', )).start() 28 Thread(target=eat2, args=('jin', )).start() 29 Thread(target=eat1, args=('boss', )).start() 30 Thread(target=eat2, args=('egg', )).start()
◇信号量-threading.Semaphore
方法
acquire() 上锁
release() 解锁
同一时间只有n个线程可以执行上锁代码
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 from threading import Semaphore, Thread 2 import time 3 4 5 def func(n): 6 sem.acquire() 7 time.sleep(1) 8 print(n) 9 sem.release() 10 11 12 sem = Semaphore(3) # 设置一个信号量, 只能有3个线程可以并发执行 13 for i in range(10): 14 t = Thread(target=func, args=(i,)) # 在线程里sem不用传进来,子线程共享主线程的变量 15 t.start()
◇事件-threading.Event
状态
False (默认) wait()阻塞
True wait()非阻塞
修改状态
set 设置为True
clear 设置为False
wait方法
wait(1) 状态为False的时候,只等待1s就往下执行
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 from threading import Event, Thread 2 import time 3 import random 4 5 6 def connect_db(): 7 count = 0 8 while count < 3: 9 e.wait(1) # 只阻塞1S,往下执行 10 if e.is_set(): 11 print('连接数据库成功!!!') 12 break 13 else: 14 count += 1 15 print('第%s次连接失败' % count) 16 else: 17 raise TimeoutError('连接超时!!!') 18 19 20 def check_web(): 21 time.sleep(random.randint(0, 3)) # 模拟网络延迟 22 e.set() 23 24 25 e = Event() 26 t1 = Thread(target=connect_db) # 直到事件状态被设置为True,这个线程才能连接上数据库 27 t2 = Thread(target=check_web) # 这个线程负责修改事件状态 28 t1.start() 29 t2.start()
◇条件-threading.Condition
使得线程等待,只有满足某条件时,才释放n个线程
方法
acquire()
release()
notify() 造钥匙(一次性钥匙)
wait() 等钥匙
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 from threading import Condition, Thread 2 3 4 def func(i): 5 con.acquire() 6 con.wait() # 等造钥匙出来 7 print('在%s循环里' % i) 8 con.release() 9 10 11 con = Condition() 12 for i in range(10): 13 t = Thread(target=func, args=(i, )) 14 t.start() 15 while True: 16 num = int(input('>>>')) 17 con.acquire() 18 con.notify(num) # 造钥匙(一次性钥匙) 19 con.release()
◇定时器-threading.Timer
指定n秒后执行某个线程
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 from threading import Timer 2 import time 3 4 5 def func(): 6 print('时间同步') 7 8 9 while True: 10 Timer(2, func).start() # 实例化一个定时器 11 time.sleep(2) # 等待2S,再重新进入循环
★队列【数据安全】-queue
◇先进先出-queue.Queue
方法
- put()
- get()
- put_nowait() 满值不阻塞,但报错
- get_nowait() 没值不阻塞,但报错
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 q = queue.Queue() 2 q.put('a') 3 q.put('b') 4 q.put('c') 5 print(q.get()) 6 print('ttt:%s' % q.get_nowait()) 7 print('ttt:%s' % q.get_nowait()) 8 print('ttt:%s' % q.get_nowait()) # 队列已空,不阻塞,但报错
◇先进后出-queue.LifoQueue(栈)
方法
- put()
- get()
- put_nowait() 满值不阻塞,但报错
- get_nowait() 没值不阻塞,但报错
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 q = queue.LifoQueue() 2 q.put('a') 3 q.put('b') 4 q.put('c') 5 print(q.get()) 6 print('ttt:%s' % q.get_nowait()) 7 print('ttt:%s' % q.get_nowait()) 8 print('ttt:%s' % q.get_nowait()) # 队列已空,不阻塞,但报错
◇优先级队列-queue.PriorityQueue
1, 数字越低,优先级越高;
2, 数字相同,ASCII越小,优先级越高
3, 返回的是一个元祖
方法
- put()
- get()
- put_nowait() 满值不阻塞,但报错
- get_nowait() 没值不阻塞,但报错
from queue import PriorityQueue q = PriorityQueue() q.put(30) q.put(40) q.put(2) q.put(2) q.put(3) q.put(1) print(q.get()) # 数字越小,优先级越高 print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get_nowait())
★标准模块-concurrent.futures
提供了高度封装的异步调用接口
子模块
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
方法
submit 异步提交任务
map for循环submit操作
shutdown 相当于进程池pool.close() + pool.join()
result 取得结果
add_done_callbak 回调函数
""" map: 使用线程池快速实现并发 """ import time from concurrent.futures import ThreadPoolExecutor def fun(n): print('in fun %s' % n) time.sleep(2) if __name__ == '__main__': thread_pool = ThreadPoolExecutor(max_workers=5) # 实例化一个线程池,可以同时并发5个线程 thread_pool.map(fun, range(20))
""" 获取线程池的返回值 """ import time from concurrent.futures import ThreadPoolExecutor def fun(n): print('in fun %s' % n) time.sleep(2) return n * n if __name__ == '__main__': thread_pool = ThreadPoolExecutor(max_workers=5) # 实例化一个线程池,可以同时并发5个线程 ret_list = [] for n in range(20): ret = thread_pool.submit(fun, n) ret_list.append(ret) thread_pool.shutdown() print('主线程,打印一下所有线程的返回值', [ret.result() for ret in ret_list])
""" 回调函数的例子 """ import time from concurrent.futures import ThreadPoolExecutor def fun(n): print('in fun %s' % n) time.sleep(0.3) return n * n def fun_callback(m): """ 回调函数 :param m: 在此示例中为:fun函数的 `return 结果对象` :return: """ print('回调函数参数:', m.result()) if __name__ == '__main__': thread_pool = ThreadPoolExecutor(max_workers=3) # 实例化一个线程池,可以同时并发5个线程 for n in range(20): thread_pool.submit(fun, n).add_done_callback(fun_callback)