Process
1 import socket
2 import time
3 from multiprocessing import Process
4
5 def talk(conn):
6 if type(conn)==socket.socket():
7 conn.send(b'connected')
8 msg = conn.recv(1024)
9 print(msg)
10 conn.close()
11
12
13 if __name__ == '__main__':
14 sk = socket.socket()
15 sk.bind(('127.0.0.1',8080))
16 sk.listen()
17 while True:
18 conn,addr = sk.accept()
19 print(addr,time.ctime())
20 p = Process(target=talk,args=(conn,))
21 p.start()
22
23 sk.close()
Lock
data:image/s3,"s3://crabby-images/6da44/6da44a3c422e49abcf1dae786223d28e774e2de6" alt=""
1 # from multiprocessing import Lock
2 # lock = Lock()
3 # lock.acquire() # 需要锁,拿钥匙
4 # lock.acquire() # 需要锁,阻塞
5 # lock.release() # 释放锁,还钥匙
6
7 # 锁 就是在并发编程中,保证数据安全
8
9 # 多进程 实现并发
10
11 import json
12 import time
13 import random
14 from multiprocessing import Lock
15 from multiprocessing import Process
16
17 # with open('ticket','w') as f:
18 # json.dump({'count':20},f)
19
20
21 def search(i):
22 with open('ticket') as f:
23 print(i,json.load(f)['count'])
24 def get(i):
25 with open('ticket') as f:
26 ticket_num = json.load(f)['count']
27 time.sleep(random.random())
28 if ticket_num>0:
29 with open('ticket','w') as f:
30 json.dump({'count':ticket_num-1},f)
31 print('%s买到票了'%i)
32 else:
33 print('%s没票了'%i)
34 def task(i,lock):
35 search(i)
36 lock.acquire()
37 get(i)
38 lock.release()
39 if __name__ == '__main__':
40 pass
41
42 lock = Lock()
43 for i in range(20):
44 p = Process(target=task,args=(i,lock))
45 p.start()
View Code
Semaphore
data:image/s3,"s3://crabby-images/6da44/6da44a3c422e49abcf1dae786223d28e774e2de6" alt=""
1 # from multiprocessing import Semaphore
2 # sem = Semaphore(4)
3 # sem.acquire()
4 # print(0)
5 # sem.acquire()
6 # print(1)
7 # sem.acquire()
8 # print(2)
9 # sem.acquire()
10 # print(3)
11 # sem.release()
12 # sem.acquire()
13 # print(4)
14
15 import time
16 import random
17 from multiprocessing import Semaphore
18 from multiprocessing import Process
19
20 def sing(i,sem):
21 sem.acquire()
22 print('%s : 进入ktv'%i)
23 time.sleep(random.randint(1,3))
24 print('%s :出ktv'%i)
25 sem.release()
26
27 if __name__ == '__main__':
28 sem = Semaphore(4)
29 for i in range(20):
30 Process(target=sing,args=(i,sem)).start()
View Code
守护进程
data:image/s3,"s3://crabby-images/6da44/6da44a3c422e49abcf1dae786223d28e774e2de6" alt=""
1 # start 开启一个进程
2 # join 用join可以让主进程等待子进程结束
3
4 # 守护进程
5 # 守护进程会随着主进程的代码执行结束而结束
6 # 正常的子进程没有执行完的时候主进程要一直等着
7
8 # import time
9 # from multiprocessing import Process
10 # def func():
11 # print('--'*10)
12 # time.sleep(15)
13 # print('--'*10)
14 #
15 # def cal_time():
16 # while True:
17 # time.sleep(1)
18 # print('过去了1秒')
19 #
20 # if __name__ == '__main__':
21 # p = Process(target=cal_time)
22 # p.daemon = True
23 # p.start()
24 # p2 = Process(target=func)
25 # p2.start()
26 # for i in range(100):
27 # time.sleep(0.1)
28 # print('*'*i)
29 # p2.join()
30 # 守护进程的作用:
31 # 会随着主进程的代码执行结束而结束,不会等待其他子进程
32 # 守护进程 要再start之前设置
33 # 守护进程中,不能再开启子进程
34
35
36 # is_alive # 进程是否还活着,True代表进程还在, False代表进程不在了
37 # terminate #结束一个进程,但是这个进程不会立刻被杀死
38
39 # import time
40 # from multiprocessing import Process
41 # def func():
42 # print('wahaha')
43 # time.sleep(5)
44 # print('qqxing')
45 # if __name__ == '__main__':
46 # p = Process(target=func)
47 # p.start()
48 # print(p.is_alive())
49 # time.sleep(0.1)
50 # p.terminate()
51 # print(p.is_alive())
52 # time.sleep(1)
53 # print(p.is_alive())
54
55 # 进程的其他属性
56 # pid 查看这个进程的进程id
57 # name 查看进程的名字,可以修改
58 # import time
59 # from multiprocessing import Process
60 # def func():
61 # print('wahaha')
62 # time.sleep(5)
63 # print('qqxing')
64 # if __name__ == '__main__':
65 # p = Process(target=func)
66 # p.start()
67 # print(p.name,p.pid)
68 # p.name = 'WAHAHAHA'
69 # print(p.name)
View Code
Event
data:image/s3,"s3://crabby-images/6da44/6da44a3c422e49abcf1dae786223d28e774e2de6" alt=""
1 import time
2 import random
3 from multiprocessing import Process
4 from multiprocessing import Event
5
6 # e = Event() # 实例化一个事件 标志/交通信号灯
7 # e.set() # 将标志变成非阻塞/交通灯变绿
8 # e.wait() # 刚实例化出来的一个事件对象,默认的信号是阻塞信号/默认是红灯
9 # # 执行到wait,要先看灯,绿灯行红灯停,如果在停的过程中灯绿了,
10 # # 就变成非阻塞了
11 # e.clear() # 将标志又变成阻塞/交通灯变红
12 #
13 # e.is_set() # 是否阻塞 True就是绿灯 False就是红灯
14
15 def traffic_light(e):
16 while True:
17 if e.is_set(): # 是否阻塞,True就是绿灯,False就是红灯
18 e.clear()
19 print('红灯亮')
20 time.sleep(3)
21 else:
22 e.set()
23 print('绿灯亮')
24 time.sleep(3)
25
26 def car(name,e):
27 e.wait()
28 print('%s 通过'%name)
29
30 if __name__ == '__main__':
31 e = Event()
32 Process(target=traffic_light,args=(e,)).start()
33 for i in range(50):
34 if i%random.randint(1,5)==0:
35 time.sleep(random.randint(1,3))
36 p = Process(target=car,args=(i,e))
37 p.start()
View Code
Pool
data:image/s3,"s3://crabby-images/6da44/6da44a3c422e49abcf1dae786223d28e774e2de6" alt=""
1 # 进程池
2 # 进程池为什么出现:开过多的进程 1,开启进程浪费时间 2. 操作系统调度太多的进程也会影响效率
3 # 开进程池:有几个现成的进程在池子里,有任务来了,就用这个池子中的进程去处理任务,
4 # 任务处理完之后,再把进程放回池子里,池子中的进程就能去处理其他任务了。
5 # 当所有的任务都处理完了,进程池关闭,回收所有的进程
6
7 # from multiprocessing import Pool
8 # import time
9 # import random
10 # def func(i):
11 # time.sleep(random.random())
12 # return i*'*'
13 #
14 # def call(arg):
15 # print(arg)
16 #
17 # if __name__ == '__main__':
18 # p = Pool(5)
19 # for i in range(10):
20 # p.apply_async(func,args=(i,),callback=call)
21 # p.close() # 不能再往进程池中添加新的任务
22 # p.join() # 阻塞等待 执行进程池中的所有任务直到执行结束
23 # apply_async是一个异步调用
24 # # 和主进程也异步了
View Code
获取进程池的返回值
1 # from multiprocessing import Pool
2 # import time
3 # import random
4 #
5 # def func(i):
6 # time.sleep(random.random())
7 # return i+1
8 #
9 # if __name__ == '__main__':
10 # p = Pool(4)
11 # res_l = []
12 # for i in range(10):
13 # res = p.apply_async(func,args=(i,))
14 # res_l.append(res)
15 # p.close()
16 # p.join()
17 # [print(res.get()) for res in res_l] # 异步调用获取函数的返回值
回调函数
1 # from multiprocessing import Pool
2 # import time,os
3 # import random
4 #
5 # def func(i):
6 # time.sleep(random.random())
7 # print('child_process%s:%s'%(i,os.getpid()))
8 # 回调函数
9 # def call(arg):
10 # print('callback:%s'%os.getpid())
11 #
12 # if __name__ == '__main__':
13 # print('---->',os.getpid())
14 # p = Pool(4)
15 # for i in range(10):
16 # p.apply_async(func,args=(i,),callback=call) # # 回调函数是在主进程中完成的,不能传参数,只能接受多进程中函数的返回值
17 # p.close()
18 # p.join()
map
data:image/s3,"s3://crabby-images/6da44/6da44a3c422e49abcf1dae786223d28e774e2de6" alt=""
1 # from multiprocessing import Pool
2 # import time,os
3 # import random
4 #
5 # def fun(i):
6 # time.sleep(random.random())
7 # print('child:%s '%i)
8 # i += 1
9 # return i
10 # if __name__ == '__main__':
11 # s = time.time()
12 # p = Pool(4)
13 # ret = p.map(fun,range(10))
14 # print(ret)
15 # print(time.time()-s)
16 # 如何使用进程池 —— 都可以拿到返回值
17 # apply 同步调用 直接调用之后就获得返回值
18 # apply_async 异步调用
19 # 在完成所有进程调用之后,close表示不再接受新任务
20 # join 让主进程等待子进程执行结束
21 # 如果需要获得返回值,提交任务之后返回的对象.get()就可以获取返回值
22 # callback 回调函数 :主进程执行 参数是子进程执行的函数的返回值
23 # map(func,iterable)
View Code
Manager
1 # def func(dic,lock):
2 # lock.acquire()
3 # dic['count'] += 1
4 # lock.release()
5 #
6 # if __name__ == '__main__':
7 # m = Manager()
8 # dic = m.dict()
9 # dic['count'] = 0
10 # lock = Lock()
11 # l = []
12 # for i in range(100):
13 # p = Process(target=func,args=(dic,lock))
14 # p.start()
15 # l.append(p)
16 # [p.join() for p in l]
17 # print(dic)