zoukankan      html  css  js  c++  java
  • 多线程之生产消费模型,多线程知识

    进程:

    生产者,消费者模型

    在编程中,模型就是解决某个问题的固定方法或套路

    生产者:泛指产生数据的一方

    消费者:泛指处理数据的一方

    生活案例:

    ​ 食堂饭店是生产者,负责炒菜

    ​ 吃饭的人就是消费者,负责吃

    1. 要解决什么问题?

    效率低,双方的处理速度不同,一方慢,一方快,则双方需要相互等待,比如:生产方一分钟能10个包子,它有十个盘子;但是消费者4分钟才能吃完一个馒头。生产方做完10个馒头,就会停止生产,因为盘子不够用,用时10分钟。消费者4分钟吃完一个馒头,也就是说生产方要到4分钟之后才能再生产一个馒头,依次类推。
    
    在这个案例中,其实生产方的效率是很高的,但是由于消费者的效率低,导致整个生产消费链时间拉长,那么对于这样的情况,我们如何解决了?
    
    第一:增加更多的盘子。
    第二:增加更多的消费者。
    第三:找一个大型的容器,对于生产方来说,它只需要把生产的馒头放到容器中即可;对于消费者来说,它只管从容器中拿东西即可。(这个大型的容器,就是下面引入的内容:队列)
    

    2. 解决办法:在生产者和消费者之间加入队列

    import time,random
    import multiprocessing
    
    def product(q,name):
        for i in range(5):
            dog = "%s的热狗%s"%(name,(i+1))
            time.sleep(random.random())
            print("生产了",dog)
            q.put(i+1)
        q.put(None)		# 生产者生产完毕后,在队列最后发送一个None,表示生产者生产完毕
    
    def customer(q,name):
        while True:
            dog = q.get()
            if not dog:		# 判断生产何时结束
                break
            time.sleep(random.random())
            print("消费了%s的%s"%(name,dog))
    
    
    if __name__ == '__main__':
        q = multiprocessing.Queue()
    
        # 生产者
        p1 = multiprocessing.Process(target=product,args=(q,"上海"))
        # p2 = multiprocessing.Process(target=product,args=(q,))
        p1.start()
        # p2.start()
    
        # 消费者
        c = multiprocessing.Process(target=customer,args=(q,"北京"))
        c.start()
    
        p1.join()
        c.join()
        print("结束")
    

    问题:如果此时加入两个生产者,那么消费者如何判断两个生产者何时结束?

    解决办法:joinableQueue

    import multiprocessing
    import time, random
    
    
    # 生产者
    def product(q, name):
        for i in range(5):
            dog = "%s的热狗%s" % (name, (i + 1))
            time.sleep(random.random())
            print("生产了", dog)
            q.put(dog)
    
    
    # 消费者
    def customer(q):
        while True:
            dog = q.get()
            time.sleep(random.random())
            print("消费了%s" % (dog))
            q.task_done()
    
    
    # dliffy diagran
    
    if __name__ == '__main__':
        q = multiprocessing.JoinableQueue()
    
        # 生产者
        p1 = multiprocessing.Process(target=product, args=(q, "上海"))
        p2 = multiprocessing.Process(target=product, args=(q, "北京"))
        p1.start()
        p2.start()
    
        # 消费者
        c = multiprocessing.Process(target=customer, args=(q,))
        c.start()
    
        # 结束生产进程
        p1.join()
        p2.join()
    
        # 结束队列进程
        q.join()                # 阻塞,等待队列进程任务执行完毕,再结束
    
        # 由以上两点,可以确定消费者进程一定被消费完了
        print("结束")
    

    线程

    1. 什么是线程

    线程是操作系统可以运算调度的最小单位,是真正的执行单位,其包含在进程中。

    一个线程就是一条固定的控制流程,一个进程可以包含多个线程

    同一个进程中的线程共享进程内的资源

    进程是操作系统可以调度或进程资源分配的基本单位,是一个资源单位,其中包含了运行这个程序所需的资源

    2. 进程与线程的区别

    进程是一个资源单位,线程是执行单位

    创建进程的开销远大于线程

    多个进程是隔离的,通信需要借助其他模块。而线程是共享进程内的所有资源

    进程之间是竞争关系,而线程之间是协作关系
    进程由主次关系,线程都是平等关系

    3. 为什么用线程

    1. 有多个任务需要并发处理
    2. 线程占用资源少,线程适用于任务数非常多的情况;进程占用资源多。
    3. 线程是共有进程内的资源

    4. 如何使用多线程

    第一种:简单模式

    import threading
    
    def task():
        print("子线程 run")
    
    # 与进程不同之处,不需要加main判断,开启线程的代码放哪里都可以
    t = threading.Thread(target=task)
    t.start()
    print("over")
    

    第二种:自定义模式(继承)

    import threading
    
    class MyThread(threading.Thread):
        def run(self):
            print("子,run")
            print("子,over")
    
    mt = MyThread()
    mt.start()
    print("主,over")
    """
    

    根据代码来验证线程的一些功能

    第一:验证线程是否能进行并发

    import threading
    def task():
        while True:
            print("子")
            
    t = threading.Thread(target=task)
    t.start()
    
    while True:
        print("主")
    
    子
    子
    子
    子
    子
    子
    主
    主
    主
    主
    主
    主
    子
    子
    子
    主
    主
    ...
    

    以上现象,即可说明线程是可以用来并发的执行的。

    第二:验证线程和进程的开销问题

    if __name__ == '__main__':
    
        # 线程开销,线程开了1000个
        start_time = time.time()
        for i in range(1000):
            t = threading.Thread()
            t.start()
        print(time.time() - start_time)
    
    
        # 进程开销,进程只开了70个
        start_time = time.time()
        for i in range(70):
            p = multiprocessing.Process()
            p.start()
    
        print(time.time() - start_time)
    
    0.16990160942077637
    0.5576796531677246
    

    通过上面的示例,可以看出线程的开销要远远小于进程开销

    第三:验证线程数据是否共享

    import threading
    a = 100
    
    def task():
        global a
        for i in range(100):
            a -= 1
    
    t = threading.Thread(target=task)
    t.start()
    
    print(a)
    
    0
    

    可以看出,线程执行时,不会copy主进程的代码再从上自下执行,而是直接从主进程运行的地方开始执行。所以它能共享进程内的资源。

    第四:验证多线程的pid是否相同

    import threading
    import os
    
    def task():
        print(os.getpid())
    
    for i in range(100):
        a = threading.Thread(target=task)
        a.start()
    
    print(f"主  结束,{os.getpid()}")
    
    13600
    13600
    13600
    13600
    13600
    主  结束,13600
    

    由此可见,多线程的pid是与主线程的pid相同的。

    第五:多线程是否会存在争夺资源的安全问题

    import threading
    import random
    import time
    a = 10
    def task():
        global a
        temp = a
        time.sleep(random.random())
        a = temp -1
        
    
    a_list = []
    for i in range(10):
        t = threading.Thread(target=task)
        a_list.append(t)
        t.start()
    
    for i in a_list:
        i.join()
    
    print(a)
    
    9
    

    通过上面的实例,可以发现资源发生了争夺,那么我们应该避免了。

    方案一:加锁
    import threading
    import random
    import time
    
    a = 10
    
    s = threading.Lock()        # 创建锁
    
    def task():
        global a
        s.acquire()     # 加锁
        temp = a
        time.sleep(random.random())
        a = temp - 1
        s.release()     # 解锁
    
    a_list = []
    for i in range(10):
        t = threading.Thread(target=task)
        a_list.append(t)
        t.start()
    
    for i in a_list:
        i.join()
    
    print(a)
    
    0
    

    这样就解决了争夺资源的问题

    第六:验证多线程是否会随着主进程结束而结束

    import threading
    import time
    import random
    
    # 妃子的一生
    def task():
        print("妃子 start")
        time.sleep(3)
        print("妃子 over")
    
    
    # 皇帝的一生
    print("皇帝 start")
    
    t = threading.Thread(target=task)
    t.start()
    
    print("皇帝 over")
    
    皇帝 start
    妃子 start
    皇帝 over
    妃子 over
    

    这时,你会发现当皇帝over(主进程结束)之后,但是子进程依然没有消失。这种情况并不是我们期望的,我们希望主进程结束后,子进程也会跟随结束,那么对于这样的情况,我们的解决办法是什么了?

    解决方案1:给子进程加守护进程
    import threading
    import time
    import random
    
    # 妃子的一生
    def task():
        print("妃子 start")
        time.sleep(3)
        print("妃子 over")
    
    # 皇帝的一生
    print("皇帝 start")
    
    t = threading.Thread(target=task)
    t.daemon = True
    t.start()
    
    print("皇帝 over")
    
    皇帝 start
    妃子 start
    皇帝 over
    

    因此,我们知道了,默认情况下,主线程即使代码执行完毕,也会等待所有子线程(非守护线程)完毕后程序才能结束

    多线程常用的属性

    import threading
    import time
    
    t = threading.Thread()
    t.start()
    print(t.ident)			# 线程的标识符
       
    
    print(threading.currentThread())        # 打印当前进程对象
    t = threading.Thread(target=lambda : print(threading.currentThread()))
    t.start()
    
    t1 = threading.Thread(target=lambda : time.sleep(2))
    t1.start()
    
    print(threading.enumerate())        # 是一个列表容器,里面存放着所有存活的线程对象
    print(threading.active_count())     # 表示存活线程对象的个数
    
    12764
    <_MainThread(MainThread, started 8976)>
    <Thread(Thread-2, started 12904)>
    [<_MainThread(MainThread, started 8976)>, <Thread(Thread-3, started 12780)>]
    2
    
  • 相关阅读:
    推荐一个c++小巧开源且跨平台的图像解码库
    设计模式---桥接模式
    redis数据结构及其使用场景、持久化、缓存淘汰策略
    mysql------explain工具
    mysql索引数据结构
    java8(2)--- Stream API
    java8(1)--- lambda
    springboot自动装配(2)---实现一个自定义自动装配组件
    springboot自动装配(1)---@SpringBootApplication注解怎么自动装配各种组件
    自己挖的坑跪着也要填完---mapper配置文件和java源文件在同一包下
  • 原文地址:https://www.cnblogs.com/plf-Jack/p/11133870.html
Copyright © 2011-2022 走看看