引子:
之前学习过了,线程,进程的概念,知道了在操作系统中进程是资源分配的最小单位,线程是CPU调度的最小单位.按道理来说我们已经算是把CPU的利用率提高很多了.但是我们知道无论是创建多进程还是创建多线程来解决问题,都要消耗一定的时间来创建进程,创建线程,以及管理他们之间的切换.
随着我们对于效率的最求不断提高,基于单线程来实现并发又成为一个新的课题.即只用一个主线程的情况下实现并发.这样就可以节省创建线程进程所消耗的时间.
并发的本质: 切换+保存状态
cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是任务发生了阻塞,另外一种情况是该任务计算时间过长
对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另外一个任务去计算,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给我们的线程。
协成的本质就是在单线程下,由用户自己控制一个任务遇到IO阻塞就切换另一个任务去执行,一次来提升效率.
#可以控制对个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行. #作为1的补充;可以检测IO操作,在遇到IO操作的情况下才发生切换
协程介绍:
协程:是单线程下的并发,又称为为线程. 协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的.
协程特点:
必须在只有一个单线程里实现并发
修改共享数据不需加锁
用户程序里自己保存多个控制流的上下文栈
一个协程遇到IO操作自动切换到其他协程
Greenlet模块
安装:pip3 install greenlet

from greenlet import greenlet def eat(name): print('%s eat 1'%name) g2.switch('egon') print('%s eat 2'%name) g2.switch() def play(name): print('%s play 1' %name) g1.switch print('%s play 2' %name) g1 = greenlet(eat) g2 = greenlet(play) g1.switch('egon')#可以在第一次switch时传入参数,以后都不需要
Gevent模块
安装:pip3 install gevent
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet,它是以C扩展模块形式接入Python的轻量级线程.Greenlet全部运行在主程序操作系统进程的内部,但他们被协作式的调度.

import gevent def eat(name): print('%s eat 1' %name) gevent.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) gevent.sleep(1) print('%s play 2' %name) g1=gevent.spawn(eat,'egon') g2=gevent.spawn(play,name='egon') g1.join() g2.join() #或者gevent.joinall([g1,g2]) print('主')
IO多路复用
IO多路复用作用:检查多个socket是否已经发生变化(是否连接成功/是否已经获取数据)(可读/可写)
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。
三种模式:
select:最多1024个socket;循环去检测。
poll:不限制监听socket个数;循环去检测(水平触发)。
epoll:不限制监听socket个数;回调方式(边缘触发)。
基于IO多路复用+socket实现并发请求:
IO多路复用
socket 非阻塞(不等待)
异步:执行完某个任务后自动调用我给它的函数
Python中开源 基于事件循环实现的异步非阻塞框架 Twisted

import socket import requests # 方式一 ret = requests.get('https://www.baidu.com/s?wd=alex') # 方式二 client = socket.socket() # 百度创建连接: 阻塞 client.connect(('www.baidu.com',80)) # 问百度我要什么? client.sendall(b'GET /s?wd=alex HTTP/1.0 host:www.baidu.com ') # 我等着接收百度给我的回复 chunk_list = [] while True: chunk = client.recv(8096) if not chunk: break chunk_list.append(chunk) body = b''.join(chunk_list) print(body.decode('utf-8'))

import socket import requests # 方式一 key_list = ['alex','db','sb'] for item in key_list: ret = requests.get('https://www.baidu.com/s?wd=%s' %item) # 方式二 def get_data(key): # 方式二 client = socket.socket() # 百度创建连接: 阻塞 client.connect(('www.baidu.com',80)) # 问百度我要什么? client.sendall(b'GET /s?wd=alex HTTP/1.0 host:www.baidu.com ') # 我等着接收百度给我的回复 chunk_list = [] while True: chunk = client.recv(8096) if not chunk: break chunk_list.append(chunk) body = b''.join(chunk_list) print(body.decode('utf-8')) key_list = ['alex','db','sb'] for item in key_list: get_data(item)

import threading key_list = ['alex','db','sb'] for item in key_list: t = threading.Thread(target=get_data,args=(item,)) t.start()
提高并发方案:
--多进程
--多线程
--异步非阻塞模块(Twisted) scrapy框架(单线程完成并发)

import socket import select client1 = socket.socket() client1.setblocking(False) # 百度创建连接: 非阻塞 try: client1.connect(('www.baidu.com',80)) except BlockingIOError as e: pass client2 = socket.socket() client2.setblocking(False) # 百度创建连接: 非阻塞 try: client2.connect(('www.sogou.com',80)) except BlockingIOError as e: pass client3 = socket.socket() client3.setblocking(False) # 百度创建连接: 非阻塞 try: client3.connect(('www.oldboyedu.com',80)) except BlockingIOError as e: pass socket_list = [client1,client2,client3] conn_list = [client1,client2,client3] while True: rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005) # wlist中表示已经连接成功的socket对象 for sk in wlist: if sk == client1: sk.sendall(b'GET /s?wd=alex HTTP/1.0 host:www.baidu.com ') elif sk==client2: sk.sendall(b'GET /web?query=fdf HTTP/1.0 host:www.sogou.com ') else: sk.sendall(b'GET /s?wd=alex HTTP/1.0 host:www.oldboyedu.com ') conn_list.remove(sk) for sk in rlist: chunk_list = [] while True: try: chunk = sk.recv(8096) if not chunk: break chunk_list.append(chunk) except BlockingIOError as e: break body = b''.join(chunk_list) # print(body.decode('utf-8')) print('------------>',body) sk.close() socket_list.remove(sk) if not socket_list: break
什么是异步非阻塞?
--非阻塞:不等待
比如创建socket对某个地址进行connect,获取接收数据recv时默认都会等待(连接成功或接收到数据),才执行后续操作.如果设置setblocking(False),以上两个过程就不在等待,到时会报BlockingIOError的错误,只要捕获即可.
--异步,通知,执行完成后自动执行回调函数或自动执行某些操作(通知)
比如做爬虫中向某个地址baidu.com发送请求,当请求执行完成之后自动执行回调函数.
什么是同步阻塞?
--阻塞:等
--同步:按照顺序逐步执行
key_list = ['alex','db','sb'] for item in key_list: ret = requests.get('https://www.baidu.com/s?wd=%s' %item) print(ret.text)