1.理解相关概念
#浅显理解下 对比cpu与io的差距如:io从硬盘读取一条数据9ms ,cpu在9ms可以做450万次指令 cpu切换上下文的方式:1.遇到io操作切换cpu 2.cpu时间片分配 操作系统的调度算法:多级反馈队列的调度算法(队列一的任务在时间片内为执行完会被下放到队列二,队列二中在时间片内未执行完毕下放到队列三) 进程概念 操作系统资源分配调度的最小单位,进程间数据资源都是隔离的!!,由(程序+数据集+进程控制块三部分组成),那么就是说进程就是分配系统资源,标识任务. 进程间可以切换合理利用cpu ,进程的状态--记录--切换称上下文切换(资源开销大) 一个程序启动就会开启一个进程 每条线程可以利用多核 线程概念 cpu执行的最小单位!! 线程为了降低上下文切换,提高系统并发,突破进程仅能做一件任务的傻缺操作,进程作为容器,给线程提供资源,多线程资源共享上下文切换减少 线程健壮性差,如果一个线程挂了,整个进程就被牵连了
遇到IO操作实现并发效果
协程概念(就是一条线程)
协程就是一条线程基础上实现切换多任务
不能利用多核,切换频率高,在多io操作时非常优势
进程的三种队列
就绪队列:这里面的任务等待cpu分配时间片
运行队列:这里面的任务正在使用cpu
阻塞队列:这里面的任务遇到阻塞操作(如:input)
同步(针对函数任务调用方式)
当a事件中调用了b事件,a事件一定会等待b事件有了结果再继续执行
异步(针对函数任务调度方式)
当a事件调用了b事件,a事件不用等待b事件结果直接继续执行
阻塞(针对进程线程)
线程在执行过程遇到input sleep conn.recv等会阻塞
非阻塞(针对进程线程)
线程在执行过程遇到input sleep conn.recv也不会阻塞
并行
需要多核或者多cpu,计算机在同一时间节点执行处理多个计算任务
并发
计算机在同一节点,只能计算一个任务,但是cpu快速切换在多个线程间貌似实现同时计算
2.非阻塞的socket ,可以同时处理多个client的连接 ,避免阻塞在accpet的位置(大量的循环recv ,cpu消耗很大)
import socket sk = socket.socket() sk.setblocking(False) # 关闭阻塞,但是会循环不停的要求connect连接 sk.bind(('127.0.0.1', 8000)) sk.listen(5) conn_l = [] # 将所有的管道对象放入列表 while True: try: conn, addr = sk.accept() conn_l.append(conn) except BlockingIOError as e: # 捕获10035错误直接处理 ,可以使用select来完美处理 for conn in conn_l: # 列表的内容在不断的循环 try: msg = conn.recv(1024) # 不间断的从各个管道拿数据 print(msg) conn.send(b'recieved') except BlockingIOError as e1: # 再次捕获10035错误直接处理 pass
##################################
import socket
client = socket.socket()
client.connect(('127.0.0.1', 8000))
while 1:
cRequest = input("send: ")
client.send(cRequest.encode('utf-8'))
if cRequest == "quit":
client.close()
break
3.创建进程
默认情况下主进程执行完毕会关闭 ,子进程继续执行不受影响 ,程序继续运行, 子进程不能有交互终端命令
僵尸进程-孤儿进程:父进程创建子进程来不及处理 ,就会产生出大量等待父进程处理的子进程 ,当父进程被杀死或退出时 ,他的所有子进程成为孤儿进程由int接管进程回收
Procee对象的一些方法
子进程对象.start()方法启动子进程
子进程对象.join()方法可以让主进程阻塞等待子进程执行完毕
子进程对象.daemon =True 可以设置守护进程(守护主进程的子进程) ,当主进程代码执行完毕子进程立即停止 ,守护进程不能有子进程
子进程对象.p.is_alive() 返回bool值判断子进程是否还在执行
import time import os from multiprocessing import Process def func(): time.sleep(2) print("子进程ID", os.getpid()) print("子进程的父进程ID", os.getppid()) if __name__ == '__main__': #windows下必须使用该判断 ,由于Windows没有fork的原因 ,启动新的py进程导入模块在Process()的时候会无线执行!!! p = Process(target=func, ) # p.daemon=True p.start() print(p.is_alive()) # p.join() print("父进程ID", os.getpid()) print("父进程的父进程ID", os.getppid())
4.创建线程
一个进程必有一个主线程 ,主线程执行完代码会等待所有子线程执行完毕后退出
多线程之间可以完成一个目标 ,遇io切换cpu给其他线程 ,易于在多IO使用
threading常用方法
子线程.start()启动子线程
子线程.join()阻塞主线程 ,必须等到该子线程结束才能继续走主线程代码
子线程.setDaemon(), 设置子线程是守护进程当主线程停止后子线程也会终止 ,正常情况主线程会等子线程
import threading # 引用线程模块 import time def countnum(n): print('running on number{}'.format(n)) time.sleep(n) start_time1 = time.time() countnum(2) # 程序io阻塞cpu挂住 ,如果此时cpu还能继续工作最好 ,不跟io一起阻塞 countnum(1) # 程序io阻塞cpu挂住 print('花费了{}'.format(time.time() - start_time1)) # cpu花费0.01s完成 ,io阻塞了很久导致后面代码也执行较慢! t1 = threading.Thread(target=countnum, args=(3,)) # 实例化线程对象,绑定执行的任务 t2 = threading.Thread(target=countnum, args=(2,)) t1.start() # t1线程遇io切换t2 print('1') t1.join() # 阻塞主线程,必须等t1执行完成后在执行后面代码 t2.start() # t2线程遇io切换下面代码 print('2') t3 = threading.Thread(target=countnum, args=(6,)) t3.setDaemon(True) # 设置t3为守护线程,当主线程终止的时候,t3这个子线程立即终止 t3.start() print('end') # 打印end但实际程序还在运行线程中的代码
######################################join()的操作######################################
import time
import threading
def countnum(n):
print("running is {}".format(n))
time.sleep(n)
print("{} is ending".format(n))
start = time.time()
thread_list = [] # 线程对象列表
for i in range(1, 6): # 5个线程 并发执行
t = threading.Thread(target=countnum, args=(i,))
thread_list.append(t)
t.start()
# 上面开始执行t1.start()t2.start()t3.start()t4.start()t5.start()t6.start()
# 下面开始执行t1.join() ,t1执行完执行t2.join()...
for x in thread_list:
x.join()
# thread_list[-1].join()
print("程序执行时间{}".format(time.time() - start))
5.进程池与线程池
concurrent.futures加载模块
ProcessPoolExecutor 进程池(计算密集操作)
ThreadPoolExecutor 线程池(io多操作)
父进程定义好task函数 ,直接submit提交到池中 ,池中很多的任务 ,同一时间只有指定个数的进程或线程
from concurrent.futures import ProcessPoolExecutor import time def task(name): print('name', name) time.sleep(1) if __name__ == '__main__': start = time.time() p1 = ProcessPoolExecutor(2) for i in range(5): p1.submit(task, i) p1.shutdown(wait=True) # 主进程是否对池中子进程join() ,否的就注释 print('主线程') end = time.time() print(end - start) ########################################################################### from concurrent.futures import ThreadPoolExecutor import time def task(name): print('name', name) time.sleep(1) if __name__ == '__main__': start = time.time() p1 = ThreadPoolExecutor(2) for i in range(5): p1.submit(task, i) p1.shutdown(wait=True) # 主线程是否对池中子线程join() ,否的就注释 print('主线程') end = time.time() print(end - start)
6.锁
GIL全局解释器锁
Cpython的解释器给python的每个进程一把锁 ,一个进程下的同一时间仅有一个线程可以获得GIL锁 ,获得锁的线程才能使用cpu ,所以python多线程不能利用多核优势 ,GIL锁出现保证了进程内共享数据安全 ,因为线程数据共共享,当各个线程访问数据资源时会出现竞争状态,造成数据混乱 ,理解为给解释器加了把互斥锁 ,当该线程遇到io会自动解锁
####python的GIL锁情况下,多线程执行计算密集型任务比串行还慢
import threading import time start_time = time.time() def task_js(): return 2 ** 70000000 def Test2(): for i in range(5): threading.Thread(target=task_js, args=(1,)).start() # Test2() # 多线程执行并发执行计算密集任务6秒多 task_js() # 串行计算计算密集任务仅5.7秒 task_js() task_js() task_js() task_js() end_time = time.time() print('执行了', end_time - start_time)
同步锁-互斥锁
同步锁就是协同操作的的锁 ,当A线程与B线程与C线程协同操作 ,规定先后顺序 ,因为可能会依赖产物 .如A线程开始操作前B线程需要先执行 ,最后执行C线程
互斥锁当多个线程访问同一个共享资源 ,拥有互斥锁的线程才能使用 ,当释放锁之后其他的线程才能获得锁继续访问共享数据
锁方法
R = threading.Lock() #创建锁对象
R.acquire() #锁定改时间仅允许一个线程操作
R.relases() #解锁
##模拟不加互斥锁 ,全局变量被混乱调用
import threading import time totle = 100 def task(): global totle time.sleep(0.00001) # 模拟程序可能会出现的io,cpu会切换大家得到的totle可能就不对劲了本来应该是0!! a = totle - 1 totle = a for i in range(1, 101): threading.Thread(target=task, args=()).start() print(totle)
##加入互斥锁############################################################
import threading
import time
totle = 100
R = threading.Lock() # 创建一个互斥锁
def task():
R.acquire() # 锁住操作全局变量
global totle
time.sleep(0.00001) # 模拟程序可能会出现的io,cpu会切换大家得到的totle可能就不对劲了本来应该是0!!
a = totle
totle = a - 1
R.release() # 解锁,其他进程可以操作全局变量
thread_list = [] # 用于join
for i in range(1, 101):
t = threading.Thread(target=task, args=())
t.start()
thread_list.append(t)
for i in thread_list: # 等待所有的子进程执行完毕 ,要不然print(totle)没有意义
i.join()
print(totle)
7.队列
可以用于生产者消费者模型 ,生产者仅产生数据 ,消费者处理数据 ,二者通过缓冲区解耦 ,缓冲区使用队列 ,生产消费二者解耦
队列常用方法
q = queue.Queue() #创建队列对象
q.put() #放入数据
q.get() #拿出一条数据 ,queue队列就是先进先出
q.empty() #判断队列是否为空
import time, random import queue, threading q = queue.Queue() # 创建一个队列 def Producer(name): # 定义生产者操作动作 while 1: a = random.randrange(1, 5) q.put(a) print('本次生产数据{}'.format(a)) time.sleep(random.randrange(3)) def Consumer(name): # 定义消费者操作动作 while 1: if not q.empty(): # 如果队列不为空 print('{}开始消费数据'.format(name)) data = q.get() # 取出队列中数据 time.sleep(random.randrange(2)) # 模拟消费数据时间 print('{}本次消费了数据{}'.format(name, data)) else: print('队列暂时无数据') time.sleep(random.randrange(3)) # 队列没数据就等等生产数据 p1 = threading.Thread(target=Producer, args=('zookerper',)) # 开启一个线程定义一个zk开始生产数据 c1 = threading.Thread(target=Consumer, args=('kafka1',)) # 在开一个定义一个kafka开始消费数据 c2 = threading.Thread(target=Consumer, args=('kafka2',)) # 再开一个定义一个kafka开始消费数据 p1.start() # 生产者产生的数据与消费者消费的数据的时间完全随机 c1.start() c2.start()
8.io模型(https://www.cnblogs.com/Eva-J/articles/8324837.html) (https://blog.csdn.net/sehanlingfeng/article/details/78920423)
梳理linux下的network的IO模型 ,当IO出现会涉及两个对象 ,1.调用IO的进程或线程 2.内核 .当一个read磁盘数据开始会进行两个阶段 1.等待数据准备 2.将数据从内核拷贝到进程中
阻塞IO(使用多线程多进程池来减少IO的影响)
实际上除非特别指定,几乎所有的IO接口都是阻塞 ,在执行recv(1024)线程被阻塞 ,在这个期间线程不计算和响应请求,被挂起来
非阻塞IO
进程需要不断的轮询询问kernel是否准备好数据 ,这个过程循环耗费大量cpu
IO多路复用
也称为异步阻塞IO ,建立在内核提供的多路分离函数select基础上
使用select可以避免非阻塞IO中不断轮询询问内核的过程 ,使用select函数对线程创建的所有socket的IO添加监视, 如果数据准备好select就会通知用户线程去拷贝数据
实际上每个IO请求在select函数上是阻塞的 ,相对于阻塞IO(recvfrom阻塞)+多线程而言 ,IO多路复用优势在于处理超高并发, 最大的优势是可以在一个用户线程内同时处理多个socket的IO请求
9.socketserver模块
https://www.cnblogs.com/eric_yi/p/7701381.html
import socketserver """ 根据继承顺序查看 实例化socket对象ThreadingTCPServer类 ,RequestHandlerClass = Handler socket对象执行了server_forever()方法 最后跳来跳去执行了finish_request()方法 ,self.RequestHandlerClass(request, client_address, self)实例化自己写的类 执行了BaseRequestHandler的init方法 ,self.handle()又执行了自定义的方法开始处理数据 """ class Handler(socketserver.BaseRequestHandler): # 定义类,必须继承BaseRquestHandler方法! def handle(self): # 必须有handler方法 print('New connection:', self.client_address) while 1: data = self.request.recv(1024) if not data: break print('client data', data) self.request.send(data) if __name__ == '__main__': server = socketserver.ThreadingTCPServer(('127.0.0.1', 8009), Handler) # 实例化socket对象绑定bind ,自定义的类 server.serve_forever() # 事件监听调用handler方法
10.socketserver模块实现ftp ,先简单实现一下
ftp的功能:1.上传文件 2.下载文件
import socketserver import os import json class FtpServer(socketserver.BaseRequestHandler): def handle(self): head_info = json.loads(self.request.recv(1024).decode('utf-8')) print(head_info) if hasattr(self, head_info['type']): func = getattr(self, head_info['type']) func(head_info) else: print('type?') def get(self, head_info): filename = head_info['filename'] if os.path.isfile(filename): filezize = str(os.path.getsize(filename)) print(filezize) self.request.send(filezize.encode('utf-8')) with open(filename, mode='rb') as f1: for i in f1: self.request.send(i) def put(self, head_info): filename = head_info['filename'] file_size = head_info['filesize'] recv_size = 0 with open(filename, mode='wb') as f1: while recv_size < file_size: print(file_size, recv_size) data = self.request.recv(1024) f1.write(data) recv_size += len(data) if __name__ == '__main__': server = socketserver.ThreadingTCPServer(('127.0.0.1', 8000), FtpServer) server.serve_forever() ######################################################################################################################################################## import socket import os import json class FtpClient(): def __init__(self): self.Client = socket.socket() def connect(self, Ip_Port): self.connect = self.Client.connect(Ip_Port) def put(self, cmd_list): if os.path.isfile(cmd_list[1]): file_dict = { 'type': 'put', 'filename': cmd_list[1], 'filesize': os.path.getsize(cmd_list[1]) } self.Client.send(json.dumps(file_dict).encode('utf-8')) with open(cmd_list[1], mode='rb') as f1: for i in f1: self.Client.send(i) print('上传完成') else: print('上传的不是文件') def get(self, cmd_list): filename = cmd_list[1] file_dict = { 'type': 'get', 'filename': filename, } self.Client.send(json.dumps(file_dict).encode('utf-8')) filesize = int(self.Client.recv(1024).decode('utf-8')) recv_size = 0 print(filesize) with open(filename, mode='wb') as f1: while 1: data = self.Client.recv(1024) f1.write(data) recv_size += len(data) if recv_size == filesize: break def Interface_cmd(self): cmd = input('>>>') cmd_list = cmd.split() if hasattr(self, cmd_list[0]): func = getattr(self, cmd_list[0]) func(cmd_list) c1 = FtpClient() c1.connect(('127.0.0.1', 8000)) c1.Interface_cmd()