源码执行流程
自己模仿一个(提取代码)
服务器类
import socket import threading import selectors class TCPServer: def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): self.server_address = server_address # IP端口 self.RequestHandlerClass = RequestHandlerClass # 处理handelr self.__is_shut_down = threading.Event() self.__shutdown_request = False self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.bind(self.server_address) # 绑定IP, 端口 self.socket.listen(5) def serve_forever(self, poll_interval=0.5): self.__is_shut_down.clear() # 清除事件标志 print("serve_forever") try: with selectors.SelectSelector() as selector: # 类似select的监听 selector.register(self, selectors.EVENT_READ) # while not self.__shutdown_request: # 循环去监听 print(111111111) ready = selector.select(poll_interval) if ready: self._handle_request_noblock() except: raise def _handle_request_noblock(self): request, client_address = self.socket.accept() try: self.process_request(request, client_address) except: self.shutdown_request(request) def process_request(self, request, client_address): t = threading.Thread(target=self.process_request_thread, args=(request, client_address)) t.start() def process_request_thread(self, request, client_address): self.finish_request(request, client_address) def finish_request(self, request, client_address): self.RequestHandlerClass(request, client_address, self) def shutdown_request(self, request): request.colse() def fileno(self): return self.socket.fileno()
handle
class Handle: def __init__(self, request, client_address, server): self.request = request # 请求 self.client_address = client_address # self.server = server # 服务器 try: self.handle() # 初始化时就会去执行handle()方法 finally: self.finish() def handle(self): pass def finish(self): pass
ps:
fileno()方法, select实际上是监听的套接字该方法, 因为我们传入的不是套接字, 所以我们实现了该方法, 并且返回套接字的状态
threading.Event()
创建一个事件, 其中存在一个标记, 如果该标记位False时, 执行event.wait()就会阻塞
相关方法:
set() : 将标记设置为True, 并通知所有处于阻塞状态的线程恢复至运行状态
clear() : 将标记设为False
wait(timeout) : 通过标记位来判断是否阻塞
isSet() : 获取内部标志状态
示例:
# coding:utf-8 import threading import time event = threading.Event() def chihuoguo(name): # 等待事件,进入等待阻塞状态 print('%s %s 就位' % (threading.currentThread().getName(),name)) event.wait() # 收到事件后进入运行状态 print('%s %s 到达终点' % (threading.currentThread().getName(), name)) # 设置线程组 threads = [] # 创建新线程 thread1 = threading.Thread(target=chihuoguo, args=("a", )) thread2 = threading.Thread(target=chihuoguo, args=("b", )) # 添加到线程组 threads.append(thread1) threads.append(thread2) print("预备") # 开启线程 for thread in threads: thread.start() time.sleep(1) # 发送事件通知 print('BOMB') event.set()
selectors
selectors模块封装了select, 大概600来行, 也是一个比较简单的模块. 这里没有仔细去看.
不过这篇文章写得很详细 : https://www.cnblogs.com/zzzlw/p/9384308.html