进程
正在进行的一个过程或任务。
进程没有任何共享状态,进程修改的数据仅限于该线程内部。
开启进程的两种方式
第一种
from multiprocessing import Process import time
def task(name):
print('%s is running'% name) time.sleep(5)
print('%s is ending'% name)
if __name__ == '__main__': p = Process(target=task,args=('子进程1',)) p.start()
print('主进程')
主进程
子进程1 is running
子进程1 is ending |
第二种
from multiprocessing import Process import time
class MyProcess(Process):
def __init__(self, name):
super(MyProcess, self).__init__()
self.name = name
def run(self):
print('%s is running'% self.name) time.sleep(5)
print('%s is ending'% self.name)
if __name__ == '__main__': p = MyProcess('子进程1') p.start()
print('主进程')
主进程
子进程1 is running
子进程1 is ending |
查看pid
from multiprocessing import Process import time,os
def task():
print('%s is running,parent id is %s'% (os.getpid(),os.getppid())) time.sleep(5)
print('%s is ending,parent id is %s'% (os.getpid(),os.getppid()))
if __name__ == '__main__': p = Process(target=task) p.start()
print('主进程',os.getpid(),'父进程',os.getppid())
主进程 9580 父进程 5348
8252 is running,parent id is 9580
8252 is ending,parent id is 9580 |
Pocess 其他属性和方法
join():父进程等待子进程,并行运行时间以最长时间为准,串行时间等于各个子进程时间和
is_alive():判断进程是否还活着
terminte():干死进程(命令发出需要一点时间,操作系统才会执行)
.name:获取进程名
练习
from multiprocessing import Process
n = 100
def work():
global n n = 0
print('子进程内',n)
if __name__=='__main__': p = Process(target=work) p.start() p.join()
print('主进程',n)
子进程内 0
主进程 100 |
说明子进程和主进程并不是共用一个n,进程之间的内存空间是隔离的。
并发通信
服务器端
from socket import * from multiprocessing import Process
def start(conn,addr):
while True:
try: msg = conn.recv(1024)
if not msg: break
print(addr,':',msg.decode('utf-8')) conn.send(msg.upper())
except ConnectionResetError:
break
def server(ip, port): server = socket(AF_INET, SOCK_STREAM) server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) server.bind((ip, port)) server.listen(5)
while True: conn, addr = server.accept() p = Process(target=start, args=(conn,addr)) p.start() conn.close() server.close()
if __name__ == '__main__': server('127.0.0.1', 8080)
|
客户端
from socket import *
client = socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080))
while True: inpt = input('>>').strip()
if not inpt:continue
client.send(inpt.encode('utf-8')) data = client.recv(1024)
print(data.decode('utf-8'))
|
守护进程
守护进程随主进程的死亡一块死亡。
守护进程一定要在开启之前设置。
守护进程内不能再开启子进程,不然会抛出异常。
练习
from multiprocessing import Process import time def Foo():
print('Foo') time.sleep(1)
print('end Foo')
def Fun():
print('Fun') time.sleep(1)
print('end Fun')
if __name__ == '__main__': p1 = Process(target=Foo) p2 = Process(target=Fun)
p1.daemon = True
p1.start()
p2.start()
print('--main--')
--main--
Fun
end Fun
|
在主进程结束后,守护进程也死亡,得不到执行。
互斥锁
同一时刻只能有一个进程使用这把锁,获得执行。
不加锁
from multiprocessing import Process import time
def work(id):
print('%s 1' % id) time.sleep(1)
print('%s 2' % id) time.sleep(1)
print('%s 3' % id)
if __name__ == '__main__':
for i in range(3): p = Process(target=work, args=('进程 %s'% i,)) p.start()
进程 0 1
进程 1 1
进程 2 1
进程 0 2
进程 1 2
进程 2 2
进程 0 3
进程 1 3
进程 2 3
|
加锁之后
from multiprocessing import Process,Lock import time
def work(id,lock): lock.acquire() # 加锁
print('%s 1' % id) time.sleep(1)
print('%s 2' % id) time.sleep(1)
print('%s 3' % id) lock.release() # 用完之后释放
if __name__ == '__main__': lock = Lock()
for i in range(3): p = Process(target=work, args=('进程 %s'% i,lock)) p.start()
进程 0 1
进程 0 2
进程 0 3
进程 1 1
进程 1 2
进程 1 3
进程 2 1
进程 2 2
进程 2 3 |
join 与互斥锁的区别:
Join()把整体都变成了串行执行,互斥锁只把修改共享数据的部分变成串行。
队列
生产者消费者模型
from multiprocessing import Process, Queue import time
def produce(q):
for i in range(5): time.sleep(2)
print('生产了包子 %s' % i) q.put(i)
def consume(q):
while True: res = q.get()
if not res: continue
time.sleep(2)
print('消费者消费了 %s' % res)
if __name__ == '__main__': q = Queue() p1 = Process(target=produce, args=(q,)) p2 = Process(target=produce, args=(q,)) p3 = Process(target=produce, args=(q,))
c1 = Process(target=consume, args=(q,)) c2 = Process(target=consume, args=(q,))
p1.start() p2.start() p3.start() c1.start() c2.start()
p1.join() p2.join() p3.join() q.put(None) # 两个消费者,必须有两个空消息
q.put(None)
print('主')
生产了包子 0
生产了包子 0
生产了包子 0
生产了包子 1
生产了包子 1
生产了包子 1
生产了包子 2
消费者消费了 1
生产了包子 2
消费者消费了 1
生产了包子 2
生产了包子 3
消费者消费了 1
生产了包子 3
消费者消费了 2
生产了包子 3
生产了包子 4
消费者消费了 2
生产了包子 4
消费者消费了 2
生产了包子 4
主
消费者消费了 3
消费者消费了 3
消费者消费了 3
消费者消费了 4
消费者消费了 4
消费者消费了 4
|
|
最后卡在那里了
修改消费函数:
def consume(q):
while True: res = q.get()
if res is None: break
time.sleep(2)
print('消费者消费了 %s' % res)
|
Joinablequeue +守护进程
from multiprocessing import Process, JoinableQueue import time
def produce(q):
for i in range(5): time.sleep(2)
print('生产了包子 %s' % i) q.put(i) q.join()
def consume(q):
while True: res = q.get()
if res is None: break
time.sleep(2)
print('消费者消费了 %s' % res) q.task_done() # 向生产者发送消息
if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=produce, args=(q,)) p2 = Process(target=produce, args=(q,)) p3 = Process(target=produce, args=(q,))
c1 = Process(target=consume, args=(q,)) c2 = Process(target=consume, args=(q,)) c1.daemon = True
c2.daemon = True
p1.start() p2.start() p3.start() c1.start() c2.start()
p1.join() p2.join() p3.join()
print('主')
|
如果不给消费者添加守护进程,那么程序会卡在那里,不会正常退出。