1、操作系统的作用:
隐藏丑陋复杂的硬件接口,提供良好的抽象接口
管理调度进程,让进程对硬件的竞争变得有序
进程的调度算法:
FCFS先来先服务调度算法
段作业优先调度算法
时间片轮转法
多级反馈队列
进程的并行与并发:
并行:是从微观上,也就是在一个精确的时间片刻,有不同的程序在执行,这就要求必须有多个处理器。(资源够用,比如三个线程,四核的CPU )
并发:是从宏观上,在一个时间段上可以看出是同时执行的,比如一个服务器同时处理多个session。
2、同步和异步
所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列
。要么成功都成功,失败都失败,两个任务的状态可以保持一致。
所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了
。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列
。
同步:一个进程的执行依赖于另外的进程,只有被依赖的进程执行完,自己的任务才执行。
异步:只通知被以来的进程完成什么工作,不需要等待被以来的进程完成。
3、开一个多进程
from multiprocessing import Process def func(i): print("输出:%s"%i) if __name__ == '__main__': for i in range (10): p=Process(target=func,args=(i,)) p.start() print("你好")
p.join()的使用
from multiprocessing import Process import time def func(i): print("输出:%s"%i) if __name__ == '__main__': p_list=[] for i in range (20): p=Process(target=func,args=(i,)) p_list.append(p) p.start() for p in p_list: #[p.join() for p in p_list] p.join() print('结束')
p.join()感知一个子进程的结束,将异步程序同步
用类的方式开启一个多进程的第二种方式
from multiprocessing import Process
import os
class Myprocess(Process):
def run(self):
print(os.getpid())
if __name__ == '__main__':
for i in range(20):
print('主',os.getpid())
p1=Myprocess()
p1.start()
自定义继承类中必须有一个run方法,run方法是在子进程中自行的程序
打印进度条
import time def run(i): num=str(i) print('*'*i+num+'%',end='\r') for i in range(101): run(i) time.sleep(0.1)
使用多进程实现socket服务端的并发效果
server端
import socket from multiprocessing import Process def server(conn): conn.send('你好'.encode('utf-8')) info = conn.recv(1024).decode('utf-8') print(info) conn.close() if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1',8080)) sk.listen() while True: conn,addr = sk.accept() q = Process(target=server,args=(conn,)) q.start() sk.close()
client端
import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) ret = sk.recv(1024).decode('utf-8') print(ret) msg = input("》》》》").encode('utf-8') sk.send(msg) sk.close()
主进程在执行完自己的代码之后,不会立马结束,而是等待子进程结束之后,回收子进程资源
守护进程 由子进程 转化而来 伴随着主进程的结束而结束
from multiprocessing import Process import time def func(): for i in range (10): time.sleep(1) print("输出:%s"%i) if __name__ == '__main__': p=Process(target=func) p.daemon = True p.start() for i in range (10) : print("你好%s"%i) time.sleep(0.5)
p.terminate()结束一个子进程,
if __name__ == '__main__': p=Process(target=func) p.daemon = True p.start() print(p.is_alive()) # 判断一个进程是否活着 p.terminate() # 关闭一个子进程 time.sleep(0.5) print(p.name) # 查看进程的名字 print(p.is_alive())
4、进程锁 相当于将异步的程序变成同步
对程序加锁,同一时间只能有一个进程执行程序
from multiprocessing import Process,Lock import time def func(i,lock): lock.acquire() time.sleep(1) print("输出:%s"%i) lock.release() if __name__ == '__main__': lock = Lock() for i in range (10): p=Process(target=func,args=(i,lock)) p.start()
5、信号量 规定同一时间允许几个进程来同时执行程序
from multiprocessing import Process,Semaphore import time def func(i,sem): sem.acquire() time.sleep(1) print("输出:%s"%i) sem.release() if __name__ == '__main__': sem = Semaphore(3) for i in range (10): p=Process(target=func,args=(i,sem)) p.start()
6、Event() 事件 通过一个信号来控制多个进程同时执行或者阻塞,一个事件被创建后默认是阻塞状态
from multiprocessing import Event e = Event() #实例化 默认阻塞状态
print(1111111)
e.set() # 将事件改为true
print(e.is_set()) # 打印事件的状态
e.wait()
print(22222)
e.clear() #将事件改为False
e.wait()
print(33333)
红绿灯 事件练习 (用到进程之间的通信IPC)
import time from multiprocessing import Event,Process def light(e): while True: if e.is_set() : e.clear() print('\033[31m红灯亮了\033[0m') time.sleep(2) else : e.set() print('\033[32m绿灯亮了\033[0m') #print('\033[31m红灯亮了\033[0m') time.sleep(2) #print('\033[32m绿灯亮了\033[0m') def car(e,i): if not e.is_set(): print('%s车在等待'%i) e.wait() print('%s车通过'%i) if __name__ =='__main__': e = Event() p1 = Process(target=light,args=(e,)) p1.start() for i in range (20): time.sleep(0.5) p2 = Process(target=car,args=(e,i)) p2.start()
7、队列 (生产者消费者模型练习)
from multiprocessing import Queue q = Queue() # 实例化队列 q.put() # 往队列里放数据 q.get() # 从队列里取数据 print(q.full())#判断队列是否满了 print(q.empty())#判断队列是否为空
生产者消费者模型
from multiprocessing import Process,Queue import time import random def producer(p,name,food): for i in range (10): time.sleep(1) f = '%s生产了第%s个%s'%(name,i,food) p.put(f) print(f) def constom(q,name): while True : time.sleep(2) food = q.get() if food ==None:break print('%s吃了一个%s'%(name,food)) if __name__=='__main__': q = Queue() p1 = Process(target=producer,args=(q,'egon','包子')) p2 = Process(target=producer,args=(q,'xiaohong','泔水')) c1 = Process(target=constom,args=(q,'jin')) c2 = Process(target=constom,args=(q,'alex')) p1.start() p2.start() c1.start() c2.start() p1.join() p2.join() q.put(None) q.put(None)
使用Joinable实现生产者消费者模型
from multiprocessing import Process,JoinableQueue import time import random def producer(p,name,food): for i in range (10): time.sleep(1) f = '%s生产了第%s个%s'%(name,i,food) p.put(f) print(f) p.join() #阻塞状态,感知一个队列中的数据全部被处理完,才能结束 def constom(q,name): while True : time.sleep(2) food = q.get() print('%s吃了一个%s'%(name,food)) q.task_done() if __name__=='__main__': q = JoinableQueue() p1 = Process(target=producer,args=(q,'egon','包子')) p2 = Process(target=producer,args=(q,'xiaohong','泔水')) c1 = Process(target=constom,args=(q,'jin')) c2 = Process(target=constom,args=(q,'alex')) p1.start() p2.start() c1.daemon=True c2.daemon=True c1.start() c2.start() p1.join() p2.join()
8、管道 Pipe
def f(conn): try : while True : print(conn.recv()) except EOFError: conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() for i in range (20): ret = parent_conn.send(b'hello') parent_conn.close()
通过管道实现生产者消费者模型
from multiprocessing import Process,Pipe import time import random def producer(pro,conn,name,food): conn.close() for i in range (10): time.sleep(1) f = '%s生产了第%s个%s'%(name,i,food) print(f) pro.send(f) pro.close() def constom(pro,conn,name): pro.close() while True : try: time.sleep(2) food = conn.recv() print('%s吃了一个%s'%(name,food)) except EOFError: conn.close() break if __name__=='__main__': pro,conn = Pipe() p1 = Process(target=producer,args=(pro,conn,'egon','包子')) c1 = Process(target=constom,args=(pro,conn,'jin')) p1.start() c1.start() pro.close() conn.close()
可能出现同一个进程拿到同一个数据,需要加锁来控制管道的行为,来避免进程之间争夺数据造成的数据不安全现象
9、进程池
定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。
如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。
也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。
这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。
进程池的同步调用和异步调用

