Python6 ---- 线程与进程、网络编程、生成器与迭代器、协程、异步IO
线程与进程、网络编程、生成器与迭代器、协程、异步IO
requests模块的介绍
-
requests的作用
通过python来模拟请求网址 -
一个模拟请求由以下四个部分组成
- url
- method
- body
- headers
-
模拟请求百度
当前python环境下执行以下语句安装第三方库,在Terminal安装requests库在 pip install requests
import requests def request_baidu(): url = "https://www.baidu.com/" # body = "" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36" } response = requests.get(url=url, headers=headers) print(response.text)
理解多线程和多进程
-
什么是进程?什么是线程?
- 进程: 可以简单地认为是一个程序. 进程是操作系统分配资源的最小单位.
- 线程: 一个进程可以有多个线程, 每个线程可以独立完成一些任务. 线程是操作系统进行运算调度的最小单位.
-
多线程demo
from threading import Thread for i in range(10): # 只是创建了线程对象,target指定目标 t = Thread(target=request_baidu) # 启动线程 t.start()
-
多进程demo
from multiprocessing import Process for i in range(10): # 只是创建了进程对象 p = Process(target=request_baidu) # 启动进程 p.start()
-
多线程
-
等待任务完成后回到主进程
通过调用Thread
对象的join
方法# 保存当前thread对象 thread_array = [] for i in range(10): t = Thread(target=request_baidu, args=(i, )) thread_array.append(t) t.start() # 调用thread对象join接口, 等待任务完成后回到主进程 for t in thread_array: t.join() print("done!")
-
如何拿到返回结果
- 赋值到全局变量当中, 添加到可变对象之中
result = [] def request_baidu(index): ... result.append(response) if __name__ == "__main__": thread_array = [] for i in range(10): t = Thread(target=request_baidu, args=(i, )) thread_array.append(t) t.start() for t in thread_array: t.join() print("done!") print(result)
- 赋值到全局变量当中, 添加到可变对象之中
-
-
多进程
- 等待任务完成后回到主进程
通过调用Process
对象的join
方法 - 如何拿到返回结果
无法通过全局变量存储返回结果,线程与进程返回的结果是不一样的
多进程相当于启动了多个程序, 共同执行了同一份代码, 他们之间的内存地址完全不一样import requests import time from threading import Thread from multiprocessing import Process result = [] print(f"主进程result内存地址: {id(result)}") def request_baidu(index): time.sleep(2) url = "https://www.baidu.com/" # body = "" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36" } response = requests.get(url=url, headers=headers) print(f"当前请求序号: {index}, 返回结果状态码: {response.status_code}") print(f"子进程result内存地址: {id(result)}") result.append(response) # 如果没有判断入口代码段if __name__ == "__main__", 多进程程序会报错 # 原因是windows和pycharm的进程阻塞带来的问题 if __name__ == "__main__": process_array = [] for i in range(10): p = Process(target=request_baidu, args=(i, )) process_array.append(p) p.start() for p in process_array: p.join() print("done!") print(result)
- 等待任务完成后回到主进程
-
多进程和多线程的异同点
- 相同点
- 都是对
cpu
工作时间段的描述, 只是颗粒度不同.
简单地说就是多进程和多线程都会调用cpu
资源的, 但是进程可以启动多个线程去执行. - 在
linux
内核态不区分进程和线程
- 都是对
- 不同点
- 进程有自己的独立地址空间, 建立数据表来维护代码段, 堆栈段和数据段, 而线程共享进程中的资源, 使用相同的地址空间, 所以线程间的切换快得多.
- 因为线程共享进程的全局变量, 静态变量等对象, 线程间的通信更为方便, 而进程间的通信更加复杂, 需要以
ipc
的方式进行. - 多进程要比多线程要健壮. 进程之间一般不会相互影响, 而多线程有一条线程崩溃, 会导致整个进程跟着发生崩溃或者无法正常退出等.
- 相同点
全局解释器锁(GIL)
- 计算密集型
主要占用cpu
资源 - IO密集型
IO
就是input output
, 需要等待的一些任务- 网络请求会有网络延迟
- 和数据库交互需要等待数据库查询事件
- 读写硬盘
- 多进程在处理计算密集型程序的时候比多线程块
由于全局解释器锁的存在, 一个进程下, 只允许一个线程执行Python
程序的字节码(当前代码文件的二进制表示).
简单地说, 创建的10个线程其实在争夺一个cpu
资源. 但是遇到io
操作会让渡cpu
资源. - 如何绕过GIL?
- 将多线程方法改为多进程
- 将计算密集型任务转移给C扩展
- 分布式计算引擎
spark
,Apache
- 使用
PyPy
解释器, 工业上几乎没人这么用, 因为PyPy
并不成熟.
课后作业
- 利用
Python
实现一个多线程程序 - 将多线程程序改为多进程程序
import requests
import time
from threading import Thread
from multiprocessing import Process
def requests_baidu(index):
time.sleep(1)
url = "https://www.baidu.com"
# body = ""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36"
}
response = requests.get(url=url, headers=headers)
# print(f"请求序号:{index}, 返回状态码:{response.status_code}")
def process():
start_time = time.time()
thread_array = []
for i in range(10):
p = Thread(target=requests_baidu, args=(i,))
thread_array.append(p)
p.start()
for p in thread_array:
p.join()
end_time = time.time()
print(f"process thread execute time: {end_time - start_time} s")
def thread():
start_time = time.time()
thread_array = []
for i in range(10):
t = Thread(target=requests_baidu, args=(i,))
thread_array.append(t)
t.start()
for t in thread_array:
t.join()
end_time = time.time()
print(f"thread execute time: {end_time - start_time} s")
if __name__ == "__main__":
thread()
process()
进程间通信(IPC)
-
文件
通过读写文件来进行变量, 数据, 信息的传递- 读写冲突
两个进程同时进行写, 或者一个写一个读, 造成了冲突. - 解决读写冲突
- 互斥锁
from multiprocessing import Process, Lock def save_to_file(index, lock): with lock: with open("test.log", "a", encoding="utf-8") as f: f.write(str(index) + "\n") if __name__ == "__main__": process_array = [] lock = Lock() for i in range(10): p = Process(target=save_to_file, args=(i, lock)) process_array.append(p) p.start() for p in process_array: p.join() print("done!")
- 互斥锁
- 读写冲突
-
套接字(socket-插座)
通过一个协议, 连接两个进程. 主要就是网络请求.进程A向百度云上传文件, 进程B向百度云下载文件, 不会有冲突.
-
管道(了解)
用文件的内存缓冲区作为管道, 实现进程间通信- 匿名管道
主进程和子进程进行交互 - 具名管道
和匿名管道原理是一样的, 不是不相关的进程也可以互相访问
- 匿名管道
-
消息队列
就是一个存在内核内存空间中的列表redis就是消息队列+socket
from multiprocessing import Process, Queue def save_to_queue(index, my_queue): my_queue.put(index) if __name__ == "__main__": process_array = [] my_queue = Queue() for i in range(10): p = Process(target=save_to_queue, args=(i, my_queue)) process_array.append(p) p.start() for p in process_array: p.join() while True: print(my_queue.get())
-
共享内存(了解)
进程访问内核态同一块内存from multiprocessing import Queue, Array, Value
-
信号量(了解)
不是用来传递数据的, 是用来传递消息进程B要等到进程A执行到某一步操作后, 才会启动
进程A->发消息->内核->转发信息->进程B
线程间通信
线程间通信强调的是线程之间传递对象引用
- 共享变量
-
线程安全
线程有GIL锁, 但是拿到GIL锁不代表可以一直执行下去
现代计算机多线程也是A执行一会儿, B执行一会儿这样交替执行import requests import time from threading import Thread zero = 0 def foo(): global zero for i in range(10**7): zero += 1 zero -= 1 if __name__ == "__main__": process_array = [] for i in range(2): p = Thread(target=foo) process_array.append(p) p.start() for p in process_array: p.join() print(zero)
-
解决线程安全
将重要指令包装成原子操作(不可分割的).- 加互斥锁
import requests import time from threading import Thread,Lock zero = 0 lock = Lock() def foo(): global zero for i in range(10**6): with lock: zero += 1 zero -= 1 if __name__ == "__main__": process_array = [] for i in range(2): p = Thread(target=foo) process_array.append(p) p.start() for p in process_array: p.join() print(zero)
-
课后作业
- 多进程锁, 多线程锁都要自己实现一遍
- 多进程通过Queue来实现进程通信
- 把上述概念熟记并理解
迭代器和生成器
-
迭代器
概念上: 迭代器可以用来表示一个数据流, 提供了数据的惰性返回功能(只有我们主动去使用next方法调用, 才会返回值)
实现上: 实现了__next__
接口的对象传统声明一个列表, 里面的元素会立即写进内存当中, 占用大量内存.
迭代器可以一次只返回一个元素, 占用内存非常小, 在读取大文件和大的数据集合的时候特别有用
-
通过
iter
方法返回一个迭代器对象# 两者实现的功能是一摸一样的 l = list(range(10**7)) l2 = iter(range(10**7))
-
通过
next
方法主动获取迭代器中的值# 当迭代器中没有值了以后, 会抛出StopIteration的异常, 需要大家自行处理一下 l = iter(range(5)) print(next(l)) print(next(l)) print(next(l)) print(next(l)) print(next(l)) print(next(l))
-
-
生成器
生成器是一种特殊的迭代器, 在迭代器惰性返回数据的基础上, 提供了额外的功能, 实现了程序的暂停.- 声明一个生成器
只要函数体中有yield
关键词, 它就是一个生成器yield翻译为让渡, 我们可以简单理解为暂停并返回右边的值
def my_range_gen(n): for i in range(n): yield i*i print(f"current index: {i}") my_range = my_range_gen(10) print(my_range) print(next(my_range)) print(next(my_range)) print(next(my_range)) print(next(my_range))
- 声明一个生成器
-
生成器和迭代器的区别?
同样提供了惰性返回的功能, 迭代器侧重于提供数据的惰性返回功能, 生成器侧重于指令的惰性返回功能
协程
- 协程的原理
协程的实现原理就是生成器的实现原理, 在生成器的基础上又提供了传递值的功能.-
通过
send
方法向生成器传递值, 以下例子中, b就是通过send方法赋值为2
对生成器进行send
操作一定要调用next
方法预激, 使其停留在第一个yield位置def simple_coro(a): print("初始值 a=", a) b = yield a print("传递值 b=", b) c = yield a + b print("传递值 c=", c) coro = simple_coro(1) print(next(coro)) print(coro.send(2)) print(coro.send(3))
-
用协程实现计算平均数的函数
def coro_avg(): total = 0 length = 0 while True: try: value = yield total/length except ZeroDivisionError: value = yield 0 total += value length += 1 my_avg = coro_avg() print(next(my_avg)) print(my_avg.send(2)) print(my_avg.send(3))
-
yield
和yield from
yield from实现的协程异步程序晦涩难懂, 在python3.4引用asyncio标准库之后被弃用
yield from
用来驱动子程序中的循环并返回最终值def return_triple(): while True: value = yield if value % 3 == 0: return value def triple_recorder(): while True: result = yield from return_triple() triple_array.append(result) triple_array = [] coro = triple_recorder() next(coro) for i in range(100): coro.send(i) print(triple_array)
-
异步I/O
-
asyncio(异步)
Python3.4引入的标准库, 替换yield from实现协程异步IO, 可以更好地实现异步程序
实现原理: 自动维护了一个事件队列, 然后循环访问事件来完成异步的消息维护.import asyncio import time class Response: staus_code = 200 async def sim_request(index): print(f"模拟发送请求 Index: {index}") response = Response() # 模拟网络延迟 # 当前是单线程运行的, 如果调用的是time.sleep(1), 那么这个线程会被阻塞 # 当前线程被阻塞之后, 不会让渡cpu资源, 异步的效率就不会体现 await asyncio.sleep(1) print(f"request index {index}, response status_code: {response.staus_code}") return response.staus_code # 获取消息队列 loop = asyncio.get_event_loop() # 包装任务 task_array = [] for i in range(100): task_array.append(sim_request(i)) # 循环访问事件来完成异步的消息维护 loop.run_until_complete(asyncio.wait(task_array)) # 关闭事件循环 loop.close()
- 当前异步实际上有没有提高效率, 也关乎到你调用的第三方是不是异步的.
这也是当前python异步的一个痛点, 就是丰富的第三方库不是都支持asyncio的.
- 小技巧: 获取异步完成之后的所有返回值
result = loop.run_until_complete(asyncio.gather(*task_array)) print(result)
- 当前异步实际上有没有提高效率, 也关乎到你调用的第三方是不是异步的.
课后作业
- 什么是迭代器?什么是生成器?两者有什么区别?
- 协程的实现原理.
asyncio
实现原理- 用协程实现一个计算平均数的函数
def coro_avg():
total = 0
length = 0
while True:
try:
value = yield total / length
except ZeroDivisionError:
value=yield 0
total += value
length += 1
my_avg = coro_avg()
print(next(my_avg))
print(my_avg.send(2))
-
编写一个
asyncio
异步程序
``` bash
import asyncio
import time# asyncio异步程序 async def do_something(index): print(f"第{index}个任务启动") await asyncio.sleep(1) print(f"第{index}个任务完成") # python 3.10才有问题,使用下面两行代码 # https://blog.csdn.net/u013421629/article/details/100163014 # loop = asyncio.get_event_loop() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) task_array = [] for i in range(10): task_array.append(do_something(i)) loop.run_until_complete(asyncio.wait(task_array)) loop.close()
-
扩展: 了解
aiohttp
异步请求网址