zoukankan      html  css  js  c++  java
  • Python--(并发编程之线程Part2)

    
    

    GIL只能保证垃圾回收机制的安全,进程中的数据安全还是需要自定义锁

    线程执行代码首先要抢到GIL全局锁,假设线程X首先抢到,以为要抢到自定义锁要执行代码,所以这个线程在执行代码的时候就很容抢到了自定义锁,当线程在执行代码的的时候遇到IO操作就会被CPU检测到,并且夺回CPU的执行权限,这个线程就释放了GIL全局锁,其他线程就开始抢GIL全局锁,但是即便是抢到了GIL全局锁,但是自定义的锁还在那个线程那里,所以那个线程做完IO操作,其他线程还是要把GIL全局锁还回去,他才能执行剩下的代码释放自定义锁,于是这个线程又拿到了GIL全局锁,执行完代码后又释放了GIL锁,在释放自定义锁的时候又加入了抢GIL锁的大军中.....

    这样子就保证了进程中数据的安装

    1. GIL全局解释器锁(******)
        2. 死锁与递归锁
        3. 信号量
        4. Event事件
    5. 线程queue
    一、GIL全局锁
    运行test.py的流程:
    a、将python解释器的代码从硬盘读入内存
    b、将test.py的代码从硬盘读入内存  (一个进程内装有两份代码)
    c、将test.py中的代码像字符串一样读入python解释器中解析执行
    1 、GIL:全局解释器锁 (CPython解释器的特性)
    In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
    native threads from executing Python bytecodes at once. This lock is necessary mainly
    because CPython’s memory management (垃圾回收机制,由解释器定期执行)is not thread-safe(如果不是串行改数据,当x=10的过程中内存中产生一个10,还没来的及绑定x,就有可能被垃圾回收机制回收).However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
    GIL本质就是一把夹在解释器身上的互斥锁(执行权限)。同一个进程内的所有线程都需要先抢到GIL锁,才能执行解释器代码
    
    2、GIL的优缺点:
    优点:保证Cpython解释器内存管理的线程安全
    缺点:在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,也就说Cpython解释器的多线程无法实现并行无法利用多核优势
    
    注意:
    a、GIL不能并行,但有可能并发,不一定为串行。因为串行是一个任务完完全全执行完毕后才进行下一个;而cpython中,一个线程在io时,被CPU释放时,会被强行取消GIL的使用权限
    b、多核(多CPU)的优势是提升运算效率
    c、计算密集型--》使用多进程,以用上多核
    d、IO密集型--》使用多线程
    二、Cpython解释器并发效率验证
    1、计算密集型应该使用多进程
    from multiprocessing import Process
    from threading import Thread
    
    import time
    # import os
    # print(os.cpu_count())  #查看cpu个数
    
    def task1():
        res=0
        for i in range(1,100000000):
            res+=i
    
    def task2():
        res=0
        for i in range(1,100000000):
            res+=i
    
    def task3():
        res=0
        for i in range(1,100000000):
            res+=i
    
    def task4():
        res=0
        for i in range(1,100000000):
            res+=i
    
    if __name__ == '__main__':
        # p1=Process(target=task1)
        # p2=Process(target=task2)
        # p3=Process(target=task3)
        # p4=Process(target=task4)
    
        p1=Thread(target=task1)
        p2=Thread(target=task2)
        p3=Thread(target=task3)
        p4=Thread(target=task4)
        start_time=time.time()
        p1.start()
        p2.start()
        p3.start()
        p4.start()
        p1.join()
        p2.join()
        p3.join()
        p4.join()
        stop_time=time.time()
        print(stop_time - start_time)
    
    2、IO密集型应该使用多线程
    from multiprocessing import Process
    from threading import Thread
    
    import time
    
    def task1():
        time.sleep(3)
    
    def task2():
        time.sleep(3)
    
    def task3():
        time.sleep(3)
    
    def task4():
        time.sleep(3)
    
    if __name__ == '__main__':
        # p1=Process(target=task1)
        # p2=Process(target=task2)
        # p3=Process(target=task3)
        # p4=Process(target=task4)
    
        # p1=Thread(target=task1)
        # p2=Thread(target=task2)
        # p3=Thread(target=task3)
        # p4=Thread(target=task4)
        # start_time=time.time()
        # p1.start()
        # p2.start()
        # p3.start()
        # p4.start()
        # p1.join()
        # p2.join()
        # p3.join()
        # p4.join()
        # stop_time=time.time()
        # print(stop_time - start_time) #3.138049364089966
    
        p_l=[]
        start_time=time.time()
    
        for i in range(500):
            p=Thread(target=task1)
            p_l.append(p)
            p.start()
    
        for p in p_l:
            p.join()
    
    print(time.time() - start_time)
    三、线程互斥锁与GIL对比
    GIL能保护解释器级别代码(和垃圾回收机制有关)但保护不了其他共享数据(比如自己的代码)。所以在程序中对于需要保护的数据要自行加锁
    
    from threading import Thread,Lock
    import time
    
    mutex=Lock()
    count=0
    
    def task():
        global count
        mutex.acquire()
        temp=count
        time.sleep(0.1)
        count=temp+1
        mutex.release()
    
    if __name__ == '__main__':
        t_l=[]
        for i in range(2):
            t=Thread(target=task)
            t_l.append(t)
            t.start()
        for t in t_l:
            t.join()
    
        print('',count)
    
    四、基于多线程实现并发的套接字通信
    服务端:
    from socket import *
    from threading import Thread
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    
    tpool=ThreadPoolExecutor(3)  #进程和线程都不能无限多,导入模块来限制进程和线程池重点数目;进程线程池中封装了Process、Thread模块的功能
    
    def communicate(conn,client_addr):
        while True:  # 通讯循环
            try:
                data = conn.recv(1024)
                if not data: break
                conn.send(data.upper())
            except ConnectionResetError:
                break
        conn.close()
    
    def server():
        server=socket(AF_INET,SOCK_STREAM)
        server.bind(('127.0.0.1',8080))
        server.listen(5)
    
        while True: # 链接循环
            conn,client_addr=server.accept()
            print(client_addr)
            # t=Thread(target=communicate,args=(conn,client_addr))
            # t.start()
            tpool.submit(communicate,conn,client_addr)
    
        server.close()
    
    if __name__ == '__main__':
        server()
    
    客户端:
    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8080))
    
    while True:
        msg=input('>>>: ').strip()
        if not msg:continue
        client.send(msg.encode('utf-8'))
        data=client.recv(1024)
        print(data.decode('utf-8'))
    
    client.close()
    六、死锁与递归锁
    进程也有死锁与递归锁,在进程那里忘记说了,放到这里一切说了额
    所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
    from threading import Thread,Lockimport time
    mutexA=Lock()
    mutexB=Lock()
    class MyThread(Thread):
        def run(self):
            self.func1()
            self.func2()
        def func1(self):
            mutexA.acquire()
            print('33[41m%s 拿到A锁33[0m' %self.name)
    
            mutexB.acquire()
            print('33[42m%s 拿到B锁33[0m' %self.name)
            mutexB.release()
    
            mutexA.release()
    
        def func2(self):
            mutexB.acquire()
            print('33[43m%s 拿到B锁33[0m' %self.name)
            time.sleep(2)
    
            mutexA.acquire()
            print('33[44m%s 拿到A锁33[0m' %self.name)
            mutexA.release()
    
            mutexB.release()
    if __name__ == '__main__':
        for i in range(10):
            t=MyThread()
            t.start()
    '''
    Thread-1 拿到A锁
    Thread-1 拿到B锁
    Thread-1 拿到B锁
    Thread-2 拿到A锁
    然后就卡住,死锁了'''
    
    解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
    这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
    mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止
    
    七、信号量
    同进程的一样
    Semaphore管理一个内置的计数器,
    每当调用acquire()时内置计数器-1;
    调用release() 时内置计数器+1;
    计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
    实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):
    from threading import Thread,Semaphoreimport threadingimport time
    # def func():
    #     if sm.acquire():
    #         print (threading.currentThread().getName() + ' get semaphore')
    #         time.sleep(2)
    #         sm.release()
    def func():
        sm.acquire()
        print('%s get sm' %threading.current_thread().getName())
        time.sleep(3)
        sm.release()if __name__ == '__main__':
        sm=Semaphore(5)
        for i in range(23):
            t=Thread(target=func)
            t.start()
    
    与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程
    八、Event事件
    同进程的一样
    线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
    九、线程queue
    queue队列 :使用import queue,用法与进程Queue一样
    queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
    class queue.Queue(maxsize=0) #先进先出
    
    
    import queue
    
    q=queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    print(q.get())print(q.get())print(q.get())'''
    结果(先进先出):
    first
    second
    third'''
    
    class queue.LifoQueue(maxsize=0) #last in fisrt out 
    import queue
    
    q=queue.LifoQueue()
    q.put('first')
    q.put('second')
    q.put('third')
    print(q.get())print(q.get())print(q.get())'''
    结果(后进先出):
    third
    second
    first'''
    
    class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列
    
    import queue
    
    q=queue.PriorityQueue()#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((20,'a'))
    q.put((10,'b'))
    q.put((30,'c'))
    print(q.get())print(q.get())print(q.get())'''
    结果(数字越小优先级越高,优先级高的优先出队):
    (10, 'b')
    (20, 'a')
    (30, 'c')'''

  • 相关阅读:
    金融量化分析【day110】:Pandas-DataFrame读取与写入
    金融量化分析【day110】:Pandas-DataFrame索引和切片
    金融量化分析【day110】:Pandas的Series对象
    金融量化分析【day110】:NumPy-切片和索引
    金融量化分析【day110】:NumPy通用函数
    金融量化分析【day110】:NumPy多维数组
    金融量化分析【day110】:IPython介绍及简单操作
    金融量化分析【day110】:金融基础知识
    设计模式:单例模式
    设计模式:工厂方法模式
  • 原文地址:https://www.cnblogs.com/zhaijihai/p/9605425.html
Copyright © 2011-2022 走看看