zoukankan      html  css  js  c++  java
  • 多线程,生产者消费者模型

    1.生产者消费者模型

    模型就是解决某个问题的固定方法或套路

    1.1 要解决什么问题

    生产者: 泛指产生数据的一方

    消费者: 泛指处理数据的一方

    案例:

    ​ 食堂饭店是生产者

    ​ 我们吃饭的人就是消费者

    他们之间有什么问题

    ​ 效率低 因为双方的处理速度不同 一个快一个慢 则双方需要相互等待

    具体的解决方法:

    1.先将双方解开耦合,让不同的进程负责不同的任务

    2.提供一个共享的容器 来平衡双方的能力,之所用进程队列是因为队列 可以在进程间共享

    案例:

    from multiprocessing import Process,Queue
    import requests
    import re,os,time,random
    
    
    # 生产者任务
    def product(urls,q):
        i = 0
        for url in urls:
            response = requests.get(url)
            text = response.text
            # 将生产完成的数据放入队列中
            time.sleep(random.random())
            q.put(text)
            i += 1
            print(os.getpid(),"生产了第%s个数据" % i)
    
    
    def customer(q):
        i = 0
        while True:
            text = q.get()
            time.sleep(random.random())
            res = re.findall('src=//(.*?) width', text)
            i += 1
            print(" 第%s个任务获取到%s个img" % (i,len(res)))
    
            
    if __name__ == '__main__':
        urls = [
            "http://www.baidu.com",
            "http://www.baidu.com",
            "http://www.baidu.com",
            "http://www.baidu.com",
        ]
    
        # 创建一个双方能共享的容器
        q = Queue()
    
        # 生产者进程
        p1 = Process(target=product,args=(urls,q))
        p1.start()
    
    
        # 消费者进程
        c = Process(target=customer,args=(q,))
        c.start()
    

    问题: 消费不知道什么时候结束

    joinableQueue 继承自Queue 用法一致,

    增加了join 和taskDone

    join是个阻塞函数 会阻塞直到taskdone的调用次数等于 存入的元素个数 可以用于表示队列任务处理完成

    案例:

    from multiprocessing import Process,JoinableQueue
    import requests
    import re,os,time,random
    
    """
    生产者 负责生产热狗 
    消费者 负责吃热狗  
    
    
    """
    
    # 生产者任务
    def product(q,name):
        for i in range(5):
            dog = "%s的热狗%s" % (name,(i + 1))
            time.sleep(random.random())
            print("生产了",dog)
            q.put(dog)
    
    # 吃热狗
    def customer(q):
        while True:
            dog = q.get()
            time.sleep(random.random())
            print("消费了%s" % dog)
            q.task_done() # 标记这个任务处理完成
    
            
    if __name__ == '__main__':
    
        # 创建一个双方能共享的容器
        q = JoinableQueue()
    
        # 生产者进程
        p1 = Process(target=product,args=(q,"上海分店"))
        p2 = Process(target=product,args=(q,"北京分店"))
    
        p1.start()
        p2.start()
    
    
        # 消费者进程
        c = Process(target=customer,args=(q,))
        # c.daemon = True # 可以将消费者设置为守护进程 当主进程确认 任务全部完成时 可以随着主进程一起结束
        c.start()
    
    
        p1.join()
        p2.join()  # 代码走到这里意味着生产方完成
    
        q.join() # 意味着队列中的任务都处理完成了
    
        # 结束所有任务
        c.terminate() # 直接终止消费者进程
    
        # 如何判定今天的热狗真的吃完了
        # 1.确定生成者任务完成
        # 2.确定生出来的数据已经全部处理完成
    

    redis 消息队列

    MQ 消息队列

    常用来做流量削峰 保证服务不会因为高并发而崩溃

    2.多线程

    并发编程

    1.什么是线程

    回顾 进程是操作系统可以调度已经进行资源分配的基本单位,是一个资源单位,其中包含了运行这个程序所需的资源

    线程是操作系统可以运算调度的最小单位,是真正的执行单位,其包含在进程中, 一个线程就是一条固定的控制流程,

    一个进程可以包含多个线程,同一进程中的线程共享进程内的资源

    特点:系统会为每一个进程自动创建一条线程,称之为主线程, 后续通过代码开启的线程称之为子线程

    进程对比线程

    进程是一个资源单位 先是执行单位

    创建进程的开销远大于线程

    多个进程之间内存是相互隔离的 而线程是共享进程内的所有资源

    进程之间之间对于硬件资源是竞争关系 而线程是协作关系

    当然开启线程也是需要消耗资源的

    进程之间有层级关系 ,而线程之间没有是平等的

    比喻:

    ​ 计算机是工厂 进程是车间 线程是流水线

    为什么用线程

    1.有多个任务要并发处理

    2.当要并发处理的任务有很多的时,不能使用进程 进程资源开销太大 线程开销非常小 适用于任务数非常多的情况

    使用线程

    方式1 直接实例化Thread类

    from threading  import Thread
    
    # def task():
    #     print("子线程 run")
    #
    # # 与进程不同之处1   不需要加判断 开启线程的代码放哪里都可以
    # t = Thread(target=task)
    # t.start()
    # print("over")
    
    
    # 使用方法2  继承Thread类
    class MyThread(Thread):
    
        def run(self):
            # 把要在子线中执行的代码放入run中
            print("子 run")
    
    mt = MyThread()
    mt.start()
    print("over")
    

    线程安全问题

    只要并发访问了同一资源一定会产生安全问题 ,解决方案和多进程一致 就是给操作公共资源代码加锁

    from threading import  Thread,Lock
    import time
    a = 10
    
    l = Lock()
    
    def task():
        global a
        l.acquire()
        temp = a
        time.sleep(0.1)
        a = temp - 1
        l.release()
    
    ts = []
    for i in range(10):
        t = Thread(target=task)
        t.start()
        ts.append(t)
    for t in ts:t.join()
    
    print(a)
    

    守护线程

    一个线程a 设置为b的守护线程, a会随着b的结束而结束

    默认情况下 主线程即使代码执行完毕 也会等待所有非守护线程完毕后程序才能结束 因为多个线程之间是协作关系

    from threading import Thread
    import time
    
    
    # 妃子的一生
    def task():
        print("妃子 start")
        time.sleep(5)
        print("妃子 over")
    
    def task2():
        print("皇太后 start")
        time.sleep(3)
        print("皇太后 over")
    
    
    # 皇帝的一生
    print("主 start")
    
    t = Thread(target=task)
    t.daemon = True
    t.start()
    
    t2 = Thread(target=task2)
    t2.start()
    
    print("主 over")
    
    
    """结果
    主 start 
    妃子start 
    皇太后 start 
    主over 
    皇太后 over
    
    """
    

    线程中的常用属性和方法

    from threading import  Thread,currentThread,enumerate,activeCount
    import time
    
    # t = Thread()
    # t.start()
    # t.join()
    # t.is_alive()
    # t.isAlive()
    # t.ident  # 线程标识符   id
    # t.daemon
    
    # 获取当前线程对象
    # print(currentThread())
    # t = Thread(target=lambda :print(currentThread()))
    # t.start()
    
    t = Thread(target=lambda :time.sleep(1))
    t.start()
    
    t = Thread(target=lambda :time.sleep(1))
    t.start()
    t.join()
    # 获取正在运行的所有线程对象  是一个列表
    print(enumerate())
    
    # 存活的线程数量
    print(activeCount())
    

    死锁现象

    递归所

    信号量

    2.多线程

    什么是线程

    与进程的区别

    如何使用 两种方式

    守护线程

    常用属性

    线程安全问题

    一堆锁

    互斥锁 死锁现象 Rlock递归锁 型号量

  • 相关阅读:
    [转] JS报 “尚未实现” 错误
    分享到国内各SNS网站的代码
    IE8以下版本不支持动态创建的HTML5元素?
    一个较为个性化的出生日期选择
    关于拖拽上传 [一个拖拽上传修改头像的流程]
    用CSS写出漂亮的小三角
    用户自定义模块的实现方案
    纪录一则IE的bug
    onpropertychange 和 oninput
    判断脚本加载完毕
  • 原文地址:https://www.cnblogs.com/bladecheng/p/11132242.html
Copyright © 2011-2022 走看看