1.进程的相关的概念
1.进程的简单初始
进程
- 是计算机的最小的资源分配单位,每一个程序在运行起来的时候需要给分配一些内存
- 一个运行的程序
- 在操作系统中用于pid来标识一个进程
线程
- 是计算机能够被CPU调度的最小单元,实际执行具体编译解释之后的代码的是线程,所以CPU执行的是解释之后的线程中的代码
2. 进程的并发与并行
并发
- 一个CPU,多个程序轮流执行
并行
- 多个CPU,各自在自己的CPU上执行多个程序
3.同步异步阻塞非阻塞
同步与异步
- 同步:调用一个操作,要等待结果
- 异步:调用一个操作,不需要等待结果
阻塞与非阻塞
- 阻塞:CPU不工作
- 非阻塞:CP
状态介绍
- 就绪
- 当进程已分配到处CPU以外的所有必要的资源,只要获得处理机便可立即执行 ,这时的进程状态称为就绪状态
- 运行
- 当前进程已或的处理机,其程序正在处理机上执行,此时的进程状态称运行状态
- 阻塞
- 正在执行的进程,由于等待某个时间发生而无法执行时,便放弃处理机而处于阻塞状态
4.进程的调度算法
-
给所有的进程分配资源或者分配CPU使用权的一种方法
-
短作业优先
-
-
多级反馈算法
-
多个任务队列,优先级从高到低
-
新来的任务总是优先级最高的
-
每一个新任务几乎会立即获得一个时间片时间
-
执行完一个时间片之后就会降到下一级队列中
-
总是优先级高的任务都执行完才执行优先级低的队列
-
优先级越高时间片越短
-
5. 进程的开启与关闭
进程的开启
-
系统初始化,
-
一个进程在运行过程中开启了子进程
-
用户的交互式请求,而创建一个新进程
-
进程的关闭
-
正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)
-
出错退出(自愿,python a.py中a.py不存在)
-
严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)
-
被其他进程杀死(非自愿,如kill -9)
2.multiprocess模块
1.process模块介绍
process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动) 强调: 1. 需要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号 参数介绍: 1 group参数未使用,值始终为None 2 target表示调用对象,即子进程要执行的任务 3 args表示调用对象的位置参数元组,args=(1,2,'egon',) 4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18} 5 name为子进程的名称
1 p.start():启动进程,并调用该子进程中的p.run() 2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 3 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁 4 p.is_alive():如果p仍然运行,返回True 5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if __name__ ==‘__main__’ 判断保护起来,import 的时候 ,就不会递归运行了。
2.使用process模块创建进程
在一个python进程中开启子进程,start方法和并发效果。
import os from multiprocessing import Process def func(): print(os.getpid(), os.getppid()) # pid os.getpid() 进行id # pid os.getppid() 父进行id if __name__ == "__main__": print("main:", os.getpid(), os.getppid()) p = Process(target=func) # target表示调用对象,即子进程要执行的任务 p.start() # 启动进程 # 输出结果: main: 20156 7380 8612 20156
3.给子进程传递参数
import os from multiprocessing import Process def func(name): print(os.getpid(), os.getppid()) print(name) # pid os.getpid() 进行id # pid os.getppid() 父进行id if __name__ == "__main__": print("main:", os.getpid(), os.getppid()) p = Process(target=func, args=("小白",)) # target表示调用对象,即子进程要执行的任务 p.start() # 启动进程 # 输出结果: main: 20156 7380 8612 20156 小白
4.同时开启多个子进程
import os import time from multiprocessing import Process def func(name): print("%s+start:"%name) time.sleep(1) print(os.getpid(), os.getppid(),name) if __name__ == '__main__': list_name = [("魏莉",), ("小白",), ("知否",)] for name in list_name: p = Process(target=func, args=name) p.start() # 异步非阻塞 # 输出结果: 小白+start: 知否+start: 魏莉+start: 21360 22116 小白 21932 22116 知否 21400 22116 魏莉
5.join的用法
import time import random from multiprocessing import Process def func(name, age): print('发送一封邮件给%s岁的%s'%(age, name)) time.sleep(random.random()) # 随机暂停0-1秒 print("发送成功")
if __name__ == "__main__": arg_list = [("魏莉", 18), ("小白", 25), ("知否", 12)] p_list = [] for arg in arg_list: p = Process(target = func, args = arg) p.start() p_list.append(p) for p in p_list: p.join() # 阻塞直到p这个子进程执行完毕才能执行 print("所有的邮件已发送完毕") # 输出结果 发送一封邮件给18岁的魏莉 发送一封邮件给12岁的知否 发送一封邮件给25岁的小白 给小白发送成功 给知否发送成功 给魏莉发送成功 所有的邮件已发送完毕
6.多进程之间的数据是隔离
from multiprocessing import Process n = 0 def func(): global n n += 1 if __name__ == '__main__': p_list = [] for i in range(100): p = Process(target=func) p.start() p_list.append(p) for j in p_list: j.join() print(n)
7.使用多进程实现一个并发的socket的server端
import socket from multiprocessing import Process def talk(conn): while True: msg = conn.recv(1024).decode("utf-8") ret = msg.upper().encode('utf-8') conn.send(ret) if __name__ == '__main__': sk = socket.socket() sk.bind(("127.0.0.1", 9001)) sk.listen() while True: conn, addr = sk.accept() Process(target=talk, args=(conn,)).start()
import time import socket sk = socket.socket() sk.connect(('127.0.0.1', 9001)) for i in range(10): sk.send(b"hello") msg = sk.recv(1024).decode("utf-8") print(msg) time.sleep(0.5) sk.close()
3.开启进程的另一种方式
面向对象的方法,通过继承和重写run方法来完成启动子进程
import time import os from multiprocessing import Process class Myprocess(Process): def run(self): time.sleep(0.5) print(os.getpid(), os.getppid()) if __name__ == '__main__': p = Myprocess() p.start()
通过重写init和调用父类的init完成给子进程传递参数
import os import time from multiprocessing import Process class Myprocess(Process): def __init__(self, a, b, c): self.a = a self.b = b self.c = c super().__init__() def run(self): time.sleep(1) print(os.getpid(), os.getppid(), self.a, self.b, self.c) if __name__ == '__main__': print('-->', os.getpid()) for i in range(5): p = Myprocess(1, 2, 3) p.start() # 输出结果 --> 12496 8644 12496 1 2 3 1836 12496 1 2 3 14288 12496 1 2 3 14052 12496 1 2 3 14936 12496 1 2 3
4.守护进程
在start一个进程之前设置daemon = True
import time from multiprocessing import Process def son1(): while True: print("--> in son1") time.sleep(1) def son2(): for i in range(5): print("--> in son2") time.sleep(1) if __name__ == '__main__': p1 = Process(target=son1) p1.daemon = True # 表示设置p1是一个守护进程 p1.start() p2 = Process(target=son2) p2.start() time.sleep(2) print("in main")
守护进程会等待主进程的代码结束就立即结束
主进程会等待所有的子进程结束,是为了回收子进程的资源,守护进程会等待主进程的代码结束之后再结束,而不是等待整个主进程结束
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法在开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
主进程的代码什么时候结束,守护进程就什么时候结束,和其他子进程的执行进度无关
import time from multiprocessing import Process def son1(): while True: print('--> in son1') time.sleep(1) def son2(): for i in range(10): print("in son2") time.sleep(1) if __name__ == "__main__" : p1 = Process(target=son1) p1.daemon = True # 表示设置p1是一个守护进程 p1.start() p2 = Process(target=son2,) p2.start() time.sleep(3) print('in main') p2.join() # 等待p2结束之后才结束 # 等待p2结束 --> 主进程的代码才结束 --> 守护进程结束
5.进程同步
锁 -- multiprocess.Lock
from multiprocessing import Lock lock = Lock() lock.acquire() # 拿钥匙 print("被锁起来的代码") lock.release() # 还钥匙
以模拟抢票为例,来看看数据安全的重要性
# 文件ticket内容{"count": 1} # 注意一定要用双引号,不然json无法识别 import json import time import random from multiprocessing import Process, Lock def search(i): with open('ticket', encoding='utf-8') as f: ticket = json.load(f) print('%s :当前的余票是%s张' % (i, ticket['count'])) def buy_ticket(i): with open('ticket', encoding='utf-8') as f: ticket = json.load(f) if ticket['count'] > 0: ticket['count'] -= 1 print('%s买到票了' % i) time.sleep(random.random()) with open('ticket', mode='w', encoding='utf-8') as f: json.dump(ticket, f) def get_ticket(i, lock): search(i) with lock: # 代替acquire和release 并且在此基础上做一些异常处理,保证即便一个进程的代码出错退出了,也会归还钥匙 buy_ticket(i) if __name__ == '__main__': lock = Lock() # 互斥锁 for i in range(5): Process(target=get_ticket, args=(i, lock)).start() # 输出结果: 3 :当前的余票是1张 3买到票了 2 :当前的余票是1张 4 :当前的余票是1张 0 :当前的余票是1张 1 :当前的余票是0张
# 互斥锁不能再同一个进程中继续acquire多次,一个acquire对应一个release from multiprocessing import Lock lock = Lock lock.acquire() print(1) lock.acquire() print(2) # 输出结果 1
#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。 虽然可以用文件共享数据实现进程间通信,但问题是: 1.效率低(共享数据基于文件,而文件是硬盘上的数据) 2.需要自己加锁处理 #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。 队列和管道都是将数据存放于内存中 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
6. 进程间通信 -- 队列
1. 进程间通信
IPC(inter-Process-Communication)
2. 队列
概念介绍
在创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递
Queue([maxsize])
创建共享的进程队列。
参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
底层队列使用管道和锁定实现。
方法介绍
Queue([maxsize]) 创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。
另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 # Queue的实例q具有以下方法: q.get( [ block [ ,timeout ] ] ) 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True.
如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间
隔内没有项目变为可用,将引发Queue.Empty异常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,
将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。 q.qsize() 返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。
在某些系统上,此方法可能引发NotImplementedError异常。 q.empty() 如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,
在返回和使用结果之间,队列中可能已经加入新的项目。 q.full() 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
代码示例
先来看看队列提供的方法,以及方法的使用和现象
''' multiprocessing模块支持进程间通信的两种主要形式:管道和队列 都是基于消息传递实现的,但是队列接口 ''' from multiprocessing import Queue q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) # q.put(3) # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。 # 如果队列中的数据一直不被取走,程序就会永远停在这里。 try: q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。 except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。 print('队列已经满了') # 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。 print(q.full()) #满了 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。 try: q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。 except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。 print('队列已经空了') print(q.empty()) #空了
一个queue的简单应用,使用队列q对象调用get函数来取得队列中最先进入的数据
import time from multiprocessing import Process, Queue def f(q): q.put([time.asctime(), 'from Eva', 'hello']) #调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。 if __name__ == '__main__': q = Queue() #创建一个Queue对象 p = Process(target=f, args=(q,)) #创建一个进程 p.start() print(q.get()) p.join()
3.生产者消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者消费者模型
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模型
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
基于队列实现生产者消费者模型
import time import random from multiprocessing import Queue, Process def consumer(q, name): # 消费者:通常取到数据之后还要进行某些操作 while True: food = q.get() if food: print('%s吃了%s' % (name, food)) else: break def producer(q, name, food): # 生产者:通常在放数据之前需要先通过某些代码来获取数据 for i in range(10): foodi = '%s%s' % (food, i) print('%s生产了%s' % (name, foodi)) time.sleep(random.random()) q.put(foodi) if __name__ == '__main__': q = Queue() c1 = Process(target=consumer, args=(q, '小白')) c2 = Process(target=consumer, args=(q, '小白')) p1 = Process(target=producer, args=(q, '小黑', '包子')) p2 = Process(target=producer, args=(q, '云溪', '香蕉')) c1.start() c2.start() p1.start() p2.start() p1.join() p2.join() q.put(None) q.put(None)
待续