来自 :https://zhuanlan.zhihu.com/p/101434151
线程
- 线程与进程的联系:都是为了解决并发
- 线程与进程的区别:
- 进程:计算机中最小的资源分配单位
- 线程:进程中的一员,同一个进程之间的几个线程共享一个进程的资源
- 线程可以直接被CPU调度,因此线程是计算机中能被CPU调度的最小单位
- 比如一个 qq 是一个进程两个好友可以同时给你发送消息,你可以同时接收,还可以和多个人聊天等等(并行)比如视频 app,在线观看的同时可以缓存,这种情况下可以看作是两条线程
- 进程:数据隔离(不同的业务,应放在不同的进程中,比如 qq 与微信)
- 线程:数据共享、效率高
- 进程可以利用多个 cpu
- 理论上,线程也能利用多个 cpu,但是 Python 的线程是不能利用多核的
- 比如一个进程里有一个列表 l = [],进程里有三个线程
- 如果三个线程同时利用多个 cpu 给列表添加数字 1,即三个线程都执行 l.append(1)操作
- 它们很有可能一开始都认为这个列表是空的,那么结果很可能 l = [1],这样就造成数据丢失了
- 而如果不能利用多个 cpu,即如果一个线程利用了一个 cpu 其他的线程只能等它从 cpu 返回后再一个一个利用 cpu
- 这样列表 l 就能确保每次都添加数字1,结果就是 l = [1, 1, 1]
3. 由刚才的示例,可以引申出全局解释器锁(GIL):
- Cpython 解释器设置的锁早期 Python刚出现时,没有多核的概念
- 所以没有考虑给数据加锁这件事后来有了多核的概念
- 而 Python 是解释型语言,考虑到数据的安全及各种数据的记录
- 为了保证数据的安全,便设置了全局解释器锁
- 这个锁导致了同一个进程之间的多个线程同一时刻只能有一个线程访问 cpu
4. 为什么要有线程
- 线程的开启和销毁的速度都比进程要快
- cpu在进程之间切换和在线程之间切换的效率,线程更高
5. 线程即然不能同时利用多个 cpu,它的影响有:
- cpu 主要是用来做计算的
- 程序中除了计算还有 IO 操作
6. 网络延迟的本质是 IO 操作
7. 线程与进程在代码里运行的区别:
from threading import Thread
def func():
print("我是子线程")
t = Thread(target=func)
t.start()
print("我是主线程")
# 运行结果:
我是子线程
我是主线程
# 注意:如果你看过我之前的文章,就会发现:
# 进程的运行顺序是是先执行主进程,再执行子进程
from multiprocessing import Process
def func():
print("我是子进程")
if __name__ == "__main__":
p = Process(target=func) # 这里并不代表开启了子进程
p.start() # 开启了一个子进程,并执行func()
print("我是主进程")
# 运行结果:
我是主进程
我是子进程
8. 线程的特点
import os
import time
from threading import Thread
def func():
time.sleep(1)
print("我是子线程", os.getpid())
for i in range(10):
t = Thread(target=func)
t.start()
print("我是主线程")
# 运行结果:
我是主线程
我是子线程 29530
我是子线程 29530
我是子线程 29530
我是子线程 29530
我是子线程 29530
我是子线程 29530
我是子线程 29530
我是子线程 29530
我是子线程 29530
我是子线程 29530
上面的示例可以看出线程的两个特点:
- 线程只能利用单个 cpu,通过 os.getpid() 可证明
- 子线程的 10 次打印是一次性全部出来的,说明线程效率很高
9. 线程也有三个状态:就绪 阻塞 运行,有上面的示例来分析:
- 运行是在 cpu 里面的,全局锁也是针对这里
- 第一个子线程去 cpu 里运行时,很快会到阻塞状态,在这个状态里要休眠 1 秒
- 而在这个 1s 时间里,第一个线程已经从 cpu 返回,然后第二个、第三个...又依次去 cpu
- 然后返回,速度非常快,1s 内 10 个子线程已经都到阻塞状态了
- 所以运行发现 10 个子线程几乎同一时间内打印出结果
10. 注意:程序中 IO 操作是不占用全局解释器锁和 cpu 的
11. 线程中的传参
import time
from threading import Thread
def func(i):
time.sleep(1)
print("我是子线程%s" % i)
for i in range(10):
t = Thread(target=func, args=(i,))
t.start()
# 运行结果:
我是子线程0
我是子线程3
我是子线程5
我是子线程7
我是子线程9
我是子线程4
我是子线程1
我是子线程6
我是子线程8
我是子线程2
12. 线程中的 join() 用法
import time
from threading import Thread
def func(i):
time.sleep(1)
print("我是子线程%s" % i)
t_lst = []
for i in range(10):
t = Thread(target=func, args=(i,))
t.start()
t_lst.append(t)
for t in t_lst:
t.join() # 阻塞,直到子线程中的代码执行
print("所有的线程都执行完了")
# 运行结果:
我是子线程0
我是子线程4
我是子线程6
我是子线程3
我是子线程1
我是子线程8
我是子线程2
我是子线程5
我是子线程7
我是子线程9
所有的线程都执行完了
13. 线程中的数据共享
from threading import Thread
n = 100
def func():
global n
n -= 1
t_lst = []
for i in range(100):
t = Thread(target=func)
t.start()
t_lst.append(t)
for t in t_lst:
t.join()
print(n) # 0
14. 守护线程
import time
from threading import Thread
def main():
print("主线程开始运行")
time.sleep(3)
print("主线程运行结束")
def daemon_func():
while 1:
time.sleep(1)
print("我是守护线程")
t = Thread(target=daemon_func)
t.setDaemon(True)
t.start()
main()
# 运行结果:
主线程开始运行
我是守护线程
我是守护线程
主线程运行结束
import time
from threading import Thread
def main():
print("主线程开始运行")
time.sleep(3)
print("主线程运行结束")
def daemon_func():
while 1:
time.sleep(1)
print("守护线程")
def thread_son():
print("子线程开始运行")
time.sleep(5)
print("子线程运行结束")
t = Thread(target=daemon_func)
t.setDaemon(True)
t.start()
Thread(target=thread_son).start() # 子线程
main()
# 运行结果:
子线程开始运行
主线程开始运行
守护线程
守护线程
主线程运行结束
守护线程
守护线程
子线程运行结束
# 注意,上面的 Thread(target=thread_son).start() 如果改为
# Thread(target=thread_son()).start()
# 那么会先执行 thread_son(),运行结果就会是同步效果了
# 运行发现守护线程和守护进程不同
# 守护线程会守护主线程直到主线程结束
# 如果主线程要等待其他子线程,那么守护线程在这段时间中仍然发挥守护作用
15. 开启线程的第二种方式
from threading import Thread
class MyThread(Thread):
def run(self):
print("子线程", self.ident)
for i in range(10):
t = MyThread()
t.start()
# 运行结果:
子线程 139970079782656
子线程 139970071389952
子线程 139970079782656
子线程 139970071389952
子线程 139970079782656
子线程 139970071389952
子线程 139970054604544
子线程 139970079782656
子线程 139970071389952
子线程 139970062997248
16. 查看线程 id 的第二种方法
import time
from threading import Thread, currentThread, enumerate, active_count
def func(i):
time.sleep(1)
print("我是子线程%s" % i, currentThread().ident)
for i in range(10):
t = Thread(target=func, args=(i, ))
t.start()
print(enumerate())
print(active_count())
# 运行结果:
[<_MainThread(MainThread, started 139970964113152)>,
<Thread(Thread-1, started 139970935011072)>,
<Thread(Thread-2, started 139970926618368)>,
<Thread(Thread-3, started 139970918225664)>,
<Thread(Thread-4, started 139970702472960)>,
<Thread(Thread-5, started 139970694080256)>,
<Thread(Thread-6, started 139970685687552)>,
<Thread(Thread-7, started 139970677294848)>,
<Thread(Thread-8, started 139970668902144)>,
<Thread(Thread-9, started 139970660509440)>,
<Thread(Thread-10, started 139970652116736)>]
11
我是子线程6 139970677294848
我是子线程8 139970660509440
我是子线程3 139970702472960
我是子线程1 139970926618368
我是子线程0 139970935011072
我是子线程7 139970668902144
我是子线程9 139970652116736
我是子线程5 139970685687552
我是子线程2 139970918225664
我是子线程4 139970694080256
17. 用多线程实现 socket server 基于 tcp 的并发
# server.py
import socket
from threading import Thread
sk = socket.socket()
sk.bind(("127.0.0.1", 8080))
sk.listen()
def talk(conn):
while 1:
conn.send("我会一直向客户端发送这个信息".encode())
while 1:
conn, addr = sk.accept()
Thread(target=talk, args=(conn, )).start()
# 因为有 accept、send 等各种阻塞,因此实现 socket server 基于 tcp 的并发
# 只用多线程来实现即可,因为运行效率快
# 可创建多个一样的客户端同时执行
# client.py
import socket
sk = socket.socket()
sk.connect(("127.0.0.1", 8080))
while True:
msg = sk.recv(1024)
print(msg.decode())
# 我会一直向客户端发送这个信息
# 我会一直向客户端发送这个信息
# 我会一直向客户端发送这个信息
# ...
18. 线程锁
import time
from threading import Thread
n = 0
def func():
global n
tmp = n
time.sleep(0.1) # 延迟,相当于时间片轮转
n = tmp + 1
t_lst = []
for i in range(100):
t = Thread(target=func)
t.start()
t_lst.append(t)
for t in t_lst:
t.join()
print(n) # 1
19. 注意:如果不加上时间延迟,结果就不一样了
import time
from threading import Thread
n = 0
def func():
global n
tmp = n
# time.sleep(0.1) # 延迟,相当于时间片轮转
n = tmp + 1
t_lst = []
for i in range(100):
t = Thread(target=func)
t.start()
t_lst.append(t)
for t in t_lst:
t.join()
print(n) # 100
20. 为了保证数据安全,加锁
import time
from threading import Thread, Lock # 互斥锁
n = 0
def func(lock):
global n
with lock:
n += 1
lock = Lock()
t_lst = []
for i in range(100):
t = Thread(target=func, args=(lock, ))
t.start()
t_lst.append(t)
for t in t_lst:
t.join()
print(n) # 100
# 1.如果没有多个线程操作同一变量的时候,就可以不用加锁(在这里就是不使用全局变量)。因此写程序代码时,为了保证数据的安全,尽量不要使用全局变量
# 2.如果是执行基础数据类型的内置方法,都是线程安全的。list.append, list.pop, list.extend, list.remove, dic.get["key"]等
21. 有趣的实验:科学家吃面问题
from threading import Lock, Thread
import time
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
noodle_lock.acquire()
print("%s拿到面条了" % name)
fork_lock.acquire()
print("%s拿到叉子了" % name)
print("%s开始吃面了" % name)
time.sleep(0.2)
fork_lock.release()
print("%s将叉子放回" % name)
noodle_lock.release()
print("%s将面条放回" % name)
def eat2(name):
fork_lock.acquire()
print("%s拿到叉子了" % name)
noodle_lock.acquire()
print("%s拿到面条了" % name)
print("%s开始吃面了" % name)
time.sleep(0.2)
noodle_lock.release()
print("%s将面条放回" % name)
fork_lock.release()
print("%s将叉子放回" % name)
Thread(target=eat1, args=("小黑", )).start()
Thread(target=eat2, args=("小明", )).start()
Thread(target=eat1, args=("小红", )).start()
Thread(target=eat2, args=("小花", )).start()
# 运行结果:
小黑拿到面条了
小黑拿到叉子了
小黑开始吃面了
小黑将叉子放回
小明拿到叉子了
小红拿到面条了
小黑将面条放回
# 注意:程序没有停止,而是卡在某一点,这就是一种死锁现象
# 形成该现象的本质原因:做某件事,需要同时拿到这两个资源,但是现在却有两个锁,锁了这两个资源
22. 递归锁
from threading import Thread, RLock # 递归锁
import time
noodle_lock = fork_lock = RLock()
def eat1(name):
noodle_lock.acquire()
print("%s拿到面条了" % name)
fork_lock.acquire()
print("%s拿到叉子了" % name)
print("%s开始吃面了" % name)
time.sleep(0.2)
fork_lock.release()
print("%s将叉子放回" % name)
noodle_lock.release()
print("%s将面条放回" % name)
def eat2(name):
fork_lock.acquire()
print("%s拿到叉子了" % name)
noodle_lock.acquire()
print("%s拿到面条了" % name)
print("%s开始吃面了" % name)
time.sleep(0.2)
noodle_lock.release()
print("%s将面条放回" % name)
fork_lock.release()
print("%s将叉子放回" % name)
Thread(target=eat1, args=("小黑", )).start()
Thread(target=eat2, args=("小明", )).start()
Thread(target=eat1, args=("小红", )).start()
Thread(target=eat2, args=("小花", )).start()
# 运行结果:
小黑拿到面条了
小黑拿到叉子了
小黑开始吃面了
小黑将叉子放回
小黑将面条放回
小明拿到叉子了
小明拿到面条了
小明开始吃面了
小明将面条放回
小明将叉子放回
小红拿到面条了
小红拿到叉子了
小红开始吃面了
小红将叉子放回
小红将面条放回
小花拿到叉子了
小花拿到面条了
小花开始吃面了
小花将面条放回
小花将叉子放回
23. 单线程演示
from threading import Lock
lock = Lock()
lock.acquire()
print(1)
lock.acquire()
print(2)
# 运行结果:
1
# 注意:程序同样没有结束,但可以说明
# 递归锁可以快速解决死锁问题,缺点是占用资源
24. 不用递归锁的解决方案
from threading import Lock, Thread
import time
lock = Lock()
def eat1(name):
lock.acquire()
print("%s拿到面条了" % name)
print("%s拿到叉子了" % name)
print("%s开始吃面了" % name)
time.sleep(0.2)
print("%s将叉子放回" % name)
print("%s将面条放回" % name)
lock.release()
def eat2(name):
lock.acquire()
print("%s拿到叉子了" % name)
print("%s拿到面条了" % name)
print("%s开始吃面了" % name)
time.sleep(0.2)
print("%s将叉子放回" % name)
lock.release()
Thread(target=eat1, args=("小黑", )).start()
Thread(target=eat2, args=("小明", )).start()
Thread(target=eat1, args=("小红", )).start()
Thread(target=eat2, args=("小花", )).start()
# 运行结果:
小黑拿到面条了
小黑拿到叉子了
小黑开始吃面了
小黑将叉子放回
小黑将面条放回
小明拿到叉子了
小明拿到面条了
小明开始吃面了
小明将叉子放回
小红拿到面条了
小红拿到叉子了
小红开始吃面了
小红将叉子放回
小红将面条放回
小花拿到叉子了
小花拿到面条了
小花开始吃面了
小花将叉子放回
25. 死锁现象:
- 使用了多把锁在一个线程内进行了多次 Acquire 导致了不可恢复的阻塞
- 形成原因——两个锁锁了两个资源,要做某件事需要同时拿到这两个资源,多个线程同时执行这个步骤
26. 递归锁与互斥锁总结:
- 递归锁——不容易发生死锁现象
- 互斥锁——使用不当容易发生死锁
- 递归锁可以快速帮我们解决死锁问题
- 死锁的真正问题不在于互斥锁,而在于对互斥锁的混乱使用
- 要想真正的解决死锁问题,还是要找出互斥锁的问题进行修正才能解决根本问题
27. 线程中的队列
- 为什么线程之间要有队列,它不像进程,进程是需要数据共享时才用到队列
- 但是线程之间用队列是为了在多个线程之间维持一个数据先后的秩序
- 线程模块的队列是线程之间数据安全的
import queue
q = queue.Queue()
# q.put() —— 队列满时会阻塞
# q.get() —— 队列空时会阻塞
q.put(1)
print(q.get_nowait()) # 1
28. 异常情况
import queue
q = queue.Queue()
print(q.get_nowait())
# 运行结果:
Traceback (most recent call last):
File "test01.py", line 4, in <module>
print(q.get_nowait())
File "/home/yanfa/anaconda3/lib/python3.7/queue.py", line 198, in get_nowait
return self.get(block=False)
File "/home/yanfa/anaconda3/lib/python3.7/queue.py", line 167, in get
raise Empty
_queue.Empty
29. 异常处理
import queue
try:
print(q.get_nowait()) # 在队列为空时也不阻塞,这时会抛异常
except queue.Empty:
pass
import queue
q = queue.Queue(3)
try:
print(q.get_nowait())
except queue.Empty:
pass
try:
q.put_nowait(1)
except queue.Full: # 这样会造成数据丢失
pass
# q.qsize():当前队列中有多少个值
# q.empty():当前队列是否为空
# q.full():当前队列是否为满
# 这三个没什么意义,不够准确
# 比如线程队列中有一个进程使用 q.empty() 询问队列是否为空
# 这个时候队列是空的,所以返回信息是空的
# 但是在这个信息返回到这个线程之前,又有一个线程往队列里添加东西了
# 那么队列不为空了,但是之后第一个线程却以为队列是空的
# 其他两个的道理一样
30. 队列
import queue
q = queue.LifoQueue()
q.put("a")
q.put("b")
q.put("c")
print(q.get()) # c
print(q.get()) # b
print(q.get()) # a
31. 优先级队列
import queue
q = queue.PriorityQueue()
q.put((2, "a"))
q.put((1, "b"))
q.put((3, "ac"))
print(q.get()) # (1, 'b')
print(q.get()) # (2, 'a')
print(q.get()) # (3, 'ac')
# 元组的第一个元素的数值越小,越先取出来(按 ascii 码的值)
# 如果第一个元素一样,则比较第二个元素的 ascii 码的值
# 总结:一共有三个队列
# queue:一般的队列,先进先出
# queue.LifoQueue():后进先出
# queue.PriorityQueue():优先级队列
32. 线程池
import time
from threading import currentThread
from concurrent.futures import ThreadPoolExecutor
def func(i):
time.sleep(1)
print('子线程%s' % i,currentThread().ident)
tp = ThreadPoolExecutor(5)
for i in range(20):
tp.submit(func,i)
tp.shutdown()
# 运行结果:
子线程0 140264401258240
子线程1 140264392865536
子线程3 140264169535232
子线程2 140264384472832
子线程4 140264161142528
子线程5 140264401258240
子线程6 140264392865536
子线程8 140264384472832
子线程9 140264161142528
子线程7 140264169535232
子线程10 140264401258240
子线程12 140264384472832
子线程13 140264161142528
子线程11 140264392865536
子线程14 140264169535232
子线程18 140264392865536
子线程15 140264401258240
子线程17 140264161142528
子线程16 140264384472832
子线程19 140264169535232
import time
import os
from concurrent.futures import ProcessPoolExecutor
def func(i):
time.sleep(1)
print('子进程%s'%i,os.getpid())
if __name__ == '__main__':
tp = ProcessPoolExecutor(5)
for i in range(20):
tp.submit(func,i)
tp.shutdown()
# 运行结果:
子进程0 3182
子进程1 3183
子进程3 3185
子进程2 3184
子进程4 3186
子进程5 3182
子进程6 3183
子进程9 3186
子进程7 3185
子进程8 3184
子进程10 3182
子进程12 3186
子进程11 3183
子进程13 3185
子进程14 3184
子进程15 3186
子进程16 3182
子进程17 3183
子进程18 3185
子进程19 3184
import