一 线程基础
1. 线程与进程的区别:
只有cpython解释器中含有GIL;因为线程的切换速度比进程块,又因为线程存在GIL,不存在多线程并行,所以计算密集采用多进程处理,而i/o密集采用多线程处理
线程无需if __name__ == '__main__':语句
- 进程是资源分配的最小单位,线程是程序执行的最小单位(资源调度的最小单位)
- 进程有自己的独立地址空间,建立数据表来维护代码段、堆栈段和数据段。而线程是共享进程中的数据的,使用相同的地址空间,因此CPU切换速度线程比进程块的多
- 多线程共享同一进程下的全局变量、静态变量等数据。
- 多线程程序只要有一个线程死掉,整个进程也死掉了,而一个进程死掉并不会对另外一个进程造成影响,因为进程有自己独立的地址空间。
- 因为GIL(同一时间,只允许一个线程占用cpu,只有一个线程在运行)锁的存在,线程没有真正的并行,而多进程能够在多核下实现并行
- 守护进程和守护线程的结束条件不一致
二 Thread模块
1. 模块介绍
multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性。
Thread实例对象的方法 # isAlive(): 返回线程是否活动的。 # getName(): 返回线程名。 # setName(): 设置线程名。 threading模块提供的一些方法: # threading.currentThread(): 返回当前的线程对象。 # threading.current_thread():返回当前的线程对象,和上述方法一样。 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。 # threading.active_count():返回正在运行的线程数量,和上述方法一样。
2. 互斥锁Lock、递归锁RLock
因为GIL的存在,同一时间只能有一个线程运行,而多线程共享进程内的全局变量,修改数据时会混乱,GIL锁的是线程
from threading import Thread import time,os def func(): global num time.sleep(0.01) num -=1 num = 100 for i in range(100): th = Thread(target=func,) th.start() print(num)
42 #数据混乱
2.1线程锁的实质
进程互斥锁和线程互斥锁都是锁定的语句,加锁部分与未加锁部分还是并发的。
from threading import Thread,Lock import time,os def func(l): l.acquire() time.sleep(5) print('func') l.release() def func2(l): l.acquire() print('2222222222') l.release() l = Lock() th = Thread(target=func,args = (l,)) th2 = Thread(target=func2,args=(l,)) th.start() time.sleep(1) th2.start()
2.2 死锁与递归锁RLock
死锁:因争夺资源而造成的一种互相等待的现象
递归锁:相同的锁只需一把钥匙,互容;互斥锁,相同的锁多少人来开,需要多少把钥匙,不互容
from threading import Lock, RLock mutexA = Lock() mutexA = RLock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
死锁实例:
from threading import Thread,Lock,RLock import time def man(l_door,l_paper): l_door.acquire() print('man_door') time.sleep(1) l_paper.acquire() print('man_paper') time.sleep(1) l_paper.release() l_door.release() def woman(l_door,l_paper): l_paper.acquire() time.sleep(1) print('woman_paper') l_door.acquire() time.sleep(1) print('woman_door') l_door.release() l_paper.release() # l_door =l_paper = Lock() #同一把锁,不互容 # l_door = Lock() # l_paper = Lock() # l_door = RLock() # l_paper = RLock() l_door = l_paper = RLock() #同一把锁,互容 th_man = Thread(target=man,args=(l_door,l_paper)) th_woman = Thread(target=woman,args=(l_door,l_paper)) th_man.start() th_woman.start()
man_door
man_paper
woman_paper
woman_door
3. Semaphore信号量
参考进程信号量
4. Event事件
event.isSet():返回event的状态值; event.wait():如果 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear():恢复event的状态值为False。
5. Condition 条件
Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。
线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,
其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
条件实例,条件的本质就是人为的控制锁的开启时刻
from threading import Condition,Thread
import time # 条件被创建之初,wait()处于堵塞状态 # notify(int) 通知,允许多把锁打开 def func(con,i): con.acquire() con.wait() # 所有线程都阻塞在这里 print('在第%s个循环里'%i) con.release() #并不是还钥匙,而是把钥匙扔掉 if __name__ == '__main__': con = Condition() for i in range(10): t = Thread(target=func,args=(con,i)) t.start() while 1: #主线程会和子线程抢夺钥匙,易出错 num = int(input('>>>')) con.acquire() con.notify(num)# 允许几个线程可以执行了 con.release()
time.sleep(2)
6. Timer定时器
定时器,指定n秒后执行某个操作
from threading import Timer def func(): print("开始计时收费了") t = Timer(1, func) t.start() # after 1 seconds, "hello, world" will be printed
7. 守护线程
无论是进程还是线程,都遵循:守护xx会等待主xx运行完毕后被销毁。需要强调的是:运行完毕并非终止运行
- 对主进程来说,运行完毕指的是主进程代码运行完毕
- 对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕
from threading import Thread from multiprocessing import Process import time def func(): time.sleep(2) print('这里是子线程,就是守护线程,就是这么diao,还活着呢!') def func1(): time.sleep(4) print('这里是子线程,并不是守护线程') if __name__ == '__main__': # t = Thread(target=func) t = Process(target=func) # t1 = Thread(target=func1) t1 = Process(target=func1) t.daemon=True t.start() t1.start() print('这里是父线程')
进程结果:
这里是父线程
这里是子线程,并不是守护线程
线程结果:
这里是父线程
这里是子线程,就是守护线程,就是这么diao,还活着呢!
这里是子线程,并不是守护线程
三 queue
进程中一般使用from multiprocessing import Queue,该队列能实现不同进程的数据共享;而import queue主要用在线程中,用法与Queue相同
1. Queue()
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get())
2. LifoQueue()
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get())
3. PriorityQueue()
存储数据时可设置优先级的队列,数据类型不一致时,按照ascii码排序,越小等级越高,等级相同时遵循先进先出的原则。
import queue q=queue.PriorityQueue() #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get())
四 线程池
1. concurrent.futures模块介绍
#1 介绍 concurrent.futures模块提供了高度封装的异步调用接口 ThreadPoolExecutor:线程池,提供异步调用 ProcessPoolExecutor: 进程池,提供异步调用 Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法 #submit(fn, *args, **kwargs) 异步提交任务 #map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作,返回的结果是生成器,用next()取值 #shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前 #result(timeout=None) 取得结果,相当于进程中的get(),两者都会使异步变同步 #add_done_callback(fn) 回调函数
2. 线程池效率对比
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import time def func(num): sum = 0 for i in range(num): sum +=i**2 # print(sum) if __name__ == '__main__': # pool = ThreadPoolExecutor(20) # start = time.time() # for i in range(10000): # pool.submit(func,i) 等于pool.apply()或pool.apply_asycn() # pool.shutdown() 等于Pool中的pool.close()和pool.join() # print(time.time()-start) # pool = ProcessPoolExecutor(5) # start = time.time() # for i in range(10000): # pool.submit(func, i) # pool.shutdown() # print(time.time() - start) pool = Pool(5) start = time.time() for i in range(10000): pool.apply_async(func, (i,)) pool.close() pool.join() print(time.time() - start)
3. 回调函数
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import os def func(num): sum = 0 for i in range(num): sum += i ** 2 return sum def call_back(var): print(os.getpid()) if __name__ == '__main__': print(os.getpid()) pool = ThreadPoolExecutor(2) for i in range(10): pool.submit(func, i).add_done_callback(call_back) pool.shutdown()
线程中的回调函数是由子线程调用的
from threading import current_thread from concurrent.futures import ThreadPoolExecutor import time def func(): time.sleep(0.5) print('son:',current_thread()) def call_back(var): print('call_back:',current_thread()) if __name__ == '__main__': th = ThreadPoolExecutor(2) for i in range(4): th.submit(func,).add_done_callback(call_back) th.shutdown() print('father',current_thread())