并发编程之多线程
1.多线程
首先,之前我们所学的进程是操作系统可以调度已经进行资源分配的基本单位,是一个资源单位,其中包含了运行这个程序所需的资源
线程是操作系统可以运算调度的最小单位,是真正的执行单位,其包含在进程中, 一个线程就是一条固定的控制流程。
一个进程可以包含多个线程,同一进程中的线程共享进程内的资源
在开启的每一个进程中会自动创建一条线程,称之为主线程,后续通过代码开启的线程称之为子线程。
2.进程与线程的区别
- 进程是资源单位,线程是执行单位
- 创建进程开销远大于线程
- 多个进程之间内存隔离,一个进程的多个线程是共享所有资源的
- 进程之间是竞争关系,线程是协作关系
- 进程之间有层级关系,线程之间是平等的
- 当要并发处理的任务有很多的时,不能使用进程,进程资源开销太大,线程开销非常小,适用于任务数非常多的情况
3.线程使用方法
- 直接实例化Thread类。
- 继承Thread类,把子进程执行的代码放入重写的run方法中。
from threading import Thread
def task():
print('zi run')
t=Thread(target=task)
t.start()
print('zu over')
4.线程的安全问题
只要并发访问了同一资源一定会产生安全问题 ,解决方案和多进程一致就是给操作公共资源代码加锁
import os
from threading import Thread,Lock
l=Lock()
def task():
l.acquire()
print('zi run',os.getpid())
l.release()
for i in range(5):
t=Thread(target=task)
t.start()
print('zu over')
5.线程中的常用属性和方法
和进程中的类似
from threading import Thread,currentThread,enumerate,activeCount
import time
t.daemon # 为守护线程
t = Thread()
t.start()
t.join()
t.is_alive()
t.isAlive()
t.ident # 线程标识符 id
# 获取当前线程对象
print(currentThread())
t = Thread(target=lambda :print(currentThread()))
t.start()
t = Thread(target=lambda :time.sleep(1))
t.start()
t = Thread(target=lambda :time.sleep(1))
t.start()
t.join()
#获取正在运行的所有线程对象 是一个列表
print(enumerate())
#存活的线程数量
print(activeCount())
6.死锁现象
死锁指的是某个资源被占用后一直得不到释放导致其他需要这个资源的线程进入阻塞状态
产生死锁的情况:
- 对同一把互斥锁多次执行acquire将导致死锁
- 有多把锁被不同线程或进程持有,会导致相互等待对方释放 ,从而会造成程序卡死的状态(解决办法如下)
- 按照相同顺序设置锁,则相同顺序去抢
- 给抢锁加上时间限制,如果超时则放弃执行
import time
from threading import Thread,Lock
# lock=Lock()
#
# lock.acquire()
# print('haha')
# lock.acquire()
l1=Lock()
l2=Lock()
def task1():
l1.acquire()
time.sleep(0.1)
l2.acquire()
print('task1')
l1.release()
l2.release()
def task2():
l2.acquire()
l1.acquire()
print('task2')
l1.release()
l2.release()
t1=Thread(target=task1)
t2=Thread(target=task2)
t1.start()
t2.start()
7.递归锁
与普通的锁相比
- 多线程之间都有互斥的效果
- 不同在于同意个线程可以多次执行acquire
- 同一线程必须保证加锁次数等于解锁的次数,其它线程才能抢到这把锁
from threading import RLock
l=RLock()
l.acquire()
l.acquire()
print('hahaha')
l.release()
8.信号量
信号不是用来解决安全问题,是用来限制最大的并发量
如果限制数量为1 则与普通互斥锁没有区别
import time
from threading import Semaphore,Thread
def task(name,s):
s.acquire()
time.sleep(1)
print(name,'run')
s.release()
s=Semaphore(5)
for i in range(10):
t=Thread(target=task,args=(i,s))
t.start()
9.GIL锁
什么是GIL锁
GIL锁是cpython中特有的全局解释器锁,因为cpython的内存管理是非线程安全的,通过GIL这个互斥锁,阻止多个本地线程在同一时间执行python字节码。
为什么需要这把锁
比如:python会自动帮我们处理垃圾,清扫垃圾也是一堆代码,也需要开启一个线程来执行 ,也就是说就算程序没有自己开启线程,内部也有多个线程, 这是因为Python有自带的内存管理机制,简称GC。GC线程与我们程序中的线程就会产生安全问题,当我们定义一个变量,还没有与变量名绑定时引用计数为0,则有可能切换到GC线程被当作垃圾回收。所以这个锁是非常必要的。
效率问题
但是我们知道GIL锁是一把互斥锁 ,互斥锁将导致效率降低。在python中,只能尽可能的避免GIL锁影响我们的效率。
- 使用多进程实现并行,从而更好的利用多核CPU
- 对任务进行区分
- 对于计算密集型:由于多线程不能并行,可以使用多进程将任务分给不同CPU执行
- 对于IO密集型:由于网络IO速度对比CPU处理速度非常慢,多线程并不会造成太大的影响,另外如有大量客户端连接服务,进程根本开不起来,我们只能使用多线程
性能的讨论
- 加锁是为了解决线程安全问题,由于有了锁导致Cpython中多线程不能并行只能并发
- 但是我们不能因此否认python,GIL锁是Cpython解释器特有的,除此之外还有Jpython,pypy等
- 如果是单核CPU,GIL不会造成任何影响
- 由于目前大多数程序都是基于网络的,网络速度对比CPU是非常慢的, 导致即使多核CPU也无法提高效率
- 对于IO密集型任务,不会有太大的影响
GIL与自定义锁的区别
GIL锁住的是解释器级别的数据
自定义锁比如lock类,锁的是解释器以外的共享资源例如:硬盘上的文件,控制台
对于这种不属于解释器的数据资源就应该自己加锁处理
10.进程池和线程池
池表示容器,进程池和线程池就是装进程和线程的容器
装到容器中有哪些好处?
- 可以避免频繁的创建和销毁(进程/线程)来的资源开销
- 可以限制同时存在的线程数量 以保证服务器不会应为资源不足而导致崩溃
- 帮我们管理了线程的生命周期
- 管理了任务的分配
import time
from concurrent.futures import ThreadPoolExecutor
pool=ThreadPoolExecutor(3)
def task(name,i):
time.sleep(i)
print(name,'run')
pool.submit(task,'kk',1)
pool.submit(task,'dd',0.1)
pool.submit(task,'mm',0.2)
pool.submit(task,'ll',0.9)
11.同步异步
之前所提到的阻塞和非阻塞使程序的状态,异步同步指的是提交任务的方式 。
- 同步 :指的是 提交任务后必须在原地等待 直到任务结束
- 异步:提交任务后不需要在原地等待 可以继续往下执行代码
异步效率高于同步,异步任务将导致一个问题:就是任务的发起方不知道任务何时 处理完毕
解决方法:
-
轮询:重复的隔一段时间就问一次
效率低,无法及时获取结果(不推荐)
-
让任务的执行方主动通知(异步回调)
通过设置回调函数,参数为任务的结果,
则可以及时拿到任务的结果(推荐方式)
import time
#===============同步=================
def task():
for i in range(10000000):
a=1+1
start_time=time.time()
task()
print(time.time()-start_time)
#================异步=================
#异步回调
from threading import Thread
def task(callback):
print('run')
time.sleep(1)
print(callback())
def callback():
return ('任务结束')
t=Thread(target=task,args=(callback,))
t.start()
#线程池中的回调
from concurrent.futures import ThreadPoolExecutor
def task(callback):
print('run')
time.sleep(0.5)
callback()
def callback():
print('over')
pool=ThreadPoolExecutor(5)
print(pool)
res=pool.submit(task,callback)
print(res)
from concurrent.futures import ThreadPoolExecutor
def task():
print('run')
time.sleep(0.5)
return 'over'
def callback(obj):
print(obj.result())
pool=ThreadPoolExecutor(5)
res=pool.submit(task)
res.add_done_callback(callback)
12.Event事件
通过threading模块中的Event类设置bool值标记来同步线程间状态,具体方法是:
- 首先实例化一个事件
- 使用set函数标记其中某一个线程状态为True
- 其它线程通过wait函数可以阻塞当前线程,直到标记的状态从False变为True才执行即可以达到同步的结果。
这样可以不采用轮询的方法,从而节省了CPU资源
import time
from threading import Thread,Event
e=Event()
def task1():
print('1,run')
time.sleep(1)
e.set()
print('1,over')
def task2():
e.wait()
print('2,run')
t1=Thread(target=task1)
t2=Thread(target=task2)
t1.start()
t2.start()
13.线程队列
queue队列:使用import queue
,用法与进程中Queue一样
1.先进先出:
import queue
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
q.task_done()
print(q.get())
q.task_done()
print(q.get())
q.task_done()
q.join() # 其原理等同于joinableQueue
print("over")
'''
结果(先进先出):
first
second
third
'''
2.后进先出
import queue
q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''
3.设置优先级
import queue
q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))
print(q.get())
print(q.get())
print(q.get())
'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
''
# 运算符重载 使得对象可以被比较
class Person:
def __init__(self,name,age):
self.age = age
self.name = name
# 覆盖比较运算符 当在两个对象之间使用比较运算符时 会自动执行该方法
def __lt__(self, other):
# print("xxxxxxxx")
if self.age == other.age: # 如果年龄相同则按照姓名比较
return self.name < other.name
return self.age < other.age
p1 = Person("ajack",18)
p2 = Person("brose",17)
# print(p1 < p2)
# 要让Person对象 支持比较 并且 先按照年龄来比较
# 具备优先级的队列 取出数据时 会比较大小 越小优先级越高
q = PriorityQueue()
# q.put((10,2))
# q.put((10,20))
# q.put((10,))
# print(q.get())
#
# p1 = Person()
# p2 = Person()
q.put(p1)
q.put(p2)
print(q.get().name)