zoukankan      html  css  js  c++  java
  • Python 第三十六章 生产者模式+递归锁+信号量+GIL全局+IO、计算密集型验证+线程池和进程池

    生产者和消费者模型

    """
    1.进程:生产者消费者模型
    编程思想:模型,设计模式,理论等等
    都是交给你一种编程的方法,以后遇到类似的情况,套用即可
    生产者消费者模型三要素
        生产者:产生数据的
        消费者:接收数据做进一步处理的
        容器:队列
    队列容器起到的作用:
    缓冲作用,平衡了生产力和消费力,解耦
    """
    from multiprocessing import Process
    from multiprocessing import Queue
    import time
    import random
    
    def producer(q,name):
        """
        生产者:生产包子
        :param q:
        :param name:
        :return:
        """
        for i in range(5):
            time.sleep(random.randint(1,2))
            res = f"{i}号"
            q.put(res)
            print(f'生产者{name}生产了{res}')
    
    def consumer(q,name):
        """
        消费者:吃包子
        :param q:
        :param name:
        :return:
        """
        while 1:
            try:
                food = q.get()
                time.sleep(random.randint(1,3))
                print(f'33[31;0m消费者{name}吃了{food}33[0m')
            except Exception:
                return
    
    if __name__ == '__main__':
        q = Queue()
        p1 = Process(target=producer,args=(q,'zs'))
        p2 = Process(target=consumer,args=(q,'ls'))
        p1.start()
        p2.start()
    # 输出:
    # 生产者zs生产了0号
    # 生产者zs生产了1号
    # 生产者zs生产了2号
    # 消费者ls吃了0号
    # 消费者ls吃了1号
    # 生产者zs生产了3号
    # 消费者ls吃了2号
    # 生产者zs生产了4号
    # 消费者ls吃了3号
    # 消费者ls吃了4号
    

    递归锁

    # 递归锁可以解决死锁的现象,业务需要多个锁时,优先考虑递归锁
    from threading import Thread
    from threading import Lock
    import time
    
    lock_A = Lock()
    lock_B = Lock()
    
    class MyThread(Thread):
        def run(self): # 必须是run方法
            self.f1()
            self.f2()
        def f1(self):
            lock_A.acquire()
            print(f'{self.name}拿到A锁') # 拿到A锁
            lock_B.acquire()
            print(f'{self.name}拿到B锁')  # 拿到B锁
            lock_B.release()
            lock_A.release()
    
        def f2(self):
            lock_B.acquire()
            print(f'{self.name}拿到B锁') # 拿到A锁
            time.sleep(0.1)
            lock_A.acquire()
            print(f'{self.name}拿到A锁')  # 拿到B锁
            lock_A.release()
            lock_B.release()
    
    if __name__ == '__main__':
        for i in range(3):
            t=MyThread()
            t.start()
    # 输出
    # Thread-1拿到A锁
    # Thread-1拿到B锁
    # Thread-1拿到B锁
    # Thread-2拿到A锁
    
    """
    thread 先抢到了A锁,此时t2和t3也想抢A锁,但是只能等待,等待t1释放A锁
    t1又抢到了B锁,此时t1有AB两把锁,也没有释放,t2和t3 继续等待
    当t1依次释放BA锁,线程t2和t3争抢A锁,t1争抢B锁
    结果分析,t2抢到了A锁,t1拿到B锁
    但是接下来,t1睡了0.01秒,t1拥有者B锁,但想要A锁,t2拥有A锁,想要B锁,形成了死锁现象
    """
    
    
    # 递归锁可以解决死锁现象
    
    from threading import Thread
    from threading import RLock # 递归锁
    import time
    
    lock_A = lock_B = RLock()
    # 递归锁有一个计数的功能,原数字为0,上一次锁计数加一,释放一次数,计数-1
    # 只要递归锁上面的数字不为零,其他线程不能抢锁
    
    class MyThread(Thread):
        def run(self): # 必须是run方法
            self.f1()
            self.f2()
        def f1(self):
            lock_A.acquire()
            print(f'{self.name}拿到A锁') # 拿到A锁
            lock_B.acquire()
            print(f'{self.name}拿到B锁')  # 拿到B锁
            lock_B.release()
            lock_A.release()
    
        def f2(self):
            lock_B.acquire()
            print(f'{self.name}拿到B锁') # 拿到A锁
            time.sleep(0.1)
            lock_A.acquire()
            print(f'{self.name}拿到A锁')  # 拿到B锁
            lock_A.release()
            lock_B.release()
    
    if __name__ == '__main__':
        for i in range(3):
            t=MyThread()
            t.start()
    
    # 输出
    # Thread-1拿到A锁
    # Thread-1拿到B锁
    # Thread-2拿到A锁
    # Thread-2拿到B锁
    # Thread-2拿到B锁
    # Thread-2拿到A锁
    # Thread-1拿到B锁
    # Thread-1拿到A锁
    # Thread-3拿到A锁
    # Thread-3拿到B锁
    # Thread-3拿到B锁
    # Thread-3拿到A锁
    

    信号量

    """
    信号量:也是锁的一种
    """
    from threading import Thread
    from threading import  Semaphore
    from threading import current_thread
    # 同一个模块可以引用多个功能加.
    
    import time
    import random
    
    sem = Semaphore(5) # 并发数量是5
    # 控制并发任务
    def task():
        sem.acquire() # 上锁
        print(f'{current_thread().name}正在排队')
        time.sleep(random.randint(1,3))
        sem.release() # 解锁 鸭子类型
    
    if __name__ == '__main__':
        for i in range(20): # 同时开启20个进程
            t = Thread(target=task)
            t.start()
    
    

    GIL全局

    """
    GIL全局解释器锁
    进程空间:解释器 文件
    Python解释器:虚拟机 编译器
    编译器:c语言能识别的字节码
    虚拟机:字节码转化成机器码
    把机器码交给cpu去执行
    文件
    理论上说:单个进程的多线程可以利用多核
    实际上说:同一时刻,只能允许一个线程进入解释器
    
    给解释器加了一个锁
    为什么要加锁:
    1.是单核时代,CPU加个贵
    2.不加全局解释器锁,开发CPython解释器的程序员就会在源码内部各种主动加锁,解锁
    非常的麻烦,为了节省方便,直接在进入解释器时给线程加锁
    
    优点:保证了CPython解释器的数据资源的安全
    缺点:单个进程的多线程不能利用多核
    
    Jpython 没有GIL锁
    pypy 没有GIL锁
    
    现在多核时代,
    因为CPython解释器所有的业务逻辑都是围绕着单个线程实现的
    去掉GIL锁几乎不可能
    
    遇到IO阻塞,CPU就会无情的被操作系统切走
    GIL锁被释放,线程被挂起,另一个线程进入(可以进行并发)
    
    单个进程的多线程可以并发,但是不能利用多核不能并行
    多个线程可以并发,并行
    
    
    """
    
    """
    GIL与lock锁的区别
    相同点:都是同种锁,互斥锁
    不同点:
    1.保护的对象不一样 
    GIL 锁全局解释器锁,保护解释器内部的资源数据的安全
    GIL锁上锁,释放无需手动操作
    自己代码中定义的互斥锁保护进程中的资源数据的安全
    
    
    lock
    t1先进入解释器要抢到GIL锁,然后抢lock锁
    遇到IO阻塞,操作系统强行将CPU切走,立马释放了GIL锁
    t3抢到GIL锁,要抢lock锁,但是lock锁被t1占用,挂起
    直到t1阻塞完毕,加上GIL锁,继续执行执行完毕
    释放GIL锁,释放lock锁,下一个线程才能进入,周而复始
    """
    
    """
    验证
    IO密集型:单个进程的多线程合适,并发执行
    
    计算密集型:多进程的并行
    """
    

    IO密集型

    # IO密集型:单个进程的多线程并发vs多个进程的并发并行
    # 多进程并发并行
    def task():
        count = 0
        # time.sleep(random.randint(1,3))
        count += 1
    
    if __name__ == '__main__':
        start_time = time.time()
        l1 = []
        for i in range(4):
            p = Process(target=task,)
            l1.append(p)
            p.start()
        for p in l1:
            p.join()
        print(f'执行效率{time.time()-start_time}')
    # 输出 执行效率:0.08116006851196289
    
    # 多个线程的并发并行
    if __name__ == '__main__':
        start_time = time.time()
        l1 = []
        for i in range(50):
            p = Thread(target=task,)
            l1.append(p)
            p.start()
    
        for p in l1:
            p.join()
        print(f'执行效率{time.time() - start_time}')
    # 输出 执行效率执行效率:0.0043070316314697266
    """
    总结:
    对于IO密集型:单个进程的多线程的并发效率高
    """
    

    计算密集型

    from threading import Thread
    from multiprocessing import Process
    import time
    import random
    
    # 计算密集型:单个进程的多线程并发vs多个进程的并发并行
    # 多个进程的并发并行
    def task():
        count = 0
        for i in range(10000000):
            count += 1
    if __name__ == '__main__':
    
        start_time = time.time()
        l1 = []
        for i in range(4):
            p = Process(target=task,)
            l1.append(p)
            p.start()
    
        for p in l1:
            p.join()
        print(f'效率{time.time()-start_time}')
    # 输出 效率1.0616059303283691
    
    from threading import Thread
    from multiprocessing import Process
    import time
    
    
    
    # 多线程并发
    def task():
        count = 0
        for i in range(10000000):
            count += 1
    if __name__ == '__main__':
        start_time = time.time()
        l1 = []
        for i in range(4):
            p = Thread(target=task,)
            l1.append(p)
            p.start()
    
        for p in l1:
            p.join()
        print(f'效率{time.time()-start_time}')
    # 输出 效率2.1974008083343506
    
    """
    总结:
    计算密集型:多进程的并发并行效率高
    """
    

    线程池和进程池

    """
    线程池 进程池
    进程池:一个容器,限制住开启进程的数量
    
    线程池:一个容器,限制住开启线程的数量,比如4个
    第一次肯定只能并发的处理4个任务,只要有任务完成,线程马上会执行下一个任务
    
    以时间换空间
    """
    
    
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import os
    import time
    import random
    
    def task(n):
        print(f'{os.getpid()}开始')
        time.sleep(random.randint(1,3)) # 开启4个进程
    
    if __name__ == '__main__':
        # 开启进程池(并行(并发+并行))ProcessPoolExecutor()
        p = ProcessPoolExecutor() # 默认不写,进程池里面的进程数和CPU个数相等
        for i in range(20): # 处理20个任务
            p.submit(task,i) # 发布任务和参数 submit(任务,参数)
    # 输出
    # 20395开始
    # 20396开始
    # 20397开始
    # 20398开始
    # 20395开始
    # 20396开始
    # 20397开始
    # 20396开始
    # 20397开始
    # 20398开始
    # 20395开始
    # 20396开始
    # 20397开始
    # 20395开始
    # 20398开始
    # 20397开始
    # 20396开始
    # 20395开始
    # 20398开始
    # 20397开始
    
        # 开启线程池(并发)
        t = ThreadPoolExecutor() # 默认不写,CPU个数*5 线程数 写多少就是多少
        for i in range(20): # 开启线程100个
            t.submit(task,i) # 发布任务和参数submit(任务,参数)
    # 输出
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    # 20403开始
    
    
    
    
  • 相关阅读:
    ARMV8 datasheet学习笔记3:AArch64应用级体系结构之Memory order
    ARMV8 datasheet学习笔记3:AArch64应用级体系结构之Atomicity
    ARMV8 datasheet学习笔记3:AArch64应用级体系结构
    ARMV8 datasheet学习笔记2:概述
    最短路径
    网络流
    二分图
    zabbix 3.4新功能值预处理
    zabbix 3.4新功能值解析——Preprocessing预处理
    Zabbix监控windows的CPU利用率和其他资源
  • 原文地址:https://www.cnblogs.com/zhangshan33/p/11402383.html
Copyright © 2011-2022 走看看