zoukankan      html  css  js  c++  java
  • [b0038] python 归纳 (二三)_多进程数据共享和同步_队列Queue

    1  队列读写

    # -*- coding: utf-8 -*-
    """
    多进程  共享  队列 multiprocessing.Process
    逻辑:
       一个进程往队列写数据,一个进程从读写读数据
       写进程完了后,主进程强行结束读进程
    
    使用:
        1. 创建队列 q = multiprocessing.Queue() ,默认无限大小,可以指定大小
        2. 把队列 q 当参数传给 子进程 执行代码, 猜测应该不能通过全局变量的方式访问
        3. 在子进程中读写队列数据   q.put(<data>)  q.get()
    
    参考:
    方法
    'cancel_join_thread', 'close', 'empty', 'full', 'get', 'get_nowait', 'join_thread', 'put', 'put_nowait', 'qsize'
    
    """
    
    from multiprocessing import Queue, Process
    import time
    
    def write(q):
        for i in ['a','b','c','d']:
            time.sleep(2)
            q.put(i)
            print ('put {0} to queue'.format(i))
    
    def read(q):
        while 1:
            time.sleep(2)
            result = q.get()
            print ("get {0} from queue".format(result))
    
    def main():
        q = Queue()
    
        pw = Process(target=write, args=(q,))
        pr = Process(target=read,  args=(q,))
        pw.start()
        pr.start()
        pw.join()
        pr.terminate()  # 强行终止读进程
    
    if __name__ == '__main__':
        main()
    
    """
    Out:
    
    put a to queue
    get a from queue
    put b to queue
    get b from queue
    put c to queue
    get c from queue
    put d to queue
    get d from queue
    """

    2 队列实现生产者、消费者

    # -*- coding: utf-8 -*-
    """
    多进程 生产者 消费者模型,使用队列实现 multiprocessing.Queue
    
    逻辑:
        1个生产者,1个消费者在2个不同的进程中操作同一个队列
        生产者的速度是消费者的3倍
    """
    import multiprocessing
    import random
    import time
    
    # 生产者
    class producer(multiprocessing.Process):
        def __init__(self, queue):
            multiprocessing.Process.__init__(self)  # 父类构造
            self.queue = queue
    
        def run(self):
            for i in range(10):
                item = random.randint(0, 256)
    
                #  往队列写数据
                self.queue.put(item)
    
                print("Process Producer: item %d appended to queue %s " 
                      %(item, self.name))
                time.sleep(1)
                print("The size of queue is %s" 
                      % self.queue.qsize())
    
    
    # 消费者
    class consumer(multiprocessing.Process):
        def __init__(self, queue):
            multiprocessing.Process.__init__(self)  # 父类构造
            self.queue = queue
    
        def run(self):
            while True:
                if (self.queue.empty()):
                    print("the queue is empty")
                    break
                else:
                    time.sleep(2)
    
                    # 从队列读取数据,队列为空会阻塞,这做了非空判断,只有一个进程读,不会阻塞
                    item = self.queue.get()
    
                    print("Process Consumer: item %d poped from by %s " 
                          % (item, self.name))
                    time.sleep(1)
    
    
    if __name__ == '__main__':
        #  多进程共享对列
        queue = multiprocessing.Queue()
    
        ## 启动生产者、消费者
        process_producer = producer(queue)
        process_consumer = consumer(queue)
        process_producer.start()
        process_consumer.start()
        process_producer.join()
        process_consumer.join()
    
    """
    Out:
    the queue is empty
    Process Producer: item 225 appended to queue producer-1
    The size of queue is 1
    Process Producer: item 101 appended to queue producer-1
    The size of queue is 2
    Process Producer: item 50 appended to queue producer-1
    The size of queue is 3
    Process Producer: item 217 appended to queue producer-1
    The size of queue is 4
    Process Producer: item 75 appended to queue producer-1
    The size of queue is 5
    Process Producer: item 45 appended to queue producer-1
    The size of queue is 6
    Process Producer: item 19 appended to queue producer-1
    The size of queue is 7
    Process Producer: item 157 appended to queue producer-1
    The size of queue is 8
    Process Producer: item 127 appended to queue producer-1
    The size of queue is 9
    Process Producer: item 223 appended to queue producer-1
    The size of queue is 10
    """
  • 相关阅读:
    Storm的并行度、Grouping策略以及消息可靠处理机制简介
    storm入门原理介绍
    Kafka学习笔记-Java简单操作
    批量复制word文档,并生成以日期为后缀名的批量文档攻略,批量生成word文档
    数组
    分支结构,循环结构学习整理
    java中的运算符
    Java中的变量和基本数据类型知识
    Java开发环境描述
    使用Map,统计字符串中每个字符出现的次数
  • 原文地址:https://www.cnblogs.com/sunzebo/p/9637384.html
Copyright © 2011-2022 走看看