一、进程
我们之前有讲到,程序就是存放代码的文件
程序的静态的,进程的动态的,即进程表示一次活动过程。同一个程序每执行一次就是一个进程
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。
1.进程是一个抽象的概念
2.进程指的是程序的运行过程
3.进程是操作系统最核心的概念
1.1操作系统
在上述进程中我们提到了操作系统,在之前的笔记中我们也提到过操作系统,是一个协调,管理,控制计算机硬件资源与应用软件资源的控制程序。
1.1.1操作系统作用
操作系统的作用有很多,从我们开发应用程序角度来看,有以下的作用。
1.将复杂的硬件接口封装成简单的接口,让程序员不需要再直接操作硬件。
2.管理,调度进程,并且将多个进程对硬件的竞争变得有序。
1.1.2并发,并行,串行
并发:多个任务看起来是同时进行,其实是每个程序运行一段时间换另一个程序运行。利用多道技术
并行:同时运行,必须多核才能实现
串行:一个进程完完整整的运行完,再运行下一个进程
单核下,可以利用多道技术,多个核,每个核也都可以利用多道技术(多道技术是针对单核而言的)
有四个核,六个任务,这样同一时间有四个任务被执行,假设分别被分配给了cpu1,cpu2,cpu3,cpu4,
一旦任务1遇到I/O就被迫中断执行,此时任务5就拿到cpu1的时间片去执行,这就是单核下的多道技术
而一旦任务1的I/O结束了,操作系统会重新调用它(需知进程的调度、分配给哪个cpu运行,由操作系统说了算),可能被分配给四个cpu中的任意一个去执行
1.1.3多道技术
1.产生背景:针对单核,实现并发。
ps:现在的主机一般是多核,那么每个核都会利用多道技术.假设有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个cpu中的任意一个,具体由操作系统调度算法决定。
2.空间上的复用:如内存中同时有多道程序
3.时间上的复用:复用一个cpu的时间片强调:遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来,这样才能保证下次切换回来时,能基于上次切走的位置继续运行.
二、创建进程的两种方法
# 方法一,大部分人都喜欢用这种方式创建进程
import time
import os
from multiprocessing import Process
def task(n, tag):
print("%s is running" % tag)
time.sleep(5)
print("%s is done" % tag)
if __name__ == '__main__':
p1 = Process(target=task, args=(5, "子进程1"))
p2 = Process(target=task, args=(3, "子进程2"))
p3 = Process(target=task, args=(2, "子进程3"))
p1.start()
p2.start()
p3.start()
print("主:%s。。。" % os.getpid())
# 方法二
# 若需要创建一些初始化属性,需要重写__init__(),在其中要调用Process中的初始化方法,然后在写自己的代码,即需要用super
# 启动需要用start,而不是直接调用run方法
import time
import os
from multiprocessing import Process
class Task(Process):
def run(self) -> None:
print('进程开始')
time.sleep(2)
print('进程结束')
if __name__ == '__main__':
p = Process()
p.start()
print('主')
三、进程调度
1 先来先服务
2 短作业优先
3 时间片轮转
4 多级反馈队列
前面三个看名字就很好理解,需要多解释一步的是多级反馈队列(了解即可,都是吹逼用的)
多级反馈队列调度算法既能使高优先级的作业得到响应又能使短作业(进程)迅速完成。(对比一下FCFS与高响应比优先调度算法的缺陷)。
1、进程在进入待调度的队列等待时,首先进入优先级最高的Q1等待。
2、首先调度优先级高的队列中的进程。若高优先级中队列中已没有调度的进程,则调度次优先级队列中的进程。例如:Q1,Q2,Q3三个队列,当且仅当在Q1中没有进程等待时才去调度Q2,同理,只有Q1,Q2都为空时才会去调度Q3。
3、对于同一个队列中的各个进程,按照FCFS分配时间片调度。比如Q1队列的时间片为N,那么Q1中的作业在经历了N个时间片后若还没有完成,则进入Q2队列等待,若Q2的时间片用完后作业还不能完成,一直进入下一级队列,直至完成。
4、在最后一个队列QN中的各个进程,按照时间片轮转分配时间片调度。
5、在低优先级的队列中的进程在运行时,又有新到达的作业,此时须立即把正在运行的进程放回当前队列的队尾,然后把处理机分给高优先级进程。换而言之,任何时刻,只有当第1~i-1队列全部为空时,才会去执行第i队列的进程(抢占式)。特别说明,当再度运行到当前队列的该进程时,仅分配上次还未完成的时间片,不再分配该队列对应的完整时间片。
四、僵尸进程与孤儿进程
1 僵尸进程:进程结束了,资源还没来得及回收,此时该进程叫僵尸进程,还没死透
2 孤儿进程:主进程挂了,子进程还没结束,它就会被专门的进程接管
五、进程对象及其他方法
1 windows:tasklist |findstr 进程id号
2 mac,Linux:ps aux | grep 进程id号
3 t.pid或者current_process().pid 获取进程id号
4 os.getpid() 同上,获取进程id号
5 os.getppid() 获取父进程id号,子进程中获取父进程id,等于父进程的id号
6 t.is_alive()或者current_process().is_alive() 查看进程是否存活
7 t.terminate() 关闭进程,在主进程关闭
六、守护进程
守护进程:主进程一旦结束,子进程也结束。这样可以保证不会出现孤儿进程,同时,在之后的情景中,如果我们在某个主进程中创建了几千甚至几万的子进程(假设),如果不使用一种方法使得他们一起结束,就会导致后期结束这些进程非常棘手。
使用守护进程十分简单,进程对象下的daemon属性设置为True即可。
p.daemon=True # 一定要加在启动之前,t为我们创建的进程
from multiprocessing import Process,current_process
import time
import os
def task():
print(os.getpid())
print('子进程')
time.sleep(200)
print('子进程结束')
if __name__ == '__main__':
p = Process(target=task, )
# 守护进程:主进程一旦结束,子进程也结束
p.daemon=True # 一定要加在启动之前
p.start()
time.sleep(1)
print('主进程结束')
七、进程互斥锁
进程互斥锁,这种锁,只能锁住自己机器上的进程,不能锁住其他的机器,所以后期使用较少,但是原理都是一样的,所以要记住这个模式。在后期利用分布式锁较多,即公共锁。也会使用悲观锁,乐观锁。这些高级方法的基础原理都是一样的。
原理:所有人先抢锁,启动锁,修改数据,释放锁,其他人再抢锁
1.为了防止多进程操作同一个数据或者文件时,数据写乱了的问题
2.在进程写数据的时候加锁,并行变成串行,牺牲了效率,保证了安全
在生活中,我们肯定都买过车票,我们可能有遇到过这种情况,查询余票的时候,还剩下一张,但是购买的时候发现票已经被人买了。对于购票这个操作,我们就可以使用互斥锁。互斥锁是一种简单的加锁的方法来控制对共享资源的访问。
# 同时只有一个人能拿到,必须释放,其他人才能再次获取到
from multiprocessing import Process, Lock
import json
import time
import random
def search():
# 查票的函数
# 打开文件,读出ticket_count
with open('ticket', 'r', encoding='utf-8') as f:
dic = json.load(f)
print('余票还有:', dic.get('ticket_count'))
def buy():
with open('ticket', 'r', encoding='utf-8') as f:
dic = json.load(f)
time.sleep(random.randint(1, 3)) # 模拟一下网络延迟
if dic.get('ticket_count') > 0:
# 能够买票
dic['ticket_count'] -= 1
# 保存到文件中去
with open('ticket', 'w', encoding='utf-8') as f:
json.dump(dic, f)
print('买票成功')
else:
# 买票失败
print('买票失败')
# 写一个函数,先查票,再买票
def task(mutex):
search()
# 买票过程要加锁
# 买前加锁
# mutex.acquire()
# buy() # 10个进程变成了串行执行
# 买后释放锁
# mutex.release()
with mutex: # 利用with做上下文管理,在完成buy函数之后帮我们释放锁
buy()
if __name__ == '__main__':
# 锁的创建应该在主进程创建
mutex = Lock() # 创建一把锁
# 模拟十个人买票(开10个进程)
for i in range(10):
t = Process(target=task, args=(mutex,))
t.start()
八、队列介绍
队列,即是一种先进先出的思想,就像我们排队买票一样,先来的可以先买到票。
在此处,主要是为了创建管道,让不同进程中的数据可以相互传输,而传输的方式就是队列。
当使用管道时,若传入的数据个数超过了上限或者提取的个数超过了传入的个数,那么程序就会阻塞住。我们可以用timeout或者nowait来解决。
from multiprocessing import Queue
# 实例化得到要给对象
q=Queue(5) # 默认很大,可以放很多,写了个5,那么只能放5个
# 往管道中放值
q.put(1)
q.put('yang')
q.put(18)
q.put(19)
# q.put(20)
# q.put(21)
# q.put_nowait(100)
# 从管道中取值
# print(q.get())
# print(q.get())
# print(q.get())
# print(q.get(timeout=100)) # 等0.1s还没有值,就结束
# print(q.get_nowait()) # 不等了,有就是有,没有就没有
print(q.empty()) # 看一下队列是不是空的
print(q.full()) # 看一下队列是不是满的
方法总结:
q=Queue(队列大小)
# 放值
q.put(asdf)
q.put_nowait(asdf) # 队列满了,放不进去就不放了,报错
# 取值
q.get() # 从队列头部取出一个值
q.get_nowait() # 从队列头部取值,没有就抛错
q.get(timeout=100) # 等待指定时间,如果还没有值传过来就不等了
# 队列是否为空,是否满
print(q.empty()) # 看一下队列是不是空的
print(q.full()) # 看一下队列是不是满的
九、IPC机制(进程间通信)
利用上面学习的管道,我们就可以在不同的进程中传递信息了。
# Inter-Process Communication,进程间通信
from multiprocessing import Process, current_process, Queue
import time
import os
def task1(q):
s1 = 'yang'
print('我是task1进程,我的id号是:%s'%os.getpid())
q.put('yang')
print('我传了%s'%s1)
def task2(q):
print('我是task2进程,我的id号是:%s'%os.getpid())
print('我拿到了%s'%q.get())
if __name__ == '__main__':
q = Queue(5)
p1 = Process(target=task1, args=(q,))
p1.start()
p2 = Process(target=task2, args=(q,))
p2.start()
像双十一活动时,处理高并发也会使用队列思想,将所有的请求都放到队列里,然后一个一个的处理。当然程序没有我们这么简单。
十、生产者消费者模型
在并发编程中使用生产者和消费者模型能够解决绝大多数的高并发问题。
应用场景:高并发场景,如淘宝双11活动,我们每个下订单的人,就是生产者,每个人的订单都放到队列里,淘宝的机器就是消费者,在处理我们的订单。当淘宝发现队列里积攒的数据过多,就会再起成千上万的机器来处理数据,当发现现在数据变少了,就会关闭一些机器,这个行为也叫弹性伸缩,可以充分利用资源。
1.生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。是队列微服务的基础
2.生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取
3.阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力
# 多对多的生产者消费者模型
'''
这个是比较完整的版本,利用了multiprocessing下的JoinableQueue类帮我们对队列中是否为空做了标志的效果,如果不使用这个类也可以实现的,只利用Queue类,当生产完毕后在其中put一个空,那么消费者拿到空的时候就知道应该停止了
'''
import time
import random
from multiprocessing import Process,JoinableQueue
def producer(name, food, q):
for i in range(10):
data = '%s 制造了%s' % (name, food)
# 模拟制造食物延迟
time.sleep(random.randint(1, 3))
print(data)
q.put(food)
def consumer(name, q):
while True:
food = q.get()
# 模拟吃食物延迟
time.sleep(random.randint(1, 3))
print('%s消费了%s' % (name, food))
q.task_done() # 把队列中维护的数字减一
if __name__ == '__main__':
q = JoinableQueue() # 放一个数字会加一,消费一个数字减一
# 创造生产者
p = Process(target=producer, args=('egon', '包子', q))
p.start()
p1 = Process(target=producer, args=('alex', '泔水', q))
p1.start()
# 创造消费者
c = Process(target=consumer, args=('鸡哥', q))
c.start()
c1 = Process(target=consumer, args=('王铁蛋', q))
c1.start()
c2 = Process(target=consumer, args=('李铁柱', q))
c2.start()
# 等待所有生产者生产结束,主进程再结束
p.join()
p1.join()
q.join() # 一直等待q队列中数据没有了,才继续往下走
print('生产者结束了,主进程结束')
# JoinableQueue(),每放一个值,数字加一
# 取值不会减一,q.task_done()
# q.join() 一直阻塞,当q没有值了,才继续走