相关概念
线程:是轻量级(轻型)进程,比进程开销小得多
进程和线程的关系:
线程是进程的一部分,线程必须依赖于进程存在
一个进程之内至少有一条线程,默认存在的线程叫做主线程,也可以在一个进程中开启多个子线程来完成更多的任务
进程是计算机中最小的资源分配单位
线程是计算机中能被CPU调度的最小单位,是计算机中最小的执行单位
进程负责分配资源和资源隔离,线程负责真正的执行代码
正常进程与线程之间的区别:
进程: 数据隔离,开销大,能利用多核
线程: 数据共享,开销小,能利用多核
python中进程与线程的区别:
进程: 数据隔离,开销大,能利用多核
线程: 数据共享,开销小,不能利用多核(GIL全局解释器锁)
全局解释器锁:
锁的是线程,让同一个进程中的多个线程同一时间只能由一个被CPU调度
threading模块
import os import time from threading import Thread from multiprocessing import Process # multiprocessing完全仿照了threading模块来进行封装 def func(i): # print('start',i) i+=1 # print(i,os.getpid()) if __name__ == '__main__': start = time.time() for i in range(20): Thread(target=func,args=(i,)).start() print(time.time() - start) start = time.time() for i in range(20): Process(target=func,args=(i,)).start() print(time.time() - start)
数据共享
from threading import Thread n = 100 def func(): global n n -= 1 t_l = [] for i in range(100): t = Thread(target=func) t.start() t_l.append(t) for t in t_l: t.join() print(n) #此时打印n为0,主线程中的值被改变
使用函数的方法启动线程
import time from threading import Thread,currentThread def func(): time.sleep(1) print('in func:',currentThread().name,currentThread().ident) #在函数内部获取当前前程名和线程id t = Thread(target=func) t.start() print(t.name) # 在函数外部获取线程名和线程id print(t.ident)
使用类的方法启动线程
import time from threading import Thread class MyThread(Thread): def __init__(self,i): self.i = i super().__init__() def run(self): time.sleep(1) print('in MyThread:',self.i,self.name,self.ident) #self.name 线程名 self.ident 线程id for i in range(10): t = MyThread(i) t.start()
active_count和enumerate
import time from threading import Thread,active_count,enumerate def func(): time.sleep(1) print('in func') t = Thread(target=func) t.start() print(enumerate()) # 当前所有活着的线程对象组成的列表 内存地址 print(active_count()) #统计当前所有或者的线程对象的格式 相当于len(enumerate)
守护线程
守护进程 会等到主进程的代码结束而结束
守护线程 会等待主线程的结束而结束,如果主线程还开启了其他子线程,那么守护线程会守护到最后
线程是数据进程的,如果进程结束了,线程也就结束了
import time from threading import Thread def func(): while True: time.sleep(1) print('in func') def son(): print('son start') time.sleep(8) print('son end') t1 = Thread(target=func) t1.setDaemon(True) # 设置守护状态 t1.start() t2 = Thread(target=son) t2.start() time.sleep(5) # 守护进程会等到子进程8秒之后运行结束再结束
数据安全问题
列别/字典 自带的方法基本都是数据安全的,但是对其中的元素进行 += -= *= /= 都是数据不安全的
由于append等方法是一个完整的指令过程,执行了就相当于添加,不执行就是还没添加
而+=这种方法,是先执行+,再对结果进行赋值,如果执行完+之后,时间片就切换到另一个线程了,那么就会出现数据不安全的情况
互斥锁
通过加锁的方式,保证+=执行完毕后,再释放锁让其他线程能够执行
from threading import Lock,Thread import time def work(lock): global n with lock: temp = n time.sleep(0.1) n = temp - 1 if __name__ == '__main__': n = 100 l = [] lock = Lock() #互斥锁 for i in range(100): p = Thread(target=work,args=(lock,)) p.start() l.append(p) for p in l: p.join() print(n)
死锁现象
科学家吃面问题(对叉子和面分别上锁,同时规定一个人必须同时拿到叉子和面才可以吃到面)
import time from threading import Lock,Thread noodle_lock = Lock() # 对面加锁 fork_lock = Lock() # 对叉子加锁 fork = 10 noodle = 20 def eat1(name): noodle_lock.acquire() print('%s拿到面了'%name) fork_lock.acquire() print('%s拿到叉子了' % name) noodel =fork + noodle time.sleep(0.1) print('%s吃到面了'%name) fork_lock.release() print('%s放下叉子了' % name) noodle_lock.release() print('%s放下面了'%name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到面了'%name) noodel =fork + noodle time.sleep(0.1) print('%s吃到面了'%name) noodle_lock.release() print('%s放下面了' % name) fork_lock.release() print('%s放下叉子了' % name) Thread(target=eat1,args=('yuan',)).start() Thread(target=eat2,args=('alex',)).start() Thread(target=eat1,args=('wusir',)).start() Thread(target=eat2,args=('taibai',)).start() # 会出现一个人拿到叉子,一个人拿到面的情况,出现死锁现象
递归锁--快速解决死锁现象
import time from threading import RLock,Thread fork_lock = noodle_lock = RLock() #将面和叉子统一加一个递归锁 fork = 10 noodle = 20 def eat1(name): noodle_lock.acquire() print('%s拿到面了'%name) fork_lock.acquire() print('%s拿到叉子了' % name) noodel =fork + noodle time.sleep(0.1) print('%s吃到面了'%name) fork_lock.release() print('%s放下叉子了' % name) noodle_lock.release() print('%s放下面了'%name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到面了'%name) noodel =fork + noodle time.sleep(0.1) print('%s吃到面了'%name) noodle_lock.release() print('%s放下面了' % name) fork_lock.release() print('%s放下叉子了' % name) Thread(target=eat1,args=('yuan',)).start() Thread(target=eat2,args=('alex',)).start() Thread(target=eat1,args=('wusir',)).start() Thread(target=eat2,args=('taibai',)).start()
用互斥锁将两个资源当成一个资源锁起来,效率会更高,在实际开发中更推荐用互斥锁
import time from threading import Lock,Thread lock = Lock() # 只用一个锁来锁资源 fork = 10 noodle = 20 def eat1(name): lock.acquire() print('%s拿到面了'%name) print('%s拿到叉子了' % name) noodel =fork + noodle time.sleep(0.1) print('%s吃到面了'%name) print('%s放下叉子了' % name) print('%s放下面了'%name) lock.release() def eat2(name): lock.acquire() print('%s拿到叉子了' % name) print('%s拿到面了'%name) noodel =fork + noodle time.sleep(0.1) print('%s吃到面了'%name) print('%s放下面了' % name) print('%s放下叉子了' % name) lock.release() Thread(target=eat1,args=('yuan',)).start() Thread(target=eat2,args=('alex',)).start() Thread(target=eat1,args=('wusir',)).start() Thread(target=eat2,args=('taibai',)).start()
总结:
死锁现象是由于多个锁对多个变量管理不当造成的
互斥锁在一个进程内只能被acquire一次
递归锁在一个进程内可以被acquire多次,但是release个数必须和acquire个数相同
线程队列
队列 先进先出 FIFO
栈 后进先出 LIFO
import queue lifoq = queue.LifoQueue() #栈 lifoq.put(1) lifoq.put(2) lifoq.put(3) print(lifoq.get()) print(lifoq.get()) print(lifoq.get()) 结果: 3 2 1
优先级队列
import queue pq = queue.PriorityQueue() pq.put((2,'wusir')) pq.put((1,'yuan')) pq.put((1,'alex')) pq.put((3,'taibai')) print(pq.get()) print(pq.get()) print(pq.get()) print(pq.get()) 结果: # 按照优先级,如果第一个相同,会根据第二个元素的ascii码的顺序排序 (1, 'alex') (1, 'yuan') (2, 'wusir') (3, 'taibai')
线程池
oncurrent.futures 模块提供了高度封装的异步调用接口 ThreadPoolExecutor 线程池,提供异步调用 ProcessPoolExecutor 进程池,提供异步调用 方法: submit 异步提交任务 map 代替for循环中submit的操作 shutdown 相当于进程池中close()+join()操作 result 取得结果 add_done_callback 回调函数 done 判断某一个线程是否完成 cancel 取消某个任务
import time from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(i): time.sleep(1) print('in son thread',i) if __name__ == '__main__': tp = ThreadPoolExecutor(4) # 开启4个线程池 # tp.map(func,range(20)) # 可以使用map,也可以使用for循环+submit提交任务 for i in range(20): tp.submit(func,i) tp.shutdown() print('所有的任务都执行完了')
获取返回值
import time from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(i): time.sleep(1) return '*'* i if __name__ == '__main__': tp = ThreadPoolExecutor(4) #开启4个线程池 # 使用map提交任务,获取返回值 # ret_l = tp.map(func,range(20)) # for ret in ret_l: # print(ret) #使用submit提交任务,获取返回值 ret_l = [] for i in range(20): ret = tp.submit(func,i) ret_l.append(ret) for ret in ret_l: print(ret.result())
回调函数
import time import random from concurrent.futures import ThreadPoolExecutor def back(ret): print(ret.result()) def son_func(i): time.sleep(random.random()) return i ** i tp = ThreadPoolExecutor(4) for i in range(20): ret = tp.submit(son_func,i) ret.add_done_callback(back)