1. 线程队列
1.1 先进先出(FIFO)
import queue
q = queue.Queue(3)
q.put('a')
q.put('b')
q.put('c')
print(q.qsize()) # 队列大小
print(q.get())
print(q.get())
print(q.get())
print(q.get(timeout=2)) # 阻塞2s后报错
print(q.get(block=False)) # 阻塞后报错
1.2 后进先出(LIFO)堆栈
q = queue.LifoQueue(3)
q.put('a')
q.put('b')
q.put('c')
print(q.qsize())
print(q.get())
print(q.get())
print(q.get())
"""
c
b
a
"""
1.3 优先级队列
# 自定义队列,数字越小优先级越高,元组形式
import queue
q = queue.PriorityQueue(3)
q.put((0, 'a'))
q.put((-1, 'b'))
q.put((1, 'c'))
print(q.qsize())
print(q.get())
print(q.get())
print(q.get())
"""
(-1, 'b')
(0, 'a')
(1, 'c')
"""
面试题:用列表实现队列和堆栈
lst = []
def add():
while 1:
msg = input('>>>').strip()
lst.append(msg)
if len(lst) > 3:
return lst
def pop_lst(lst):
for i in range(len(lst)):
print(lst.pop(0)) # 队列
#print(lst.pop()) # 堆栈
lt = add()
pop_lst(lt)
# 第二种加event
from threading import Thread
from threading import Event
event = Event()
lst = []
def add_queue():
while 1:
msg = input('>>>').strip()
lst.append(msg)
if len(lst) > 3:
event.set()
break
def pop_queue(lst):
event.wait()
for i in range(len(lst)):
print(lst.pop()) #
t1 = Thread(target=add_queue)
t2 = Thread(target=pop_queue,args=(lst,))
t1.start()
t2.start()
2. 事件event
开启两个线程,一个线程运行到中间的某个阶段,触发另一个线程执行。两个线程增加了耦合性。
如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,就需要用threading中的Event模块。
# 一个线程监测服务器是否开启;另一个线程判断如果i哦开启,则能够连接成功,
# 此线程只能尝试连接三次,每次间隔1秒。
from threading import Thread
from threading import Event
from threading import current_thread
import time
event = Event()
def check():
print(f"{current_thread().name}监测服务器是否开启...")
time.sleep(2)
event.set() # 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
print(f"{current_thread().name}服务器开启")
def connect():
count = 1
while 1:
if count > 3:
print(f"{current_thread().name}超时,连接失败!")
break
event.wait(1) # 阻塞,直到event.set()方法之后,才会执行后面的代码;里面可以设置时间,1s后如果还没有event.set(),不等待,直接执行下一步。
print(f"{current_thread().name}等待连接第{count}次")
if event.is_set(): # 判断是否event.set()
print(f"{current_thread().name}连接成功!")
break
count += 1
t1 = Thread(target=ckeck, name="T1")
t2 = Thread(target=connect, name="T2")
t1.start()
t2.start()
3. 协程
并发的本质:切换 + 保持状态。
协程:是单线程下并发的处理任务,又称微线程、纤程。
协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
过程:
开启协程并发执行,自己的程序把控着cpu在多个任务之间来回切换,并且保持状态。
协程切换速度非常快,蒙蔽了操作系统的监测,让操作系统认为cpu一直在运行你这个线程。(协程)
协程处理IO密集型效率高。
优点:
- 开销小;
- 运行速度快;
- 协程会长期霸占cpu,只执行程序里的所有任务,最大限度的利用cpu。
缺点:
- 本质是单线程下,无法利用多核;
- 一旦协程出现阻塞,将会阻塞整个线程,cpu也会被切走。
总结协程的特点:
- 必须在只有一个单线程里实现并发;
- 修改共享数据不需加锁;
- 用户程序里自己保存多个控制流的上下文栈(能够保持状态);
- 一个协程的任务遇到IO操作自动切换到其他任务。
4. Greenlet 模块
from greenlet import greenlet
def eat(name):
print('%s eat 1' %name) #2
g2.switch('taibai') #3
print('%s eat 2' %name) #6
g2.switch() #7
def play(name):
print('%s play 1' %name) #4
g1.switch() #5
print('%s play 2' %name) #8
g1=greenlet(eat)
g2=greenlet(play)
g1.switch('taibai') #可以在第一次switch时传入参数,以后都不需要 1
能够在任务之间直接切换,但是当切到一个任务执行时如果遇到io,那就会原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。
5. Gevent模块
第三方库,可以轻松通过gevent实现并发同步或异步编程,模式是Greenlet。特点就是,遇到IO阻塞就会自动切换任务。
import gevent
import time
from gevent import monkey
monkey.patch_all() # 打补丁: 将下面的所有任务的阻塞都打上标记
def eat(name):
print('%s eat 1' %name)
time.sleep(2) # 阻塞,切换任务
print('%s eat 2' %name)
def play(name):
print('%s play 1' %name)
time.sleep(1) # 阻塞,切换任务
print('%s play 2' %name)
g1 = gevent.spawn(eat, 'meet')
g2 = gevent.spawn(play, 'meet')
gevent.joinall([g1,g2]) # 只有一个线程,因此需join
"""
meet eat 1
meet play 1
meet play 2
meet eat 2
"""
# 如果两个任务同时遇到阻塞,切换一两次后操作系统会将cpu切走,程序挂起,当阻塞完毕后,会抢cpu,执行下面的代码。
一般在工作中我们都是进程+线程+协程的方式来实现并发,以达到最好的并发效果,如果是4核的cpu,一般起5个进程,每个进程中20个线程(5倍cpu数量),每个线程可以起500个协程,大规模爬取页面的时候,等待网络延迟的时间的时候,我们就可以用协程去实现并发。 并发数量 = 5 * 20 * 500 = 50000个并发,这是一般一个4cpu的机器最大的并发数。nginx在负载均衡的时候最大承载量就是5w个.
单线程里的多个任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。。如此,才能提高效率,这就用到了Gevent模块。