zoukankan      html  css  js  c++  java
  • 第五十五篇 死锁、GIL锁以及Pool

    一、死锁

    1.死锁现象

    1.定义:死锁指的是某个资源被占用后一直得不到释放,导致其他需要这个资源的线程进入阻塞状态

    2.产生死锁的原因:

    from threading import Lock, Thread
    import time
    
    • 1.对同一把互斥锁加锁(acquire)多次
    mutex = Lock()
    # 加锁两次,没有释放,程序无法向下执行
    mutex.acquire()
    mutex.acquire()
    
    • 2.一个共享资源,要访问必须同时具备多把锁,但是这些锁被不同的线程或进程所持有,就会导致相互等待对方释放,进而导致程序卡死
    w = Lock()
    k = Lock()
    
    def eat1():
    	# 抢到了其中一把锁
    	w.acquire()
    	time.sleep(0.2)   # 睡眠一下
    	# 由于时间太慢无法抢到第二把锁,而第一把锁也没有释放,所以两个线程都会等待
    	k.acquire()
    	print('我  开吃了')
    	k.release()
    	w.release()
    	
    def eat2():
    	# 抢到了另一把锁
    	k.acquire()
    	time.sleep(0.2)
    	# 只有同时拥有两把锁才能向下执行,等待对方释放
    	w.acquire()
    	print('你  开吃了')
    	k.release()
    	w.release()
    	
    t1 = Thread(target=eat1)
    t2 = Thread(target=eat2)
    
    t1.start()
    t2.start()
    
    # 最终都无法执行
    

    3.解决方法:

    • 1.给acquire加上超时限制,可以保证线程不会卡死
    l = Lock()
    l.acquire()
    l.acquire(timeout=3) 
    
    • 2抢锁一定要按照相同的顺序去抢
    • 3.递归锁:只是解决了多次给同一把锁加锁也能向下执行的问题

    2.递归锁(可重入锁)

    import time
    from threading import Thread, RLock, currentThread
    # currentThread = current_thread
    r = RLock()
    

    1.特点:同一个线程可以对这个锁执行多次acquire,而不会造成卡死

    # 对同一把锁加锁两次
    r.acquire()
    r.acquire()
    print('递归锁,可以锁多次')  # 任然可以执行全部代码
    

    2.注意:同一个线程必须保证,加锁的次数和解锁的次数相同,其他线程才能抢到这把锁

    def task1():
    	# 加锁几次就解锁几次,以便后面的线程使用控制台资源
    	time.sleep(2)
    	r.acquire()
    	r.acquire()
    	print(currentThread().name)
    	r.release()
    	r.release()
    	
    def task2():
    	time.sleep(2)
    	r.acquire()
    	r.acquire()
    	print(currentThread().name)
    	r.release()
    	r.release()
    
    # 多线程并发,谁先抢到锁,谁就先执行
    Thread(target=task1).start()
    Thread(target=task2).start()
    

    3.信号量

    1.信号量可以限制同时并发执行公共代码的线程数量
    2.如果限制数量为1,则与普通互斥锁没有区别
    3.注意:信号量不是用来解决安全问题的,而是用于限制最大的并发量

    from threading import Semaphore, currentThread, Thread
    import time
    
    # 限制同时访问公共代码的线程数量为5
    sp = Semaphore(5)
    
    def task():
    	sp.acquire()
    	time.sleep(1)
    	print(currentThread().name)
    	s.release()
    	
    for i in range(10)
    	# 开启十个线程,但是同一时间只有5个线程可以共同使用公共代码
    	Thread(target=task).start()
    

    二、GIL(全局解释器锁)

    1.什么是GIL

    1.GIL(全局解释器锁):在CPython中,防止多个线程在同一时间执行python字节码的一个互斥锁

    2.特点:GIL是非常重要的,因为CPython的内存管理是非线程安全的,很多其他特性都依赖于GIL锁,所以即使它影响了程序效率,也无法将其去除

    3.总结:在CPython中,GIL会把线程的并行变成并发,导致效率降低

    4.延申:需要知道的是,解释器并不只有CPython,还有PyPy 、JPython等等。GIL也仅存在于CPython中,这并不是python语言的问题,而是CPython解释器的问题

    2.GIL带来的问题

    1.单个线程开启流程

    1.执行python文件的三个步骤:

    • 1.从硬盘加载python解释器到内存
    • 2.从硬盘加载py文件到内存
    • 3.解释器解析py文件内容,交给CPU执行

    2.注意:每当执行一个py文件,就会立即启动一个python解释器

    3.执行python文件时的内存结构图,如下

    2.多个线程开启流程

    1.GIL叫做全局解释器锁,是用于加到解释器上的一把互斥锁,那么这把锁就会对应用程序有所影响

    2.py文件中的内容本质都是字符串,只有在被解释器解释时,才具备语法意义,解释器会将py代码翻译为当前系统支持的指令交给系统去执行

    3.当进程中仅存在一条线程时,GIL锁的存在不会影响效率,但是如果进程中有多个线程时,GIL锁就会发挥它的作用了

    4.解释:

    • 1.开启子线程时,给子线程指定了一个target,表示该子线程要处理的任务(即要执行的代码),代码要执行则必须交由解释器,即多个线程之间就需要共享解释器,为了避免共享带来的数据竞争问题,于是就需要给解释器加上互斥锁
    • 2.由于互斥锁的特性,程序串行,保证数据的安全,降低执行效率,GIL使得程序整体效率降低

    3.为什么需要GIL

    1.GC线程

    1.python程序(进程:python.exe)本质上就是一堆字符串,所以运行一个python程序时,必须开启一个解释器,但是在一个python程序中只有一个解释器,当有多个线程要执行时,就会产生线程安全问题

    2.是不是我们不开启子线程就没有问题呢,答案是否定的。在使用python解释器编程时,程序员无需参与内存的管理工作,这是因为python有自带的内存管理机制,简称GC

    3.python中的垃圾回收就是GC参与完成的,当内存占用达到某个阈值时,GC会将其他线程挂起,然后执行垃圾清理操作,执行这个操作的GC本身也是一串代码,也即需要开启一个线程来执行

    4.也就是说,就算程序没有自己开启线程,内部也会有多个线程,GC线程与我们程序中的线程竞争解释器资源,就会产生安全问题

    示例:
       1.假设线程A要定义一个变量 a = 100,那么它的步骤是:1.先申请一块内存空间,并把数据100放进去;2.将100的内存地址与变量名a进行绑定,引用计数加一
       2.如果线程A进行到第一步完成时,CPU切换到了GC线程,GC发现100的地址的引用计数为0,就会将它当成垃圾清理掉,等CPU再次切换到线程A时,刚刚保存的数据100就没有了,导致定义变量失败
       3.当然其他一些涉及到内存的操作同样可能产生问题,为了避免GC与其他线程竞争解释器带来的安全问题,CPython简单粗暴的给解释器加了互斥锁
    

    2.GIL带来的问题

    1.互斥锁的特性使得多线程无法并行,只能并发

    2.详细解释:GIL是以把互斥锁,互斥锁只能让线程来回切换,导致效率降低,因此,在CPython中即使开启了多线程,而且是多核CPU,也是无法执行多线程并行的,因为在一个进程中只有一个解释器,而且同一时间只能有一个任务在执行(由于GIL锁的缘故)

    3.如何解决GIL锁导致的效率问题

    • 1.没办法解决,只能尽可能的避免GIL锁的影响
    • 2.使用多进程能够实现并行,从而更好的利用多核CPU
    • 3.对任务进行区分:IO操作多的可以尽量使用多线程;计算密集型的任务尽量使用多进程

    4.我们不能因为CPython对于多线程无法实现并行,就否定python这门语言,因为:

    • 1.GIL仅仅在CPython解释器中存在,在其他的解释器中没有,并不是Python这门语言的缺点

    • 2.在单核处理器下,多线程之间本来就无法真正的并行执行

    • 3.在多核处理下,运算效率的确是比单核处理器高,但是要知道现代应用程序多数都是基于网络的(qq,微信,爬虫,浏览器等IO密集型程序),CPU的运行效率是无法决定网络速度的,而网络的速度是远远比不上处理器的运算速度,则意味着每次处理器在执行运算前都需要等待网络IO,这样一来多核优势也就没有那么明显了

    5.对于GIL锁产生的问题的总结:

    • 1.单核下无论是IO密集还是计算密集GIL都不会产生任何影响

    • 2.多核下对于IO密集任务,GIL会有细微的影响,基本可以忽略

    • 3.Cpython中IO密集任务应该采用多线程,计算密集型应该采用多进程

    • 4.之所以广泛采用CPython解释器,就是因为大量的应用程序都是IO密集型的,还有另一个很重要的原因是CPython可以无缝对接各种C语言实现的库,这对于一些数学计算相关的应用程序而言非常的happy,直接就能使用各种现成的算法

    3.GIL锁的作用

    有了GIL之后,多个线程将不可能在同一时间使用解释器,从而保证了解释器的数据安全,因此CPython中的内存管理就是线程安全的了

    4.关于GIL的性能

    1.GIL的加锁与解锁时机

    1.加锁时机:在调用解释器时立即加锁

    2.解锁时机:

    • 1.当前线程遇到了IO操作时,则释放GIL锁
    • 2.当前线程执行时间超过设定值(阈值),则释放锁

    2.性能测试

    from multiprocessing import Process
    from threading import Thread
    import time
    

    1.IO密集型(如浏览网页)

    • 1.计算任务非常少,大部分时间都在等待IO操作
    • 2.由于网络IO速度对比CPU处理速度非常慢,多线程并发并不会有太大影响,另外如果有大量客户端连接服务,多进程根本开不起来
    # 读写文件的操作也是IO操作,和输入输出类似
    def task():
    	# 利用多线程/进程循环打开文件
    	for i in range(150):
    		with open(r'test.py', 'r', encoding='utf-8') as f:
    			f.read()
    
    
    # 记录进程开始的时间
    start_time = time.time()
    	
    tl = list()   # 用于将实例化的进程对象/线程对象放入容器
    for i in range(10):
    	# p = Process(target=task)  # 使用多进程时,必须在main判断下进行
    	t = Thread(target=task)
    	t.start()
    	tl.append(t)
    		
    	# 遍历列表
    	for j in tl:
    		j.join()  # 无论是哪个进程对象/线程对象,主进程/主线程都会等待它们执行完,再运行自身
    		
    	# 计算从进程开始到进程结束所花费的时间
    	print(time.time() - start_time)
    

    2.计算密集型(比如人脸识别、图像处理)

    • 1.基本没有IO操作,大部分时间都在计算
    • 2.由于多线程不能并行,应当使用多进程,将计算任务分给不同的CPU
    • 3.其实更适合C语言这种运算速度极快的语言(适用于解析高清视频)
    def task():
    	for i in range(1000000):
    		a = 6 + 6
    		
    if __name__ == '__main__':
    	start_time = time.time()
    	
    	pl = list()
    	for i in range(6):
    		p = Process(target=task)
    		# t = Thread(target=task)   # 多线程不需要在main判断下进行
    		p.start()
    		pl.append(p)
    		
    	for j in pl:
    		j.join()
    		
    	print(time.time() - start_time)
    

    3.GIL与自定义锁的区别

    1.GIL保护的是解释器级别的数据安全,比如对象的引用计数、垃圾分代数据

    2.自定义锁保护的是解释器之外的共享资源的安全,比如硬盘上的文件、控制台,所以当程序中出现了共享自定义的数据时,就需要自己加锁

    from threading import Thread, Lock
    import time   # 为了模拟多线程竞争共享资源而导致数据错乱,需要导入时间模块来控制两个线程的速度
    
    a = 0
    mutex = Lock
    
    def task():
    	# mutex.acquire()
    	global a 
    	temp = a 
    	# 如果不加锁,则两个线程都会运行到这里等待,它们获取的temp都是0,因为第一个线程还没有改变a的值
    	time.sleep(0.1)  
    	a = temp + 1
    	# mutex.release()
    	
    t1 = Thread(target=task)
    t2 = Thread(target=task)
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    print(a)
    
    # 如果不加锁,则a为1;加锁之后由于每次只有一个线程会执行共享代码(数据),所以a的值会被加两次,则a为2
    

    三、线程池与进程池

    import os
    import time
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    from threading import activeCount, enumerate, currentThread
    

    1.线程池

    1.池表示一个容器,线程池本质上就是一个存储线程的列表

    2.如果是IO密集型任务,则使用线程池

    # 创建一个线程池,指定最多可以容纳的线程数量
    pool = ThreadPoolExecutor(10)  # 自定义一个最多可以容纳10个线程的线程池(如果不指定,则默认为CPU的个数乘以5)
    
    def task():
    	time.sleep(1)   # 让多个线程在同一起跑线,并打印自己的名字
    	print(currentThread().name)
    	
    # 提交任务到池子中
    pool.submit(task)
    pool.submit(task)
    pool.submit(task)
    
    print(active_count()) # 存活的线程数量(包括主线程)
    print(enumerate())  # 当前存活线程信息(列表形式)
    
    '''
    4
    [<_MainThread(MainThread, started 19612)>, <Thread(ThreadPoolExecutor-0_0, started daemon 42040)>, <Thread(ThreadPoolExecutor-0_1, started daemon 47792)>, <Thread(ThreadPoolExecutor-0_2, started daemon 47232)>]
    ThreadPoolExecutor-0_1
    ThreadPoolExecutor-0_0
    ThreadPoolExecutor-0_2
    '''
    

    2.进程池

    def task():
        time.sleep(1)
        print(os.getpid(), os.getppid())
    
    if __name__ == '__main__':
        pool = ProcessPoolExecutor(5)  # 如果不指定个数,则默认为CPU的个数
        pool.submit(task)
        pool.submit(task)
        pool.submit(task)
        pool.submit(task)
        pool.submit(task)
        print(os.getpid())
        
    '''
    51012
    50860 51012
    50512 51012
    51024 51012
    50540 51012
    51180 51012
    '''
    

    3.线程池与进程池

    1.为什么要使用线程池/进程池

    • 1.可以避免频繁的创建和销毁(进程/线程),来降低对资源的开销
    • 2.可以限制同时存在的线程数量,以保证服务器不会因为资源不足而崩溃
    • 3.帮我们管理线程/进程的生命周期
    • 4.管理任务的分配

    2.注意:如果进程不结束,池子里面的进程/线程也会一直存活

    四、同步与异步

    1.回顾

    1.程序的运行状态:阻塞与非阻塞

    2.处理任务的方式:并行、并发、串行

    3.提交任务的方式:同步、异步

    2.同步

    1.同步(指的是调用):提交任务后必须在原地等待,直到任务结束才能执行下面的代码

    2.同步会有等待的效果,但是和阻塞完全不同,阻塞时程序会被剥夺CPU执行权,而同步调用则不会

    def task():
    	for i in range(1000000):
    		6 + 6
    
    print('start...')
    task()  # 不是阻塞,而是在进行大量的计算,称为同步执行
    print('end')  # 要等到上一行代码执行完毕
    

    3.异步

    1.异步相关概念

    1.异步(异步调用):发起任务后不用等待任务执行完毕,可以立即开启执行其他操作

    2.异步效率高于同步,但是会出现另一个问题,就是任务发起方不知道任务合适处理完成

    2.解决异步无法知晓任务状态的问题

    1.轮询:每隔一段时间就询问一次(效率低、无法及时获取结果)

    # 不推荐轮询的方法
    from threading import Thread
    import time
    
    is_start = False
    
    def server_task():
    	global is_start
    	print('服务器正在启动...')
    	time.sleep(2)
    	print('服务器启动成功')
    	is_start = True
    	
    def client_task():
    	while True:
    		time.sleep(0.2)
    		if is_start:
    			print('连接成功')
                break
    		else:
    			print('请耐心等待...')
    			
    t1 = Thread(target=server_task)
    t2 = Thread(target=client_task)
    
    t1.start()
    t2.start()
    
    print('异步——轮询方法')
    

    2.异步回调:让任务的执行方主动通知任务的执行状态(可以及时拿到任务的结果)

    • 1.方法一:定义一个回调函数,用来反映异步调用的子线程完成之后的状态(结果)
    from threading import Thread
    
    # 具体的任务
    def task(callback):
    	print('子线程start')
    	for i in range(1000000):
    		6 + 7
    	
    	callback('子线程end')
    	
    # 回调函数(参数是表示任务的结果)
    def call_back(res):
    	print(res)
    	
    print('主线程 start')
    t = Thread(target=task, args=(call_back,))
    t.start()
    print('主线程 end')
    
    • 2.方法二:利用线程池中的add_done_callback方法来绑定回调函数
    from concurrent.futures import ThreadPoolExecutor
    import time
    
    # 具体的任务
    def task():
    	time.sleep(2)
    	print('子线程end')
    	return 'ok'
    	
    # 回调函数(参数是表示任务的结果)
    def call_back(arg):
    	print(arg)   # <Future at 0x1345dd8e9b0 state=finished returned str>
    	print(arg.result())  # ok
    
    pool = ThreadPoolExecutor(10)
    res = pool.submit(task)  # 异步提交方式
    print(res)
    # print(res.result()) # result是阻塞函数,它会阻塞到任务执行完毕为止
    res.add_done_callback(call_back)  # 为这个任务绑定回调函数
    
    print('主线程 end')
    
    
    # 使用案例
    def task(num):
    	time.sleep(1)
    	print(num)
    	return 'ok'  # 返回值就包含在res这个对象中
    	
    def callback(obj): 
    	print(obj.result())   # 绑定的回调函数会接收返回值对象res,它是一个对象,只能通过打印 对象.result() 才能得到任务的返回值
    	
    pool = ThreadPoolExecutor()
    res = pool.submit(task, 666)  # res接收的是一个返回值对象(区别于函数返回值)
    res.add_done_callback(callback)
    
    print('over')
    

    3.异步回调详解

    1.定义:在发起一个异步任务的同时指定一个函数,在异步任务完成时会自动的调用这个函数

    2.为什么需要异步回调

    • 之前在使用线程池或进程池提交任务时,如果想要处理任务的执行结果则必须调用result函数或是shutdown函数,而它们都是是阻塞的,会等到任务执行完毕后才能继续执行,这样一来在这个等待过程中就无法执行其他任务,降低了效率,所以需要一种方案,即保证解析结果的线程不用等待,又能保证数据能够及时被解析,该方案就是异步回调

    3.总结

    • 异步回调使用方法就是在提交任务后得到一个Futures对象,调用对象的add_done_callback来指定一个回调函数

    4.注意:

    • 1.使用进程池时,回调函数都是主进程中执行执行
    • 2.使用线程池时,回调函数的执行线程是不确定的,哪个线程空闲就交给哪个线程
    • 3.回调函数默认接收一个参数就是这个任务对象自己,再通过对象的result函数来获取任务的处理结果

    5.异步回调的应用

    import requests,re,os,random,time
    from concurrent.futures import ProcessPoolExecutor
    
    def get_data(url):
        print("%s 正在请求%s" % (os.getpid(),url))
        time.sleep(random.randint(1,2))
        response = requests.get(url)
        print(os.getpid(),"请求成功 数据长度",len(response.content))
        #parser(response) # 3.直接调用解析方法  哪个进程请求完成就那个进程解析数据  强行使两个操作耦合到一起了
        return response
    
    def parser(obj):
        data = obj.result()
        htm = data.content.decode("utf-8")
        ls = re.findall("href=.*?com",htm)
        print(os.getpid(),"解析成功",len(ls),"个链接")
    
    if __name__ == '__main__':
        pool = ProcessPoolExecutor(3)
        urls = ["https://www.baidu.com",
                "https://www.sina.com",
                "https://www.python.org",
                "https://www.tmall.com",
                "https://www.mysql.com",
                "https://www.apple.com.cn"]
        # objs = []
        for url in urls:
            # res = pool.submit(get_data,url).result() # 1.同步的方式获取结果 将导致所有请求任务不能并发
            # parser(res)
    
            obj = pool.submit(get_data,url) # 
            obj.add_done_callback(parser) # 4.使用异步回调,保证了数据可以被及时处理,并且请求和解析解开了耦合
            # objs.append(obj)
            
        # pool.shutdown() # 2.等待所有任务执行结束在统一的解析
        # for obj in objs:
        #     res = obj.result()
        #     parser(res)
        # 1.请求任务可以并发 但是结果不能被及时解析 必须等所有请求完成才能解析
        # 2.解析任务变成了串行,
    

    五、线程事件Event

    1.什么是事件

    1.事件表示在某个时间发生了某个事情的通知信号,用于线程间的协同工作

    2.作用:因为不同线程之间是独立运行的状态,不可预测,所以一个线程与另一个线程间的数据是不同步的,当一个线程需要利用另一个线程的状态来确定自己的下一步操作时,就必须保持线程间数据的同步,Event就可以实现线程间同步

    2.Event相关概念

    1.Event对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生
    2.在初始情况下,Event对象中的信号标志被设置为假,如果有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞,直到该标志为真
    3.一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程
    4.如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,继续执行

    3.Event对象的方法

    event.isSet():返回event的状态值;等价于event.is_set()
    event.wait():将阻塞线程;直到event的状态为True
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    event.clear():恢复event的状态值为False。
    

    4.如何使用Event

    通过wait函数阻塞当前线程,直到Event对象的状态从False变为True

    
    from threading import Thread, Event
    import time
    
    e = Event()  # 实例化一个事件对象,它的初始值时False
    
    def start_server():
    	print('server loading...')
    	time.sleep(2)
    	print('server start')
    	e.set()  # 当服务器线程执行完时,事件对象通过set方法将其状态(bool值)标为True
    	
    def connect_task():
    	e.wait()  # 在并发的多线程中,连接线程由于事件对象的wait方法,会一直处于阻塞状态,直到事件对象的bool值为True时,才会变为非阻塞
    	if e.is_set():  # is_set方法可以获取事件对象的状态
    		print('connect sucessful')
    		
    t1 = Thread(target=start_server)
    t2 = Thread(target=connect_task)
    
    t1.start()
    t2.start()
    
  • 相关阅读:
    20169305 2016-2017-2《网络攻防技术与实践》课程总结
    20169305 2016-2017-2 《网络攻防技术与实践》第11周学习总结
    20169305 2016-2017-2 《网络攻防技术与实践》第10周学习总结
    20169305 2016-2017-2《网络攻防与实践》第九周 学习总结
    20169305 2016-2017-2 《网络攻防技术与实践》第8周学习总结
    20169305 2016-2017-2 《网络攻防技术与实践》第7周学习总结
    20169305 2016-2017-2 《网络攻防技术与实践》第6周学习总结
    20169305 2016-2017-2 《网络攻防技术与实践》第5周学习总结
    20169305 2016-2017-2《网络攻击与防范》第四周学习总结
    20169305 2016-2017-2《网络攻击与防范》第三周学习总结
  • 原文地址:https://www.cnblogs.com/itboy-newking/p/11185355.html
Copyright © 2011-2022 走看看