一、线程理论
什么是线程
线程是cpu调度的最小单位,线程更小,更轻量级。
打个比方,计算机如果是工厂,那个工厂里有很多车间,这个是进程,在每个车间中又有很多工作的人,这些人就是线程,真正干活的是这些线程。
什么是进程什么是线程(常见面试题)
进程是资源分配的最小单位,线程是CPU调度的最小单位。每一个进程中至少有一个线程
二、开启线程的两种方式
与开启进程完全相同,只是导入的类不同
# 第一种
from threading import Thread
import time
def task():
print('开始')
time.sleep(1)
print('结束')
if __name__ == '__main__':
t=Thread(target=task,) # 实例化得到一个对象
t.start() # 对象.start()启动线程
print('主')
# 第二种,通过类继承的方式
from threading import Thread
import time
class MyThread(Thread):
def run(self):
print('开始')
time.sleep(1)
print('结束')
if __name__ == '__main__':
t=MyThread()
t.start()
print('主')
三、线程对象的join方法
线程join方法也是等待子线程执行结束,进程的join方法是等待子进程执行结束
from threading import Thread
import time
def task(n):
print('开始')
time.sleep(n)
print('结束')
if __name__ == '__main__':
t=Thread(target=task,args=(2,))
t.start()
t1=Thread(target=task,args=(3,))
t1.start()
t.join() # 等待子进程执行结束
t1.join()
print('主')
四、同一进程下的多个线程数据共享
不同线程数据交互的两种方式
-共享变量:不同线程修改同一份数据要加锁(互斥锁)
-通过queue:不需要考虑数据安全问题(线程安全了)
线程与线程之间是不存在物理隔离的,因为多个线程在同一个进程下,而不同进程之间是存在物理隔离的。
import time
from threading import Thread
money = 99
def task(n):
global money
money=n
print('开始')
print('结束')
if __name__ == '__main__':
t = Thread(target=task, args=(2,))
t.start()
t1 = Thread(target=task, args=(66,))
t1.start()
t.join()
t1.join()
print(money)
print('主')
五、线程对象及其他方法
必须知道的方法:
t.name,t.getName():线程名字
current_thread().name:线程名字,需要导入current_thread类
active_count():需要先导入类cative_count,不需要参数,查看当前进程下有几个线程存活
t.is_alive():当前线程是否存活
t.ident:是一个属性,不要加括号。是python人为加上的线程id号,可以认为是线程id号,但是在操作系统里线程是没有id号的。
from threading import Thread, current_thread,active_count
import time
def task():
print('开始')
print(current_thread().name) # 线程名字
time.sleep(1)
print('结束')
if __name__ == '__main__':
t1 = Thread(target=task,name='线程一')
t2 = Thread(target=task) # 若未命名,则使用默认的命名
t1.start()
t2.start()
print(active_count()) # 打印出3 ,开了两个线程,还有一个主线程
------------------------
开始
线程一
开始
Thread-1
3
结束
结束
线程在操作系统里只有名字,没有编号,ident方法人为的为这些线程编了一个编号,且在同一进程下,不同线程的进程号是一样的。
from threading import Thread, current_thread,active_count
import time
import os
def task(n):
print('开始')
print(current_thread().name) # 线程名字
print(os.getpid())
time.sleep(n)
print('结束')
if __name__ == '__main__':
t1 = Thread(target=task,name='yang',args=(2,))
t2 = Thread(target=task,args=(8,))
t1.start()
t2.start()
t1.join()
print('---------',t1.is_alive())
print('---------',t2.is_alive())
# 线程本来是没有id号的,该方法人为的创建了编号当作线程id号
print('*********',t1.ident)
print('*********',t2.ident)
print(os.getpid())
print(active_count()) # 打印出3 ,开了两个线程,还有一个主线程
---------------------------------
开始
开始
Thread-1
3619
yang
3619
结束
--------- False
--------- True
********* 123145450909696
********* 123145456164864
3619
2
结束
六、守护线程
守护线程存在的意义是,在一个进程中,把线程中的所有线程分为守护线程和非守护线程,当非守护线程都运行结束,那么就表示主线程结束,守护线程要跟着结束。而不是所有线程结束主线程才结束。
守护线程与守护进程的区别:
1.守护进程是在主进程代码运行结束的时候立即挂掉,守护进程本身是一个子进程。即主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束。
2.守护线程会在该进程内所有非守护线程全部运行完毕之后,才挂掉。并不是主线程运行完毕后守护线程挂掉。
# 守护线程
from threading import Thread, current_thread,active_count
import time
import os
def task(n):
print('开始')
time.sleep(n)
# print('-----',active_count())
print('结束')
if __name__ == '__main__':
t1 = Thread(target=task,name='yang',args=(10,))
# t1.daemon = True
t1.setDaemon(True)
t1.start()
t2 = Thread(target=task,name='yang',args=(4,))
t2.start()
print('主')
七、线程互斥锁
线程互斥锁与进程互斥锁虽然类名都叫Lock,但两者不是相同的,从导入的模块不同就可以知道了。
1.不同线程要修改同一个数据,要加锁
2.让并行变成串行,牺牲了效率,保证了数据安全
后面讲到的悲观锁,乐观锁,分布式锁面试几率极高,必须完全掌握。
线程互斥锁,进程互斥锁其实也属于悲观锁
# 实例化
l = Lock()
# 加锁
l.acquire()
# 解锁
l.release()
from threading import Thread,Lock
import time
import random
money = 99
def task(n,mutex):
global money
# 在修改数据的时候,枷锁
mutex.acquire()
temp = money
time.sleep(0.1)
money = temp - 1
# 修改完以后,释放锁,其它线程就能再次抢到锁
mutex.release()
if __name__ == '__main__':
ll=[]
mutex=Lock()
for i in range(10):
t = Thread(target=task, args=(i,mutex))
t.start()
ll.append(t)
for i in ll:
i.join()
print(money)
八、GIL全局解释器锁理论
GIL跟互斥锁的区别:GIL不能保证我们自己的数据安全,自己的互斥锁是保证自己产生的数据安全(背)
GIL全局解释器锁只在cpython解释器中有,如jpython,pypy,都是没有这个特性的
1 python的解释器有很多,cpython,jpython,pypy(python写的解释器,利用python写python解释器叫自举
2 cpython的库多,很多库都是基于cpython写起来的,其他解释器没有那么多的库。所以即使cpython有GIL全局锁导致比较慢绝大部分人还是使用cpython
3 cpython中有一个全局大锁,每条线程要执行,必须获取到这个锁
4 为什么会有这个锁呢?python的垃圾回收机制可能会导致,当我们定义一个变量时,这个变量还没来得及被引用直接就被回收了。比如我们定义x = 1,在python内部,有可能我们刚拿到x,没来记得把1赋值给他,此时的引用计数为0,垃圾回收的扫描直接过来把x回收了。
5 python的多线程其实还是单线程
6 某个线程想要执行,必须先拿到GIL,我们可以把GIL看作是“通行证”,并且在一个python进程中,GIL只有一个。拿不到通行证的线程,就不允许进入CPU执行
7 因为python是动态语言,代码是一行一行解释的,所以我们在刚定义一个变量时候,根本不知道后期是否要使用,所以有可能会被垃圾回收掉,而编译型语言是在编译时,代码已经都跑完了,知道变量是否有用。
8(了解)cpython多核多线程比单核多线程更差:原因是单核下多线程,每次释放GIL,唤醒的那个线程都能获取到GIL锁,所以能够无缝执行,但多核下,CPU0释放GIL后,其他CPU上的线程都会进行竞争,但GIL可能会马上又被CPU0拿到,导致其他几个CPU上被唤醒后的线程会醒着等待到切换时间后又进入待调度状态,这样会造成线程颠簸(thrashing),导致效率更低
9 总结:cpython解释器中有一个全局锁(GIL),线程必须获取到GIL才能执行,我们开的多线程,不管有几个cpu,同一时刻,只有一个线程在执行(python的多线程,不能利用多核优势,这就是慢的其中一个原因)
针对cpython解释器:
1.在单核情况下:
不管什么情况,都是多开线程,因为开进程资源消耗更多。
2.在多核情况下:
如果是io密集型操作:开多线程,cpu遇到io会切换到其他线程执行
如果是计算密集型:开多进程,能被多个cpu调度执行
九、验证GIL锁的存在方式
要验证GIL锁的存在,即在同一时刻,只有一条线程执行,所以对于我们多核的机器,无论如何CPU都不一定不会满
from threading import Thread
from multiprocessing import Process
def task():
while True:
pass
if __name__ == '__main__':
for i in range(6): # 测试机子为6核cpu
# t=Thread(target=task) # 因为有GIL锁,同一时刻,只有一条线程执行,所以cpu不会满
t=Process(target=task) # 由于是多进程,进程中的线程会被cpu调度执行,6个cpu全在工作,就会跑满
t.start()
十、GIL与普通互斥锁的区别
GIL锁是不能保证数据的安全,普通互斥锁来保证数据安全。我们平时在编写程序时,就当做GIL锁不存在即可。
from threading import Thread, Lock
import time
mutex = Lock()
money = 100
def task():
global money
mutex.acquire()
temp = money
time.sleep(1)
money = temp - 1
mutex.release()
if __name__ == '__main__':
ll=[]
for i in range(10):
t = Thread(target=task)
t.start()
# t.join() # 会怎么样?变成了串行,不能这么做
ll.append(t)
for t in ll:
t.join()
print(money)
十一、io密集型和计算密集型
以下只针对于cpython解释器:
-在单核情况下:
-开多线程还是开多进程?不管干什么都是开线程
-在多核情况下:
-如果是计算密集型,需要开进程,能被多个cpu调度执行
-如果是io密集型,需要开线程,cpu遇到io会切换到其他线程执行
以下代码为测试两种操作在线程与进程下的效率
from threading import Thread
from multiprocessing import Process
import time
# 计算密集型
def task():
count = 0
for i in range(100000000):
count += i
if __name__ == '__main__':
ctime = time.time()
ll = []
for i in range(10):
t = Thread(target=task) # 开线程:用时42.68658709526062
# t = Process(target=task) # 开进程:用时9.04949426651001
t.start()
ll.append(t)
for t in ll:
t.join()
print(time.time()-ctime)
## io密集型
def task():
time.sleep(2) # sleep模拟io操作,因为本质一样
if __name__ == '__main__':
ctime = time.time()
ll = []
for i in range(400):
t = Thread(target=task) # 开线程:用时2.0559656620025635
# t = Process(target=task) # 开进程:用时9.506720781326294
t.start()
ll.append(t)
for t in ll:
t.join()
print(time.time()-ctime)
十二、死锁现象
形象点来说,就是进程或线程1拿到了A锁,等B锁,进程或线程2拿到B锁等A锁,现在就会导致两者都在等。
死锁现象:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
23种设计模式:https://www.cnblogs.com/liuqingzheng/p/10038958.html
单例模式,工厂模式都属于设计模式
死锁
注意:不是一定要两把锁才会产生死锁,就算只有一把锁,当
# 死锁现象,张三拿到了A锁,等B锁,李四拿到了B锁,等A锁
from threading import Thread, Lock
import time
mutexA = Lock()
mutexB = Lock()
def eat_apple(name):
mutexA.acquire()
print('%s 获取到了a锁' % name)
mutexB.acquire()
print('%s 获取到了b锁' % name)
print('开始吃苹果,并且吃完了')
mutexB.release()
print('%s 释放了b锁' % name)
mutexA.release()
print('%s 释放了a锁' % name)
def eat_egg(name):
mutexB.acquire()
print('%s 获取到了b锁' % name)
time.sleep(2)
mutexA.acquire()
print('%s 获取到了a锁' % name)
print('开始吃鸡蛋,并且吃完了')
mutexA.release()
print('%s 释放了a锁' % name)
mutexB.release()
print('%s 释放了b锁' % name)
if __name__ == '__main__':
ll = ['egon', 'yang', 'lqz']
for name in ll:
t1 = Thread(target=eat_apple, args=(name,))
t2 = Thread(target=eat_egg, args=(name,))
t1.start()
t2.start()
# 结果为egon吃完苹果后,拿鸡蛋的锁A,此时yang会来拿苹果的锁B,然后两者都卡住了
------------------------------
egon 获取到了a锁
egon 获取到了b锁
开始吃苹果,并且吃完了
egon 释放了b锁
egon 释放了a锁
egon 获取到了b锁
yang 获取到了a锁
解决方法
现在的问题是有两把锁,当一个锁被释放之后立刻被人拿走,导致双方都被卡主,那么只要我们设置只有一把锁,且这把锁能被多次获取,在它完全被释放前,谁都不能再获取这把锁。
# 递归锁(可重入),同一个人可以多次acquire,每acquire一次,内部计数器加1,每relaese一次,内部计数器减一
# 只有计数器不为0,其他人都不获得这把锁
from threading import Thread, Lock,RLock
import time
mutexA = mutexB =RLock()
def eat_apple(name):
mutexA.acquire()
print('%s 获取到了a锁' % name)
mutexB.acquire()
print('%s 获取到了b锁' % name)
print('开始吃苹果,并且吃完了')
mutexB.release()
print('%s 释放了b锁' % name)
mutexA.release()
print('%s 释放了a锁' % name)
def eat_egg(name):
mutexB.acquire()
print('%s 获取到了b锁' % name)
time.sleep(2)
mutexA.acquire()
print('%s 获取到了a锁' % name)
print('开始吃鸡蛋,并且吃完了')
mutexA.release()
print('%s 释放了a锁' % name)
mutexB.release()
print('%s 释放了b锁' % name)
if __name__ == '__main__':
ll = ['egon', 'alex', '铁蛋']
for name in ll:
t1 = Thread(target=eat_apple, args=(name,))
t2 = Thread(target=eat_egg, args=(name,))
t1.start()
t2.start()
十三、Semaphore信号量
# Semaphore:信号量可以理解为多把锁,同时允许多个线程来更改数据
from threading import Thread,Semaphore
import time
import random
sm=Semaphore(3) # 数字表示可以同时有多少个线程操作
def task(name):
sm.acquire()
print('%s 正在蹲坑'%name)
time.sleep(random.randint(1,5))
sm.release()
if __name__ == '__main__':
for i in range(10):
t = Thread(target=task,args=('%s号'%i,))
t.start()
------------------------------------------------
# 最先显示的只有0-2号
0号 正在蹲坑
1号 正在蹲坑
2号 正在蹲坑
3号 正在蹲坑
4号 正在蹲坑
5号 正在蹲坑
6号 正在蹲坑
7号 正在蹲坑
8号 正在蹲坑
9号 正在蹲坑
十四、Event事件
一些线程需要等到其他线程执行完成之后才能执行,类似于发射信号
比如一个线程等待另一个线程执行结束再继续执行
# 练习:起两个线程,第一个线程读文件的前半部分,读完发一个信号,另一个进程读后半部分,并打印
from threading import Thread, Event
import time
import os
event = Event()
# 获取文件总大小
size = os.path.getsize('a.txt')
def read_first():
with open('a.txt', 'r', encoding='utf-8') as f:
n = size // 2 # 取文件一半,整除
data = f.read(n)
print(data)
print('我一半读完了,发了个信号')
event.set()
def read_last():
event.wait() # 等着发信号
with open('a.txt', 'r', encoding='utf-8') as f:
n = size // 2 # 取文件一半,整除
# 光标从文件开头开始,移动了n个字节,移动到文件一半
f.seek(n, 0)
data = f.read()
print(data)
if __name__ == '__main__':
t1=Thread(target=read_first)
t1.start()
t2=Thread(target=read_last)
t2.start()
十五、线程queue
与进程的queue不一样,因为导入的类不同,进程导入的类是multiprocessing下的Queue,而线程导入的是queue下的Queue,但是方法使用一样
使用线程queue(线程中通信)的目的也是为了线程中间通信,我们使用共享变量也可以实现,但是会有数据安全的问题,用线程queue,使用线程queue就不需要加锁了,因为类中自带了
三种线程Queue
-Queue:队列,先进先出
-PriorityQueue:优先级队列,谁小谁先出
-LifoQueue:栈,后进先出
线程Queue
from queue import Queue,LifoQueue,PriorityQueue
from queue import Queue
q=Queue(5)
q.put("lqz")
q.put("egon")
q.put("铁蛋")
q.put("钢弹")
q.put("金蛋")
# 取值
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.full())
print(q.empty())
PriorityQueue:优先级队列,谁权重小谁先出
from queue import PriorityQueue
q=PriorityQueue(4)
q.put((1,"lqz"))
q.put((12,"egon"))
q.put((100,"yang"))
q.put((21,"alex"))
print(q.get())
print(q.get())
print(q.get())
print(q.get())
----------------------
(1, 'lqz')
(12, 'egon')
(21, 'alex')
(100, 'yang')
LifoQueue:栈,后进先出
from queue import LifoQueue
q=LifoQueue(5)
q.put("lqz")
q.put("egon")
q.put("铁蛋")
q.put("钢弹")
q.put("金蛋")
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
十六、线程池与进程池***
总所周知,进程和线程都是不可被重复利用的,在实现高并发中,这会极大的浪费资源。为什么会出现池?不管是开进程还是开线程,都不能无限制开,因为需要耗费资源。通过池就可以很好的解决这个问题,假设池子里我们规定就只能有10个,不管再怎么开,永远是这10个,其他的人进来都得在外等着。
线程在进入Stopped状态后, 就会被销毁, 是不可能再次Start的.要再次Start只能重新建立一个. 所以在线程池
这两个池非常重要,使用也非常简单。
使用 submit() 方法来提交任务括号内的第一个参数是执行的任务(函数)第二个是参数用逗号隔开.且 submit是异步朝线程池中提交任务。
submit 提交任务之后 任务完成 会有一个返回值是一个对象 ,使用result可以查看对象的结果。
但是不能直接在submit后直接使用result,因为任务提交之后是不会直接有返回结果的,而result就相当于等待这个结果那么异步就会变成串行了。
shutdown()方法 关闭进程/线程池入口 等待池子中所有的任务执行完毕之后 才会往下运行代码相当于join
异步回调机制:当异步提交的任务有返回结果之后,会自动触发回调函数的执行,.add_done_callback(),括号内填一个函数名,函数需要有一个参数,这个参数会自动传入,他是 submit的返回对象,提交任务的时候 绑定一个回调函数 一旦该任务有结果 立刻执行对于的回调函数
线程池和进程池的操作几乎一样 唯一一点就是,进程的回调函数是,主线程执行的,而线程池的回调函数是谁腾出手来谁来干
# 复习同步异步
同步: 提交任务之后 原地等待任务的返回结果 期间不做任何事
异步: 提交任务之后 不原地等待返回结果 直接执行下一行代码 结果是要的 通过其他方式来拿
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from threading import Thread
from multiprocessing import Process
import time
import random
def task(name):
print('start',name)
time.sleep(random.randint(1,4))
print('end')
return '%s end' %name
def call_back(obj):
print(obj)
print(obj.result())
if __name__ == '__main__':
pool = ThreadPoolExecutor(5)
for i in range(10):
pool.submit(task,'thread%s号' %i).add_done_callback(call_back)
爬虫小案例
from concurrent.futures import ThreadPoolExecutor
import requests # 爬虫会学到的模块
pool = ThreadPoolExecutor(2)
def get_pages(url):
# https://www.baidu.com
res = requests.get(url) # 向这个地址发送请求
name = url.rsplit('/')[-1] + '.html'
print(name) # www.baidu.com.html
# res.content拿到页面的二进制
return {'name': name, 'text': res.content}
def call_back(f):
dic = f.result()
with open(dic['name'], 'wb') as f:
f.write(dic['text'])
if __name__ == '__main__':
ll = ['https://www.baidu.com', 'https://www.mzitu.com', 'https://www.cnblogs.com']
for url in ll:
pool.submit(get_pages, url).add_done_callback(call_back)
补充:kafka,rabbitMQ
后期会详细提到
kafka的吞吐量高,但是缺陷是不支持消息确认,即东西丢了,被谁消费了不会知道。在写日志是时候使用较多,因为日志稍微丢一两个问题不大
rabbitMQ吞吐量较小,但是支持消息确认。在订单使用较多,因为都是和金钱相关的信息,很重要,需要有消息确认
十七、线程池与进程池shutdown
一般线程也不需要开太多,一条进程下开十个线程已经比较多了。在工作中一般不要开进程,除非你知道需要计算密集型。
主线程等待所有任务执行完成:
from concurrent.futures import ThreadPoolExecutor
import time
pool = ThreadPoolExecutor(3)
def task(name):
print('%s 开始'%name)
time.sleep(1)
print('%s 结束'%name)
if __name__ == '__main__':
for i in range(20):
pool.submit(task, '屌丝%s' % i)
# 放到for外面
pool.shutdown(wait=True) # 等待所有任务完成,并且把池关闭
# 问题,关了还能提交任务吗?不能再提交了
pool.submit(task,'sdddd')
print('主') # 立马执行,20个线程都执行完了,再执行
十八、定时器
from threading import Timer
# 本质是一个线程,因为继承了Thread类,延迟一秒去执行
具体应用:
# 多长时间之后执行一个任务
from threading import Timer
def task(name):
print('我是大帅比--%s'%name)
if __name__ == '__main__':
# t = Timer(2, task,args=('yang',)) # 本质是开两个线程,延迟一秒执行
t = Timer(2, task,kwargs={'name':'yang'}) # 本质是开两个线程,延迟一秒执行
t.start()