1. course
1.进程创建的两种方式
-
开启进程的第一种方式:
from multiprocessing import Process import random import time def task(name): print(f'{name} is running') time.sleep(random.randint(1, 3)) print(f'{name} is gone') if __name__ == '__main__': # 在windows环境下, 开启进程必须在 __name__ == '__main__' 下面 p = Process(target=task, args=('常鑫')) # 创建一个进程对象 p.start() ''' 只是想操作系统发出一个开辟子进程的信号,然后就执行下一行 这个信号操作系统收到后,会从内存中开辟一个子进程空间,然后将主进程所有数据copy加载到子进程,然后再调用cpu去执行开辟子进程的开销很大 ''' print('开始') time.sleep(2) # 所以永远先执行主进程的代码
-
开启进程的第二种方式:
from multiprocessing import Process import random import time class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run1(self): print(f'{self.name} is running') time.sleep(random.randint(1, 3)) print(f'{self.name} is gone') if __name__ == '__main__': p = MyProcess('常鑫') p.start() print('==主')
-
简单应用
# 简单应用/ from multiprocessing import Process import time def task(name): print(f'{name} is running') time.sleep(1) print(f'{name} is gone') def task1(name): print(f'{name} is running') time.sleep(2) print(f'{name} is gone') def task2(name): print(f'{name} is running') time.sleep(3) print(f'{name} is gone') if __name__ == '__main__': p1 = Process(target=task, args=('常鑫一号英雄',)) p2 = Process(target=task, args=('常鑫二号英雄',)) start_time = time.time() task(1) task1(2) task2(3) print(f'结束时间{time.time() - start_time}') # 三个进程并发或者并行的执行三个任务 # 创建进程是并行,不创建是串行
2.获取进程pid
import os
print(f'子进程:{os.getpid()}')
print(f'主进程:{os.getppid()}')
cmd命令查看pid
tasklist 查看所有进程的pid
tasklist|findstr pycharm 查看pycharm的pid
from multiprocessing import Process
import os
print(f'子进程:{os.getpid()}')
print(f'主进程:{os.getppid()}')
def task(name):
print(f'子进程:{os.getpid()}')
print(f'主进程:{os.getppid()}')
if __name__ == '__main__':
p = Process(target=task, args=('常鑫',))
p.start()
print('==主开始')
print(f'==主{os.getpid()}')
print(f'===主{os.getppid()}')
3.验证进程之间的空间隔离
初始子进程的时候copy主进程,之后主进程和子进程没有任何联系,不共同享用任何内容
from multiprocessing import Process
import time
name = '常鑫'
def task():
global name
name = '郭记'
print(f'子进程的名字是: {name}')
if __name__ == '__main__':
p = Process(target=task)
p.start()
time.sleep(1)
print(f'主进程的名字是: {name}')
----------------------------分割线-----------------------------
lst = ['郭苏慧', ]
def task1():
lst.append('郭记')
print(f'子进程的名字是: {lst}')
if __name__ == '__main__':
p = Process(target=task1)
p.start()
time.sleep(2)
print(f'主进程的名字是: {lst}')
4. join
join 让主进程等待子进程结束之后再执行主进程
join 只针对主进程,如果join下面多次join 他是不阻塞的
join 就是阻塞,主进程有join,主进程下面的代码一律不执行,直到进程执行完毕之后,在执行.
# 正确 重点
from multiprocessing import Process
import time
def task(name):
print(f'{name} is running')
time.sleep(2)
print(f'{name} is gone')
if __name__ == '__main__':
start_time = time.time()
l1 = []
for i in range(1, 4):
p = Process(target=task, args=(i,))
l1.append(p)
p.start()
for i in l1:
i.join()
print(f'==主{time.time() - start_time}')
错误示范:
for i in range(1,4):
p = Process(target=task,args=(i,))
p.start()
p.join()
'''
p1 = Process(target=task,args=(1,))
p1.start()
p1.join()
p2 = Process(target=task,args=(2,))
p2.start()
p2.join()
p3 = Process(target=task,args=(3,))
p3.start()
p3.join()
'''
# join让主进程等待子进程结束之后再执行主进程
from multiprocessing import Process
import time
def task(name):
print(f'{name} is running')
time.sleep(2)
print(f'{name} is gone')
if __name__ == '__main__':
p = Process(target=task, args=('常鑫',))
p.start()
p.join()
print('==主进程开始')
# 多个进程使用join
def task(name, sec):
print(f'{name} is running')
time.sleep(sec)
print(f'{name} is gone')
if __name__ == '__main__':
star_time = time.time()
start_time = time.time()
p1 = Process(target=task, args=('常鑫', 1))
p2 = Process(target=task, args=('李业', 2))
p3 = Process(target=task, args=('海狗', 3))
p1.start()
p2.start()
p3.start()
# join 只针对主进程,如果join下面多次join 他是不阻塞的.
p1.join()
p2.join()
p3.join()
print(f'==主{time.time()-start_time}')
# ----------------------------------------------------------
def task(name, sec):
print(f'{name}is running')
time.sleep(sec)
print(f'{name} is gone')
if __name__ == '__main__':
start_time = time.time()
p1 = Process(target=task, args=('常鑫', 3))
p2 = Process(target=task, args=('李业', 2))
p3 = Process(target=task, args=('海狗', 1))
p1.start()
p2.start()
p3.start()
# join就是阻塞
p1.join() # 等2s
print(f'==主1:{time.time() - start_time}')
p2.join()
print(f'===主2:{time.time() - start_time}')
p3.join()
print(f'==主3:{time.time() - start_time}')
5.进程的其他参数
p.terminate() # 杀死子进程 ***
print(p.is_alive()) # *** 判断子进程 False True
p.join() # ***
from multiprocessing import Process
import time
def task(name):
print(f'{name} is running')
time.sleep(2)
print(f'{name} is gone')
if __name__ == '__main__':
p = Process(target=task, args=('常鑫', ), name='Alex')
p.start()
time.sleep(1)
p.terminate() # 杀死子进程 ***
p.join() # ***
time.sleep(1)
print(p.is_alive()) # *** 判断子进程 False
print(p.name)
p.name = 'sb'
print(p.name)
print('主程序开始')
6.守护进程
p.daemon = True
将p子进程设置成守护进程,只要主进程结束,守护进程马上结束 一定要在子进程开启之前设置
from multiprocessing import Process
import time
def task(name):
print(f'{name} is running')
time.sleep(2)
print(f'{name} is gone')
if __name__ == '__main__':
p = Process(target=task, args=('常鑫',)) # 创建一个进程对象
p.daemon = True # 将p子进程设置成守护进程,只要主进程结束,守护进程马上结束.
p.start()
# p.daemon = True # 一定要在子进程开启之前设置
time.sleep(1)
print('===主')
7.僵尸进程孤儿进程
基于unix
环境(linux, macOS
)
-
主进程需要等待子进程结束之后,主进程才结束
主进程时刻监测子进程的运行状态,当子进程结束之后,一段时间之内,将子进程进行回收.
-
为什么主进程不在子进程结束后马上对其回收呢?
- 主进程与子进程是异步关系.主进程无法马上捕获子进程什么时候结束.
- 如果子进程结束之后马上再内存中释放资源,主进程就没有办法监测子进程的状态了.
-
unix
针对于上面的问题,提供了一个机制.所有的子进程结束之后,立马会释放掉文件的操作链接,内存的大部分数据,但是会保留一些内容: 进程号,结束时间,运行状态,等待主进程监测,回收.
-
僵尸进程:所有的子进程结束之后,在被主进程回收之前,都会进入僵尸进程状态.
-
僵尸进程有无危害???
如果父进程不对僵尸进程进行回收(
wait/waitpid
),产生大量的僵尸进程,这样就会占用内存,占用进程pid
号. -
孤儿进程:
父进程由于某种原因结束了,但是你的子进程还在运行中,这样你的这些子进程就成了孤儿进程.你的父进程如果结束了,你的所有的孤儿进程就会被
init
进程的回收,init
就变成了你的父进程,对你进行回收. -
僵尸进程如何解决???
父进程产生了大量子进程,但是不回收,这样就会形成大量的僵尸进程,解决方式就是直接杀死父进程,将所有的僵尸进程变成孤儿进程进程,由
init
进行回收.
8.互斥锁
互斥锁:
指散布在不同进程之间的若干程序片断,当某个进程运行其中一个程序片段时,其它进程就不能运行它们之中的任一程序片段,只能等到该进程运行完这个程序片段后才可以运行的一种类似于"锁"的机制
版本一:
现在是所有的进程都并发的抢占打印机.
并发是以效率优先的,但是目前我们的需求: 顺序优先.
多个进程共强一个资源时, 要保证顺序优先: 串行,一个一个来.
版本二:
我们利用join 解决串行的问题,保证了顺序优先,但是这个谁先谁后是固定的.
这样不合理. 你在争抢同一个资源的时候,应该是先到先得,保证公平.
lock与join的区别.
共同点: 都可以把并发变成串行, 保证了顺序.
不同点: join人为设定顺序,lock让其争抢顺序,保证了公平性.
版本三:(for i in 循环)
from multiprocessing import Process
from multiprocessing import Lock
import time
import random
import sys
def task1(p, lock):
lock.acquire()
print(f'{p}开始打印了')
time.sleep(random.randint(1, 3))
print(f'{p}开始打印了')
lock.release()
def task2(p, lock):
lock.acquire()
print(f'{p}开始打印了')
time.sleep(random.randint(1, 3))
print(f'{p}开始打印了')
lock.release()
def task3(p, lock):
lock.acquire()
print(f'{p}开始打印了')
time.sleep(random.randint(1, 3))
print(f'{p}开始打印了')
lock.release()
if __name__ == '__main__':
mutex = Lock()
for i in range(1, 4):
p = Process(target=getattr(sys.modules[__name__], f'task{i}'), args=(i, mutex))
p.start()
9.进程之间的通信
进程在内存级别是隔离的,但是文件在磁盘上
1.基于文件通信
抢票系统.
1. 先可以查票.查询余票数. 并发
2. 进行购买,向服务端发送请求,服务端接收请求,在后端将票数-1,返回到前端. [串行].
当很多进程共强一个资源(数据)时, 你要保证顺序(数据的安全),一定要串行.
互斥锁: 可以公平性的保证顺序以及数据的安全.
基于文件的进程之间的通信:
1.效率低.
2.自己加锁麻烦而且很容易出现死锁.
from multiprocessing import Process
from multiprocessing import Lock
import random
import time
import json
import os
def search():
time.sleep(random.randint(1, 3)) # 模拟网络延迟(查询环节)
with open('db.json', encoding='utf-8') as f1:
dic = json.load(f1)
print(f'{os.getpid()}查看了票的剩余量,还剩余{dic["count"]}')
def paid():
with open('db.json', encoding='utf-8') as f1:
dic = json.load(f1)
if dic['count'] > 0:
dic['count'] -= 1
time.sleep(random.randint(1, 3))
with open('db.json', encoding='utf-8', mode='w') as f1:
json.dump(dic, f1)
print(f'{os.getpid()}购票成功, 还剩余{dic["count"]}票')
else:
time.sleep(1)
print(f'{os.getpid()}购票未成功')
def task(lock):
search()
lock.acquire()
paid()
lock.release()
if __name__ == '__main__':
mutex = Lock()
for i in range(5):
p = Process(target=task, args=(mutex,))
p.start()
2.基于队列通信
队列: 把队列理解成一个容器,这个容器可以承载一些数据,
队列的特性: 先进先出永远保持这个数据. FIFO 羽毛球筒.
q.put(5555) 当队列满了时,在进程put数据就会阻塞.
print(q.get()) 当数据取完时,在进程get数据也会出现阻塞,直到某一个进程put数据.
print(q.get(timeout=3)) 阻塞3秒,3秒之后还阻塞直接报错.
print(q.get(block=False)) 只要遇到阻塞就会报错.
利用队列Queue改进选票系统作业:
- 票数放入队列中存储
- 开启多个进程进行选票,查票为并发效果,买票为串行效果
- 购买成功、失败都需要提示
from multiprocessing import Process
from multiprocessing import Queue
import random
import time
import os
def search(q):
get = q.get() # 取出
print(f'{os.getpid()}查看了票的剩余量,还剩余{get["count"]}')
q.put(get) # 输入
def paid(q):
time.sleep(random.randint(1, 3))
q_dic = q.get() # 取出
q.put(q_dic) # 输入
if q_dic["count"] > 0:
q_dic["count"] -= 1
print(f"{os.getpid()}购买成功!{q_dic['count']} ")
try:
q.put(q_dic, block=False)
except Exception:
pass
else:
print(f"{os.getpid()}购买失败")
def task(q):
search(q)
paid(q)
if __name__ == '__main__':
q = Queue(1)
q.put({"count": 3})
for i in range(5):
p = Process(target=task, args=(q,))
p.start()
模拟双十一排队抢小米手机,多用户抢购,只能选取前10个用户:
开启多个用户抢购买手机。
只能限定10人购买。
最终将10个用户的排名展示出来。
import os
from multiprocessing import Queue
from multiprocessing import Process
def task(q):
try:
q.put(f'{os.getpid()}', block=False)
except Exception:
return
if __name__ == '__main__':
q = Queue(10)
for i in range(100):
p = Process(target=task, args=(q,))
p.start()
for i in range(1, 10):
print(f'排名第{i}的用户是{q.get()}')
10.生产者消费者模型
编程思想,模型,设计模式,理论等等,都是交给你一种编程的方法,以后你遇到类似的情况,套用即可.
生产者消费者模型三要素:
生产者: 产生数据的
消费者: 接收数据做进一步处理的
容器: 盆(队列)
那么队列容器起到什么作用? 起到缓冲的作用,平衡生产力与消费力,解耦.
from multiprocessing import Process
from multiprocessing import Queue
import random
import time
def producer(q, name):
for i in range(1, 6):
time.sleep(random.randint(1, 2))
res = f'{i}号包子'
q.put(res)
print(f'生产者{name}生产了{res}')
def consumer(q, name):
while 1:
try:
food = q.get(timeout=3)
time.sleep(random.randint(1, 3))
print(f'消费者{name}吃了吃了吃了{food}')
except Exception:
pass
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q, '常鑫'))
p2 = Process(target=consumer, args=(q, '常鑫鑫'))
p1.start()
p2.start()
2. thread
1.线程的理论知识
1. 什么是线程:进程是资源单位, 线程是执行单位.
进程:进程会在内存中开辟一个进程空间,将主进程的资料数据全部复制一份,线程会执行里面的代码.
2. 线程vs进程:
1. 开启进程的开销非常大,比开启线程的开销大很多.
2. 开启线程的速度非常快,要快几十倍到上百倍.
3. 线程与线程之间可以共享数据,进程与进程之间需借助队列等方法实现通信.
3. 线程的应用: 数据共享, 开销小,速度快,
并发: 一个cpu 看起来像是同时执行多个任务,单个进程开启三个线程,并发的执行任务
主线程子线程没有地位之分,但是,一个进程谁在干活? 一个主线程在干活,当干完活了,你得等待其他线程干完活之后,才能结束本进程
-
什么是线程
一条流水线的工作流程.
进程: 在内存中开启一个进程空间,然后将主进程的所有的资源数据复制一份,然后调用cpu去执行这些代码.
之前的描述不够具体:
开启一个进程:
在内存中开启一个进程空间,然后将主进程的所有的资源数据复制一份,然后调用线程去执行代码
进程是资源单位, 线程是执行单位.
以后你描述开启一个进程:
开启一个进程:进程会在内存中开辟一个进程空间,将主进程的资料数据全部复制一份,线程会执行里面的代码.
-
线程vs进程
- 开启进程的开销非常大,比开启线程的开销大很多.
- 开启线程的速度非常快.要快几十倍到上百倍.
- 线程线程之间可以共享数据,进程与进程之间需借助队列等方法实现通信.
-
线程的应用
-
并发: 一个cpu 看起来像是同时执行多个任务.
单个进程开启三个线程.并发的执行任务.
开启三个进程并发的执行任务.
文本编辑器:
- 输入文字.
- 在屏幕上显示.
- 保存在磁盘中.
开启多线程就非常好了:
数据共享, 开销小,速度快.
主线程子线程没有地位之分,但是,一个进程谁在干活? 一个主线程在干活,当干完活了,你得等待其他线程干完活之后,才能结束本进程
-
2.开启线程的两种方式
第一种方式:
from threading import Thread
import time
def task(name):
print(f'{name} is running')
time.sleep(1)
print(f'{name} in gone')
if __name__ == '__main__':
p1 = Thread(target=task, args=('常鑫',))
p1.start()
print('===主线程')
第二种方式:
from threading import Thread
import time
class MyThread(Thread):
def __init__(self, name, l1, s1):
super().__init__()
self.name = name
self.l1 = l1
self.s1 = s1
def run(self):
print(f'{self.name} is running')
print(f'{self.l1} is running')
print(f'{self.s1} is running')
time.sleep(1)
print(f'{self.name} is gone')
print(f'{self.l1} is gone')
print(f'{self.s1} is gone')
if __name__ == '__main__':
p1 = MyThread('常鑫', [1, 2, 3], '100')
p1.start()
print('===主线程')
3.线程vs进程的代码对比
-
开启速度对比,线程比进程
from multiprocessing import Process def work(): print('hello') if __name__ == '__main__': # 在主进程下开启线程 t = Process(target=work) t.start() print('主线程/主进程')
from threading import Thread import time def task(name): print(f'{name} is running') time.sleep(1) print(f'{name} is gone') if __name__ == '__main__': t1 = Thread(target=task, args=('海狗',)) t1.start() print('===主线程') # 线程是没有主次之分的.
-
对比
pid
同一个pid
from threading import Thread import os def task(): print(os.getpid()) if __name__ == '__main__': t1 = Thread(target=task) t2 = Thread(target=task) t1.start() t2.start() print(f'===主线程{os.getpid()}')
-
同一个进程内线程共享内部数据
同一进程内的资源数据对于这个进程的多个线程来说是共享的. from threading import Thread x = 3 def task(): global x x = 100 if __name__ == '__main__': t1 = Thread(target=task) t1.start() t1.join() print(f'===主线程{x}')
4.线程的相关其他方法(了解)
# Thread实例对象的方法
p1.setName('子线程1') # 设置线程名
p1.getName() # 返回线程名
---print(p1.name) # 获取线程名 ***
print(p1.isAlive()) # 返回线程是否活动的。
# threading模块提供的一些方法:
print(current_thread()) # 获取当前线程的对象
print(currentThread()) # 获取当前线程的对象
print(enumerate()) # 返回一个列表,包含所有的线程对象
---print(activeCount()) # *** 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
from threading import Thread
from threading import currentThread
from threading import enumerate
from threading import activeCount
import os
import time
x = 9
def task():
print(currentThread())
time.sleep(1)
print('666')
if __name__ == '__main__':
p1 = Thread(target=task, name='p1') # name 设置线程名
p2 = Thread(target=task, name='p2') # name 设置线程名
p1.start()
p2.start()
# Thread实例对象的方法
p1.setName('子线程1') # 设置线程名
p2.setName('子线程1') # 设置线程名
p1.getName() # 返回线程名
p2.getName() # 返回线程名
print(p1.name) # 获取线程名 ***
print(p2.name) # 获取线程名 ***
print(p1.isAlive()) # 返回线程是否活动的。
print(p2.isAlive()) # 返回线程是否活动的。
# threading模块提供的一些方法:
print(currentThread()) # 获取当前线程的对象
print(enumerate()) # 返回一个列表,包含所有的线程对象
print(activeCount()) # *** 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
print(f'主线程{os.getpid()}')
5.守护线程(考点)
join: 阻塞 告知主线程要等待我子线程执行完毕之后再执行主线程
主线程什么时候结束???
守护线程 等待非守护子线程以及主线程结束之后,结束.
from threading import Thread
import time
def foo():
print(123) # 1
time.sleep(1)
print("end123") # 4
def bar():
print(456) # 2
time.sleep(2)
print("end456") # 5
t1 = Thread(target=foo)
t2 = Thread(target=bar)
t1.daemon = True
t1.start()
t2.start()
print("main-------") # 3
# 结果:
# 123
# 456
# main-------
# end123
# end456
6.互斥锁(考点)
正常情况加锁之后编程串行
锁之后加上延迟就不一定,有的可能就会出现插队现象
from threading import Thread
from threading import Lock
import time
import random
x = 10
def task(lock):
lock.acquire()
time.sleep(random.randint(1, 3)) # 卡点
global x
temp = x
time.sleep(0.1)
temp = temp - 1
x = temp
lock.release()
if __name__ == '__main__':
mutex = Lock()
l1 = []
for i in range(10):
t = Thread(target=task, args=(mutex,))
l1.append(t)
t.start()
time.sleep(1)
print(f'主线程{x}')
7.死锁现象与递归锁
- 死锁现象就是: A进程那着A钥匙去找B钥匙,B进程拿着B钥匙去找A钥匙
- 递归锁: 可以解决死锁现象,业务需要多个锁时,优先考虑递归锁
- 锁必须写成
lock_A = lock_B = RLock()
格式,原理是pid
都一样,每锁一次,锁的数量加一,解开的时候减一.锁的数量如果不为零其他线程不能抢锁from threading import RLock
导入模块
死锁现象:
from threading import Thread
from threading import Lock
import time
lock_A = Lock()
lock_B = Lock()
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
lock_A.acquire()
print(f'{self.name}拿到的A')
lock_B.acquire()
print(f'{self.name}拿到的B')
lock_B.release()
lock_A.release()
def f2(self):
lock_B.acquire()
print(f'{self.name}拿到的B')
time.sleep(0.1)
lock_A.acquire()
print(f'{self.name}拿到的A')
lock_A.release()
lock_B.release()
if __name__ == '__main__':
for i in range(3):
t = MyThread()
t.start()
递归锁:
from threading import Thread
from threading import RLock
import time
lock_A = lock_B = RLock()
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
lock_A.acquire()
print(f'{self.name}拿到的A')
lock_B.acquire()
print(f'{self.name}拿到的B')
lock_B.release()
lock_A.release()
def f2(self):
lock_B.acquire()
print(f'{self.name}拿到的B')
lock_A.acquire()
print(f'{self.name}拿到的A')
time.sleep(1)
lock_A.release()
lock_B.release()
if __name__ == '__main__':
for i in range(3):
t = MyThread()
t.start()
8.信号量
也是一种锁, 控制并发数量
from threading import current_thread
获取当前线程的对象模块
from threading import Semaphore
导入信号量模块
sem = Semaphore(5)
实例化信号量 不写时无穷大
sem.acquire()
函数里面获取信号量
from threading import Thread
from threading import Semaphore
from threading import current_thread
import random
import time
sem = Semaphore(5)
def task():
sem.acquire()
print(f'{current_thread().name} 房间')
time.sleep(random.randint(1, 3))
sem.release()
if __name__ == '__main__':
for i in range(30):
t = Thread(target=task, )
t.start()
9. GIL
全局解释器锁
-
好多自称大神的说,GIL锁就是python的致命缺陷,Python不能多核,并发不行等等 ..... -
理论上来说:单个进程的多线程可以利用多核.
但是,开发
Cpython
解释器的程序员,给进入解释器的线程加了锁. -
为什么加锁?
- 当时都是单核时代,而且
cpu
价格非常贵. - 如果不加全局解释器锁, 开发
Cpython
解释器的程序员就会在源码内部各种主动加锁,解锁,非常麻烦,各种死锁现象等等.他为了省事儿,直接进入解释器时给线程加一个锁. - 优点: 保证了
Cpython
解释器的数据资源的安全. - 缺点: 单个进程的多线程不能利用多核.
- 当时都是单核时代,而且
-
Jpython
没有GIL锁.pypy
也没有GIL锁. -
现在多核时代, 我将
Cpython
的GIL锁去掉行么?因为
Cpython
解释器所有的业务逻辑都是围绕着单个线程实现的,去掉这个GIL锁,几乎不可能. -
单个进程的多线程可以并发,但是不能利用多核,不能并行. 多个进程可以并发,并行.
io
密集型: 单个进程的多线程合适,并发执行
计算密集型:多进程的并行
10.GIL
锁与lock
锁的区别
- 相同点: 都是同种锁,互斥锁.
- 不同点:
- GIL锁全局解释器锁,保护解释器内部的资源数据的安全.
- GIL锁 上锁,释放无需手动操作.
- 自己代码中定义的互斥锁保护进程中的资源数据的安全.
- 自己定义的互斥锁必须自己手动上锁,释放锁.
11.验证计算密集型IO密集型的效率
-
io
密集型: 单个进程的多线程的并发效率高合适.并发执行 -
计算密集型:多进程的并发并行效率高.并行
-
代码验证:
计算密集型: 单个进程的多线程并发 vs 多个进程的并发并行 from multiprocessing import Process from threading import Thread import time def task(): count = 0 for i in range(30000000): # (三千万) count += 1 if __name__ == '__main__': # 多进程的并发,并行 2.3737263679504395秒 start_time = time.time() l1 = [] for i in range(4): p = Process(target=task,) l1.append(p) p.start() for i in l1: i.join() print(f'执行时间:{time.time()-start_time}') # 多线程的并发 6.290118932723999秒 start_time = time.time() l1 = [] for i in range(4): p = Thread(target=task,) l1.append(p) p.start() for i in l1: i.join() print(f'执行时间:{time.time()-start_time}') 计算密集型: 多进程的并发并行效率高.
# IO密集型: 单个进程的多线程并发 vs 多个进程的并发并行 from multiprocessing import Process from threading import Thread import time def task(): count = 0 time.sleep(1) count += 1 if __name__ == '__main__': # 多进程的并发,并行 3.0123958587646484秒 start_time = time.time() l1 = [] for i in range(50): p = Process(target=task, ) l1.append(p) p.start() for p in l1: p.join() print(f'执行效率:{time.time() - start_time}') # 多线程的并发 1.0087950229644775秒 start_time = time.time() l1 = [] for i in range(50): p = Thread(target=task,) l1.append(p) p.start() for p in l1: p.join() print(f'执行效率:{time.time()- start_time}') 对于IO密集型: 单个进程的多线程的并发效率高.
12.多线程实现socket
通信
无论是多线程还是多进程,如果按照之前的写法,来一个客户端请求,我就开一个线程,来一个请求开一个线程,应该是这样: 你的计算机允许范围内,开启的线程进程数量越多越好.
服务端:
from threading import Thread
import socket
def communicate(conn, addr):
while 1:
try:
from_client_data = conn.recv(1024)
print(f'来{addr[1]}的信息{from_client_data.decode("utf-8")}')
to_client_data = input('>>>').strip()
conn.send(to_client_data.encode('utf-8'))
except Exception:
break
conn.close()
def _accket():
server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)
while 1:
conn, addr = server.accept()
t = Thread(target=communicate, args=(conn, addr))
t.start()
if __name__ == '__main__':
_accket()
客户端:
import socket
client = socket.socket()
client.connect(('127.0.0.1', 8080))
while 1:
try:
to_server_data = input('>>>').strip()
client.send(to_server_data.encode('utf-8'))
from_server_data = client.recv(1024)
print(f'来自服务器的消息: {from_server_data.decode("utf-8")}')
except Exception:
break
client.close()
13.进程池线程池
from concurrent.futures import ProcessPoolExecutor # 线程池模块
from concurrent.futures import ThreadPoolExecutor # 进程池模块
p = ProcessPoolExecutor() # 默认不写,进程池里面的进程数与cpu核个数相等(并行(并行+并发))
t = ThreadPoolExecutor() # 默认不写, cpu核个数*5 线程数 (并发)
print(os.cpu_count()) # 查看电脑几核
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import random
import time
import os
print(os.cpu_count()) # 查看电脑几核
def task():
print(f'pid号: {os.getpid()} 来了')
time.sleep(random.randint(1, 3))
if __name__ == '__main__':
# 开启进程池 (并行(并行+并发))
p = ProcessPoolExecutor() # 默认不写,进程池里面的进程数与cpu个数相等
for i in range(20):
p.submit(task, )
# 开启线程池 (并发)
t = ThreadPoolExecutor() # 默认不写, cpu个数*5 线程数
for i in range(40):
t.submit(task, )
14.阻塞 非阻塞 异步 同步
进程运行的三个状态: 运行,就绪,阻塞.
- 执行的角度:
- 阻塞: 程序运行时,遇到了IO,程序挂起,cpu被切走.
- 非阻塞: 程序没有遇到IO,程序遇到IO但是我通过某种手段,让cpu强行运行我的程序.
- 提交任务的角度:
- 同步: 提交一个任务,自任务开始运行直到此任务结束(可能有IO),返回一个返回值之后,我在提交下一个任务.
- 异步: 一次提交多个任务,然后我就直接执行下一行代码.
- 返回结果如何回收?
给三个老师发布任务:
同步:
先告知第一个老师完成写书的任务,我从原地等待,等他两天之后完成了,告诉完事了,我在发布下一个任务......
异步:
直接将三个任务告知三个老师,我就忙我的我,直到三个老师完成之后,告知我.
15.同步调用,异步调用
1.异步调用:
shutdown: 让我的主进程等待进程池中所有的子进程都结束任务之后再执行,有点类似与join.在上一个进程池没有完成所有的任务之前,不允许添加新的任务.
一个任务是通过一个函数实现的,任务完成了他的返回值就是函数的返回值.
2.同步调用:
obj是一个动态对象,返回的当前的对象的状态,有可能运行中,可能(就绪阻塞),还可能是结束了.
obj.result() 必须等到这个任务完成后,返回了结果之后,在执行下一个任务.
3 异步如何取结果???
1.异步调用 统一回收结果: 我不能马上收到任何一个已经完成的任务的返回值,我只能等到所有的任务全部结束统一回收.
2.异步加回调回收结果
1.异步调用:
from concurrent.futures import ProcessPoolExecutor
import random
import time
import os
def task(i):
print(f'{os.getpid()}开始')
time.sleep(random.randint(1, 3))
print(f'{os.getpid()}结束')
return i
if __name__ == '__main__':
'''异步调用'''
pool = ProcessPoolExecutor()
for i in range(10):
obj = pool.submit(task, i)
pool.shutdown(wait=True)
print('==主')
2.同步调用:
from concurrent.futures import ProcessPoolExecutor
import random
import time
import os
def task(i):
print(f'{os.getpid()}开始')
time.sleep(random.randint(1, 3))
print(f'{os.getpid()}结束')
return i
if __name__ == '__main__':
pool = ProcessPoolExecutor()
for i in range(10):
obj = pool.submit(task, i)
# obj是一个动态对象, 返回的当前的对象的状态, 有可能运行中, 可能(就绪阻塞), 还可能是结束了.
# obj.result() 必须等到这个任务完成后,返回了结果之后,在执行下一个任务.
print(f'任务结束:{obj.result()}')
pool.shutdown(wait=True)
print('==主')
3 异步如何取结果???
方式一: 异步调用,统一回收结果.
from concurrent.futures import ProcessPoolExecutor
import random
import time
import os
def task(i):
print(f'{os.getpid()}开始')
time.sleep(random.randint(1, 3))
print(f'{os.getpid()}结束')
return i
if __name__ == '__main__':
pool = ProcessPoolExecutor()
l1 = []
for i in range(10):
obj = pool.submit(task, i)
l1.append(obj)
pool.shutdown(wait=True)
print(l1)
for i in l1:
print(i.result())
print('==主')
统一回收结果: 我不能马上收到任何一个已经完成的任务的返回值,我只能等到所有的任务全部结束统一回收.
16.异步调用+回调函数
import requests 导入函数
线程池设置4个线程, 异步发起10个任务,每个任务是通过网页获取源码, 并发执行,当一个任务完成之后,将parse这个分析代码的任务交由剩余的空闲的线程去执行,你这个线程继续去处理其他任务.
如果进程池+回调: 回调函数由主进程去执行.
如果线程池+回调: 回到函数由空闲的线程去执行.
异步 回调是一回事儿?
异步站在发布任务的角度,
站在接收结果的角度: 回调函数 按顺序接收每个任务的结果,进行下一步处理.
异步 + 回调:
异步处理的IO类型.
回调处理非IO
浏览器工作原理, 向服务端发送一个请求,服务端验证你的请求,如果正确,给你的浏览器返回一个文件
浏览器接收到文件,将文件里面的代码渲染成你看到的漂亮美丽的模样.
什么叫爬虫?
1. 利用代码模拟一个浏览器,进行浏览器的工作流程得到一堆源代码.
2. 对源代码进行数据清洗得到我想要数据.
版本一:
1. 异步发出10个任务,并发的执行,但是统一的接收所有的任务的返回值.(效率低,不能实时的获取结果)
2. 分析结果流程是串行,影响效率.
3. 线程池设置4个线程, 异步发起10个任务,每个任务是通过网页获取源码, 并发执行,最后统一用列表回收10个任务, 串行着分析源码
版本二:
针对版本一的缺点2,改进,让串行变成并发或者并行.
1 在开一个线程进程池,并发并行的处理,开销大.
2 将原来的任务扩大,以空间换效率.
3 线程池设置4个线程, 异步发起10个任务,每个任务是通过网页获取源码+数据分析, 并发执行,最后将所有的结果展示出来.耦合性增强了,并发执行任务,此任务最好是IO阻塞,才能发挥最大的效果.
版本三:
异步调用 + 回调函数
基于异步调用回收所有任务的结果要做到实时回收结果,并发执行任务每个任务只是处理IO阻塞的,不能增加新的功能.
import requests
ret = requests.get('http://www.baidu.com')
if ret.status_code == 200:
print(ret.text)
版本一:
1. 异步发出10个任务,并发的执行,但是统一的接收所有的任务的返回值.(效率低,不能实时的获取结果)
2. 分析结果流程是串行,影响效率.
from concurrent.futures import ThreadPoolExecutor
import requests
def task(url): # 模拟的就是爬取多个源代码 一定有IO操作
ret = requests.get(url)
if ret.status_code == 200:
return ret.text
def parse(content): # 模拟对数据进行分析 一般没有IO
return len(content)
if __name__ == '__main__':
# 开启线程池,并发并行的执行 串行 耗费时间长,不可取
url_list = [
'http://www.baidu.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.taobao.com',
'https://www.cnblogs.com/jin-xin/articles/7459977.html',
'https://www.luffycity.com/',
'https://www.cnblogs.com/jin-xin/articles/9811379.html',
'https://www.cnblogs.com/jin-xin/articles/11245654.html',
'https://www.sina.com.cn/',
]
pool = ThreadPoolExecutor(4)
obj_list = []
for url in url_list:
obj = pool.submit(task, url)
obj_list.append(obj)
pool.shutdown(wait=True)
for res in obj_list:
print(parse(res.result()))
print('===主')
版本二: 针对版本一的缺点2,改进,让串行编程并发或者并行.
from concurrent.futures import ThreadPoolExecutor
import requests
def task(url): # 模拟的就是爬取多个源代码 一定有IO操作
ret = requests.get(url)
if ret.status_code == 200:
return parse(ret.text)
def parse(content): # 模拟对数据进行分析 一般没有IO
return len(content)
if __name__ == '__main__':
# 开启线程池,并发并行的执行
url_list = [
'http://www.baidu.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.taobao.com',
'https://www.cnblogs.com/jin-xin/articles/7459977.html',
'https://www.luffycity.com/',
'https://www.cnblogs.com/jin-xin/articles/9811379.html',
'https://www.cnblogs.com/jin-xin/articles/11245654.html',
'https://www.sina.com.cn/',
]
pool = ThreadPoolExecutor(4)
obj_list = []
for url in url_list:
obj = pool.submit(task, url)
obj_list.append(obj)
版本三:
基于 异步调用回收所有任务的结果我要做到实时回收结果,
并发执行任务每个任务只是处理IO阻塞的,不能增加新得功能.
异步调用 + 回调函数
from concurrent.futures import ThreadPoolExecutor
import requests
def task(url): # 模拟的就是爬取多个源代码 一定有IO操作
ret = requests.get(url)
if ret.status_code == 200:
return ret.text
def parse(obj): # 模拟对数据进行分析 一般没有IO
print(len(obj.result()))
if __name__ == '__main__':
# 开启线程池,并发并行的执行
url_list = [
'http://www.baidu.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.taobao.com',
'https://www.cnblogs.com/jin-xin/articles/7459977.html',
'https://www.luffycity.com/',
'https://www.cnblogs.com/jin-xin/articles/9811379.html',
'https://www.cnblogs.com/jin-xin/articles/11245654.html',
'https://www.sina.com.cn/',
]
pool = ThreadPoolExecutor(4)
for url in url_list:
obj = pool.submit(task, url)
obj.add_done_callback(parse)
17.线程Queue
栈 : 就是先进后出 容器
多线程抢占资源: 只能让其串行.
1.互斥锁 2. 队列
import queue
q = queue.Queue(3) 先进先出
q = queue.LifoQueue(3) 先进后出 后进先出 LiFo 堆栈
q = queue.PriorityQueue(3) 自定义优先级,必须是元组 小的比大的优先级高
import queue
先进先出
q = queue.Queue(3)
q.put('常鑫')
q.put('徐怡')
q.put('郭记')
print(q.get())
print(q.get())
print(q.get())
先进后出 后进先出 LiFo 堆栈
q = queue.LifoQueue(3)
q.put('常鑫')
q.put('徐怡')
q.put('郭记')
print(q.get())
print(q.get())
print(q.get())
自定义优先级
q = queue.PriorityQueue(3)
q.put((0, '常鑫'))
q.put((1, '徐怡'))
q.put((-1, '郭记'))
print(q.get())
print(q.get())
print(q.get())
18.事件Event
开启两个线程,一个线程运行到中间的某个阶段,触发另个线程执行.两个线程增加了耦合性
from threading import Event 模块
event = Event() 默认是False
event.wait() 自动等待 监测是否为True
event.wait(1) 阻塞一秒 设置超过时间 直接下一步
线程进程都一样,定一个全局变量,监测成功之后进入服务器,辅助监测然后修改变量
版本一:(自定)
如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作
版本二:
一个线程监测服务器是否开始
另个一线程判断如果开始了,则显示连接成功,此线程只尝试连接3次,1s 一次,如果超过3次,还没有连接成功,则显示连接失败
版本一:
如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作
from threading import Thread
from threading import current_thread
import time
flag = False
def check():
print(f'{current_thread().name} 监测服务器是否开启...')
time.sleep(3)
global flag
flag = True
print(f'{current_thread().name}服务器已经开启...')
def connect():
while 1:
print(f'{current_thread().name} 等待连接...')
time.sleep(0.5)
if flag:
print(f'{current_thread().name} 连接成功...')
break
t1 = Thread(target=check, )
t2 = Thread(target=connect, )
t1.start()
t2.start()
版本二:
一个线程监测服务器是否开始
另个一线程判断如果开始了,则显示连接成功,此线程只尝试连接3次,1s 一次,如果超过3次,还没有连接成功,则显示连接失败
from threading import Thread
from threading import current_thread
from threading import Event
import time
event = Event()
def check():
print(f'{current_thread().name} 监测服务器是否开始...')
time.sleep(3)
event.set()
print('服务器已经开始...')
def connect():
count = 1
while not event.is_set():
if count == 4:
print('连接次数过多,断开连接')
break
print(f'{current_thread().name} 等待连接...')
# event.wait() # 阻塞 直到 event.set() 方法之后
event.wait(1) # 只阻塞1秒,1秒之后如果还没有进行set 直接进行下一步操作.
print(f'{current_thread().name} 尝试连接{count}次连接')
count += 1
else:
print(f'{current_thread().name} 连接成功')
t1 = Thread(target=check, )
t2 = Thread(target=connect, )
t1.start()
t2.start()
18.协程的初识
- 一个线程并发的处理任务.
- 串行: 一个线程执行一个任务,执行完毕之后,执行下一个任务.
- 并行: 多个cpu执行多个任务, 4个cpu 执行4个任务.
- 并发: 一个cpu执行多个任务,看起来像是同时运行.
- 并发真正的核心: 切换并且保持状态.
- 多线程的并发: 3个线程处理10个任务,如果线程1处理的这个任务,遇到阻塞,cpu被操作系统切换到另一个线程,
- 一个线程能否并发的处理任务??? 一个线程处理三个任务.
- 单个cpu: 10个任务,让你给我并发的执行这个10个任务:
- 方式一:开启多进程并发执行, 操作系统切换+保持状态.
- 方式二:开启多线程并发执行,操作系统切换+保持状态.
- 方式三:开启协程并发的执行, 自己的程序 把控着cpu 在3个任务之间来回切换+保持状态
- 对方式三详细解释: 协程他切换速度非常快,蒙蔽操作系统的眼睛,让操作系统认为cpu一直在运行你这一个线程(协程.)
- 协程方式最好
- 为什么?
- 开销小.
- 运行速度快.
- 协程会长期霸占cpu只执行我程序里面的所有任务.
- 并发的本质:就是切换+保持状态.
- 协程处理IO密集型好, 计算密集型串行好.
- 什么是协程? 单个线程并发的处理多个任务. 程序控制协程的切换+保持状态.
- 协程的特点:
- 必须在只有一个单线程里实现并发
- 修改共享数据不需加锁
- 用户程序里自己保存多个控制流的上下文栈(保持状态)
- 附加:一个协程遇到IO操作自动切换到其它协程
- 为什么?
- 工作中:
- 一般在工作中我们都是进程+线程+协程的方式来实现并发,以达到最好的并发效果,如果是4核的cpu,一般起5个进程,每个进程中20个线程(5倍cpu数量),每个线程可以起500个协程,大规模爬取页面的时候,等待网络延迟的时间的时候,我们就可以用协程去实现并发。 并发数量 = 5 * 20 * 500 = 50000个并发,这是一般一个4cpu的机器最大的并发数。nginx在负载均衡的时候最大承载量就是5w个
- 单线程里的这20个任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。。如此,才能提高效率,这就用到了Gevent模块。
19.协程(代码)
-
导入第三方库 gevent greenlet
-
from greenlet import greenlet 导入模块
-
gevent.sleep(1) 模拟的阻塞,不是真正的阻塞
-
g1 = greenlet(函数名) 使用
-
monkey.patch_all() # 打补丁: 将下面的所有的任务的阻塞都打上标记
-
两种写法: g1.join() g2.join()===gevent.joinall([g1, g2]) 第二种写法
-
切换 +保持状态(遇到IO不会主动切换)
from greenlet import greenlet import time def eat(name): print('%s eat 1' % name) # 2 g2.switch('常鑫') # 3 time.sleep(3) print('%s eat 2' % name) # 6 g2.switch() # 7 def play(name): print('%s play 1' % name) # 4 g1.switch() # 5 print('%s play 2' % name) # 8 g1 = greenlet(eat) g2 = greenlet(play) g1.switch('太白') # 1 切换到eat任务
-
协程 模拟的阻塞,不是真正的阻塞
import time import gevent from threading import current_thread def eat(name): print('%s eat 1' % name) print(current_thread().name) gevent.sleep(2) 模拟的阻塞,不是真正的阻塞 # time.sleep(2) print('%s eat 2' % name) def play(name): print('%s play 1' % name) print(current_thread().name) gevent.sleep(1) 模拟的阻塞,不是真正的阻塞 # time.sleep(1) print('%s play 2' % name) g1 = gevent.spawn(eat, 'egon') g2 = gevent.spawn(play, name='egon') print(f'主{current_thread().name}') g1.join() g2.join()
-
高级版 实用版
import time import gevent from gevent import monkey monkey.patch_all() # 打补丁: 将下面的所有的任务的阻塞都打上标记 def eat(name): print('%s eat 1' % name) time.sleep(2) print('%s eat 2' % name) def play(name): print('%s play 1' % name) time.sleep(1) print('%s play 2' % name) g1 = gevent.spawn(eat, 'egon') g2 = gevent.spawn(play, name='egon') g1.join() g2.join() gevent.joinall([g1, g2]) 第二种写法
-