zoukankan      html  css  js  c++  java
  • ss源码学习--工作流程

    ss的local端和server端的工作流程相似,因此复用了TCPRelay类和TCPRelayHandler类。
    两端均是使用TCPRelay类监听连接,并使用TCPRelayHandler类处理请求。

    以server端为例:

    # server.py
        ...
        tcp_servers = []
        tcp_servers.append(tcprelay.TCPRelay(a_config, dns_resolver, False))
        ...
    class 
        def run_server():
            ...
            try:
                loop = eventloop.EventLoop()
                dns_resolver.add_to_loop(loop)
                list(map(lambda s: s.add_to_loop(loop), tcp_servers + udp_servers))
    
                daemon.set_user(config.get('user', None))
                loop.run()
            except Exception as e: 
                shell.print_exception(e)
                sys.exit(1)
    
    

    这里创建了一个TCPRelay对象以及EventLoop,并将所有tcp_server加入eventloop中。

    在TCPRelay的构造函数中,打开了一个监听套接字,监听1024端口。

    # tcpreply.py
    
    class TCPRelay:
        def __init__(self, config, dns_resolver, is_local, stat_callback=None):
            self._config = config
            # 该标志用来区分该对象是客户端还是服务端
            self._is_local = is_local
            self._dns_resolver = dns_resolver
            self._closed = False
            self._eventloop = None
    
            # 字典存放的为(fd,handle) 键值对,用来保存每个fd对应的handle函数
            self._fd_to_handlers = {}
    
            self._timeout = config['timeout']
    
            self._timeouts = []  # a list for all the handlers
            # we trim the timeouts once a while
            self._timeout_offset = 0   # last checked position for timeout
            self._handler_to_timeouts = {}  # key: handler value: index in timeouts
    
            if is_local:
                listen_addr = config['local_address']
                listen_port = config['local_port']
            else:
                listen_addr = config['server']
                listen_port = config['server_port']
            self._listen_port = listen_port
    
            addrs = socket.getaddrinfo(listen_addr, listen_port, 0,
                                       socket.SOCK_STREAM, socket.SOL_TCP)
            if len(addrs) == 0:
                raise Exception("can't get addrinfo for %s:%d" %
                                (listen_addr, listen_port))
            af, socktype, proto, canonname, sa = addrs[0]
    
            server_socket = socket.socket(af, socktype, proto)
            server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            server_socket.bind(sa)
    
            server_socket.setblocking(False)
    
            if config['fast_open']:
                try:
                    server_socket.setsockopt(socket.SOL_TCP, 23, 5)
                except socket.error:
                    logging.error('warning: fast open is not available')
                    self._config['fast_open'] = False
    
            server_socket.listen(1024)
            self._server_socket = server_socket
            self._stat_callback = stat_callback
    
    

    然后执行add_to_loop()将自己加入到eventloop中。

    # tcprelay.py
    
    class TCPRelay:
        def add_to_loop(self, loop):
            if self._eventloop:
                raise Exception('already add to loop')
            if self._closed:
                raise Exception('already closed')
            self._eventloop = loop
    
            # 将事件加入到loop,监听IN和ERR
            self._eventloop.add(self._server_socket,
                                eventloop.POLL_IN | eventloop.POLL_ERR, self)
            self._eventloop.add_periodic(self.handle_periodic)
    
    

    这里实质上是调用了eventloop的add

    # eventloop.py
    
    class EventLoop:
        def add(self, f, mode, handler):
            fd = f.fileno()
            self._fdmap[fd] = (f, handler)
            self._impl.register(fd, mode)
    

    注意这里是将TCPRelay这个类自身作为handler放入_fdmap字典中。这个字典的作用主要是存储每个描述符对应的handler,当事件发生时,从中取出对应的handler对事件进行处理。

    接着执行loop.run()开始事件循环。

    # eventloop.py
    class EventLoop:
        def run(self):
            # 事件集合
            events = []
            while not self._stopping:
                asap = False
                try:
                    # 返回所有事件
                    events = self.poll(TIMEOUT_PRECISION)
                except (OSError, IOError) as e:
                    if errno_from_exception(e) in (errno.EPIPE, errno.EINTR):
                        # EPIPE: Happens when the client closes the connection
                        # EINTR: Happens when received a signal
                        # handles them as soon as possible
                        asap = True
                        logging.debug('poll:%s', e)
                    else:
                        logging.error('poll:%s', e)
                        import traceback
                        traceback.print_exc()
                        continue
    
                for sock, fd, event in events:
                    # 对于监听到的事件,获取其对应的handle
                    handler = self._fdmap.get(fd, None)
                    if handler is not None:
                        # handler[0]为socket
                        handler = handler[1]
                        try:
                            # 使用对应的handle处理这个事件
                            handler.handle_event(sock, fd, event)
                        except (OSError, IOError) as e:
                            shell.print_exception(e)
                now = time.time()
                if asap or now - self._last_time >= TIMEOUT_PRECISION:
                    for callback in self._periodic_callbacks:
                        callback()
                    self._last_time = now
    
    

    事件发生时,首先根据fd获取其对应的handler,并调用handler.handle_event()

    # tcprelay.py
    class TCPRelay:
        def handle_event(self, sock, fd, event):
            # handle events and dispatch to handlers
            if sock:
                logging.log(shell.VERBOSE_LEVEL, 'fd %d %s', fd,
                            eventloop.EVENT_NAMES.get(event, event))
            # 如果为监听套接字
            if sock == self._server_socket:
                if event & eventloop.POLL_ERR:
                    # TODO
                    raise Exception('server_socket error')
                try:
                    logging.debug('accept')
                    # 尝试接受客户端的连接
                    conn = self._server_socket.accept()
                    TCPRelayHandler(self, self._fd_to_handlers,
                                    self._eventloop, conn[0], self._config,
                                    self._dns_resolver, self._is_local)
                except (OSError, IOError) as e:
                    error_no = eventloop.errno_from_exception(e)
                    if error_no in (errno.EAGAIN, errno.EINPROGRESS,
                                    errno.EWOULDBLOCK):
                        return
                    else:
                        shell.print_exception(e)
                        if self._config['verbose']:
                            traceback.print_exc()
            # 不是监听套接字,说明是已连接的客户端有可读操作
            else:
                if sock:
                    handler = self._fd_to_handlers.get(fd, None)
                    if handler:
                        handler.handle_event(sock, event)
                else:
                    logging.warn('poll removed fd')
    
    

    如果是监听套接字的事件,则说明有新的客户端连接。先通过_fdmap.get()来获取handler(这里为TCPRelay对象) 接着accept,然后创建了一个TCPRelayHandler对象。看一下构造函数:

    # tcprelay.py
    
    class TCPRelayHandler(object):
        def __init__(self, server, fd_to_handlers, loop, local_sock, config,
                     dns_resolver, is_local):
            self._server = server
            self._fd_to_handlers = fd_to_handlers
            self._loop = loop
            self._local_sock = local_sock
            self._remote_sock = None
            self._config = config
            self._dns_resolver = dns_resolver
    
            # TCP Relay works as either sslocal or ssserver
            # if is_local, this is sslocal
            self._is_local = is_local
            self._stage = STAGE_INIT
            self._encryptor = encrypt.Encryptor(config['password'],
                                                config['method'])
            self._fastopen_connected = False
            self._data_to_write_to_local = []
            self._data_to_write_to_remote = []
            self._upstream_status = WAIT_STATUS_READING
            self._downstream_status = WAIT_STATUS_INIT
            self._client_address = local_sock.getpeername()[:2]
            self._remote_address = None
            if 'forbidden_ip' in config:
                self._forbidden_iplist = config['forbidden_ip']
            else:
                self._forbidden_iplist = None
            if is_local:
                self._chosen_server = self._get_a_server()
    
            # 将自身作为handler加入fd_to_handlers
            fd_to_handlers[local_sock.fileno()] = self
            local_sock.setblocking(False)
            local_sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
    
            # 加入loop的handle仍然是TCPRelay
            loop.add(local_sock, eventloop.POLL_IN | eventloop.POLL_ERR,
                     self._server)
            self.last_activity = 0
            self._update_activity()
    

    这里将与客户端连接的套接字加入了loop,handler为self._server即那个监听到该连接的TCPRelay对象。

    接着上面事件循环,如果是已连接的套接字产生事件,则通过_fdmap.get()获取到那个监听到它的TCPRelay对象,然后在handle_event的过程中,通过_fd_to_handlers.get()获取对应的TCPRelayHandler对象,再使用TCPRelayHandler.handle_event()来处理具体的读写事件。

    ps:描述的好乱,抽时间再整理下吧。

    参考:
    ss2.8 源码

  • 相关阅读:
    [javascript] vuejs为输入框增加回车事件
    iview上的兼容性问题
    python+vscode安装与插件配置
    Chrome浏览器获取XPATH的方法----通过开发者工具获取
    使用谷歌浏览器定位xpath是否准确
    [PHP] xpath提取网页数据内容
    PHP中preg_match正则匹配的/u /i /s是什么意思
    Flink connect 算子实践
    DataStreamUtils 连续keyBy 优化
    Heartbeat原理及部署
  • 原文地址:https://www.cnblogs.com/cknightx/p/7574325.html
Copyright © 2011-2022 走看看