zoukankan      html  css  js  c++  java
  • (转)OpenFire源码学习之十一:连接管理(下)

    转:http://blog.csdn.net/huwenfeng_2011/article/details/43416523

    下面是下部分

    C2S

    1、当有客户端进行连接时根据Mina框架的模式首先调用的是sessionOpened方法。

    sessionOpened首先为此新连接构造了一个parser(XMLLightWeightParser),这个parser  是专门给XMPPDecoder(是XMPPCodecFactory的解码器类)使用的,再创建一个 Openfire的Connection类实例connection和一个StanzaHandler的实例。最后将以上的       parser,connection和StanzaHandler的实例存放在Mina的session中,以便以后使用。

     

    2、当有数据发送过来时,Mina框架会调用messageReceived方法

    messageReceived首先从Mina的session中得到在sessionOpened方法中创建的   StanzaHandler实例handler,然后从parsers中得到一个parser(如果parsers中没有可以 创建一个新的实例)(注意这个parser和在sessionOpened方法中创建的parser不同,这     个parser是用来处理Stanza的,而在sessionOpened方法中创建的parser是在filter中用  来解码的,一句话说就是在sessionOpened方法中创建的parser是更低一层的parser)。      最后将xml数据包交给StanzaHander的实例hander进行处理。

     

    3、StanzaHander的实例hander处理xml数据包的过程

    StanzaHander首先判断xml数据包的类型,.如果数据包以“”打头那么说明客户端刚刚连       接,需要初始化通信(符合XMPP协议)Openfire首先为此客户端建立一个与客户端       JID相关的ClientSession,而后与客户端交互协商例如是否使用SSL,是否使用压缩等       问题。当协商完成之后进入正常通信阶段,则可以将xml数据包交给PacketRouteImpl   模块进行处理。

    4、PacketRouteImpl中包将进一步被细化处理

           PacketRouteImp1将packet分成Message、Iq和Presence,分别交由MessageRouter、          IqRouter和PresenceRouter进行下一步路由。

    5、MessageRoute处理

           调用routetableImp1进行处理,然后交由通过getRoute方法获取session,最后调用           NIOConnection的deliver方法。

    6、IQRoute 处理

           根据IQ的不同的命名空间通过getHandler方法找到相应的iq处理方法进行处理。

           注意:在IQRouter的initialize方法中,iqHandlers.addAll方法会将iq的命名空间与     其对应的处理方法存储到一个map中。

    7、PresenceRoute处理

        调用PresenceUpdateHandler的process方法(处理数据库的更新和缓存的更新),然后    调用Roster的boradcastPresence方法(检查privacylist(隐身及黑名单用户)然后路由给  所有在线好友),再调用routeTable的getRoutes()获取session,最后调用NIOConnection     的deliver方法。

    客户端连接

    管理控制台:会话->有效会话->客户端会话

    会话列表在session-summary.jsp中,

    [java] view plain copy
     
    1. SessionResultFilter filter = SessionResultFilter.createDefaultSessionFilter();  
    2. filter.setSortOrder(order);  
    3. filter.setStartIndex(start);  
    4. filter.setNumResults(range);  
    5. Collection<ClientSession> sessions = sessionManager.getSessions(filter);  

    获取在线用户有如下步骤:

    1.会得到SessionResultFilter的会话过滤器

    2.然后在获取所在在线客户会话(ClientSession)。

      客户端的session存在会话路由表中,通过会话路由得到所有的在线用户。

    3.排序列表。当拿到客户端的在线的连接,重新迭代列表集合,通过 sort的方法排序

    4.然后封装过滤后的列表返回给前端

    注:在SessionManager.getSession(SessionResultFilterfiler)的方法应该还可以优化,以后在考虑

     

    关闭客户端连接

    点击会话列表中的可断开连接。

    通过JID(admin@192.169.1.104/Spark2.6.3)从会话路由中查找会话连接。调用close方法关闭

    [java] view plain copy
     
    1. JID address = new JID(jid);  
    2.         try {  
    3.             Session sess = sessionManager.getSession(address);  
    4.             sess.close();  
    5.             // Log the event  
    6.             webManager.logEvent("closed session for address "+address, null);  
    7.             // wait one second  
    8.             Thread.sleep(1000L);  
    9.         }  
    10.         catch (Exception ignored) {  
    11.             // Session might have disappeared on its own  
    12.         }  

    S2S

    控制台的配置

    Openfire中Serverto Server 连接默认使用5269 端口,在管理控制台:服务器->服务器设置->服务器到服务器

    当你选择正常使用的时候,后台会调用ConnectionManager中的enableServerListener()和setServerListenerPort(port);

    方法流程

    流程图:

    在创建服务监听方法中相应的localIPAddress和端口信息被包装成一个SocketAcceptThread,做为一个独立的线程运行。在SocketAcceptThread 里面有一个BlockingAcceptingMode 成员,专门用来监听Socket连接。

    代码清单:

    [java] view plain copy
     
    1. public void run() {  
    2.         while (notTerminated) {  
    3.             try {  
    4.                 Socket sock = serverSocket.accept();  
    5.                 if (sock != null) {  
    6.                     Log.debug("Connect " + sock.toString());  
    7.                     SocketReader reader =  
    8.                             connManager.createSocketReader(sock, false, serverPort, true);  
    9.                     Thread thread = new Thread(reader, reader.getName());  
    10.                     thread.setDaemon(true);  
    11.                     thread.setPriority(Thread.NORM_PRIORITY);  
    12.                     thread.start();  
    13.                 }  
    14.             }  
    15.             catch (IOException ie) {  
    16.                。。。。。。。  
    17.             }  
    18.             catch (Throwable e) {  
    19.                。。。。。。。  
    20.             }  
    21.         }  
    22.     }  

    每监听到一个连接都会交由一个线程来接管。 这个线程运行SocketReader的run()方法,接收消息,具体的消息的接收和初步处理由SocketReader里面的SocketReadingMode对象(这个对象指定是否使用阻塞和非阻塞套接字连接。)进行。SocketReadingMode反过来调用ServerSocketReader的createSession()方法根据namespace创建Session。所有其它域连接到本地服务器的session都由LocalIncomingServerSession保持,session的创建也是由它来进行。如代码清单:

    [java] view plain copy
     
    1. boolean createSession(String namespace) throws UnauthorizedException, XmlPullParserException,  
    2.             IOException {  
    3.         if ("jabber:server".equals(namespace)) {  
    4.             // The connected client is a server so create an IncomingServerSession  
    5.             session = LocalIncomingServerSession.createSession(serverName, reader, connection);  
    6.             return true;  
    7.         }  
    8.         return false;  
    9.     }  

    session创建时会发送相应的握手信息给对方。初始建立的session是未验证的,此时如果对端发送iq、message、presence等消息由ServerSocketReader接收,其调 用packetReceived() 方法验证域名并抛出PacketRejectedException,所以S2S开始发送的应该为验证信息。本地ServerSocketReader收到验证信息后由processUnknowPacket() 方法处理,调用LocalIncomingServerSession 的validateSubsequentDomain()验证。具体的由一个ServerDialback对象进行处理。刚建立连接时发送了握手信息,这里的ServerDialback对象则验证对方返回的key是否正确,OK的情况下则发送成功通知。验证通过的session会加入到LocalIncomingServerSession的已鉴权列表中,后面发送的iq、message、presence消息就不会再被拦截。

    代码清单:

    [java] view plain copy
     
    1. public static LocalIncomingServerSession createSession(String serverName,       
    2.                        XMPPPacketReader reader, SocketConnection connection) throws   
    3.                                                 XmlPullParserException, IOException {  
    4.         XmlPullParser xpp = reader.getXPPParser();  
    5.         String version = xpp.getAttributeValue("", "version");  
    6.         int[] serverVersion = version != null ? decodeVersion(version) : new int[]    
    7.                                                                                      {0,0};  
    8.         try {  
    9.             // 在新的会话中获取stream ID  
    10.             StreamID streamID = SessionManager.getInstance().nextStreamID();  
    11.             // 与远程服务器创建会话  
    12.             LocalIncomingServerSession session = SessionManager.getInstance().  
    13.                                  createIncomingServerSession(connection,streamID);  
    14.   
    15.             // 发送消息流头部  
    16.             StringBuilder openingStream = new StringBuilder();  
    17.             openingStream.append("<stream:stream");  
    18.             openingStream.append(" xmlns:db="jabber:server:dialback"");  
    19.             openingStream.append("xmlns:stream="http:  
    20.                                       //etherx.jabber.org/streams"");  
    21.             openingStream.append(" xmlns="jabber:server"");  
    22.             openingStream.append(" from="").append(serverName).append(""");  
    23.             openingStream.append(" id="").append(streamID).append(""");  
    24.               
    25.             // OF-443: 无响应,减少对这个域的连接  
    26.             if (serverVersion[0] >= 1) {  
    27.                 openingStream.append(" version="1.0">");  
    28.             } else {  
    29.                 openingStream.append(">");  
    30.             }  
    31.               
    32.             connection.deliverRawText(openingStream.toString());  
    33.             if (serverVersion[0] >= 1) {           
    34.                 // 远程服务器XMPP 1.0兼容所以提供TLS和SASL建立连接(服务器回拨)  
    35.                 // 之时TLS策略用于此连接  
    36.                 Connection.TLSPolicy tlsPolicy =  ServerDialback.isEnabled() ?  
    37.                                                       Connection.TLSPolicy.optional :  
    38.                                                       Connection.TLSPolicy.required;  
    39.                 boolean hasCertificates = false;  
    40.                 try {  
    41.                     hasCertificates = SSLConfig.getKeyStore().size() > 0;  
    42.                 }  
    43.                 catch (Exception e) {  
    44.                     Log.error(e.getMessage(), e);  
    45.                 }  
    46.          if (Connection.TLSPolicy.required == tlsPolicy && !hasCertificates)    {  
    47.                     Log.error("Server session rejected. TLS is required but no    
    48.                                                       certificates " + "were created.");  
    49.                     return null;  
    50.                 }  
    51.       connection.setTlsPolicy(hasCertificates ? tlsPolicy :           
    52.                                               Connection.TLSPolicy.disabled);  
    53.             }  
    54.   
    55.             // 显示压缩策略用于此连接  
    56.             String policyName =                                                             
    57. JiveGlobals.getProperty("xmpp.server.compression.policy",  
    58.                     Connection.CompressionPolicy.disabled.toString());  
    59.             Connection.CompressionPolicy compressionPolicy =  
    60.                     Connection.CompressionPolicy.valueOf(policyName);  
    61.             connection.setCompressionPolicy(compressionPolicy);  
    62.   
    63.             StringBuilder sb = new StringBuilder();  
    64.               
    65.             if (serverVersion[0] >= 1) {           
    66.                 //远程服务器XMPP 1.0兼容所以提供TLS和SASL建立连接(服务器回拨)                     
    [java] view plain copy
     
    1. <span style="white-space:pre">        </span>// 不提供流特性对pre - 1.0服务器,因为它混淆他们。发送功能Openfire < 3.7.1混淆太-的- 443)          
    [java] view plain copy
     
    1. <span style="white-space: pre;">          </span>sb.append("<stream:features>");  
    [java] view plain copy
     
    1.          if (JiveGlobals.getBooleanProperty("xmpp.server.tls.enabled", true)) {  
    2.              sb.append("<starttls xmlns="urn:ietf:params:xml:ns:xmpp-tls">");  
    3.              if (!ServerDialback.isEnabled()) {  
    4.                  // 服务器回拨要求TLS关闭                        sb.append("<required/>");  
    5.              }  
    6.              sb.append("</starttls>");  
    7.          }  
    8.            
    9.          // 引用可用SASL机制  
    10.          sb.append(SASLAuthentication.getSASLMechanisms(session));  
    11.            
    12.          if (ServerDialback.isEnabled()) {  
    13.              // 当TLS不是必需的,可以提供服务器回拨                   
    14. fter TLS has been negotiated and a self-signed certificate is being used  
    15.              sb.append("<dialback xmlns="urn:xmpp:features:dialback"/>");  
    16.          }  
    17.   
    18.          sb.append("</stream:features>");  
    19.         }  
    20.           
    21.         connection.deliverRawText(sb.toString());  
    22.   
    23.         // Set the domain or subdomain of the local server targeted by the remote server  
    24.         session.setLocalDomain(serverName);  
    25.         return session;  
    26.     }  
    27.     catch (Exception e) {  
    28.         Log.error("Error establishing connection from remote server:" + connection, e);  
    29.         connection.close();  
    30.         return null;  
    31.     }  
    32. }  

    S2S建立连接

    发送给其它服务器的消息由@domain 部分区分,在进入到服务器路由后在

    RoutingTableImpl.routePacket(Packetpacket) 中与发送给本地服务器的消息分离。

    [java] view plain copy
     
    1. public void routePacket(JID jid, Packet packet, boolean fromServer) throws PacketException {  
    2.         boolean routed = false;  
    3.         if (serverName.equals(jid.getDomain())) {  
    4.                // Packet sent to our domain.  
    5.             routed = routeToLocalDomain(jid, packet, fromServer);  
    6.         }  
    7.         else if (jid.getDomain().contains(serverName)) {  
    8.             // Packet sent to component hosted in this server 数据包发送到组件驻留在这个服务器  
    9.             routed = routeToComponent(jid, packet, routed);  
    10.         }  
    11.         else {  
    12.             // Packet sent to remote server  
    13.             routed = routeToRemoteDomain(jid, packet, routed);  
    14.         }  
    15.   
    16.         if (!routed) {  
    17.             if (Log.isDebugEnabled()) {  
    18.                 Log.debug("RoutingTableImpl: Failed to route packet to JID: {} packet: {}", jid, packet.toXML());  
    19.             }  
    20.             if (packet instanceof IQ) {  
    21.                 iqRouter.routingFailed(jid, packet);  
    22.             }  
    23.             else if (packet instanceof Message) {  
    24.                 messageRouter.routingFailed(jid, packet);  
    25.             }  
    26.             else if (packet instanceof Presence) {  
    27.                 presenceRouter.routingFailed(jid, packet);  
    28.             }  
    29.         }  
    30. }  

    在初次发送消息给外部服务器时两台服务器的连接还没有建立,这种情况下会将包交由一个OutgoingSessionPromise 对象来处理,将消息加入它的队列。在OutgoingSessionPromise 中保有一个线程池和一个独立线程。独立线程不断从消息队列中读取要处理的packet,并针对每个domain建立一个PacketsProcessor线程,将消息交给这个线程,然后把此线程放入线程池中运行。PacketsProcessor在发送消息包时会判断到外部服务器的连接是否已经建立。未建立的情况下会调用LocalOutgoingDServerSession.authenticateDomain()方法建立连接。具体的Socket连接建立是在authenticateDomain() 方法中经过一系列的验证和鉴权后调用createOutgoingSession(domain,hostname,port)来完成。建立好连接后则重新调用routingTable.routePacket()再进行一次路由。

  • 相关阅读:
    R 语言
    Maven
    IntelliJ IDEA
    Windows Terminal
    pip
    批处理编程案例
    Windows DOS命令批处理脚本
    Day15 T1 库特的向量
    Day12 T1 少女觉
    Day10 T2 邦德
  • 原文地址:https://www.cnblogs.com/wangle1001986/p/7217659.html
Copyright © 2011-2022 走看看