#创建方法1:将要执行的方法作为参数传给Process from multiprocessing import Process def f(name): print 'hello',name if __name__ == '__main__': #需要注意的是,多进程只能在main中执行 p = Process(target=f,args=('pingy',)) #target=f指执行函数f,args=('pingy',)是指以元组方式传入函数的参数 p.start() #执行进程 p.join() #父进程停止,等待子进程执行完
#创建方法2:从Process继承,并重写run() from multiprocessing import Process class MyProcess(Process): def run(self): print("MyProcess extended from Process") if __name__ == '__main__': #需要注意的是,多进程只能在main中执行 p2=MyProcess() p2.start()
run(): #默认的run()函数调用target的函数,你也可以在子类中覆盖该函数 start() : #启动该进程 daemon(): #停止子进程,只执行父进程 join([timeout]) : #父进程被停止,直到子进程被执行完毕。当timeout为None时没有超时,否则有超时 is_alive(): #返回进程是否在运行。正在运行指启动后、终止前 terminate(): #结束进程
from multiprocessing import Process from threading import Thread import time import os def foo(n): time.sleep(2) print 'Number:',n print '子进程ID:',os.getpid(),'父进程ID:',os.getpid() def main1(): for i in range(2): foo(i) def main2(): for i in range(2): p = Process(target=foo,args=(i,)) print p.name,'准备执行...' #p.name为进程名 p.start() print p.pid,'开始执行...' #在进程start前,进程号p.pid为None p.join(1) #join([timeout]) 父进程被停止,直到子进程被执行完毕。 if __name__ == '__main__': print '主进程ID:',os.getpid() print '++++++++++++++++++++++++++++++++++++++++++' main1() print '------------------------------------------' main2()
主进程ID: 84792 ++++++++++++++++++++++++++++++++++++++++++ Number: 0 子进程ID: 84792 父进程ID: 84792 Number: 1 子进程ID: 84792 父进程ID: 84792 ------------------------------------------ Process-1 准备执行... 123316 开始执行... Process-2 准备执行... 85716 开始执行... Number: 0 子进程ID: 123316 父进程ID: 123316 Number: 1 子进程ID: 85716 父进程ID: 85716
#不加daemon: import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime())); if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.start() print "end!"
end! work start:Thu Oct 20 16:46:12 2016 work end:Thu Oct 20 16:46:15 2016
#加上daemon后: import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime())); if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.daemon = True p.start() print "end!"
import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime())); if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.daemon = True p.start() p.join() # print "end!"
work start:Thu Oct 20 16:49:34 2016
work end:Thu Oct 20 16:49:37 2016
import multiprocessing import time class ClockProcess(multiprocessing.Process): def __init__(self, interval): multiprocessing.Process.__init__(self) self.interval = interval def run(self): n = 5 while n > 0: print("the time is {0}".format(time.ctime())) time.sleep(self.interval) n -= 1 if __name__ == '__main__': p = ClockProcess(3) p.start()
the time is Thu Oct 20 16:42:21 2016 the time is Thu Oct 20 16:42:24 2016 the time is Thu Oct 20 16:42:27 2016 the time is Thu Oct 20 16:42:30 2016 the time is Thu Oct 20 16:42:33 2016
from multiprocessing import Process import threading import time
li = [] def run(li1,n): li1.append(n) print li1 if __name__ == '__main__':for i in range(10): #创建多进程,每个进程占用单独内存 p = Process(target=run,args=[li,i]) p.start() time.sleep(1) print '我是分割线'.center(50,'*') for i in range(10): #创建多线程,所有线程共享内存 t = threading.Thread(target=run,args=[li,i]) t.start()
[0] [1] [2] [3] [4] [5] [6] [7] [8] [9] *****************我是分割线****************** [0] [0, 1] [0, 1, 2] [0, 1, 2, 3] [0, 1, 2[, 03, , 14, , 25, ]3 , 4, 5] [0, 1, 2[, 03, , 14, , 25, , 36, , 47, ][5 0, , 61, , 72, , 83], 4, 5, 6, [70, , 81, , 92], 3, 4, 5, 6, 7, 8, 9]
#锁的使用方法: import multiprocessing lock = multiprocessing.Lock() #Lock对象 lock.acquire(([timeout])) #锁定。timeout为可选项,如果设定了timeout,则在超时后通过返回值可以判断是否得到了锁 lock.release() #解锁 rLock = multiprocessing.RLock() #RLock对象 rLock.acquire(([timeout])) #锁定。timeout为可选项,如果设定了timeout,则在超时后通过返回值可以判断是否得到了锁 rLock.release() #解锁
import multiprocessing import sys def worker_with(lock, f): with lock: fs = open(f, 'a+') n = 10 while n > 1: fs.write("Lockd acquired via with ") n -= 1 fs.close() def worker_no_with(lock, f): lock.acquire() try: fs = open(f, 'a+') n = 10 while n > 1: fs.write("Lock acquired directly ") n -= 1 fs.close() finally: lock.release() if __name__ == "__main__": lock = multiprocessing.Lock() f = "file.txt" w = multiprocessing.Process(target = worker_with, args=(lock, f)) nw = multiprocessing.Process(target = worker_no_with, args=(lock, f)) w.start() nw.start() print "end"
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
- FIFO(先进先出)队列
- LIFO(后进先出)队列
- PriorityQueue(优先级)队列
Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.full 与 maxsize 大小对应
Queue.get([block[, timeout]])获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
Queue.put(item) 写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)
Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
Queue.join() 实际上意味着等到队列为空,再执行别的操作
import Queue q = Queue.Queue() for i in range(5): q.put(i) while not q.empty(): print q.get()
输出结果: 0 1 2 3 4
import Queue q = Queue.LifoQueue() for i in range(5): q.put(i) while not q.empty(): print q.get()
输出结果: 4 3 2 1 0
import Queue import threading import time exitFlag = 0 class myThread (threading.Thread): def __init__(self, threadID, name, q): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run(self): print "Starting " + self.name process_data(self.name, self.q) print "Exiting " + self.name def process_data(threadName, q): while not exitFlag: queueLock.acquire() if not workQueue.empty(): data = q.get() queueLock.release() print "%s processing %s" % (threadName, data) else: queueLock.release() time.sleep(1) threadList = ["Thread-1", "Thread-2", "Thread-3"] nameList = ["One", "Two", "Three", "Four", "Five"] queueLock = threading.Lock() workQueue = Queue.Queue(10) threads = [] threadID = 1 # 创建新线程 for tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID += 1 # 填充队列 queueLock.acquire() for word in nameList: workQueue.put(word) queueLock.release() # 等待队列清空 while not workQueue.empty(): pass # 通知线程是时候退出 exitFlag = 1 # 等待所有线程完成 for t in threads: t.join() print "Exiting Main Thread"
Starting Thread-1 Starting Thread-2 Starting Thread-3 Thread-1 processing One Thread-2 processing Two Thread-3 processing Three Thread-1 processing Four Thread-2 processing Five Exiting Thread-3 Exiting Thread-1 Exiting Thread-2 Exiting Main Thread
from multiprocessing import Process,Queue def foo(q,n): q.put(n) if __name__ == '__main__': que=Queue() for i in range(5): p=Process(target=foo,args=(que,i)) p.start() p.join() print(que.qsize())
import multiprocessing def writer_proc(q): try: q.put(1, block = False) except: pass def reader_proc(q): try: print q.get(block = False) except: pass if __name__ == "__main__": q = multiprocessing.Queue() writer = multiprocessing.Process(target=writer_proc, args=(q,)) writer.start() reader = multiprocessing.Process(target=reader_proc, args=(q,)) reader.start()
from multiprocessing import Process,Value,Array def foo1(n,a): n.value = 3 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d',0.0) #d的意思是小数.创建0.0 arr = Array('i',range(10)) #i的意思是整数.创建一个0-9的整数 p = Process(target=foo1,args=(num,arr)) p.start() p.join() print num.value print arr[:]
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
from multiprocessing import Manager,Process def f(d,l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': manage = Manager() d = manage.dict() #创建一个进程间可共享的dict l = manage.list(range(10)) #创建一个进程间可共享的list p = Process(target=f,args=(d,l)) p.start() p.join() print d print l
{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
#使用进程池(非阻塞) import time import multiprocessing def fun(msg): print 'MSG:',msg time.sleep(3) print 'end' if __name__ == '__main__': pool = multiprocessing.Pool(processes=3) for i in xrange(4): msg = 'hello,%d' %(i) pool.apply_async(fun,(msg,)) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 print '-----------------------' pool.close() pool.join() print 'Sub-processes done'
----------------------- MSG: hello,0 MSG: hello,1 MSG: hello,2 end MSG: hello,3 end end end Sub-processes done
import time,multiprocessing def fun(msg): print 'MSG:',msg time.sleep(3) print 'end' if __name__ == '__main__': pool = multiprocessing.Pool(processes=3) for i in xrange(4): msg = 'hello,%d' %i pool.apply(fun,(msg,)) print '-------------------------------' pool.close() pool.join() print 'Sub-processes done'
MSG: hello,0 end MSG: hello,1 end MSG: hello,2 end MSG: hello,3 end ------------------------------- Sub-processes done
import multiprocessing import time def proc1(pipe): while True: for i in range(10000): print 'send:%s' %i pipe.send(i) time.sleep(1) def proc2(pipe): while True: print 'proc2 recv:',pipe.recv() time.sleep(1) def proc3(pipe): while True: print 'proc3 recv:',pipe.recv() time.sleep(1) if __name__ == '__main__': pipe = multiprocessing.Pipe() p1 = multiprocessing.Process(target=proc1,args=(pipe[0],)) p2 = multiprocessing.Process(target=proc2, args=(pipe[1],)) #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],)) p1.start() p2.start() #p3.start() p1.join() p2.join() #p3.join()
send:0 proc2 recv: 0 send:1 proc2 recv: 1 send:2 proc2 recv: 2 send:3 proc2 recv: 3 send:4 proc2 recv: 4 send:5 proc2 recv: 5 send:6 proc2 recv: 6