zoukankan      html  css  js  c++  java
  • mutiprocessing 同步类型,如锁,条件和队列官方案例:

    官方文档:https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing

    1。 同步类型,如锁,条件和队列官方案例:

    #
    # A test file for the `multiprocessing` package
    #
    # Copyright (c) 2006-2008, R Oudkerk
    # All rights reserved.
    #
    
    import time, sys, random
    from Queue import Empty
    
    import multiprocessing               # may get overwritten
    
    
    #### TEST_VALUE
    
    def value_func(running, mutex):
        random.seed()
        time.sleep(random.random()*4)
    
        mutex.acquire()
        print '
    			' + str(multiprocessing.current_process()) + ' has finished'
        running.value -= 1
        mutex.release()
    
    def test_value():
        TASKS = 10
        running = multiprocessing.Value('i', TASKS)
        mutex = multiprocessing.Lock()
    
        for i in range(TASKS):
            p = multiprocessing.Process(target=value_func, args=(running, mutex))
            p.start()
    
        while running.value > 0:
            time.sleep(0.08)
            mutex.acquire()
            print running.value,
            sys.stdout.flush()
            mutex.release()
    
        print
        print 'No more running processes'
    
    
    #### TEST_QUEUE
    
    def queue_func(queue):
        for i in range(30):
            time.sleep(0.5 * random.random())
            queue.put(i*i)
        queue.put('STOP')
    
    def test_queue():
        q = multiprocessing.Queue()
    
        p = multiprocessing.Process(target=queue_func, args=(q,))
        p.start()
    
        o = None
        while o != 'STOP':
            try:
                o = q.get(timeout=0.3)
                print o,
                sys.stdout.flush()
            except Empty:
                print 'TIMEOUT'
    
        print
    
    
    #### TEST_CONDITION
    
    def condition_func(cond):
        cond.acquire()
        print '	' + str(cond)
        time.sleep(2)
        print '	child is notifying'
        print '	' + str(cond)
        cond.notify()
        cond.release()
    
    def test_condition():
        cond = multiprocessing.Condition()
    
        p = multiprocessing.Process(target=condition_func, args=(cond,))
        print cond
    
        cond.acquire()
        print cond
        cond.acquire()
        print cond
    
        p.start()
    
        print 'main is waiting'
        cond.wait()
        print 'main has woken up'
    
        print cond
        cond.release()
        print cond
        cond.release()
    
        p.join()
        print cond
    
    
    #### TEST_SEMAPHORE
    
    def semaphore_func(sema, mutex, running):
        sema.acquire()
    
        mutex.acquire()
        running.value += 1
        print running.value, 'tasks are running'
        mutex.release()
    
        random.seed()
        time.sleep(random.random()*2)
    
        mutex.acquire()
        running.value -= 1
        print '%s has finished' % multiprocessing.current_process()
        mutex.release()
    
        sema.release()
    
    def test_semaphore():
        sema = multiprocessing.Semaphore(3)
        mutex = multiprocessing.RLock()
        running = multiprocessing.Value('i', 0)
    
        processes = [
            multiprocessing.Process(target=semaphore_func,
                                    args=(sema, mutex, running))
            for i in range(10)
            ]
    
        for p in processes:
            p.start()
    
        for p in processes:
            p.join()
    
    
    #### TEST_JOIN_TIMEOUT
    
    def join_timeout_func():
        print '	child sleeping'
        time.sleep(5.5)
        print '
    	child terminating'
    
    def test_join_timeout():
        p = multiprocessing.Process(target=join_timeout_func)
        p.start()
    
        print 'waiting for process to finish'
    
        while 1:
            p.join(timeout=1)
            if not p.is_alive():
                break
            print '.',
            sys.stdout.flush()
    
    
    #### TEST_EVENT
    
    def event_func(event):
        print '	%r is waiting' % multiprocessing.current_process()
        event.wait()
        print '	%r has woken up' % multiprocessing.current_process()
    
    def test_event():
        event = multiprocessing.Event()
    
        processes = [multiprocessing.Process(target=event_func, args=(event,))
                     for i in range(5)]
    
        for p in processes:
            p.start()
    
        print 'main is sleeping'
        time.sleep(2)
    
        print 'main is setting event'
        event.set()
    
        for p in processes:
            p.join()
    
    
    #### TEST_SHAREDVALUES
    
    def sharedvalues_func(values, arrays, shared_values, shared_arrays):
        for i in range(len(values)):
            v = values[i][1]
            sv = shared_values[i].value
            assert v == sv
    
        for i in range(len(values)):
            a = arrays[i][1]
            sa = list(shared_arrays[i][:])
            assert a == sa
    
        print 'Tests passed'
    
    def test_sharedvalues():
        values = [
            ('i', 10),
            ('h', -2),
            ('d', 1.25)
            ]
        arrays = [
            ('i', range(100)),
            ('d', [0.25 * i for i in range(100)]),
            ('H', range(1000))
            ]
    
        shared_values = [multiprocessing.Value(id, v) for id, v in values]
        shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
    
        p = multiprocessing.Process(
            target=sharedvalues_func,
            args=(values, arrays, shared_values, shared_arrays)
            )
        p.start()
        p.join()
    
        assert p.exitcode == 0
    
    
    ####
    
    def test(namespace=multiprocessing):
        global multiprocessing
    
        multiprocessing = namespace
    
        for func in [ test_value, test_queue, test_condition,
                      test_semaphore, test_join_timeout, test_event,
                      test_sharedvalues ]:
    
            print '
    	######## %s
    ' % func.__name__
            func()
    
        ignore = multiprocessing.active_children()      # cleanup any old processes
        if hasattr(multiprocessing, '_debug_info'):
            info = multiprocessing._debug_info()
            if info:
                print info
                raise ValueError('there should be no positive refcounts left')
    
    
    if __name__ == '__main__':
        multiprocessing.freeze_support()
    
        assert len(sys.argv) in (1, 2)
    
        if len(sys.argv) == 1 or sys.argv[1] == 'processes':
            print ' Using processes '.center(79, '-')
            namespace = multiprocessing
        elif sys.argv[1] == 'manager':
            print ' Using processes and a manager '.center(79, '-')
            namespace = multiprocessing.Manager()
            namespace.Process = multiprocessing.Process
            namespace.current_process = multiprocessing.current_process
            namespace.active_children = multiprocessing.active_children
        elif sys.argv[1] == 'threads':
            print ' Using threads '.center(79, '-')
            import multiprocessing.dummy as namespace
        else:
            print 'Usage:
    	%s [processes | manager | threads]' % sys.argv[0]
            raise SystemExit(2)
    
        test(namespace)
    

     下面是一个示例,显示了如何使用队列将任务提供给一组工作进程并收集结果:

    # Simple example which uses a pool of workers to carry out some tasks.
    #
    # Notice that the results will probably not come out of the output
    # queue in the same in the same order as the corresponding tasks were
    # put on the input queue.  If it is important to get the results back
    # in the original order then consider using `Pool.map()` or
    # `Pool.imap()` (which will save on the amount of code needed anyway).
    #
    # Copyright (c) 2006-2008, R Oudkerk
    # All rights reserved.
    #
    
    import time
    import random
    
    from multiprocessing import Process, Queue, current_process, freeze_support
    
    #
    # Function run by worker processes
    #
    
    def worker(input, output):
        for func, args in iter(input.get, 'STOP'):
            result = calculate(func, args)
            output.put(result)
    
    #
    # Function used to calculate result
    #
    
    def calculate(func, args):
        result = func(*args)
        return '%s says that %s%s = %s' % 
            (current_process().name, func.__name__, args, result)
    
    #
    # Functions referenced by tasks
    #
    
    def mul(a, b):
        time.sleep(0.5*random.random())
        return a * b
    
    def plus(a, b):
        time.sleep(0.5*random.random())
        return a + b
    
    #
    #
    #
    
    def test():
        NUMBER_OF_PROCESSES = 4
        TASKS1 = [(mul, (i, 7)) for i in range(20)]
        TASKS2 = [(plus, (i, 8)) for i in range(10)]
    
        # Create queues
        task_queue = Queue()
        done_queue = Queue()
    
        # Submit tasks
        for task in TASKS1:
            task_queue.put(task)
    
        # Start worker processes
        for i in range(NUMBER_OF_PROCESSES):
            Process(target=worker, args=(task_queue, done_queue)).start()
    
        # Get and print results
        print 'Unordered results:'
        for i in range(len(TASKS1)):
            print '	', done_queue.get()
    
        # Add more tasks using `put()`
        for task in TASKS2:
            task_queue.put(task)
    
        # Get and print some more results
        for i in range(len(TASKS2)):
            print '	', done_queue.get()
    
        # Tell child processes to stop
        for i in range(NUMBER_OF_PROCESSES):
            task_queue.put('STOP')
    
    
    if __name__ == '__main__':
        freeze_support()
        test()
    

      

  • 相关阅读:
    子线程导致 Windows 服务停止的情况(Topshelf 结合 Quartz.NET)
    ASP.NET Web API 2 使用 DelegatingHandler(委托处理程序)实现签名认证
    ASP.NET Web API 2 使用 AuthorizationFilter(授权过滤器)实现 Basic 认证
    聚合函数查询语句
    SQL SERVER数据库常用命令
    Easyui-datebox日期控件增加清空按钮
    用sql语句查出来字段里包含某个字符串的所有记录
    String 转化成java.sql.Date和java.sql.Time(转载)
    常见的 HTML 事件
    JavaScript 变量中给数值加引号的问题
  • 原文地址:https://www.cnblogs.com/SunshineKimi/p/12054433.html
Copyright © 2011-2022 走看看