官方文档:https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing
1。 同步类型,如锁,条件和队列官方案例:
# # A test file for the `multiprocessing` package # # Copyright (c) 2006-2008, R Oudkerk # All rights reserved. # import time, sys, random from Queue import Empty import multiprocessing # may get overwritten #### TEST_VALUE def value_func(running, mutex): random.seed() time.sleep(random.random()*4) mutex.acquire() print ' ' + str(multiprocessing.current_process()) + ' has finished' running.value -= 1 mutex.release() def test_value(): TASKS = 10 running = multiprocessing.Value('i', TASKS) mutex = multiprocessing.Lock() for i in range(TASKS): p = multiprocessing.Process(target=value_func, args=(running, mutex)) p.start() while running.value > 0: time.sleep(0.08) mutex.acquire() print running.value, sys.stdout.flush() mutex.release() print print 'No more running processes' #### TEST_QUEUE def queue_func(queue): for i in range(30): time.sleep(0.5 * random.random()) queue.put(i*i) queue.put('STOP') def test_queue(): q = multiprocessing.Queue() p = multiprocessing.Process(target=queue_func, args=(q,)) p.start() o = None while o != 'STOP': try: o = q.get(timeout=0.3) print o, sys.stdout.flush() except Empty: print 'TIMEOUT' print #### TEST_CONDITION def condition_func(cond): cond.acquire() print ' ' + str(cond) time.sleep(2) print ' child is notifying' print ' ' + str(cond) cond.notify() cond.release() def test_condition(): cond = multiprocessing.Condition() p = multiprocessing.Process(target=condition_func, args=(cond,)) print cond cond.acquire() print cond cond.acquire() print cond p.start() print 'main is waiting' cond.wait() print 'main has woken up' print cond cond.release() print cond cond.release() p.join() print cond #### TEST_SEMAPHORE def semaphore_func(sema, mutex, running): sema.acquire() mutex.acquire() running.value += 1 print running.value, 'tasks are running' mutex.release() random.seed() time.sleep(random.random()*2) mutex.acquire() running.value -= 1 print '%s has finished' % multiprocessing.current_process() mutex.release() sema.release() def test_semaphore(): sema = multiprocessing.Semaphore(3) mutex = multiprocessing.RLock() running = multiprocessing.Value('i', 0) processes = [ multiprocessing.Process(target=semaphore_func, args=(sema, mutex, running)) for i in range(10) ] for p in processes: p.start() for p in processes: p.join() #### TEST_JOIN_TIMEOUT def join_timeout_func(): print ' child sleeping' time.sleep(5.5) print ' child terminating' def test_join_timeout(): p = multiprocessing.Process(target=join_timeout_func) p.start() print 'waiting for process to finish' while 1: p.join(timeout=1) if not p.is_alive(): break print '.', sys.stdout.flush() #### TEST_EVENT def event_func(event): print ' %r is waiting' % multiprocessing.current_process() event.wait() print ' %r has woken up' % multiprocessing.current_process() def test_event(): event = multiprocessing.Event() processes = [multiprocessing.Process(target=event_func, args=(event,)) for i in range(5)] for p in processes: p.start() print 'main is sleeping' time.sleep(2) print 'main is setting event' event.set() for p in processes: p.join() #### TEST_SHAREDVALUES def sharedvalues_func(values, arrays, shared_values, shared_arrays): for i in range(len(values)): v = values[i][1] sv = shared_values[i].value assert v == sv for i in range(len(values)): a = arrays[i][1] sa = list(shared_arrays[i][:]) assert a == sa print 'Tests passed' def test_sharedvalues(): values = [ ('i', 10), ('h', -2), ('d', 1.25) ] arrays = [ ('i', range(100)), ('d', [0.25 * i for i in range(100)]), ('H', range(1000)) ] shared_values = [multiprocessing.Value(id, v) for id, v in values] shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays] p = multiprocessing.Process( target=sharedvalues_func, args=(values, arrays, shared_values, shared_arrays) ) p.start() p.join() assert p.exitcode == 0 #### def test(namespace=multiprocessing): global multiprocessing multiprocessing = namespace for func in [ test_value, test_queue, test_condition, test_semaphore, test_join_timeout, test_event, test_sharedvalues ]: print ' ######## %s ' % func.__name__ func() ignore = multiprocessing.active_children() # cleanup any old processes if hasattr(multiprocessing, '_debug_info'): info = multiprocessing._debug_info() if info: print info raise ValueError('there should be no positive refcounts left') if __name__ == '__main__': multiprocessing.freeze_support() assert len(sys.argv) in (1, 2) if len(sys.argv) == 1 or sys.argv[1] == 'processes': print ' Using processes '.center(79, '-') namespace = multiprocessing elif sys.argv[1] == 'manager': print ' Using processes and a manager '.center(79, '-') namespace = multiprocessing.Manager() namespace.Process = multiprocessing.Process namespace.current_process = multiprocessing.current_process namespace.active_children = multiprocessing.active_children elif sys.argv[1] == 'threads': print ' Using threads '.center(79, '-') import multiprocessing.dummy as namespace else: print 'Usage: %s [processes | manager | threads]' % sys.argv[0] raise SystemExit(2) test(namespace)
下面是一个示例,显示了如何使用队列将任务提供给一组工作进程并收集结果:
# Simple example which uses a pool of workers to carry out some tasks. # # Notice that the results will probably not come out of the output # queue in the same in the same order as the corresponding tasks were # put on the input queue. If it is important to get the results back # in the original order then consider using `Pool.map()` or # `Pool.imap()` (which will save on the amount of code needed anyway). # # Copyright (c) 2006-2008, R Oudkerk # All rights reserved. # import time import random from multiprocessing import Process, Queue, current_process, freeze_support # # Function run by worker processes # def worker(input, output): for func, args in iter(input.get, 'STOP'): result = calculate(func, args) output.put(result) # # Function used to calculate result # def calculate(func, args): result = func(*args) return '%s says that %s%s = %s' % (current_process().name, func.__name__, args, result) # # Functions referenced by tasks # def mul(a, b): time.sleep(0.5*random.random()) return a * b def plus(a, b): time.sleep(0.5*random.random()) return a + b # # # def test(): NUMBER_OF_PROCESSES = 4 TASKS1 = [(mul, (i, 7)) for i in range(20)] TASKS2 = [(plus, (i, 8)) for i in range(10)] # Create queues task_queue = Queue() done_queue = Queue() # Submit tasks for task in TASKS1: task_queue.put(task) # Start worker processes for i in range(NUMBER_OF_PROCESSES): Process(target=worker, args=(task_queue, done_queue)).start() # Get and print results print 'Unordered results:' for i in range(len(TASKS1)): print ' ', done_queue.get() # Add more tasks using `put()` for task in TASKS2: task_queue.put(task) # Get and print some more results for i in range(len(TASKS2)): print ' ', done_queue.get() # Tell child processes to stop for i in range(NUMBER_OF_PROCESSES): task_queue.put('STOP') if __name__ == '__main__': freeze_support() test()