zoukankan      html  css  js  c++  java
  • 即时通信系统Openfire分析之六:路由表 RoutingTable

      还是从会话管理说起

      上一章,Session经过预创建、认证之后,才正常可用。认证时,最重要的操作,就是将Session加入到路由表,使之拥用了通信功能。

      添加到至路由表的操作,是在SessionManager中操作的,如下:

      SessionManager.addSession(LocalClientSession session):

    public void addSession(LocalClientSession session) {
        // Add session to the routing table (routing table will know session is not available yet)
        routingTable.addClientRoute(session.getAddress(), session);
        // Remove the pre-Authenticated session but remember to use the temporary ID as the key
        localSessionManager.getPreAuthenticatedSessions().remove(session.getStreamID().toString());
        SessionEventDispatcher.EventType event = session.getAuthToken().isAnonymous() ?
                SessionEventDispatcher.EventType.anonymous_session_created :
                SessionEventDispatcher.EventType.session_created;
        // Fire session created event.
        SessionEventDispatcher.dispatchEvent(session, event);
        if (ClusterManager.isClusteringStarted()) {
            // Track information about the session and share it with other cluster nodes
            sessionInfoCache.put(session.getAddress().toString(), new ClientSessionInfo(session));
        }
    }

      进入路由表模块, RoutingTableImpl.addClientRoute(session.getAddress(), session)方法:

    public boolean addClientRoute(JID route, LocalClientSession destination) {
        boolean added;
        boolean available = destination.getPresence().isAvailable();
        localRoutingTable.addRoute(route.toString(), destination);
        ......
        return added;
    }

      从这里可以看出,路由表的底层,是借助LocalRoutingTable类来实现。

      路由表的底层数据结构

      LocalRoutingTable类的成员构成,非常的简单:

    Map<String, RoutableChannelHandler> routes = new ConcurrentHashMap<>();

      也就是说,路由表的实质,就是一个Map的数据结构,其Key为JID地址,Velue为RoutableChannelHandler类型报文处理器。

      查看路由表RoutingTableImpl模块中的路由添加方法,可以看到表中存储的是以RoutableChannelHandler衍生出来的几个Session类型,总共提供了三种:

      LocalOutgoingServerSession(用于存储连接本机的远程服务端)、LocalClientSession(用于存储连接到本机的客户端)、RoutableChannelHandler(用于存储组件),类结构如下:

    |-- RoutableChannelHandler
        |-- Session
            |-- LocalSession
                |-- LocalClientSession
                |-- LocalServerSession
                    |-- LocalOutgoingServerSession

      而LocalRoutingTable内的所有方法,就是一系列对这个Map结构的操作函数,核心的如下几个:

      添加路由:

    boolean addRoute(String address, RoutableChannelHandler route) {
        return routes.put(address, route) != route;
    }

      获取路由:

    RoutableChannelHandler getRoute(String address) {
        return routes.get(address);
    }

      获取客户端的Session列表:

    Collection<LocalClientSession> getClientRoutes() {
        List<LocalClientSession> sessions = new ArrayList<>();
        for (RoutableChannelHandler route : routes.values()) {
            if (route instanceof LocalClientSession) {
                sessions.add((LocalClientSession) route);
            }
        }
        return sessions;
    }

      移除路由

    void removeRoute(String address) {
        routes.remove(address);
    }

      还有一个每3分钟一次的定时任务,查询并关闭被闲置了的远程服务器Session,在路由表中启动该任务

    public void start() {
        int period = 3 * 60 * 1000;
        TaskEngine.getInstance().scheduleAtFixedRate(new ServerCleanupTask(), period, period);
    }

      路由表模块 RoutingTable

      路由表是Openfire的核心module之一,RoutingTable接口定义了一系列操作标准,主要围绕路由表进行,提供添加,删除,查询,消息路由等操作,而RoutingTableImpl负责具体实现。

      先来看看RoutingTableImpl的成员列表

    /**
     * 缓存外部远程服务器session
     * Key: server domain, Value: nodeID
     */
    private Cache<String, byte[]> serversCache;
    /**
     * 缓存服务器的组件
     * Key: component domain, Value: list of nodeIDs hosting the component
     */
    private Cache<String, Set<NodeID>> componentsCache;
    /**
     * 缓存已认证的客户端session
     * Key: full JID, Value: {nodeID, available/unavailable}
     */
    private Cache<String, ClientRoute> usersCache;
    /**
     * 缓存已认证匿名的客户端session
     * Key: full JID, Value: {nodeID, available/unavailable}
     */
    private Cache<String, ClientRoute> anonymousUsersCache;
    /**
     * 缓存已认证(包括匿名)的客户端Resource,一个用户,在每一端登录,都会有一个resource
     * Key: bare JID, Value: list of full JIDs of the user
     */
    private Cache<String, Collection<String>> usersSessions;
    
    private String serverName;                              // 服务器的域名
    private XMPPServer server;                              // XMPP服务
    private LocalRoutingTable localRoutingTable;            // 路由表底层
    private RemotePacketRouter remotePacketRouter;          // 远程包路由器
    private IQRouter iqRouter;                              // IQ包路由器
    private MessageRouter messageRouter;                    // Message包路由器
    private PresenceRouter presenceRouter;                  // Presence包路由器
    private PresenceUpdateHandler presenceUpdateHandler;    // 在线状态更新处理器

      成员列表中,除了LocalRoutingTable之外,还定义了一堆的缓存。这些缓存干嘛用?

      Openfire支持集群机制,即在多台服务器上分别运行一个Openfire实例,并使各个实例的数据同步。算法一致,数据一致,用户不管连接到任意一台服务器,效果就都一样。

      集群中的数据同步,除了数据库之外,其他的都是用通过缓存来处理,而上面的这些缓存正是集群同步的一部分,用于同步用户路由信息,每个服务器都会有缓存的副本。

      总的来说,LocalRoutingTable用于存储本机的路由数据,而Cache中是存储了整个集群的路由数据。

      但是,需要注意的一点,LocalRoutingTable与Cache,这两者的数据结构并不相同:

      (1)LocalRoutingTable中记录了本机中所有的Session实例,可以用来通信

      (2)Cache中只存储了用户路由节点信息,需要通过集群管理组件来获取Session实例

      路由表的操作

      路由表的操作,实际上就是在会话管理中,对会话实例的操作。为免与上面混淆,这一节的功能说明,以会话代称。

      添加路由(会话)

      代码如下:

    @Override
    public boolean addClientRoute(JID route, LocalClientSession destination) {
        boolean added;
        boolean available = destination.getPresence().isAvailable();
        
        // 加入到路由表
        localRoutingTable.addRoute(route.toString(), destination);
        
        // 若为匿名客户端,添加到anonymousUsersCache、usersSessions缓存队列中
        if (destination.getAuthToken().isAnonymous()) {
            Lock lockAn = CacheFactory.getLock(route.toString(), anonymousUsersCache);
            try {
                lockAn.lock();
                added = anonymousUsersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) ==
                        null;
            }
            finally {
                lockAn.unlock();
            }
            // Add the session to the list of user sessions
            if (route.getResource() != null && (!available || added)) {
                Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions);
                try {
                    lock.lock();
                    usersSessions.put(route.toBareJID(), Arrays.asList(route.toString()));
                }
                finally {
                    lock.unlock();
                }
            }
        }
        
        // 非匿名客户端,添加到usersCache、usersSessions缓存队列中
        else {
            Lock lockU = CacheFactory.getLock(route.toString(), usersCache);
            try {
                lockU.lock();
                added = usersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null;
            }
            finally {
                lockU.unlock();
            }
            // Add the session to the list of user sessions
            if (route.getResource() != null && (!available || added)) {
                Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions);
                try {
                    lock.lock();
                    Collection<String> jids = usersSessions.get(route.toBareJID());
                    if (jids == null) {
                        // Optimization - use different class depending on current setup
                        if (ClusterManager.isClusteringStarted()) {
                            jids = new HashSet<>();
                        }
                        else {
                            jids = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
                        }
                    }
                    jids.add(route.toString());
                    usersSessions.put(route.toBareJID(), jids);
                }
                finally {
                    lock.unlock();
                }
            }
        }
        return added;
    }

      主要两步:

      (1)添加到路由表
      (2)添加到对应的缓存中

      移除路由(会话)

      代码如下:

    @Override
    public boolean removeClientRoute(JID route) {
        boolean anonymous = false;
        String address = route.toString();
        ClientRoute clientRoute = null;
        
        // 从缓存中移除客户端的Session信息
        Lock lockU = CacheFactory.getLock(address, usersCache);
        try {
            lockU.lock();
            clientRoute = usersCache.remove(address);
        }
        finally {
            lockU.unlock();
        }
        if (clientRoute == null) {
            Lock lockA = CacheFactory.getLock(address, anonymousUsersCache);
            try {
                lockA.lock();
                clientRoute = anonymousUsersCache.remove(address);
                anonymous = true;
            }
            finally {
                lockA.unlock();
            }
        }
        if (clientRoute != null && route.getResource() != null) {
            Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions);
            try {
                lock.lock();
                if (anonymous) {
                    usersSessions.remove(route.toBareJID());
                }
                else {
                    Collection<String> jids = usersSessions.get(route.toBareJID());
                    if (jids != null) {
                        jids.remove(route.toString());
                        if (!jids.isEmpty()) {
                            usersSessions.put(route.toBareJID(), jids);
                        }
                        else {
                            usersSessions.remove(route.toBareJID());
                        }
                    }
                }
            }
            finally {
                lock.unlock();
            }
        }
        
        // 将对应客户端的Session信息,移出路由表
        localRoutingTable.removeRoute(address);
        return clientRoute != null;
    }

      操作与添加类似:
      (1)移除缓存里的路由信息
      (2)移除路由表中的信息

      获取路由(会话)

    @Override
    public ClientSession getClientRoute(JID jid) {
        // Check if this session is hosted by this cluster node
        ClientSession session = (ClientSession) localRoutingTable.getRoute(jid.toString());
        if (session == null) {
            // The session is not in this JVM so assume remote
            RemoteSessionLocator locator = server.getRemoteSessionLocator();
            if (locator != null) {
                // Check if the session is hosted by other cluster node
                ClientRoute route = usersCache.get(jid.toString());
                if (route == null) {
                    route = anonymousUsersCache.get(jid.toString());
                }
                if (route != null) {
                    session = locator.getClientSession(route.getNodeID().toByteArray(), jid);
                }
            }
        }
        return session;
    }

      从上面的方法代码中可以看到,获取路由的方法是:先查找本地路由表,若获取不到对应Session时,则通过集群获取。RemoteSessionLocator是用于适配不同的集群组件所抽象的接口,为不同集群组件提供了透明处理。

      至于如何从集群中获取Session,主要就在于sersCache和anonymousUsersCache这两个cache,它们记录了每个客户端的路由节点信息,通过它可以取得对应的Session实例。详见第八章《集群管理》

      消息路由

      根据发送的形式,分为两种:一是广播、二是单点路由

      1、以广播的形式,向所有在线的客户端发送消息

    @Override
    public void broadcastPacket(Message packet, boolean onlyLocal) {
        // Send the message to client sessions connected to this JVM
        for(ClientSession session : localRoutingTable.getClientRoutes()) {
            session.process(packet);
        }
    
        // Check if we need to broadcast the message to client sessions connected to remote cluter nodes
        if (!onlyLocal && remotePacketRouter != null) {
            remotePacketRouter.broadcastPacket(packet);
        }
    }

      2、单点发送的形式,向某个指定的客户端发送消息

    @Override
    public void routePacket(JID jid, Packet packet, boolean fromServer) throws PacketException {
        
        boolean routed = false;
        try {
            if (serverName.equals(jid.getDomain())) {
                // Packet sent to our domain.
                routed = routeToLocalDomain(jid, packet, fromServer);
                Log.info("routeToLocalDomain");
            }
            else if (jid.getDomain().endsWith(serverName) && hasComponentRoute(jid)) {
                // Packet sent to component hosted in this server
                routed = routeToComponent(jid, packet, routed);
                Log.info("routeToComponent");
            }
            else {
                // Packet sent to remote server
                routed = routeToRemoteDomain(jid, packet, routed);
                Log.info("routeToRemoteDomain");
            }
        } catch (Exception ex) {
            // Catch here to ensure that all packets get handled, despite various processing
            // exceptions, rather than letting any fall through the cracks. For example,
            // an IAE could be thrown when running in a cluster if a remote member becomes 
            // unavailable before the routing caches are updated to remove the defunct node.
            // We have also occasionally seen various flavors of NPE and other oddities, 
            // typically due to unexpected environment or logic breakdowns. 
            Log.error("Primary packet routing failed", ex); 
        }
    
        if (!routed) {
            if (Log.isDebugEnabled()) {
                Log.debug("Failed to route packet to JID: {} packet: {}", jid, packet.toXML());
            }
            if (packet instanceof IQ) {
                iqRouter.routingFailed(jid, packet);
            }
            else if (packet instanceof Message) {
                messageRouter.routingFailed(jid, packet);
            }
            else if (packet instanceof Presence) {
                presenceRouter.routingFailed(jid, packet);
            }
        }
    }

      路由表中的功能,最后由SessionManager集中处理,详见上一章的分析,这里不再赘述。

      特点提一点,比较有用:路由表做为一个module已经在Openfire主服务启动时完成实例化,所以,在自定义的插件、或者其他任何需要发送消息的地方,只需选择调用如下两个方法中之一,即可完成消息发送:

    XMPPServer.getInstance().getRoutingTable().routePacket(jid, packet, fromServer);
    
    XMPPServer.getInstance().getRoutingTable().broadcastPacket(packet, onlyLocal);

      而消息发送中,最后消息如何送到网卡实现发送,在第三章《消息路由》中已经详细分析,同样不再赘述。


      本章就到此结束,OVER!

  • 相关阅读:
    在C#代码中应用Log4Net(二)典型的使用方式
    在C#代码中应用Log4Net(一)简单使用Log4Net
    Windows Azure Active Directory (2) Windows Azure AD基础
    Windows Azure Virtual Network (6) 设置Azure Virtual Machine固定公网IP (Virtual IP Address, VIP) (1)
    Windows Azure Active Directory (1) 前言
    Azure China (6) SAP 应用在华登陆 Windows Azure 公有云
    Microsoft Azure News(3) Azure新的基本实例上线 (Basic Virtual Machine)
    Microsoft Azure News(2) 在Microsoft Azure上运行SAP应用程序
    Microsoft Azure News(1) 新的数据中心Japan East, Japan West and Brazil South
    Windows Azure HandBook (2) Azure China提供的服务
  • 原文地址:https://www.cnblogs.com/Fordestiny/p/7663317.html
Copyright © 2011-2022 走看看