zoukankan      html  css  js  c++  java
  • Python进程间通信

    通信方式

    进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块主要通过队列方式

    队列:队列类似于一条管道,元素先进先出

    需要注意的一点是:队列都是在内存中操作,进程退出,队列清空,另外,队列也是一个阻塞的形态

    Queue介绍:

    创建队列的类(底层就是以管道和锁定的方式实现):

    Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,

    可以使用Queue实现多进程之间的数据传递。maxsize是队列中允许最大项数,省略则无大小限制。

    方法介绍:

    def put(self, obj, block=True, timeout=None):插入数据到队列中
    Block值默认为True,代表当队列已满时,会阻塞。如果block为False,则队列满会报异常Queue.Full
    timeout表示会阻塞到指定时间,直到有剩余的空间供插入,如果时间超时,则报异常Queue.Full
     
    def get(self, block=True, timeout=None):从队列中取出数据
    Block值默认为True,代表当队列为空时,会阻塞。如果block为False,则队列空会报异常Queue.Empty
    timeout表示会等待到指定时间,直到取出数据,如果时间超时,则报异常Queue.Empty

    def empty(self): 判断队列是否为空,如果空返回True
    def full(self): 判断队列是否已满,如果满返回True
    def qsize(self): 返回队列的大小

    应用举例:

    from multiprocessing import Process, Manager
    q = Manager().Queue(2)
    q.put(1)
    q.put(2,block=False,timeout=2)
    def func():
        print(q.get())
    
    p = Process(target=func)
    print("size",q.qsize())
    print("full",q.full())
    p.start()
    p.join()
    print("empty",q.empty())
    print("get", q.get())
    print("get", q.get(block=False,timeout=2))

    输出结果

    生产者和消费者模型

    
    

    在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

    
    

    为什么要使用生产者和消费者模式

    
    

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    
    

    什么是生产者消费者模式

    
    

    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯:

    生产者,只需要往队列里面丢东西(生产者不需要关心消费者)

    消费者,只需要从队列里面拿东西(消费者也不需要关心生产者)

    阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。


    实现方式一:Queue
    from multiprocessing import Process,Manager,active_children
    import random
    import queue
    import time
    
    class Producer(Process):
    
        def __init__(self,queue):
            super().__init__()
            self.queue = queue
    
        def run(self):
            for i in range(6):
                r = random.randint(0, 99)
                time.sleep(1)
                self.queue.put(r)
                print("add data{}".format(r))
    
    class Consumer(Process):
    
        def __init__(self,queue):
            super().__init__()
            self.queue = queue
    
        def run(self):
            while True:
              if not self.queue.empty():
                    data = self.queue.get()
                    print("minus data{}".format(data))
    
    
    if __name__ == '__main__':
        q = Manager().Queue() # 创建队列
        p = Producer(q)
        c = Consumer(q)
        p.start()
        c.start()
        print(active_children())  # 查看现有的进程
        p.join()
        c.join()
        print("结束")
    
    
    >>>输出
    [<ForkProcess(SyncManager-1, started)>, <Producer(Producer-2, started)>, <Consumer(Consumer-3, started)>]
    add data83
    minus data83
    add data72
    minus data72
    add data8
    minus data8
    add data63
    minus data63
    add data75
    minus data75
    add data52
    minus data52
    
    
    
     

    实现方式二:利用JoinableQueue
     JoinableQueue([maxsize]):一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
         JoinableQueue的实例除了与Queue对象相同的方法之外还具有:
         task_done():使用者使用此方法发出信号,表示get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
         join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用task_done()方法为止
     
    from multiprocessing import Process,JoinableQueue
    import os
    import time
    import random
    
    
    def print_log(msg, log_type="prod"):
        if log_type == 'prod':
            print("33[32;1m%s33[0m" %msg)
        elif log_type == 'con':
            print("33[31;1m%s33[0m" %msg)
    
    def producer(q):
        """
        生产者
        :param q: 
        :return: 
        """
        for i in range(10):
            data = random.randint(1,200)
            time.sleep(2)
            q.put(data)  # 放入队列
            msg = "add data {}".format(data)
            print_log(msg)
        q.join()  # 生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。
        # 阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
    
    
    
    
    def consumer(q):
        """
        消费者
        :param q: 
        :return: 
        """
        while True:
            if not q.empty():
                time.sleep(5)
                data = q.get()
                msg = "minus data{}".format(data)
                print_log(msg,"con")
                q.task_done()  # q.get()的返回项目已经被处理
    
    
    if __name__ == '__main__':
        q = JoinableQueue()
        prod = Process(target=producer, args=(q,))
        con = Process(target=consumer, args=(q,))
        con.daemon = True  # 设置为守护进程,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
        # 开启进程
        prod.start()
        con.start()
    
        prod.join()  # 等待生产和消费完成,主线程结束
        print("结束")

     输出结果

  • 相关阅读:
    UVALIVE 4819 最大流
    Directx 3D编程实例:随机绘制的立体图案旋转
    PHP漏洞全解(四)-xss跨站脚本攻击
    PHP漏洞全解(三)-客户端脚本植入
    PHP漏洞全解(二)-命令注入攻击
    PHP漏洞全解(一)-PHP网站的安全性问题
    BT5下安装Metasploit4.5方法
    Ubuntu使用apt-get安装本地deb包
    Linux按照时间查找文件
    Linux系统备份与还原
  • 原文地址:https://www.cnblogs.com/xiao-apple36/p/8655073.html
Copyright © 2011-2022 走看看