zoukankan      html  css  js  c++  java
  • 多线程(二)

    多线程(二)

    1.死锁现象与递归锁

    死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
    
    from threading import Thread,Lock
    import time
    
    lock_A = Lock()
    lock_B = Lock()
    
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
            
        def f1(self):
            lock_A.acquire()
            print(f'{self.name}拿到了A锁')
            lock_B.acquire()
            print(f'{self.name}拿到了B锁')
            lock_B.release()
            lock_A.release()
            
        def f2(self):
            lock_B.acquire()
            print(f'{self.name}拿到了B锁')
            time.sleep(0.1)
            lock_A.acquire()
            print(f'{self.name}拿到了A锁')
            lock_A.release()
            lock_B.release()
            
    if __name__ == '__main__':
        for i in range(3):
            t = MyThread()
            t.start()
    
    #Thread-1拿到了A锁
    #Thread-1拿到了B锁
    #Thread-1拿到了B锁
    #Thread-2拿到了A锁
    解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
    
    这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
            
            
    from threading import Thread,RLock
    import time
    
    lock_A = lock_B = RLock()
    #递归锁有一个计数的功能,原数字为0,上一次锁,计数+1,释放一次锁,计数-1
    #只要递归锁上面的数字不为0,其他线程就不能抢锁
    
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
            
        def f1(self):
            lock_A.acquire()
            print(f'{self.name}拿到了A锁')
            lock_B.acquire()
            print(f'{self.name}拿到了B锁')
            lock_B.release()
            lock_A.release()
            
        def f2(self):
            lock_B.acquire()
            print(f'{self.name}拿到了B锁')
            time.sleep(0.1)
            lock_A.acquire()
            print(f'{self.name}拿到了A锁')
            lock_A.release()
            lock_B.release()
            
    if __name__ == '__main__':
        for i in range(3):
            t = MyThread()
            t.start()        
            
    #Thread-1拿到了A锁
    #Thread-1拿到了B锁
    #Thread-1拿到了B锁
    #Thread-1拿到了A锁
    #Thread-2拿到了A锁
    #Thread-2拿到了B锁
    #Thread-2拿到了B锁
    #Thread-2拿到了A锁
    #Thread-3拿到了A锁
    #Thread-3拿到了B锁
    #Thread-3拿到了B锁
    #Thread-3拿到了A锁
    

    2.信号量

    Semaphore管理一个内置的计数器,
    每当调用acquire()时内置计数器-1;
    调用release() 时内置计数器+1;
    计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
    
    实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):
    
    from threading import Thread, Semaphore, current_thread
    import time
    import random
    sem = Semaphore(5)
    
    def task():
        sem.acquire()
    
        print(f'{current_thread().name} 厕所ing')
        time.sleep(random.randint(1,3))
    
        sem.release()
    
    
    if __name__ == '__main__':
        for i in range(20):
            t = Thread(target=task,)
            t.start()
    

    3.GIL全局解释器锁

    理论上:单个进程的多线程可以利用多核,但是Cpython解释器给进入解释器的线程加了锁

    加锁的原因:

    1.当时处于单核时代,且cpu价格昂贵

    2.如果不加全局解释器锁,则程序员需要在源码内部各种主动加锁,解锁,出现死锁现象,则直接在进入解释器时给线程加了锁

    优点:保证了Cpython解释器的数据资源的安全

    缺点:单个进程的多线程不能利用多核

    结论:在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势
    
    首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython,PyPy就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL
    
    
    GIL介绍:
        GIL本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。
    
      可以肯定的一点是:保护不同的数据的安全,就应该加不同的锁。
    	要想了解GIL,首先确定一点:每次执行python程序,都会产生一个独立的进程。例如python test.py,python aaa.py,python bbb.py会产生3个不同的python进程
    
    #验证python test.py只会产生一个进程
    #test.py内容
    import os,time
    print(os.getpid())
    time.sleep(1000)
    
    python3 test.py 
    #在windows下
    tasklist |findstr python
    #在linux下
    ps aux |grep python
    
    在一个python的进程内,不仅有test.py的主线程或者由该主线程开启的其他线程,还有解释器开启的垃圾回收等解释器级别的线程,总之,所有线程都运行在这一个进程内,毫无疑问
    
    #1 所有数据都是共享的,这其中,代码作为一种数据也是被所有线程共享的(test.py的所有代码以及Cpython解释器的所有代码)
    例如:test.py定义一个函数work(代码内容如下图),在进程内所有线程都能访问到work的代码,于是我们可以开启三个线程然后target都指向该代码,能访问到意味着就是可以执行。
    
    #2 所有线程的任务,都需要将任务的代码当做参数传给解释器的代码去执行,即所有的线程要想运行自己的任务,首先需要解决的是能够访问到解释器的代码。
    
    综上:
    
      如果多个线程的target=work,那么执行流程是
    
      多个线程先访问到解释器的代码,即拿到执行权限,然后将target的代码交给解释器的代码去执行
    
      解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题:对于同一个数据100,可能线程1执行x=100的同时,而垃圾回收执行的是回收100的操作,解决这种问题没有什么高明的方法,就是加锁处理,如下图的GIL,保证python解释器同一时间只能执行一个任务的代码
            
    IO密集型:适合单个进程的多线程,并发执行
    计算密集型:适合多进程的并行
    

    4.GIL锁与lock锁的区别

    GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理
    
    相同点:都是同种锁(互斥锁)
    不同点:
        1.GIL全局解释器锁保护解释器内部的资源数据安全
        2.GIL锁的上锁释放无需手动操作
        3.自己代码中定义的互斥锁保护进程中的资源数据的安全
        4.自己定义的互斥锁必须手动上锁和释放
    

    5.验证计算密集型与IO密集型的效率

    计算密集型:
    
    from threading import Thread
    from multiprocessing import Process
    import time,random
    
    def task():
        count = 0
        for i in range(100000000):
            count += 1
    
    #多进程的并发,并行
    if __name__ == '__main__':
        start_time = time.time()
        l1 = []
        for i in range(4):
            p = Process(target = task)
            l1.append(p)
            p.start()
        for i in l1:
            p.join()
        print(f"执行效率:{time.time() - start_time}")
        
    #执行效率:6.620556354522705
        
    #多线程的并发
    if __name__ == '__main__':
        start_time = time.time()
        l1 = []
        for i in range(4):
            p = Thread(target = task)
            l1.append(p)
            p.start()
        for i in l1:
            p.join()
        print(f"执行效率:{time.time() - start_time}")
        
    #执行效率:17.61694884300232
    
    总结:计算密集型:多进程的并发并行执行效率高
            
            
            
    IO密集型:
        
    from threading import Thread
    from multiprocessing import Process
    import time,random
    def task():
        count = 0
        time.sleep(random.randint(1,3))
        count += 1
    
    #多进程的并发,并行
    if __name__ == '__main__':
        start_time = time.time()
        l1 = []
        for i in range(50):
            p =Process(target = task)
            l1.append(p)
            p.start()
        for i in l1:
            p.join()
        print(f"执行效率:{time.time() - start_time}")
        
    #执行效率:3.101606607437134
    
    if __name__ == '__main__':
        start_time = time.time()
        l1 = []
        for i in range(50):
            p =Thread(target = task)
            l1.append(p)
            p.start()
        for i in l1:
            p.join()
        print(f"执行效率:{time.time() - start_time}")
        
    #执行效率:2.010244846343994
    
    总结:IO密集型:单个进程的多线程的并发执行效率高
            
            
            
    

    6.多线程实现socket通信

    server端:
        
    import socket
    from threading import Thread
    
    def communicate(conn,addr):
        while 1:
            try:
                from_client_data = conn.recv(1024)
                print(f'来自客户端{addr[1]}的消息: {from_client_data.decode("utf-8")}')
                to_client_data = input('>>>').strip()
                conn.send(to_client_data.encode('utf-8'))
            except Exception:
                break
        conn.close()
    
    
    
    def _accept():
        server = socket.socket()
    
        server.bind(('127.0.0.1', 8848))
    
        server.listen(5)
    
        while 1:
            conn, addr = server.accept()
            t = Thread(target=communicate,args=(conn,addr))
            t.start()
    
    if __name__ == '__main__':
        _accept()    
    
    client端:
        
    import socket
    
    client = socket.socket()
    
    client.connect(('127.0.0.1',8848))
    
    while 1:
        try:
            to_server_data = input('>>>').strip()
            client.send(to_server_data.encode('utf-8'))
    
            from_server_data = client.recv(1024)
            print(f'来自服务端的消息: {from_server_data.decode("utf-8")}')
    
        except Exception:
            break
    client.close()
    

    7.进程池和线程池

    创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务(高级一些的进程池可以根据你的并发量,搞成动态增加或减少进程池中的进程数量的操作),不会开启其他进程,提高操作系统效率,减少空间的占用等。
    
    线程池: 一个容器,这个容器限制住你开启线程的数量,比如4个,第一次肯定只能并发的处理4个任务,只要有任务完成,线程马上就会接下一个任务.
    #以时间换空间
    
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import os,time,random
    
    def task():
        print(f"{os.getpid()}工作")
        time.sleep(random.randint(1,3))
    
    #开启进程池(并发或并行)
    if __name__ == '__main__':
        p = ProcessPoolExecutor() #默认不写,进程池里面的进程数与本机cpu数量相等
        for i in range(20):
            p.submit(task)
            
            
    #开启线程池(并发)
    if __name__ == '__main__':
        t = ThreadPoolExecutor() #默认不写,线程池里的线程数为本机cpu个数*5
        for i in range(20):
            t.submit(task)
    
  • 相关阅读:
    openOPC与监控页面二
    Node教程——Gulp前端构建工具-教程
    回到顶部插件
    《软件测试52讲》——测试基础知识篇
    计算贝塞尔曲线上点坐标
    少年,不要滥用箭头函数啊
    JS属性defer
    leetcode-572-另一个树的子树
    leetcode-9.-回文数
    leetcode-300-最长上升子序列
  • 原文地址:https://www.cnblogs.com/tutougold/p/11402553.html
Copyright © 2011-2022 走看看