zoukankan      html  css  js  c++  java
  • 60%的人不懂Python进程Process,你懂吗?

     

    新手注意如果你Python基础学的不够扎实,遇问题没人解答?可以点我进裙看我的最新入门到实战教程复习下再来

    基本使用

    运用多进程时,将方法放在main()中,否则会出现异常警告。

    Process() 基本使用:与Thread()类似。

    Pool() 基本使用:

    其中map方法用起来和内置的map函数一样,却有多进程的支持。

    from multiprocessing import Pool
    pool = Pool(2)
    pool.map(fib, [35] * 2)

    multiprocessing.dummy 模块:

    multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module.

    对于以上部分知识点,没有实际运用过,只是单纯了解并编写Demo进行了练习,理解没有很透彻。

    # -*- coding: utf-8 -*-
    from multiprocessing import Process, Pool
    from multiprocessing.dummy import Pool as DummyPool
    import time
    import datetime
    
    def log_time(methond_name):
        def decorator(f):
            def wrapper(*args, **kwargs):
                start_time = time.time()
                res = f(*args, **kwargs)
                end_time = time.time()
                print('%s cost %ss' % (methond_name, (end_time - start_time)))
                return res
            return wrapper
        return decorator
    
    def fib(n):
        if n <=2 :
            return 1
        return fib(n-1) + fib(n-2)
    
    @log_time('single_process')
    def single_process():
        fib(33)
        fib(33)
    
    @log_time('multi_process')
    def multi_process():
        jobs = []
        for _ in range(2):
            p = Process(target=fib, args=(33, ))
            p.start()
            jobs.append(p)
        for j in jobs:
            j.join()
    
    
    @log_time('pool_process')
    def pool_process():
        pool = Pool(2)
        pool.map(fib, [33]*2)
    
    
    @log_time('dummy_pool')
    def dummy_pool():
        pool = DummyPool(2)
        pool.map(fib, [33]*2)
    
    
    if __name__ == '__main__':
        single_process()
        multi_process()
        pool_process()
        dummy_pool()

    基于Pipe的parmap

    理解稍有困难。注意:如果你Python基础不够扎实,可以点我进裙看我的最新入门到实战教程复习


    队列

    实现生产消费者模型,一个队列存放任务,一个队列存放结果。 
    multiprocessing模块下也有Queue,但不提供task_done()join()方法。故利用Queue存放结果,JoinableQueue() 来存放任务。

    仿照的Demo,一个消费者进程和一个生产者进程:

    # -*- coding: utf-8 -*-
    from multiprocessing import Process, Queue, JoinableQueue
    import time
    import random
    
    def double(n):
        return n * 2 
    
    def producer(name, task_q):
        while 1:
            n = random.random()
            if n > 0.8:  # 大于0.8时跳出
                task_q.put(None)
                print('%s break.' % name)
                break
            print('%s produce %s.' % (name, n))
            task_q.put((double, n))
    
    
    def consumer(name, task_q, result_q):
        while 1:
            task = task_q.get()
            if task is None:
                print('%s break.' % name)
                break
            func, arg = task
            res = func(arg)
            time.sleep(0.5)  # 阻塞
            task_q.task_done()
            result_q.put(res)
            print('%s consume %s, result %s' % (name, arg, res))
    
    def run():
        task_q = JoinableQueue()
        result_q = Queue()
        processes = []
        p1 = Process(name='p1', target=producer, args=('p1', task_q))
        c1 = Process(name='c1', target=consumer, args=('c1', task_q, result_q))
        p1.start()
        c1.start()
        processes.append(p1)
        processes.append(c1)
    
        # join()阻塞主进程
        for p in processes:
            p.join()
    
        # 子进程结束后,输出result中的值
        while 1:
            if result_q.empty():
                break
            result = result_q.get()
            print('result is: %s' % result)
    
    if __name__ == '__main__':
        run()

    如果存在多个consumer()进程,只会有一个consumer()进程能取出None并break,其他的则会在task_q.get()一直挂起,尝试在consumer()方法中添加超时退出。

    import queue
    
    def consumer(name, task_q, result_q):
        while 1:
            try:
                task = task_q.get(1)  # 1s
            except queue.Empty:
                print('%s time out, break.' % name)
            if task is None:
                print('%s break.' % name)
                break
            func, arg = task
            res = func(arg)
            time.sleep(0.5)  # 阻塞
            task_q.task_done()
            result_q.put(res)
            print('%s consume %s, result %s' % (name, arg, res))

    共享内存

    利用sharedctypes中的ArrayValue来共享内存。 
    下例为仿照。

    # -*- coding: utf-8 -*-
    
    from pprint import pprint
    
    # 共享内存
    from multiprocessing import sharedctypes, Process, Lock
    from ctypes import Structure, c_bool, c_double
    
    pprint(sharedctypes.typecode_to_type)
    
    lock = Lock()
    
    
    class Point(Structure):
        _fields_ = [('x', c_double), ('y', c_double)]  # _fields_
    
    
    def modify(n, b, s, arr, A):
        n.value **= 2
        b.value = True
        s.value = s.value.upper()
        arr[0] = 10
        for a in A:
            a.x **= 2
            a.y **= 2
    
    if __name__ == '__main__':
    
        n = sharedctypes.Value('i', 7)
        b = sharedctypes.Value(c_bool, False, lock=False)
        s = sharedctypes.Array('c', b'hello world', lock=lock)  # bytes
        arr = sharedctypes.Array('i', range(5), lock=True)
        A = sharedctypes.Array(Point, [(1.875, -6.25), (-5.75, 2.0)], lock=lock)
        p = Process(target=modify, args=(n, b, s, arr, A))
        p.start()
        p.join()
        print(n.value)
        print(b.value)
        print(s.value)
        print(arr[:])
        print([(a.x, a.y) for a in A])

    实际项目中利用Value来监测子进程的任务状态, 并通过memcached来存储更新删除。

    # -*- coding: utf-8 -*-
    
    from multiprocessing import Process, Value
    import time
    import datetime
    import random
    
    
    FINISHED = 3
    FAILED = 4
    INPROCESS = 2
    WAITING = 1
    
    def execute_method(status, process):
        time.sleep(1)
        status.value = INPROCESS  # test
        time.sleep(1)
        status.value = FINISHED  # test
        time.sleep(0.5)
    
    def run(execute_code):
        status = Value('i', WAITING )
        process = Value('f', 0.0)
        # mem_cache.set('%s_status' % execute_code, status.value, 0)
        # mem_cache.set('%s_process' % execute_code, process .value, 0)
        p = Process(target=execute_method, args=(status, process))
        p.start()
        start_time = datetime.datetime.now()
        while True:
            print(status.value)
            now_time = datetime.datetime.now()
            if (now_time - start_time).seconds > 30:  # 超过30sbreak
                # mem_cache.delete('%s_status' % execute_code)
                # mem_cache.delete('%s_process' % execute_code)
                print('execute failed')
                p.terminate()
                break
            if status.value == 3:
                # mem_cache.delete('%s_status' % execute_code)
                # mem_cache.delete('%s_process' % execute_code)
                print('end execute')
                break
            else:
                # mem_cache.set('%s_status' % execute_code, status.value, 0)
                # mem_cache.set('%s_process' % execute_code, process .value, 0)
                print('waiting or executing')
            time.sleep(0.5)
        p.join()

    服务进程

    下例为仿照博客中的服务进程的例子,简单的展示了Manager的常见的共享方式。

    一个multiprocessing.Manager对象会控制一个服务器进程,其他进程可以通过代理的方式来访问这个服务器进程。 常见的共享方式有以下几种: 
    1. Namespace。创建一个可分享的命名空间。 
    2. Value/Array。和上面共享ctypes对象的方式一样。 
    dict/list。创建一个可分享的 
    3. dict/list,支持对应数据结构的方法。 
    4. Condition/Event/Lock/Queue/Semaphore。创建一个可分享的对应同步原语的对象。

    # -*- coding: utf-8 -*-
    from multiprocessing import Manager, Process
    
    def modify(ns, lproxy, dproxy):
        ns.name = 'new_name'
        lproxy.append('new_value')
        dproxy['new'] = 'new_value'
    
    def run():
        # 数据准备
        manager = Manager()
        ns = manager.Namespace()
        ns.name = 'origin_name'
        lproxy = manager.list()
        lproxy.append('origin_value')
        dproxy = manager.dict()
        dproxy['origin'] = 'origin_value'
    
        # 子进程
        p = Process(target=modify, args=(ns, lproxy, dproxy))
        p.start()
        print(p.pid)
        p.join()
    
        print('ns.name: %s' % ns.name)
        print('lproxy: %s' % lproxy)
        print('dproxy: %s' % dproxy)
    
    if __name__ == '__main__':
        run()

    上例主要是展示了Manager中的共享对象类型和代理,查看源码知是通过register()方法。

    multiprocessing/managers.py:

    #
    # Definition of SyncManager
    #
    
    class SyncManager(BaseManager):
        '''
        Subclass of `BaseManager` which supports a number of shared object types.
    
        The types registered are those intended for the synchronization
        of threads, plus `dict`, `list` and `Namespace`.
    
        The `multiprocessing.Manager()` function creates started instances of
        this class.
        '''
    
    SyncManager.register('Queue', queue.Queue)
    SyncManager.register('JoinableQueue', queue.Queue)
    SyncManager.register('Event', threading.Event, EventProxy)
    SyncManager.register('Lock', threading.Lock, AcquirerProxy)
    SyncManager.register('RLock', threading.RLock, AcquirerProxy)
    SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
    SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
                         AcquirerProxy)
    SyncManager.register('Condition', threading.Condition, ConditionProxy)
    SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
    SyncManager.register('Pool', pool.Pool, PoolProxy)
    SyncManager.register('list', list, ListProxy)
    SyncManager.register('dict', dict, DictProxy)
    SyncManager.register('Value', Value, ValueProxy)
    SyncManager.register('Array', Array, ArrayProxy)
    SyncManager.register('Namespace', Namespace, NamespaceProxy)
    
    # types returned by methods of PoolProxy
    SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
    SyncManager.register('AsyncResult', create_method=False)

    除了在子进程中,还可利用Manager()来在不同进程间通信,如下面的分布式进程简单实现。


    分布进程

    和上例的主要区别是,非子进程间进行通信。

    manager_server.py:

    # -*- coding: utf-8 -*-
    
    from multiprocessing.managers import BaseManager
    
    host = '127.0.0.1'
    port = 8080
    authkey = b'python'
    
    shared_list = []
    
    class ServerManager(BaseManager):
        pass
    
    ServerManager.register('get_list', callable=lambda: shared_list)
    server_manager = ServerManager(address=(host, port), authkey=authkey)
    server = server_manager.get_server()
    server.serve_forever()

    manager_client.py

    # -*- coding: utf-8 -*-
    
    from multiprocessing.managers import BaseManager
    
    host = '127.0.0.1'
    port = 8080
    authkey = b'python'
    
    class ClientManager(BaseManager):
        pass
    
    ClientManager.register('get_list')
    client_manager = ClientManager(address=(host, port), authkey=authkey)
    client_manager.connect()
    
    l = client_manager.get_list()
    print(l)
    
    l.append('new_value')
    print(l)

    运行多次后,shared_list中会不断添加new_value

    仿照廖雪峰教程上的分布式进程加以适当修改。

    manager_server.py:

    # -*- coding: utf-8 -*-
    
    from multiprocessing.managers import BaseManager
    from multiprocessing import Condition, Value
    import queue
    
    host = '127.0.0.1'
    port = 8080
    authkey = b'python'
    
    
    task_q = queue.Queue(10)
    result_q = queue.Queue(20)
    cond = Condition()
    done = Value('i', 0)
    
    def double(n):
        return n * 2
    
    class ServerManager(BaseManager):
        pass
    
    ServerManager.register('get_task_queue', callable=lambda: task_q)
    ServerManager.register('get_result_queue', callable=lambda: result_q)
    ServerManager.register('get_cond', callable=lambda: cond)
    ServerManager.register('get_done', callable=lambda: done)
    ServerManager.register('get_double', callable=double)
    
    server_manager = ServerManager(address=(host, port), authkey=authkey)
    server = server_manager.get_server()
    
    print('start server')
    server.serve_forever(
    

    manager_producer.py:

    # -*- coding: utf-8 -*-
    
    from multiprocessing.managers import BaseManager
    import random
    import time
    
    host = '127.0.0.1'
    port = 8080
    authkey = b'python'
    
    class ProducerManager(BaseManager):
        pass
    
    ProducerManager.register('get_task_queue')
    ProducerManager.register('get_cond')
    ProducerManager.register('get_done')
    producer_manager = ProducerManager(address=(host, port), authkey=authkey)
    
    producer_manager.connect()
    task_q  = producer_manager.get_task_queue()
    cond = producer_manager.get_cond()
    # done = producer_manager.get_done()
    count = 20  # 最多有20个任务
    
    while count > 0:
        if cond.acquire():
            if not task_q.full():
                n = random.randint(0, 10)
                task_q.put(n)
                print("Producer:deliver one, now tasks:%s" % task_q.qsize())
                cond.notify()
                count -= 1
                time.sleep(0.5)
            else:
                print("Producer:already full, stop deliver, now tasks:%s" % task_q.qsize())
                cond.wait() 
            cond.release()
    # done.value = 1
    print('Producer break')

    manager_consumer.py:

    # -*- coding: utf-8 -*-
    
    from multiprocessing.managers import BaseManager
    
    host = '127.0.0.1'
    port = 8080
    authkey = b'python'
    
    class ConsumerManager(BaseManager):
        pass
    
    ConsumerManager.register('get_task_queue')
    ConsumerManager.register('get_result_queue')
    ConsumerManager.register('get_cond')
    # ConsumerManager.register('get_done')
    ConsumerManager.register('get_double')
    
    consumer_manager = ConsumerManager(address=(host, port), authkey=authkey)
    consumer_manager.connect()
    
    task_q = consumer_manager.get_task_queue()
    result_q = consumer_manager.get_result_queue()
    cond = consumer_manager.get_cond()
    # done = consumer_manager.get_done()
    
    while 1:
        if result_q.full():
            print('result queue is full')
            break
        if cond.acquire():
            if not task_q.empty():
                arg = task_q.get()
                res = consumer_manager.get_double(arg)
                print("Consumer:consume one, now tasks:%s" % task_q.qsize())
                result_q.put(res)
                cond.notify()
            else:
                print("Consumer:only 0, stop consume, products")
                cond.wait()
            cond.release()
    
    while 1:
        if result_q.empty():
            break
        result = result_q.get()
        print('result is: %s' % result)
  • 相关阅读:
    剑指 Offer——13. 调整数组顺序使奇数位于偶数前面
    剑指 Offer——3. 从尾到头打印链表
    剑指 Offer——2. 替换空格
    剑指 Offer——1. 二维数组中的查找
    LeetCode 905. Sort Array By Parity 按奇偶校验排列数组
    LeetCode 448. Find All Numbers Disappeared in an Array找到所有数组中消失的元素
    SSH 代码笔记
    anaconda3安装caffe
    opencv多版本安装
    人脸文章与数据库
  • 原文地址:https://www.cnblogs.com/chengxuyuanaa/p/11961696.html
Copyright © 2011-2022 走看看