zoukankan      html  css  js  c++  java
  • XMPP协议学习笔记五(Openfire消息处理流程)

    XMPP协议学习笔记五(Openfire消息处理流程) - nomousewch的专栏 - 博客频道 - CSDN.NET

    XMPP协议作为一个IM,其核心在于消息的传递,在Openfire服务器对XMPP的实现中,消息被封装为Packet对象,因此Openfire服务器的核心代码是对客户端Packet对象的监听和处理流程,我们今天就来研究一下Openfire的消息包接受处理流程。

        

    • 首先,Openfire服务器需要启动一个基于TCP/IP的监听服务,用以接收客户端传过来的XML流文件。这个过程在XMPPServer类的start()方法中进行,这个监听服务是以loadModule(ConnectionManagerImpl.class.getName())来加载,调用ConnectionManagerImpl类的createClientListeners()方法
    1. private void createClientListeners() {  
    2.     // Start clients plain socket unless it's been disabled.  
    3.     if (isClientListenerEnabled()) {  
    4.         // Create SocketAcceptor with correct number of processors  
    5.         socketAcceptor = buildSocketAcceptor();  
    6.         // Customize Executor that will be used by processors to process incoming stanzas  
    7.         ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("client");  
    8.         int eventThreads = JiveGlobals.getIntProperty("xmpp.client.processing.threads"16);  
    9.         ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)threadModel.getExecutor();  
    10.         eventExecutor.setCorePoolSize(eventThreads + 1);  
    11.         eventExecutor.setMaximumPoolSize(eventThreads + 1);  
    12.         eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);  
    13.   
    14.         socketAcceptor.getDefaultConfig().setThreadModel(threadModel);  
    15.         // Add the XMPP codec filter  
    16.         socketAcceptor.getFilterChain().addFirst("xmpp"new ProtocolCodecFilter(new XMPPCodecFactory()));  
    17.         // Kill sessions whose outgoing queues keep growing and fail to send traffic  
    18.         socketAcceptor.getFilterChain().addAfter("xmpp""outCap"new StalledSessionsFilter());  
    19.     }  
    20. }  
        private void createClientListeners() {
            // Start clients plain socket unless it's been disabled.
            if (isClientListenerEnabled()) {
                // Create SocketAcceptor with correct number of processors
                socketAcceptor = buildSocketAcceptor();
                // Customize Executor that will be used by processors to process incoming stanzas
                ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("client");
                int eventThreads = JiveGlobals.getIntProperty("xmpp.client.processing.threads", 16);
                ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)threadModel.getExecutor();
                eventExecutor.setCorePoolSize(eventThreads + 1);
                eventExecutor.setMaximumPoolSize(eventThreads + 1);
                eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);
    
                socketAcceptor.getDefaultConfig().setThreadModel(threadModel);
                // Add the XMPP codec filter
                socketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory()));
                // Kill sessions whose outgoing queues keep growing and fail to send traffic
                socketAcceptor.getFilterChain().addAfter("xmpp", "outCap", new StalledSessionsFilter());
            }
        }
    • 其中的socketAcceptor是在buildSocektAcceptor()方法中定义的,它是作为一个服务端的接收器,是mina框架为我们封装好的一个socketserver,在上面这个方法中,我们为socketAcceptor添加了一个过滤器,XMPPCodeFactory,这个类将过滤xmpp相关请求,加以处理,我们再看同一个类的另外一个方法startClientListener()
    1. private void startClientListeners(String localIPAddress) {  
    2.    // Start clients plain socket unless it's been disabled.  
    3.    if (isClientListenerEnabled()) {  
    4.        int port = getClientListenerPort();  
    5.        try {  
    6.            // Listen on a specific network interface if it has been set.  
    7.            String interfaceName = JiveGlobals.getXMLProperty("network.interface");  
    8.            InetAddress bindInterface = null;  
    9.            if (interfaceName != null) {  
    10.                if (interfaceName.trim().length() > 0) {  
    11.                    bindInterface = InetAddress.getByName(interfaceName);  
    12.                }  
    13.            }  
    14.            // Start accepting connections  
    15.            socketAcceptor  
    16.                    .bind(new InetSocketAddress(bindInterface, port), new ClientConnectionHandler(serverName));  
    17.   
    18.            ports.add(new ServerPort(port, serverName, localIPAddress, falsenull, ServerPort.Type.client));  
    19.   
    20.            List<String> params = new ArrayList<String>();  
    21.            params.add(Integer.toString(port));  
    22.            Log.info(LocaleUtils.getLocalizedString("startup.plain", params));  
    23.        }  
    24.        catch (Exception e) {  
    25.            System.err.println("Error starting XMPP listener on port " + port + ": " +  
    26.                    e.getMessage());  
    27.            Log.error(LocaleUtils.getLocalizedString("admin.error.socket-setup"), e);  
    28.        }  
    29.    }  
         private void startClientListeners(String localIPAddress) {
            // Start clients plain socket unless it's been disabled.
            if (isClientListenerEnabled()) {
                int port = getClientListenerPort();
                try {
                    // Listen on a specific network interface if it has been set.
                    String interfaceName = JiveGlobals.getXMLProperty("network.interface");
                    InetAddress bindInterface = null;
                    if (interfaceName != null) {
                        if (interfaceName.trim().length() > 0) {
                            bindInterface = InetAddress.getByName(interfaceName);
                        }
                    }
                    // Start accepting connections
                    socketAcceptor
                            .bind(new InetSocketAddress(bindInterface, port), new ClientConnectionHandler(serverName));
    
                    ports.add(new ServerPort(port, serverName, localIPAddress, false, null, ServerPort.Type.client));
    
                    List<String> params = new ArrayList<String>();
                    params.add(Integer.toString(port));
                    Log.info(LocaleUtils.getLocalizedString("startup.plain", params));
                }
                catch (Exception e) {
                    System.err.println("Error starting XMPP listener on port " + port + ": " +
                            e.getMessage());
                    Log.error(LocaleUtils.getLocalizedString("admin.error.socket-setup"), e);
                }
            }
        }
    • 其中的socketAcceptor.bind()方法启动了监听服务器,来监听所有发送到服务器5222端口的数据,并用ClientConnetionHandler类来处理,ClinetConnectionHandler继承于ConnectionHandler类,后者实现了mina的IoHandlerAdaptor接口,其中的messageReceived()方法是关键。
    1. public void messageReceived(IoSession session, Object message) throws Exception {  
    2. // Get the stanza handler for this session  
    3. StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);  
    4. // Get the parser to use to process stanza. For optimization there is going  
    5. // to be a parser for each running thread. Each Filter will be executed  
    6. // by the Executor placed as the first Filter. So we can have a parser associated  
    7. // to each Thread  
    8. int hashCode = Thread.currentThread().hashCode();  
    9. XMPPPacketReader parser = parsers.get(hashCode);  
    10. if (parser == null) {  
    11.     parser = new XMPPPacketReader();  
    12.     parser.setXPPFactory(factory);  
    13.     parsers.put(hashCode, parser);  
    14. }  
    15. // Update counter of read btyes  
    16. updateReadBytesCounter(session);  
    17. //System.out.println("RCVD: " + message);  
    18. // Let the stanza handler process the received stanza  
    19. try {  
    20.     handler.process((String) message, parser);  
    21. catch (Exception e) {  
    22.     Log.error("Closing connection due to error while processing message: " + message, e);  
    23.     Connection connection = (Connection) session.getAttribute(CONNECTION);  
    24.     connection.close();  
    25. }  
            public void messageReceived(IoSession session, Object message) throws Exception {
            // Get the stanza handler for this session
            StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);
            // Get the parser to use to process stanza. For optimization there is going
            // to be a parser for each running thread. Each Filter will be executed
            // by the Executor placed as the first Filter. So we can have a parser associated
            // to each Thread
            int hashCode = Thread.currentThread().hashCode();
            XMPPPacketReader parser = parsers.get(hashCode);
            if (parser == null) {
                parser = new XMPPPacketReader();
                parser.setXPPFactory(factory);
                parsers.put(hashCode, parser);
            }
            // Update counter of read btyes
            updateReadBytesCounter(session);
            //System.out.println("RCVD: " + message);
            // Let the stanza handler process the received stanza
            try {
                handler.process((String) message, parser);
            } catch (Exception e) {
                Log.error("Closing connection due to error while processing message: " + message, e);
                Connection connection = (Connection) session.getAttribute(CONNECTION);
                connection.close();
            }
        }
    • 可以看到收到的信息交由StanzaHandler的process方法中进行XML解析并封装为packet对象,然后再进行下一步的处理,至此,从客户端到服务器端的packet传递结束。
  • 相关阅读:
    技术总监7年经验——论程序员的职业发展路线
    2.MySQL入门基本操作初体验
    1.MySQL的安装(linux Ubuntu环境下)
    Boot Petalinux Project Using a remote system
    字符设备驱动、平台设备驱动、设备驱动模型、sysfs的比较和关联
    linux采用模块方法,添加一个新的设备
    在远程服务器上完成本地设备的程序烧写和调试(基于vivado ,SDK软件)
    Linux Master/Baremetal Remote 配置下的裸机调试
    利用Xlinix SDK 建立Linux程序以及对该程序进行调试
    Vivado Launching SDK "Importing Hardware Specification" error的解决方法
  • 原文地址:https://www.cnblogs.com/seven1979/p/4221939.html
Copyright © 2011-2022 走看看