zoukankan      html  css  js  c++  java
  • 进程间通信 Queue队列使用 生产者消费者模型 线程理论 创建及对象属性方法 线程互斥锁 守护线程

    进程补充

    进程通信

    要想实现进程间通信,可以用管道或者队列

    队列比管道更好用(队列自带管道和锁)

    管道和队列的共同特点:数据只有一份,取完就没了 无法重复获取用一份数据

    队列特点:先进先出

    堆栈特点:先进后出

    我们采用队列来实现进程间数据通信,下面先介绍一下队列

    Queue队列的基本使用

    基本方法:q.put(元素) q.get() q.get_nowait() q.full() q.empty()

    基础版

            from multiprocessing import Queue
            q = Queue()  # 括号内支持传数字 限制的是队列的大小
            # 队列先进先出
            # 堆栈先进后出
            q.put()放队列中放数据
            q.get()从队列中取数据
            # 上面两个方法都会阻塞
            q.full()判断队列是否存满
            q.empty()判断队列是否取空
            q.get_nowait()取值一旦没有值不会原地等待直接报错
            # 上面三个方法在多进程/多线程不适用
    

    进阶版

    from multiprocessing import Process, Queue
    
    q = Queue(5)  # 实例化出一个对象
    # --------------------------------------
    # q.put(元素) 往队列里放东西
    #   如果队列满了还往里面放,就会等在这里
    # --------------------------------------
    # q.put(1)
    # q.put(2)
    # q.put(3)
    # --------------------------------------
    # # q.full() 判断队列有没有满
    # --------------------------------------
    # print(q.full())  # q.full 判断队列有没有满
    # # False
    # q.put(4)
    # q.put(5)
    # # q.put(6)  # 如果队列满了还往里面放,就会等在这里
    # print(q.full())
    # # True
    
    for i in range(5):
        q.put(i)
    print(q.full())
    # True
    # --------------------------------------
    # q.get() 从队列头取一个值
    #   如果队列空了,就会等在这里,等数据过来
    # --------------------------------------
    print(q.get())
    print(q.full())
    # 0
    # False
    print(q.get())
    print(q.get())
    # print(q.get())
    # --------------------------------------
    # q.get_nowait() 从队列头取一个值
    #   在队列有数据的情况下,与get取值一样
    #   当队列没有值的情况下,取值直接报错
    # --------------------------------------
    print(q.get_nowait())  # 在队列有数据的情况下,与get取值一样,当队列没有值的情况下,取值直接报错
    # --------------------------------------
    # q.empty() 判断队列是否为空
    #   在并发的情况下,这个方法不准确
    # --------------------------------------
    print(q.empty())  # 判断队列是否为空,需要注意的是在并发的情况下,这个方法不准确
    print(q.get())
    # 1
    # 2
    # 3
    # False
    # 4
    # print(q.get())  # 如果队列空了,就会等在这里,等数据过来
    print(q.empty())
    # True
    # print(q.get_nowait())
    # 直接报错 queue.Empty
    

    通过Queue队列实现进程间通信( IPC机制 )

    数据的互通可实现主进程与子进程之间的互通,子进程与子进程之间的互通

    from multiprocessing import Queue, Process
    
    
    def producer(q):
        q.put('hello baby.')
    
    
    def consumer(q):
        print(q.get())
    
    
    if __name__ == '__main__':
        q = Queue()  # 生成一个队列对象
        p1 = Process(target=producer, args=(q,))
        c1 = Process(target=consumer, args=(q,))
        p1.start()
        c1.start()  # 子进程获取到了另一个子进程的数据
        # hello baby.
        # print(q.get())  # 主进程获取到了子进程的数据
        # hello baby.
    

    生产者消费者模型

    生产者:生产/制造数据的

    消费者:消费/处理数据的

    两者之间的沟通介质:队列

    例子:做包子的,卖包子的
        1.做的包子远比买包子的多
        2.做的包子远比买包子的少
        --> 供需不平衡
    

    用处:(利用队列)解决供需不平衡的问题

    需求:以卖包子为例,实现当包子卖完了就停止消费行为

    也就是做到消费者消费完数据之后代码立即结束

    1.利用join等待生产者生产完数据 再往队列中添加特定信息(None)
    	有几个消费者就必须有几个None(对应下面的方式一)
                
    2.JoinableQueue 能够被join的q
    	q.task_done()  告诉队列数据被取出
    	q.join()  等待队列数据完全被取完
    	将所有的消费者设置为守护进程(对应下面的方式二)
    

    方式一

    from multiprocessing import Process, Queue
    import time
    import random
    
    
    def producer(name, food, q: Queue):
        for i in range(10):
            data = f'{name} 生产了 {food}{i}'
            time.sleep(random.random())
            q.put(data)
            print(data)
    
    
    def consumer(name, q):
        while True:
            res = q.get()
            if not res:  # 已经把生产者做的东西全部吃完了,那么本消费者也结束食用
                break
            data = res.split(' ')[2]
            data = f'{name} 吃了 {data}'
            print(data)
            time.sleep(random.random())
    
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=producer, args=('大厨egon', '馒头', q))
        p2 = Process(target=producer, args=('跟班tank', '生蚝', q))
        c = Process(target=consumer, args=('jason', q))
        c2 = Process(target=consumer, args=('吃货kevin', q))
        p.start()
        p2.start()
        c.start()
        c2.start()
        # 不知道什么时候生产者什么时候生成完
        p.join()
        p2.join()
        q.put(None)  # 通过 None来标志生产者已生产完成
        q.put(None)
        # 可以实现,但是不好
    

    方式二

    改用JoinableQueue模块的队列守护进程来实现

    from multiprocessing import Process, JoinableQueue
    import time
    import random
    
    
    def producer(name, food, q: JoinableQueue):
        for i in range(10):
            data = f'{name} 生产了 {food}{i}'
            time.sleep(random.random())
            q.put(data)
            print(data)
    
    
    def consumer(name, q):
        while True:
            res = q.get()
            if not res:
                break
            data = res.split(' ')[2]
            data = f'{name} 吃了 {data}'
            print(data)
            time.sleep(random.random())
            q.task_done()  # 告诉队列,你已经从队列中取出了一个数据,并且处理完毕了
    
    
    if __name__ == '__main__':
        q = JoinableQueue()
        p = Process(target=producer, args=('大厨egon', '馒头', q))
        p2 = Process(target=producer, args=('跟班tank', '生蚝', q))
        c = Process(target=consumer, args=('jason', q))
        c2 = Process(target=consumer, args=('吃货kevin', q))
        p.start()
        p2.start()
        c.daemon = True  # 配合join,结束程序消费者也结束(注意join是主进程的最后一句代码)
        c.start()
        c2.daemon = True
        c2.start()
        # 不知道什么时候生产者什么时候生成完
        p.join()
        p2.join()
    
        q.join()  # 等待队列中数据全部取出,执行完了这句话,也就意味着队列中没有数据了(消费者那里还是会卡住,get不到东西等待)
        # 配合上 守护进程 来实现....
    

    线程

    什么是线程?

    进程和线程其实都是虚拟单位,都是用来帮助我们形象的描述某种事物

    进程:资源单位(一块独立的内存空间)(车间)

    线程:执行单位(流水线)

    将内存比喻成工厂,那么进程就相当于工厂里的车间,而你的线程就相当于是车间里面的流水线

    CPU其实运行的其实是线程,进程只是资源单位

    线程执行时需要的资源单位都跟进程要

    ps:每个进程都自带一个“主线程”,线程才是真正的执行单位,进线程在运行的时候所需要和产生的数据都是来源于当前线程所在的进程

    一个进程下可以开设多个线程

    线程没有主次之分,只不过我们默认就把主进程自带的那个线程叫做主线程

    为什么要有线程?

    开进程

    • 申请内存空间 ---> 耗资源
    • “拷贝代码” ---> 耗资源

    开线程

    • 进程间数据是隔离但是同一个进程下的多个线程数据是共享的

    ps:开启线程的开销要远远小于开启进程的开销(可能刚执行完创建线程的代码线程就创建好了)

    开启线程的两种方式

    方式一(1.利用类名里面传target,args参数来开设线程)

    from threading import Thread
    import time
    
    
    def task(name):
        print(f"{name} is running")
        time.sleep(3)
        print(f"{name} is over")
    
    
    t = Thread(target=task, args=('egon', ))  # 开线程不需要在 __main__ 代码块内,但是习惯性的还是写在 __main__ 内
    t.start()  # 告诉操作系统开启一个线程
    # 线程的开销远远小于进程,小到以至于可以代码执行完,线程就已经开启了
    print("主")  # 线程没有主次之分,都在同一个进程的名称空间里,只是人为把进程自带的线程叫做主线程
    # egon is running
    # 主线程  # 进程的时候这个主线程可能会是最先打印的
    # egon is over
    

    方式二(2.自定义类进程线程类 通过定义暴露的接口run方法)

    ps:windows中开设进程必须在__main__代码块内 而开线程不需要

    from threading import Thread
    import time
    
    
    class MyThread(Thread):
        def __init__(self, name):
            super().__init__()
            self.name = name
    
        def run(self):
            print(f"{self.name} is running")
            time.sleep(1)
            print(f"{self.name} is over")
    
    
    if __name__ == '__main__':
        t = MyThread('jason')
        t.start()  # 开启线程的速度非常快,几乎代码执行完线程就已经开启
        print("主")
    
    # jason is running
    # 主
    # jason is over
    

    线程之间的数据共享

    from threading import Thread
    
    money = 666
    
    
    def task():
        global money
        money = 999
    
    
    t = Thread(target=task)
    t.start()
    t.join()  # 确保是线程运行结束后
    print(money)
    # 999  # 主线程与子线程之间数据是通用的
    

    线程间想要实现数据通信,不需要借助于队列(线程间支持数据通信)

    线程对象的其他属性和方法

    import time
    from threading import Thread, active_count, current_thread
    import os
    
    
    def task(name):
        print(f"{name} is running {os.getpid()}")
        # # ------------------------------------------------
        # # current_thread().name current_thread().getname() 当前线程名
        # #   记得导入模块
        # # ------------------------------------------------
        # print(f"current_thread().name:{current_thread().name}")
        # current_thread().name:Thread-1
        time.sleep(1)
        print(f"{name} is over")
    
    
    # t = Thread(target=task, args=('jason', ))
    # t.start()
    # # ------------------------------------------------
    # # os.getpid() os.getppid() 获取进程号 父进程号
    # #   多个线程属于同一进程
    # # ------------------------------------------------
    # print(f"pid {os.getpid()}")
    # # jason is running 5572
    # # pid 5572
    # # jason is over
    
    
    t = Thread(target=task, args=('jason', ))
    t.start()
    # ------------------------------------------------
    # active_count()  统计当前存活的线程数
    #   记得导入模块
    # ------------------------------------------------
    print(active_count())
    print(f"pid {os.getpid()}")
    # jason is running 5728
    # 2
    # pid 5728
    print(f"主 current_thread().name:{current_thread().name}")
    # 主 current_thread().name:MainThread
    
    t.join()  # 主线程等待子线程运行结束
    # jason is over
    print("主 active_count", active_count())  # 可能会有问题,多线程是异步,可能join的线程结束了,其他线程也正好结束了(多个线程时)
    # 主 active_count 1
    # Thread.join(t)  # 可以考虑用类调用对象方法,传入对象来在循环里对线程对象进行操作
    

    线程运用互斥锁

    # # 将这个100 每个线程减一
    # import random
    # from threading import Thread
    # import time
    #
    # n = 100
    #
    #
    # def task():
    #     global n
    #     tmp = n
    #     time.sleep(random.randint(1, 3))
    #     n = tmp - 1
    #
    #
    # threading_list = []
    # for i in range(100):
    #     t = Thread(target=task)
    #     t.start()
    #     threading_list.append(t)
    #
    # for t in threading_list:
    #     t.join()
    #
    # print(n)
    
    # 利用互斥锁来实现
    import random
    from threading import Thread, Lock  # 和multi...模块里的是一样的,所以用法也一样
    import time
    
    n = 100
    
    
    def task(mutex):
        global n
        mutex.acquire()
        tmp = n
        time.sleep(random.random())
        n = tmp - 1
        mutex.release()
    
    
    threading_list = []
    mutex = Lock()
    for i in range(100):
        t = Thread(target=task, args=(mutex,))
        t.start()
        threading_list.append(t)
    
    for t in threading_list:
        t.join()
    
    print(n)
    # 0  # 等待一段时间后才会打印出结果
    

    守护线程

    主线程要等待所有非守护线程结束后才会结束(不是主线程的代码执行完了就立马结束了)

    主线程结束后,守护(子)线程也会立即结束

    主线程运行结束之后为什么需要等待子线程结束才能结束呢?

    主线程的结束也就意味着进程的结束
    主线程必须等待其他非守护线程的结束才能结束
    因为子线程在运行的时候需要使用进程中的资源,而主线程一旦结束了,资源也就销毁了

    # from threading import Thread, current_thread
    # import time
    #
    #
    # def task(i):
    #     print(f"{current_thread().name}")
    #     time.sleep(i)
    #     print("GG")
    #
    #
    # for i in range(3):
    #     t = Thread(target=task, args=(i, ))
    #     t.start()
    #
    #
    # print("主")
    # # 循环的时候就已经打印了部分数据了(异步)
    # # Thread-1
    # # GG
    # # Thread-2
    # # Thread-3
    # # 主
    # # GG
    # # GG
    
    # 主线程运行结束之后为什么需要等待子线程结束才能结束呢?
    '''
    主线程的结束也就意味着进程的结束
        主线程必须等待其他非守护线程的结束才能结束
    因为子线程在运行的时候需要使用进程中的资源,而主线程一旦结束了,资源也就销毁了
    '''
    from threading import Thread, current_thread
    import time
    
    
    def task(i):
        print(f"{current_thread().name}")
        time.sleep(i)
        print("GG")
    
    
    for i in range(3):
        t = Thread(target=task, args=(i,))
        t.daemon = True
        t.start()
    
    print("主")
    # Thread-1
    # GG
    # Thread-2
    # Thread-3
    # 主
    

    测试

    下面程序的执行结果是什么?

    from threading import Thread
    import time
    
    
    def foo():
        print(123)
        time.sleep(1)
        print("end123")
    
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    
    t1 = Thread(target=foo)
    t2 = Thread(target=bar)
    
    t1.daemon = True
    t1.start()
    t2.start()
    print("main-------")
    
    
    # 123
    # 456
    # main-------
    # end123
    # end456
    

    线程互斥锁

    基础版

    多个线程修改同一份数据会造成数据错乱的问题 所以需要加锁
    加了锁,保证了数据的安全 但是降低了代码的运行效率 因为你将并发变成了串行
    ps:针对不同的数据 应该加不同的锁进行处理
    抢锁 acquire()
    释放锁 release()

    进阶版

    从线程间通信那里的案例可以看出,线程间数据是相通的,那么多个线程对同一份数据进行操作会产生问题

    下面同样模拟一个网络延迟来对数据进行操作(确保所有线程都执行完的操作可以记一下)

    不加锁遇到延迟的情况

    # 模拟网络延迟的现象
    #   多个线程操作同一个数据,也会造成数据不安全
    import time
    from threading import Thread
    
    n = 10
    
    
    def task():
        global n
        tmp = n
        time.sleep(1)
        n = tmp - 1
    
    
    # -------------------------------
    t_list = []
    for i in range(10):
        t = Thread(target=task)
        t.start()
        t_list.append(t)
    
    # 确保其他线程都执行完了之后再打印
    for t in t_list:
        t.join()
    # -------------------------------
    
    print(n)
    # 9
    

    加锁后遇到延迟

    # 加锁解决问题
    import time
    from threading import Thread, Lock
    
    n = 10
    
    
    def task(mutex):
        mutex.acquire()  # 抢锁
        global n
        tmp = n
        time.sleep(1)
        n = tmp - 1
        mutex.release()  # 释放锁
    
    
    t_list = []
    mutex = Lock()
    for i in range(10):
        t = Thread(target=task, args=(mutex, ))
        t.start()
        t_list.append(t)
    
    # 确保其他线程都执行完了之后再打印
    for t in t_list:
        t.join()
    print(n)
    # 0  # 等10s多点 后打印出结果,数据未受延迟影响,保证了数据安全
    

    线程和进程的用户大同小异,可以对比着来记

  • 相关阅读:
    Windows2003 Webshell默认权限
    Windows安装Centos7双系统后Windows启动项消失
    GCC编译流程及常用编辑命令
    swoole的websockte例子
    PhpStorm 增加Swoole智能提示
    Centos7/RHEL 7 配置静态路由
    webpack介绍和使用
    Webpack中的sourcemap以及如何在生产和开发环境中合理的设置
    什么是 PWA
    php实现excel单元格合并,字体加粗居中等操作
  • 原文地址:https://www.cnblogs.com/PowerTips/p/11342297.html
Copyright © 2011-2022 走看看