8.5 线程
进程:开辟空间,加载数据,资源单位
线程:流水线,执行代码,执行单位
8.5.1 线程的概念
是操作系统能够进行运算调度的最小单位,线程包含在进程中,是进程中的执行单元,一个进程至少包含一条线程。
用户级线程
切换不需要内核支持而在用户程序中实现
内核级线程
又可以叫轻量级线程或者内核支持的线程,他的切换是由内核控制
8.5.2 线程vs进程
- 开销:开启多进程时开销大,而开启线程的开销非常小
- 速度:开启多进程的速度慢,二开启线程的速度快
- 通信:进程间相互独立,且数据不能直接共享,同一个进程的线程之间的数据可以共享
8.5.3 多线程应用场景
并发:一个CPU(实际是多个线程间切换),多进程并发,多线程并发
多进程并发:开启多个进程,每个进程的主线程执行任务;
多线程并发:一个进程中,里面有多个线程执行任务
【使用】如遇并发,常用多线程并发
【要知道使用线程的几个理由】
- 共享进程资源,多线程共享一个进程的地址空间(下文会有代码实例)
- 线程是轻型实体,线程的实体包括程序、数据和TCB(Thread Control Block),线程比进程更容易创建和撤销
- 可并发执行,同一进程中的多个线程可以并发执行
- 独立调度和分派的基本单位
8.5.4 线程的特性
开启
使用threading模块的Thread类
方式一类:
from threading import Thread
class MyThread(Thread,):
def run(self):
print(f'{self.name} is runing')
if __name__ == '__main__':
t = MyThread()
t.start()
方式二函数:
from threading import Thread
import time
def f(name):
time.sleep(2)
print(f"{name} say hello ")
if __name__ == '__main__':
t = Thread(target=f,args=('gailun',))
t.start()
print('this is main process')
开启速度
from multiprocessing import Process
from threading import Thread
def f_process():
print(f'this is a process ')
def f_thread():
print(f'this is a thread ')
if __name__ == '__main__':
p = Process(target=f_process)
t = Thread(target=f_thread)
p.start()
t.start()
print('this in main ')
# 输出
this is a thread
this in main
this is a process
【结论】线程的开启速度要比进程快
pid
from multiprocessing import Process
from threading import Thread
import os
def f():
print(f"this pid is {os.getpid()}")
if __name__ == '__main__':
t1 = Thread(target=f)
t2 = Thread(target=f)
p1 = Process(target=f)
p2 = Process(target=f)
t1.start()
t2.start()
print('main process pid is',os.getpid())
p1.start()
p2.start()
#输出
this pid is 8100
this pid is 8100
main process pid is 8100
this pid is 1716
this pid is 10920
【结论】线程使用的是其进程的pid
通信
from multiprocessing import Process
from threading import Thread
import os
def f():
global n
n = 0
if __name__ == '__main__':
n = 100
t = Thread(target=f)
t.start()
print(n) # 0,线程已经修改了主进程的n
# 输出
0
【结论】线程共享进程的空间
8.5.5 线程的方法
.is_alive() .isAlive() -- 判断线程是否存活
.getName() -- 获取线程名 ,可以在t = Thread(target=f,name = '***')这里修改线程名
.setName() -- 修改线程名,可以在线程中使用,可以在线程外使用
threading.current_thread() -- 返回当前线程变量
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程
threading.activeCount() threading.active_count() : 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
from threading import Thread
import threading
import time
def f(name):
# threading.current_thread().setName(name)
print(threading.current_thread().getName())
time.sleep(3)
print(f'{threading.current_thread().getName()} is done')
if __name__ == '__main__':
t = Thread(target=f,args=('gailun',),name = 'jiawen')
t.start()
print(t.is_alive())
print(t.isAlive())
t.setName('zhaoxin')
print(t.getName())
print(threading.enumerate())
print(threading.active_count())
print(threading.activeCount())
8.5.6 守护线程
在进程中,主进程与子进程间相互独立,父进程与非守护子进程的执行没有太大关联,主主进程等到所有非守护子进程执行完毕后回收资源,主进程结束。而在线程中,各线程共享统一进程空间,而主线程的结束必须要等到所有非守线程结束之后再能算是真正的结束。
主线程的而结束意味着进程的结束,进程资源的整体将被回收,而进程必须要保证所有非守护线程结束之后才能结束。
守护线程要等到所有非守护线程消失之后才能结束
这里需要厘清一个事情:
- 对主进程来说,运行完毕指的是主进程代码运行完毕;主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束。S
- 对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕。主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
守护线程的两种方式:
【方式一】.setDaemon(True)
from threading import Thread
import time
def f_daemon():
print('this is daemon thread')
time.sleep(1)
print('daemon thread is over')
def f_thread():
print('this is child_thread')
time.sleep(2)
print('child_thread is over')
if __name__ == '__main__':
t1 = Thread(target=f_daemon)
t2 = Thread(target=f_thread)
t1.setDaemon(True) # 设置守护进程必须要在它启动之前进行设置
t1.start()
t2.start()
print('this is main Thread')
【方式二】.daemon=True
from threading import Thread
import time
def f_daemon():
print('this is daemon thread')
time.sleep(3)
print('daemon thread is over')
def f_thread():
print('this is child_thread')
time.sleep(2)
print('child_thread is over')
if __name__ == '__main__':
t1 = Thread(target=f_daemon)
t2 = Thread(target=f_thread)
t1.daemon = True # 设置守护进程必须要在它启动之前进行设置
t1.start()
t2.start()
time.sleep(1)
print('this is main Thread')
8.5.7 互斥锁
与多进程的互斥锁一致,对线程在对同一数据资源进行竞争时,应该要保证数据的安全和逻辑畅通。
抢占资源实例
from threading import Thread
import time
x = 100
def f():
global x
temp = x
time.sleep(0.1)
temp -= 1
x = temp
if __name__ == '__main__':
for i in range(100):
t = Thread(target=f)
t.start()
time.sleep(1)
print(x)
# 输出结果99
【分析】for循环的时候,100个线程同时开启,因为f函数中time.sleep了0.1秒,这使得100的线程都会得到 x = 100,之后就是100个进程分别运行,过程变成100个线程对100减1之后重复对x赋值99
使用加锁配合join解决
from threading import Thread,Lock
import time
x = 100
def f(l):
global x
l.acquire()
temp = x
time.sleep(0.1)
temp -= 1
x = temp
l.release()
if __name__ == '__main__':
l = Lock()
ls = []
for i in range(100):
t = Thread(target=f,args=(l,))
ls.append(t)
t.start()
for i in ls:
i.join()
print(x)
# 输出 0
8.5.8 死锁与递归锁
死锁
所谓死锁就是两个或者两个以上的进程或者线程在执行过程中,因争夺资源而造成的一种相互等待的状态,如果不加干预,双方都无法继续向下执行,此时这种状态成为死锁状态,这些处于死锁状态的进程或线程叫做死锁进程或线程。
线程死锁实例
from threading import Thread
from threading import Lock
import time
lock_a = Lock()
lock_b = Lock()
class MyThread(Thread):
def run(self): # 必须要有run函数
self.f1()
self.f2()
def f1(self):
lock_a.acquire()
print(f' 33[0;36m {self.name} get a lock 33[0m')
lock_b.acquire()
print(f' 33[0;35m {self.name} get b lock 33[0m')
lock_b.release()
lock_a.release()
def f2(self):
lock_b.acquire()
print(f' 33[0;35m {self.name} get b lock 33[0m')
time.sleep(0.1) #创造死锁条件
lock_a.acquire()
print(f' 33[0;36m {self.name} get a lock 33[0m')
lock_a.release()
lock_b.release()
if __name__ == '__main__':
for i in range(3):
t = MyThread(name=f"num {i} thread")
t.start()
print('this is main process')
如上图结果所以num 0号线程 和 num 1号线程互相等待获取对方的锁,只能夯在这里。
递归锁
要解决这类问题,可以使用递归锁,python为了支持同一进程中,多次请求同一资源,提供了可重入锁Rlock。
在Rlock内部,有一个Lock和一个counter变量,counter记录了acquire的次数,从而使资源可以被重复acquire,直到一个线程所有的acquire都被release之后,其他线程才能获得资源
递归锁实例:
from threading import Thread
from threading import RLock
import time
lock_a = lock_b = RLock() # 改的是这里哟,必须使用这种连等方式进行
class MyThread(Thread):
def run(self): # 必须要有run函数
self.f1()
self.f2()
def f1(self):
lock_a.acquire()
print(f' 33[0;36m {self.name} get a lock 33[0m')
lock_b.acquire()
print(f' 33[0;35m {self.name} get b lock 33[0m')
lock_b.release()
lock_a.release()
def f2(self):
lock_b.acquire()
print(f' 33[0;35m {self.name} get b lock 33[0m')
time.sleep(0.1)
lock_a.acquire()
print(f' 33[0;36m {self.name} get a lock 33[0m')
lock_a.release()
lock_b.release()
if __name__ == '__main__':
for i in range(3):
t = MyThread(name=f"num {i} thread")
t.start()
print('this is main process')
8.5.9 信号量Semaphore
Semaphore管理一个内置的计数器,设置一个初始值,每当调用acquire()时内置计数器-1;调用release() 时内置计数器+1;计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。适用于线程和进程
from threading import Thread
from threading import Semaphore
from threading import current_thread
import time,random
sem = Semaphore(5) #设置最大连接数为5
def fight():
sem.acquire()
print(f"{current_thread().getName()} get one seat")
time.sleep(random.randint(1,3))
sem.release()
if __name__ == '__main__':
for i in range(20):
t = Thread(target=fight)
t.start()
# 输出,边按enter键,可以看到大约看到线程的替换
Thread-1 get one seat
Thread-2 get one seat
Thread-3 get one seat
Thread-4 get one seat
Thread-5 get one seat
Thread-6 get one seat
Thread-7 get one seat
Thread-8 get one seat
Thread-9 get one seat
Thread-10 get one seat
Thread-11 get one seat
Thread-12 get one seat
Thread-13 get one seat
Thread-14 get one seat
Thread-15 get one seat
Thread-16 get one seat
Thread-17 get one seat
Thread-18 get one seat
Thread-19 get one seat
Thread-20 get one seat