zoukankan      html  css  js  c++  java
  • OpenFire源码学习之十一:连接管理(下)

    下面是下部分

    C2S

    1、当有客户端进行连接时根据Mina框架的模式首先调用的是sessionOpened方法。

    sessionOpened首先为此新连接构造了一个parser(XMLLightWeightParser),这个parser  是专门给XMPPDecoder(是XMPPCodecFactory的解码器类)使用的,再创建一个 Openfire的Connection类实例connection和一个StanzaHandler的实例。最后将以上的       parser,connection和StanzaHandler的实例存放在Mina的session中,以便以后使用。


    2、当有数据发送过来时,Mina框架会调用messageReceived方法

    messageReceived首先从Mina的session中得到在sessionOpened方法中创建的   StanzaHandler实例handler,然后从parsers中得到一个parser(如果parsers中没有可以 创建一个新的实例)(注意这个parser和在sessionOpened方法中创建的parser不同,这     个parser是用来处理Stanza的,而在sessionOpened方法中创建的parser是在filter中用  来解码的,一句话说就是在sessionOpened方法中创建的parser是更低一层的parser)。      最后将xml数据包交给StanzaHander的实例hander进行处理。

     

    3、StanzaHander的实例hander处理xml数据包的过程

    StanzaHander首先判断xml数据包的类型,.如果数据包以“”打头那么说明客户端刚刚连       接,需要初始化通信(符合XMPP协议)Openfire首先为此客户端建立一个与客户端       JID相关的ClientSession,而后与客户端交互协商例如是否使用SSL,是否使用压缩等       问题。当协商完成之后进入正常通信阶段,则可以将xml数据包交给PacketRouteImpl   模块进行处理。

    4、PacketRouteImpl中包将进一步被细化处理

           PacketRouteImp1将packet分成Message、Iq和Presence,分别交由MessageRouter、          IqRouter和PresenceRouter进行下一步路由。

    5、MessageRoute处理

           调用routetableImp1进行处理,然后交由通过getRoute方法获取session,最后调用           NIOConnection的deliver方法。

    6、IQRoute 处理

           根据IQ的不同的命名空间通过getHandler方法找到相应的iq处理方法进行处理。

           注意:在IQRouter的initialize方法中,iqHandlers.addAll方法会将iq的命名空间与     其对应的处理方法存储到一个map中。

    7、PresenceRoute处理

        调用PresenceUpdateHandler的process方法(处理数据库的更新和缓存的更新),然后    调用Roster的boradcastPresence方法(检查privacylist(隐身及黑名单用户)然后路由给  所有在线好友),再调用routeTable的getRoutes()获取session,最后调用NIOConnection     的deliver方法。

    客户端连接

    管理控制台:会话->有效会话->客户端会话


    会话列表在session-summary.jsp中,

    SessionResultFilter filter = SessionResultFilter.createDefaultSessionFilter();
    filter.setSortOrder(order);
    filter.setStartIndex(start);
    filter.setNumResults(range);
    Collection<ClientSession> sessions = sessionManager.getSessions(filter);
    

    获取在线用户有如下步骤:

    1.会得到SessionResultFilter的会话过滤器

    2.然后在获取所在在线客户会话(ClientSession)。

      客户端的session存在会话路由表中,通过会话路由得到所有的在线用户。

    3.排序列表。当拿到客户端的在线的连接,重新迭代列表集合,通过 sort的方法排序

    4.然后封装过滤后的列表返回给前端

    注:在SessionManager.getSession(SessionResultFilterfiler)的方法应该还可以优化,以后在考虑

     

    关闭客户端连接

    点击会话列表中的可断开连接。

    通过JID(admin@192.169.1.104/Spark2.6.3)从会话路由中查找会话连接。调用close方法关闭

    JID address = new JID(jid);
            try {
                Session sess = sessionManager.getSession(address);
                sess.close();
                // Log the event
                webManager.logEvent("closed session for address "+address, null);
                // wait one second
                Thread.sleep(1000L);
            }
            catch (Exception ignored) {
                // Session might have disappeared on its own
            }
    

    S2S

    控制台的配置

    Openfire中Serverto Server 连接默认使用5269 端口,在管理控制台:服务器->服务器设置->服务器到服务器

    当你选择正常使用的时候,后台会调用ConnectionManager中的enableServerListener()和setServerListenerPort(port);

    方法流程

    流程图:

    在创建服务监听方法中相应的localIPAddress和端口信息被包装成一个SocketAcceptThread,做为一个独立的线程运行。在SocketAcceptThread 里面有一个BlockingAcceptingMode 成员,专门用来监听Socket连接。

    代码清单:

    public void run() {
            while (notTerminated) {
                try {
                    Socket sock = serverSocket.accept();
                    if (sock != null) {
                        Log.debug("Connect " + sock.toString());
                        SocketReader reader =
                                connManager.createSocketReader(sock, false, serverPort, true);
                        Thread thread = new Thread(reader, reader.getName());
                        thread.setDaemon(true);
                        thread.setPriority(Thread.NORM_PRIORITY);
                        thread.start();
                    }
                }
                catch (IOException ie) {
                   。。。。。。。
                }
                catch (Throwable e) {
                   。。。。。。。
                }
            }
        }
    

    每监听到一个连接都会交由一个线程来接管。 这个线程运行SocketReader的run()方法,接收消息,具体的消息的接收和初步处理由SocketReader里面的SocketReadingMode对象(这个对象指定是否使用阻塞和非阻塞套接字连接。)进行。SocketReadingMode反过来调用ServerSocketReader的createSession()方法根据namespace创建Session。所有其它域连接到本地服务器的session都由LocalIncomingServerSession保持,session的创建也是由它来进行。如代码清单:

    boolean createSession(String namespace) throws UnauthorizedException, XmlPullParserException,
                IOException {
            if ("jabber:server".equals(namespace)) {
                // The connected client is a server so create an IncomingServerSession
                session = LocalIncomingServerSession.createSession(serverName, reader, connection);
                return true;
            }
            return false;
        }
    

    session创建时会发送相应的握手信息给对方。初始建立的session是未验证的,此时如果对端发送iq、message、presence等消息由ServerSocketReader接收,其调 用packetReceived() 方法验证域名并抛出PacketRejectedException,所以S2S开始发送的应该为验证信息。本地ServerSocketReader收到验证信息后由processUnknowPacket() 方法处理,调用LocalIncomingServerSession 的validateSubsequentDomain()验证。具体的由一个ServerDialback对象进行处理。刚建立连接时发送了握手信息,这里的ServerDialback对象则验证对方返回的key是否正确,OK的情况下则发送成功通知。验证通过的session会加入到LocalIncomingServerSession的已鉴权列表中,后面发送的iq、message、presence消息就不会再被拦截。

    代码清单:

    public static LocalIncomingServerSession createSession(String serverName,     
                           XMPPPacketReader reader, SocketConnection connection) throws 
                                                    XmlPullParserException, IOException {
            XmlPullParser xpp = reader.getXPPParser();
            String version = xpp.getAttributeValue("", "version");
            int[] serverVersion = version != null ? decodeVersion(version) : new int[]  
                                                                                         {0,0};
            try {
                // 在新的会话中获取stream ID
                StreamID streamID = SessionManager.getInstance().nextStreamID();
                // 与远程服务器创建会话
                LocalIncomingServerSession session = SessionManager.getInstance().
                                     createIncomingServerSession(connection,streamID);
    
                // 发送消息流头部
                StringBuilder openingStream = new StringBuilder();
                openingStream.append("<stream:stream");
                openingStream.append(" xmlns:db="jabber:server:dialback"");
                openingStream.append("xmlns:stream="http:
                                          //etherx.jabber.org/streams"");
                openingStream.append(" xmlns="jabber:server"");
                openingStream.append(" from="").append(serverName).append(""");
                openingStream.append(" id="").append(streamID).append(""");
                
                // OF-443: 无响应,减少对这个域的连接
                if (serverVersion[0] >= 1) {
                    openingStream.append(" version="1.0">");
                } else {
                    openingStream.append(">");
                }
                
                connection.deliverRawText(openingStream.toString());
                if (serverVersion[0] >= 1) {        	
                    // 远程服务器XMPP 1.0兼容所以提供TLS和SASL建立连接(服务器回拨)
    	            // 之时TLS策略用于此连接
    	            Connection.TLSPolicy tlsPolicy =  ServerDialback.isEnabled() ?
    	                                                  Connection.TLSPolicy.optional :
    	                                                  Connection.TLSPolicy.required;
    	            boolean hasCertificates = false;
    	            try {
    	                hasCertificates = SSLConfig.getKeyStore().size() > 0;
    	            }
    	            catch (Exception e) {
    	                Log.error(e.getMessage(), e);
    	            }
    	     if (Connection.TLSPolicy.required == tlsPolicy && !hasCertificates)    {
    	                Log.error("Server session rejected. TLS is required but no  
                                                          certificates " + "were created.");
    	                return null;
    	            }
    	  connection.setTlsPolicy(hasCertificates ? tlsPolicy :         
                                                  Connection.TLSPolicy.disabled);
                }
    
                // 显示压缩策略用于此连接
                String policyName =                                                           
    JiveGlobals.getProperty("xmpp.server.compression.policy",
                        Connection.CompressionPolicy.disabled.toString());
                Connection.CompressionPolicy compressionPolicy =
                        Connection.CompressionPolicy.valueOf(policyName);
                connection.setCompressionPolicy(compressionPolicy);
    
                StringBuilder sb = new StringBuilder();
                
                if (serverVersion[0] >= 1) {        	
                    //远程服务器XMPP 1.0兼容所以提供TLS和SASL建立连接(服务器回拨)            	   
    <span style="white-space:pre">		</span>// 不提供流特性对pre - 1.0服务器,因为它混淆他们。发送功能Openfire < 3.7.1混淆太-的- 443)        
    <span style="white-space: pre;">			</span>sb.append("<stream:features>");
    	            if (JiveGlobals.getBooleanProperty("xmpp.server.tls.enabled", true)) {
    	                sb.append("<starttls xmlns="urn:ietf:params:xml:ns:xmpp-tls">");
    	                if (!ServerDialback.isEnabled()) {
    	                    // 服务器回拨要求TLS关闭	                    sb.append("<required/>");
    	                }
    	                sb.append("</starttls>");
    	            }
    	            
    	            // 引用可用SASL机制
    	            sb.append(SASLAuthentication.getSASLMechanisms(session));
    	            
    	            if (ServerDialback.isEnabled()) {
    	                // 当TLS不是必需的,可以提供服务器回拨	                
    // after TLS has been negotiated and a self-signed certificate is being used
    	                sb.append("<dialback xmlns="urn:xmpp:features:dialback"/>");
    	            }
    
    	            sb.append("</stream:features>");
                }
                
                connection.deliverRawText(sb.toString());
    
                // Set the domain or subdomain of the local server targeted by the remote server
                session.setLocalDomain(serverName);
                return session;
            }
            catch (Exception e) {
                Log.error("Error establishing connection from remote server:" + connection, e);
                connection.close();
                return null;
            }
        }
    

    S2S建立连接

    发送给其它服务器的消息由@domain 部分区分,在进入到服务器路由后在

    RoutingTableImpl.routePacket(Packetpacket) 中与发送给本地服务器的消息分离。

    public void routePacket(JID jid, Packet packet, boolean fromServer) throws PacketException {
            boolean routed = false;
            if (serverName.equals(jid.getDomain())) {
            	   // Packet sent to our domain.
                routed = routeToLocalDomain(jid, packet, fromServer);
            }
            else if (jid.getDomain().contains(serverName)) {
                // Packet sent to component hosted in this server 数据包发送到组件驻留在这个服务器
                routed = routeToComponent(jid, packet, routed);
            }
            else {
                // Packet sent to remote server
                routed = routeToRemoteDomain(jid, packet, routed);
            }
    
            if (!routed) {
                if (Log.isDebugEnabled()) {
                    Log.debug("RoutingTableImpl: Failed to route packet to JID: {} packet: {}", jid, packet.toXML());
                }
                if (packet instanceof IQ) {
                    iqRouter.routingFailed(jid, packet);
                }
                else if (packet instanceof Message) {
                    messageRouter.routingFailed(jid, packet);
                }
                else if (packet instanceof Presence) {
                    presenceRouter.routingFailed(jid, packet);
                }
            }
    }
    

    在初次发送消息给外部服务器时两台服务器的连接还没有建立,这种情况下会将包交由一个OutgoingSessionPromise 对象来处理,将消息加入它的队列。在OutgoingSessionPromise 中保有一个线程池和一个独立线程。独立线程不断从消息队列中读取要处理的packet,并针对每个domain建立一个PacketsProcessor线程,将消息交给这个线程,然后把此线程放入线程池中运行。PacketsProcessor在发送消息包时会判断到外部服务器的连接是否已经建立。未建立的情况下会调用LocalOutgoingDServerSession.authenticateDomain()方法建立连接。具体的Socket连接建立是在authenticateDomain() 方法中经过一系列的验证和鉴权后调用createOutgoingSession(domain,hostname,port)来完成。建立好连接后则重新调用routingTable.routePacket()再进行一次路由。









  • 相关阅读:
    hdu2328 Corporate Identity
    hdu1238 Substrings
    hdu4300 Clairewd’s message
    hdu3336 Count the string
    hdu2597 Simpsons’ Hidden Talents
    poj3080 Blue Jeans
    poj2752 Seek the Name, Seek the Fame
    poj2406 Power Strings
    hust1010 The Minimum Length
    hdu1358 Period
  • 原文地址:https://www.cnblogs.com/huwf/p/4273357.html
Copyright © 2011-2022 走看看