import os,time from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(1) return n**2 if __name__ == '__main__': start_time = time.time() p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞 print(res) # 但不管该任务是否存在阻塞,同步调用都会在原地等着 p.close() end_time = time.time() print(end_time-start_time)

import os import time import random from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': start_time = time.time() p=Pool(5) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行 print(res) # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务 # 需要注意的是,进程池中的三个进程不会同时开启或者同时结束 # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。 res_l.append(res) # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果 # 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了 p.close() p.join() # end_time = time.time() # print(end_time-start_time) for res in res_l: print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get

import os import time import random from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': start_time = time.time() p=Pool(5) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 p.map(work,range(30))
用进程池实现socket服务端的并发效果
server端
import socket from multiprocessing import Pool def chat(conn): conn.send(b'hello') print(conn.recv(1024).decode('utf-8')) conn.close() if __name__ =='__main__': p = Pool(5) sk = socket.socket() sk.bind(('127.0.0.1',8080)) sk.listen() while True : conn,addr = sk.accept() p.apply_async(chat,args=(conn,)) sk.close()
client端
import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) ret = sk.recv(1024).decode('utf-8') print(ret) msg = input('>>>>: ').encode('utf-8') sk.send(msg) sk.close()

import sys,os,io sys.stdout=io.TextIOWrapper(sys.stdout.buffer,encoding='gb18030') from multiprocessing import Pool import requests import json import os def get_page(url): print('<进程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<进程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a+') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) #拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了

import sys,os,io sys.stdout=io.TextIOWrapper(sys.stdout.buffer,encoding='gb18030') import re from urllib.request import urlopen from multiprocessing import Pool def get_page(url,pattern): response=urlopen(url).read().decode('utf-8') return pattern,response def parse_page(info): pattern,page_content=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0].strip(), 'title':item[1].strip(), 'actor':item[2].strip(), 'time':item[3].strip(), } print(dic) if __name__ == '__main__': regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>' pattern1=re.compile(regex,re.S) url_dic={ 'http://maoyan.com/board/':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get()