zoukankan      html  css  js  c++  java
  • queue for process of Python

    Queue for multiple processes

    跟线程队列类似。

    有三种队列:

    (1)Queue -- 普通队列

    (2)SimpleQueue -- 简化队列,类似管道

    (3)JoinableQueue -- 可观测队列。

    https://docs.python.org/3.7/library/multiprocessing.html#pipes-and-queues

    When using multiple processes, one generally uses message passing for communication between processes and avoids having to use any synchronization primitives like locks.

    For passing messages one can use Pipe() (for a connection between two processes) or a queue (which allows multiple producers and consumers).

    The Queue, SimpleQueue and JoinableQueue types are multi-producer, multi-consumer FIFO queues modelled on the queue.Queue class in the standard library. They differ in that Queue lacks the task_done() and join() methods introduced into Python 2.5’s queue.Queue class.

    If you use JoinableQueue then you must call JoinableQueue.task_done() for each task removed from the queue or else the semaphore used to count the number of unfinished tasks may eventually overflow, raising an exception.

    class multiprocessing.Queue([maxsize])

    Returns a process shared queue implemented using a pipe and a few locks/semaphores. When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.

    class multiprocessing.SimpleQueue

    It is a simplified Queue type, very close to a locked Pipe.

    class multiprocessing.JoinableQueue([maxsize])

    JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.

    Queue - demo

    队列将一个大任务划分为小的任务,并分发到多个worker,并行处理。

    此案例为 单向通信案例,master向worker发送消息,worker状态master不管。

    其中发送消息的内容为一个类对象。

    消息队列通过pickle工具对对象进行 序列化 和 反序列化。

    https://pymotw.com/3/multiprocessing/communication.html#passing-messages-to-processes

    As with threads, a common use pattern for multiple processes is to divide a job up among several workers to run in parallel. Effective use of multiple processes usually requires some communication between them, so that work can be divided and results can be aggregated. A simple way to communicate between processes with multiprocessing is to use a Queue to pass messages back and forth. Any object that can be serialized with pickle can pass through a Queue.

    import multiprocessing
    
    
    class MyFancyClass:
    
        def __init__(self, name):
            self.name = name
    
        def do_something(self):
            proc_name = multiprocessing.current_process().name
            print('Doing something fancy in {} for {}!'.format(
                proc_name, self.name))
    
    
    def worker(q):
        obj = q.get()
        obj.do_something()
    
    
    if __name__ == '__main__':
        queue = multiprocessing.Queue()
    
        p = multiprocessing.Process(target=worker, args=(queue,))
        p.start()
    
        queue.put(MyFancyClass('Fancy Dan'))
    
        # Wait for the worker to finish
        queue.close()
        queue.join_thread()
        p.join()

    pickle

    https://pymotw.com/3/pickle/index.html#module-pickle

    The pickle module implements an algorithm for turning an arbitrary Python object into a series of bytes. This process is also called serializing the object. The byte stream representing the object can then be transmitted or stored, and later reconstructed to create a new object with the same characteristics.

    Encoding and Decoding Data in Strings

    This first example Uses dumps() to encode a data structure as a string, then prints the string to the console. It uses a data structure made up of entirely built-in types. Instances of any class can be pickled, as will be illustrated in a later example.

    pickle_string.py
    import pickle
    import pprint
    
    data = [{'a': 'A', 'b': 2, 'c': 3.0}]
    print('DATA:', end=' ')
    pprint.pprint(data)
    
    data_string = pickle.dumps(data)
    print('PICKLE: {!r}'.format(data_string))
    

    By default, the pickle will be written in a binary format most compatible when sharing between Python 3 programs.

    $ python3 pickle_string.py
    
    DATA: [{'a': 'A', 'b': 2, 'c': 3.0}]
    PICKLE: b'x80x03]qx00}qx01(Xx01x00x00x00cqx02G@x08x00
    x00x00x00x00x00Xx01x00x00x00bqx03Kx02Xx01x00x00x0
    0aqx04Xx01x00x00x00Aqx05ua.'
    

    After the data is serialized, it can be written to a file, socket, pipe, etc. Later, the file can be read and the data unpickled to construct a new object with the same values.

    pickle_unpickle.py
    import pickle
    import pprint
    
    data1 = [{'a': 'A', 'b': 2, 'c': 3.0}]
    print('BEFORE: ', end=' ')
    pprint.pprint(data1)
    
    data1_string = pickle.dumps(data1)
    
    data2 = pickle.loads(data1_string)
    print('AFTER : ', end=' ')
    pprint.pprint(data2)
    
    print('SAME? :', (data1 is data2))
    print('EQUAL?:', (data1 == data2))
    

    The newly constructed object is equal to, but not the same object as, the original.

    $ python3 pickle_unpickle.py
    
    BEFORE:  [{'a': 'A', 'b': 2, 'c': 3.0}]
    AFTER :  [{'a': 'A', 'b': 2, 'c': 3.0}]
    SAME? : False
    EQUAL?: True
    

    pickle protocol

    pickle是一种python专门的数据格式协议。

    只能用于python。

    目前演化到4.0版本。

    https://docs.python.org/3.7/library/pickle.html#pickle-protocols

    The data format used by pickle is Python-specific. This has the advantage that there are no restrictions imposed by external standards such as JSON or XDR (which can’t represent pointer sharing); however it means that non-Python programs may not be able to reconstruct pickled Python objects.

    By default, the pickle data format uses a relatively compact binary representation. If you need optimal size characteristics, you can efficiently compress pickled data.

    There are currently 5 different protocols which can be used for pickling. The higher the protocol used, the more recent the version of Python needed to read the pickle produced.

    • Protocol version 0 is the original “human-readable” protocol and is backwards compatible with earlier versions of Python.

    • Protocol version 1 is an old binary format which is also compatible with earlier versions of Python.

    • Protocol version 2 was introduced in Python 2.3. It provides much more efficient pickling of new-style classes. Refer to PEP 307 for information about improvements brought by protocol 2.

    • Protocol version 3 was added in Python 3.0. It has explicit support for bytes objects and cannot be unpickled by Python 2.x. This is the default protocol, and the recommended protocol when compatibility with other Python 3 versions is required.

    • Protocol version 4 was added in Python 3.4. It adds support for very large objects, pickling more kinds of objects, and some data format optimizations. Refer to PEP 3154 for information about improvements brought by protocol 4.

    https://www.python.org/dev/peps/pep-3154/

    Framing

    Traditionally, when unpickling an object from a stream (by calling load() rather than loads()), many small read() calls can be issued on the file-like object, with a potentially huge performance impact.

    Protocol 4, by contrast, features binary framing. The general structure of a pickle is thus the following:

    +------+------+
    | 0x80 | 0x04 |              protocol header (2 bytes)
    +------+------+
    |  OP  |                     FRAME opcode (1 byte)
    +------+------+-----------+
    | MM MM MM MM MM MM MM MM |  frame size (8 bytes, little-endian)
    +------+------------------+
    | .... |                     first frame contents (M bytes)
    +------+
    |  OP  |                     FRAME opcode (1 byte)
    +------+------+-----------+
    | NN NN NN NN NN NN NN NN |  frame size (8 bytes, little-endian)
    +------+------------------+
    | .... |                     second frame contents (N bytes)
    +------+
      etc.
    

    Queue 双向通信

    A more complex example shows how to manage several workers consuming data from a JoinableQueue and passing results back to the parent process. The poison pill technique is used to stop the workers. After setting up the real tasks, the main program adds one “stop” value per worker to the job queue. When a worker encounters the special value, it breaks out of its processing loop. The main process uses the task queue’s join() method to wait for all of the tasks to finish before processing the results.

    import multiprocessing
    import time
    
    
    class Consumer(multiprocessing.Process):
    
        def __init__(self, task_queue, result_queue):
            multiprocessing.Process.__init__(self)
            self.task_queue = task_queue
            self.result_queue = result_queue
    
        def run(self):
            proc_name = self.name
            while True:
                next_task = self.task_queue.get()
                if next_task is None:
                    # Poison pill means shutdown
                    print('{}: Exiting'.format(proc_name))
                    self.task_queue.task_done()
                    break
                print('{}: {}'.format(proc_name, next_task))
                answer = next_task()
                self.task_queue.task_done()
                self.result_queue.put(answer)
    
    
    class Task:
    
        def __init__(self, a, b):
            self.a = a
            self.b = b
    
        def __call__(self):
            time.sleep(0.1)  # pretend to take time to do the work
            return '{self.a} * {self.b} = {product}'.format(
                self=self, product=self.a * self.b)
    
        def __str__(self):
            return '{self.a} * {self.b}'.format(self=self)
    
    
    if __name__ == '__main__':
        # Establish communication queues
        tasks = multiprocessing.JoinableQueue()
        results = multiprocessing.Queue()
    
        # Start consumers
        num_consumers = multiprocessing.cpu_count() * 2
        print('Creating {} consumers'.format(num_consumers))
        consumers = [
            Consumer(tasks, results)
            for i in range(num_consumers)
        ]
        for w in consumers:
            w.start()
    
        # Enqueue jobs
        num_jobs = 10
        for i in range(num_jobs):
            tasks.put(Task(i, i))
    
        # Add a poison pill for each consumer
        for i in range(num_consumers):
            tasks.put(None)
    
        # Wait for all of the tasks to finish
        tasks.join()
    
        # Start printing results
        while num_jobs:
            result = results.get()
            print('Result:', result)
            num_jobs -= 1

    Although the jobs enter the queue in order, their execution is parallelized so there is no guarantee about the order they will be completed.

    $ python3 -u multiprocessing_producer_consumer.py
    
    Creating 8 consumers
    Consumer-1: 0 * 0
    Consumer-2: 1 * 1
    Consumer-3: 2 * 2
    Consumer-4: 3 * 3
    Consumer-5: 4 * 4
    Consumer-6: 5 * 5
    Consumer-7: 6 * 6
    Consumer-8: 7 * 7
    Consumer-3: 8 * 8
    Consumer-7: 9 * 9
    Consumer-4: Exiting
    Consumer-1: Exiting
    Consumer-2: Exiting
    Consumer-5: Exiting
    Consumer-6: Exiting
    Consumer-8: Exiting
    Consumer-7: Exiting
    Consumer-3: Exiting
    Result: 6 * 6 = 36
    Result: 2 * 2 = 4
    Result: 3 * 3 = 9
    Result: 0 * 0 = 0
    Result: 1 * 1 = 1
    Result: 7 * 7 = 49
    Result: 4 * 4 = 16
    Result: 5 * 5 = 25
    Result: 8 * 8 = 64
    Result: 9 * 9 = 81
    
  • 相关阅读:
    音频(一)_音频认知(1.音频释义)
    音频_写在前面的话
    SignInWithAppleId(Apple登录接入)_unity篇
    编程工具~用了都说好的快捷键大杂烩
    Unity的PlayerPrefs存储路径
    unity如何判断应用的运行平台
    Unity资源加载机制www的坑
    VSCode快捷键
    MD5加密字符串并转化为base64(C#和PHP代码相同实现)
    转载:关于 Google Chrome 中的全屏模式和 APP 模式
  • 原文地址:https://www.cnblogs.com/lightsong/p/13985485.html
Copyright © 2011-2022 走看看