多线程实现TCP服务端并发
服务端---封装接口思想
import threading
import socket
def server_interface():
server = socket.socket()
server.bind(('127.0.0.1', 8888))
server.listen()
def multi_handle():
new_server_link, address = server.accept()
print(address)
while True:
try:
res = new_server_link.recv(1024).decode('utf-8')
print(res)
new_server_link.send(res.upper().encode('utf-8'))
if res == 'q':
return
except Exception as e:
print(e)
return
for i in range(10):
t = threading.Thread(target=multi_handle)
t.start()
if __name__ == '__main__':
server_interface()
GIL全局解释器锁
什么是GIL?---global interpreter lock
- 在Cpython解释器中有一把GIL全局解释器锁,本质上是一把互斥锁
- 可以使同一进程下,同一时刻只能运行一个线程
- 优点:执行I/O密集型任务效率和多进程区别不大,反而更节省资源
- 缺点:执行计算密集型任务无法利用多核优势
- 同一进程下多个线程只能实现并发不能实现并行
- 如果一个线程抢占了GIL,当遇到I/O或者执行时间过长时,会强行释放掉GIL锁,使其他线程可以抢占GIL执行任务,从GIL切换出来的线程会有GIL计数,不会被垃圾回收线程回收
为什么要有GIL?
- 因为Cpython自带的垃圾回收机制不是线程安全的,所以要有GIL锁
- 如果没有GIL,则python解释器的垃圾回收线程和任务线程可以并行,在任务线程I/O时,某些变量值引用计数为0,很可能会被回收, 导致数据不安全
死锁问题及解决
死锁问题
- 两个线程
- 线程1拿到了锁头2,想要往下执行释放锁头2需要锁头1
- 线程2拿到了锁头1,想要往下执行释放锁头1需要锁头2
- 它们互相都拿到彼此想要往下执行释放锁的必需条件,所以造成了死锁问题
解决:递归锁
- 递归锁在同一个线程内可以被多次acquire
- 如何释放: 内部相当于维护了一个计数器,也就是说同一个线程acquire了几次就要release几次
import threading
import time
class MyThread(threading.Thread):
def run(self):
self.task1()
self.task2()
def task1(self):
mutex1.acquire()
print(f'{self.name}拿到了 锁1')
mutex2.acquire()
print(f'{self.name}拿到了 锁2')
mutex1.release()
print(f'{self.name}释放了 锁1')
mutex2.release()
print(f'{self.name}释放了 锁2')
def task2(self):
mutex2.acquire()
print(f'{self.name}拿到了 锁2')
time.sleep(0.01) # 阻塞线程使下一个线程拿到锁头1
mutex1.acquire()
print(f'{self.name}拿到了 锁1')
mutex2.release()
print(f'{self.name}释放了 锁2')
mutex1.release()
print(f'{self.name}释放了 锁1')
mutex1 = threading.Lock()
mutex2 = threading.Lock()
# 解决死锁问题:递归锁
mutex1 = mutex2 = threading.RLock() # 两个变量引用同一个递归锁对象的地址,该对象引用计数为2
for i in range(3):
t = MyThread()
t.start()
'''
# 死锁:
Thread-1拿到了 锁1
Thread-1拿到了 锁2
Thread-1释放了 锁1
Thread-1释放了 锁2
Thread-1拿到了 锁2
Thread-2拿到了 锁1
'''
信号量
import threading
import time
def task():
sm.acquire() # 加信号量,信号量本质上是锁
print(f'线程{threading.current_thread().name}开始...')
print('*' * 50)
time.sleep(3)
print(f'线程{threading.current_thread().name}结束...')
print('*' * 50)
sm.release() # 释放信号量
if __name__ == '__main__':
sm = threading.Semaphore(5) # 实例化得到信号量对象,参数控制能够同时并发的线程数量
for i in range(25):
t = threading.Thread(target=task)
t.start()