十三、TCP 服务端实现并发
13.1、服务端要求
1、有固定 IP 和 port
2、24 小时不间断提供服务
3、支持并发
13.2、代码
import socket from threading import Thread server = socket.socket() server.bind(('127.0.0.1', 8080)) server.listen(5) def talk(conn): while True: try: data = conn.recv(1024) if not data: break print(data.decode('utf-8')) conn.send(data.upper()) except ConnectionResetError as e: print(e) break conn.close() while True: conn, addr = server.accept() t = Thread(target=talk, args=(conn,)) t.start()
十四、GIL(全局解释器锁)
14.1、Cpython 解释器
1、GIL 是 Cpython 解释器特有的概念
2、在 Cpython 中,同一进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势
14.2、GIL 存在的原因
在一个 python 进程内,所有数据共享,包括解释器的代码。所有进程都要访问解释器的代码去执行,包括垃圾回收进程。因此只有通过为解释器代码加锁
来避免数据在一开始就被垃圾回收机制执行。
14.3、进程与多线程的选择
14.3.1、进程与多线程的特点
1、进程可以利用多核,但开销大
2、线程开销小,无法利用多核优势
14.3.2、结论
1、多线程应用于 I / O 密集型任务,因为再多的 CPU 面对 I / O 都是阻塞,开多进程反而耗费更多内存。
2、多进程应用于计算密集型任务,多核能有效提升计算速度。
十五、死锁现象与递归锁
15.1、死锁现象
两个或两个以上的进程或线程在执行过程中,因争夺资源而造成互相等待的现象。若无外力作用,它们都无法推进下去。
from threading import Thread, Lock
import time
mutexA = Lock()
mutexB = Lock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print(1, self.name)
mutexB.acquire()
print(2, self.name)
mutexB.release()
print(3, self.name)
mutexA.release()
print(4, self.name)
def func2(self):
mutexB.acquire()
print(5, self.name)
time.sleep(1)
mutexA.acquire()
print(6, self.name)
mutexA.release()
print(7, self.name)
mutexB.release()
print(8, self.name)
for i in range(2):
t = MyThread()
t.start()
1 Thread-1
2 Thread-1
3 Thread-1
4 Thread-1
5 Thread-1
1 Thread-2
15.2、递归锁 RLock
可以被第一个抢到该锁的人多次的 acquire 和 release
每 acquire 一次锁的计数加 1
每 release 一次锁的计数减 1
只要锁的计数不为 0,其他线程都不能抢锁
避免了死锁现象
from threading import Thread, RLock
import time
mutexA = mutexB = RLock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print(1, self.name)
mutexB.acquire()
print(2, self.name)
mutexB.release()
print(3, self.name)
mutexA.release()
print(4, self.name)
def func2(self):
mutexB.acquire()
print(5, self.name)
time.sleep(1)
mutexA.acquire()
print(6, self.name)
mutexA.release()
print(7, self.name)
mutexB.release()
print(8, self.name)
for i in range(2):
t = MyThread()
t.start()
1 Thread-1
2 Thread-1
3 Thread-1
4 Thread-1
5 Thread-1
6 Thread-1
7 Thread-1
8 Thread-1
1 Thread-2
2 Thread-2
3 Thread-2
4 Thread-2
5 Thread-2
6 Thread-2
7 Thread-2
8 Thread-2
十六、信号量
1、信号量也是锁,如果指定信号量为5,则同一时间有5个任务拿到锁去执行。
from threading import Thread, Semaphore import threading import time def func(): s.acquire() print(threading.current_thread().getName()) time.sleep(3) s.release() if __name__ == '__main__': s = Semaphore(5) for i in range(10): t = Thread(target=func) t.start()
Thread-1 Thread-2 Thread-3 Thread-4 Thread-5 Thread-7 Thread-8 Thread-6 Thread-9 Thread-10
十七、event 事件
17.1、作用
让一个子进程等待另一个子进程的运行
17.2、方法
1、event.isSet() 返回 event 的状态值
2、event.wait() 如果 event.isSet() ==False 将阻塞线程
3、event.set() 设置 event 的状态值为 True,将所有阻塞池的线程激活进入就绪状态,等待操作系统调度
4、event.clear() 恢复 event 的状态值为 False
17.3、代码
from threading import Event, Thread import time e = Event() def light(): print('红灯亮着') time.sleep(3) e.set() print('绿灯亮了') def car(name): print('%s正在等红灯' % name) e.wait() print('%s加油门飙车了' % name) t = Thread(target=light) t.start()
红灯亮着
老司机0正在等红灯
老司机1正在等红灯
绿灯亮了
老司机0加油门飙车了
老司机1加油门飙车了
十八、线程 queue
18.1、作用:
队列是管道 + 锁,使用队列就不用考虑锁的问题
18.2、三种队列
1、q = queue.Queue()
队列,先进先出
2、q = queue.LifoQueue()
堆栈,先进后厨
3、q = queue.PriorityQueue()
优先级队列,存储数据时可设置优先级的队列
put 进入一个元祖,第一个元素是优先级,数字越小优先级越高。第二个元素是数据
import queue q = queue.PriorityQueue()
q.put((1, 'aaa')) q.put((10, 'bbb')) q.put((100, 'ccc')) print(q.get()) print(q.get()) print(q.get())
(1, 'aaa') (10, 'bbb') (100, 'ccc')
十九、进程池与线程池
19.1、作用:服务器开启的进程数或线程数都会随着并发的客户端数目的增多而增多,这会对服务端主机带来巨大的压力。
因此必须对服务端开启的进程数或线程数加以控制,让机器在一个自己可以承受的范围内运行。
19.2、模块介绍
1、concurrent.futures 模块提供了高度封装的异步调用接口