什么是进程process:
运行中的程序就称为进程,我们写好的程序,如果不运行的话就是一堆普通的字符,没有任何意义。
并行与并发:
并发:伪并行,看起来是同时运行,实际是cpu+多道技术实现,cpu在多个任务间快速切换,给我们造成的感觉就好像是在同时运行。
并行:只有具备多个cpu才能实现真正意义上的并行。多个任务同时进行,但是最大运行数为cpu个数。
multiprocessing 模块:
python中的多线程无法利用cpu的多核优势(GIL锁约束一个进程内同一时间内,只能运行一个线程)。如果想要充分利用cpu的多核资源,就要使用多进程。python中的multiprocessing模块为我们提供了开启子进程的方法,并在子进程中执行我们定制的任务。
multiprocessing 模块提供Process,Queue ,Pipe,Lock等组件。
与线程不同,进程的初始化完全copy其父进程内存地址,进程间数据不共享,进程数据的修改仅限于该进程内部。
Process类
class Process(object): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): '''参数介绍:group(这个应该是不用的)target:要调用的对象,即要执行的任务(函数)name:子进程的名字 args:表示要要调用对象的位置参数,kwargs:表示要调用对象的字典。''' self.name = ''#进程名 self.daemon = False#默认为False设置为True 表示为守护进程,在start()之前设置,随着主进程结束而结束。 self.authkey = None#进程的身份验证键,默认为os.urandom()随机生成的32为字符串 self.exitcode = None#进程运行时为None,如果为-N,表示信号N结束 self.ident = 0 self.pid = 0#进程的pid self.sentinel = None def run(self):#进程启动,自定义进程时必须实现 pass def start(self):#启动进程 pass def terminate(self):#强制终止进程,不推荐使用,容易造成僵尸进程和死锁 pass def join(self, timeout=None):#设置超时时间 pass def is_alive(self):#判断进程是否正在运行,正在运行返回True return False
创建线程的两种方式:
import time import random from multiprocessing import Process def piao(name): print('%s piaoing' %name) time.sleep(random.randrange(1,5)) print('%s piao end' %name) p1=Process(target=piao,args=('egon',)) #必须加,号 p2=Process(target=piao,args=('alex',)) p3=Process(target=piao,args=('wupeqi',)) p4=Process(target=piao,args=('yuanhao',)) p1.start() p2.start() p3.start() p4.start() print('主线程')
import time import random from multiprocessing import Process class MyProcess(Process): def __init__(self,name,money): super().__init__() self.name = name self.money = money def run(self): print("%s is piaoing"%self.name) time.sleep(random.randint(1,3)) print("%s is piao end"%self.name) print("%s pay %s yuan for piao"%(self.name,self.money)) if __name__ == '__main__': p1= MyProcess("egon",200) p2= MyProcess("alex",None) p3= MyProcess("yuanhao",500) p1.start()#start 会自动调用run方法 p2.start() p3.start() print("主进程".center(30,"*"))
进程实现socket并发
###服务端 from socket import * from multiprocessing import Process s = socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(("192.168.20.45",8800)) s.listen(5) def talk(conn,addr): while True: try: msg = conn.recv(1024) if not msg:continue conn.send(msg.upper()) except Exception: break if __name__ == '__main__': while True: conn,addr = s.accept() p = Process(target = talk,args = (conn,addr)) p.start() ####客户端 from socket import * c = socket(AF_INET,SOCK_STREAM) c.connect(("192.168.20.45",8800)) while True: msg = input(">>>>:").strip() if not msg:continue c.send(msg.encode("utf-8")) data = c.recv(1024) print(data.decode("utf-8"))
Process 对象的join方法:
from multiprocessing import Process import time,random def piaochang(name,money): print("%s is piaoing"%name) time.sleep(random.randrange(5)) print("%s is piao end"%name) print("%s pay %s yuan for piaochang"%(name,money)) if __name__ == '__main__': start = time.time() p1 = Process(target=piaochang,args = ('egon',200)) p2 = Process(target=piaochang,args = ('alex',None)) p3 = Process(target=piaochang,args = ('wupeiqi',500)) p_l = [p1,p2,p3] for p in p_l: p.start() # p.join()#如果将join放在这里,就是串行的结果 主线程 9.459348678588867 for p in p_l: p.join()#join的意思是等待所有上面的程序执行完后再往下执行代码 主线程 4.645843029022217 #join是让主进程等待子进程运行结束,卡住的是主进程 print("主线程",time.time()-start)
Process 其它属性或方法:
from multiprocessing import Process,current_process import time,random class Piao(Process): def __init__(self,name,money): super().__init__() self.name = name self.money = money def run(self): print("%s is piaoing"%self.name) time.sleep(random.randrange(5)) print("%s is piao end,pay %s yuan"%(self.name,self.money)) if __name__ == '__main__': p = Piao("egon",200) p.start() print(p.is_alive())#查看进程p是否存活 print(p.pid)#子进程ip print(p.name)#查看进程名,这里为"egon",默认为Process1 p.terminate()#强制关闭子进程 time.sleep(0.1)#这里需要注意,关闭进程不会立马关闭,所以is_alive 有可能还存活,这里让它停小会 print(p.is_alive())#存活状态为False
守护进程:
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
from multiprocessing import Process import time,random def piao(name): print("%s is piaoing"%name) time.sleep(random.randint(1,3)) print("%s is piao done"%name) if __name__ == '__main__': p = Process(target=piao,args = ("egon",)) p.daemon = True p.start() print("主")#执行完这里意味着p这个守护进程已经结束,它后面的代码也将不会运行
同步锁:
进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,,竞争带来的结果就是错乱,如何控制,就是加锁处理
# #不加锁,并发,效率高,数据错乱 from multiprocessing import Process,Lock import time,os def work(): print("%s is running"%os.getpid()) time.sleep(1) print("%s is done"%os.getpid()) if __name__ == '__main__': for i in range(5): p = Process(target=work) p.start() # 加锁,变并发伪串行,效率低,数据安全 def work(mutex): mutex.acquire() print("%s is running "%os.getpid()) time.sleep(1) print('%s is done'%os.getpid()) mutex.release() if __name__ == '__main__': mutex = Lock() for i in range(5): p = Process(target=work,args = (mutex,)) p.start()
模拟抢票对同一文件操作:
#文件db的内容为:{"count":1} #注意一定要用双引号,不然json无法识别 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db.txt')) print('