zoukankan      html  css  js  c++  java
  • Tigase 发送消息的流程源码分析

    XMPP 的<message/>节是使用基本的”push”方法来从一个地方到另一个地方得到消息。因为消息通常是不告知的,它们是一种”fire-and-forget”(发射后自寻目的)的机制来从一个地方到另一个地方快速获取信息
    消息节有五种不同的类型,通过 type 属性来进行区分:例如 chat 类型为 chat 的消息在两个实体间的实时对话中交换,例如两个朋友之间的即时通讯聊天。除了 type 属性外,消息节还包括一个 to 和 from 地址,并且也可以包含一个用于跟踪目的的 id  属性(我们在使用更为广泛的 IQ  节中详细的讨论 IDs)。to  地址是预期接收人的
    JabberID,from 地址是发送者的JabberID。from 地址不由发送客户端提供,而是由发送者的服务器添加邮戳,以避免地址欺骗。
    在Tigase中,有两个重要的组成,一个组件,二是插件,可以去官方网去看下他的架构介绍 https://docs.tigase.net/tigase-server/7.1.4/Development_Guide/html/#writePluginCode
    例如最著名的组件的一个例子是MUC或PubSub。在Tigase中,几乎所有东西实际上都是一个组件:会话管理器、s2s连接管理器、消息路由器等等,组件是根据服务器配置加载的,新的组件可以在运行时加载和激活。您可以轻松地替换组件实现,唯一要做的更改是配置条目中的类名。

    Tigase 中定义一个最简单的消息组件,需要实现MessageReceiver或继承 extends AbstractMessageReceiver 类, MessageReceiver 的抽象类: AbstractMessageReceiver 子类 :
    一、ClientConnectionManager
    二、SessionManager
    三、 MessageRouter
    public void setProperties(Map<String, Object> props){
        for (String name : msgrcv_names) {
            mr = conf.getMsgRcvInstance(name);
            if (mr instanceof MessageReceiver) {
                ((MessageReceiver) mr).setParent(this);
                ((MessageReceiver) mr).start();
            }
        }
    }

    1、当客户端发送的message消息到tigase服务端,每个一SOCKET连接都会被包装成IOService对象,IOService包含一系列操作socket的方法(接收发送数据等),processSocketData()接收网络数据,由tigase.net处理解析成xml对象,并将packet放到接收队列receivedPackets中再调用serviceListener.packetsReady(this)。由于ConnectionManager实现IOServiceListener接口,实现上调用的的是ConnectionManager中的packetsReady()来开始处理数据

    此时的packet :packetFrom=null,packetTo=null。
     
    ClientConnectionManager.processSocketData方法中设置packet的一些属性:
    此时: packetFrom=c2s@llooper/192.168.0.33_5222_192.168.0.33_38624, packetTo=sess-man@llooper
    ClientConnectionManager.processSocketData(XMPPIOService<Object>serv)
        JID id = serv.getConnectionId(); //c2s@llooper/192.168.0.33_5222_192.168.0.33_38624
        p.setPacketFrom(id); //packetFrom 设置为onnectionId
        p.setPacketTo(serv.getDataReceiver()); //packetTo 设置为sess-man --> SessionManager 
        addOutPacket(p);//将会委托给父 MessageRouter 路由
        
    }
    //packet 被设置上一些源信息,和目的地信息,接下来,这个数据包将会委托给父 MessageRouter 帮忙路由到 SessionManager组件中进行处理
    packet = (tigase.server.Message) from=c2s@llooper/192.168.0.33_5222_192.168.0.33_38624, to=sess-man@llooper, DATA=<message xmlns="jabber:client" id="44grM-176" type="chat" to="llooper@llooper"><thread>SWjZv5</thread><composing xmlns="http://jabber.org/protocol/chatstates"/></message>, SIZE=170, XMLNS=jabber:client, PRIORITY=NORMAL, PERMISSION=NONE, TYPE=chat
     
    packet = from=c2s@llooper/192.168.0.33_5222_192.168.0.33_38624, to=sess-man@llooper, DATA=<message to="admin@llooper" type="chat" id="2jePE-253" xmlns="jabber:client"><thread>7VKMRq</thread><composing xmlns="http://jabber.org/protocol/chatstates"/></message>, SIZE=168, XMLNS=jabber:client, PRIORITY=NORMAL, PERMISSION=NONE, TYPE=chat
     
    2、MessageRouter.processPacket(Packet packet)部分代码如下:
     
    //我们不会处理没有目标地址的数据包,只是丢弃它们并写一个日志消息
    if (packet.getTo() == null) {
        log.log(Level.WARNING, "Packet with TO attribute set to NULL: {0}", packet);
        return;
    }   
    
    
    //它不是一个服务发现包,我们必须找到一个处理组件
    //下面的代码块是“快速”找到一个组件if
    
    //这个包TO 组件ID,格式在以下一项:
    // 1。组件名+“@”+默认域名
    // 2。组件名+“@”+任何虚拟主机名
    // 3。组件名+ "."+默认域名
    // 4。组件名+ "."+任何虚拟主机名
    
    ServerComponent comp = getLocalComponent(packet.getTo()); //SessionManager
    comp.processPacket(packet, results);

     3、SessionManager.processPacket(final Packet packet)处理,有要代码如下。 例如A->B,这样做的目的是为了首先确定用户A有权限发送packet,然后是确定用户B有权限接收数据。如果用户B不在线,那么离线消息处理器会把packet保存到数据库当中。

    //XMPPResourceConnection session——用户会话保存所有用户会话数据,并提供对用户数据存储库的访问。它只允许在会话的生命周期内将信息存储在永久存储或内存中。如果在分组处理时没有联机用户会话,则此参数可以为空。
    XMPPResourceConnection conn = getXMPPResourceConnection(packet);
    //现在要走SessionManager的处理函数,主要是走插件流程,插件在Tigase中也是一个重要的组成,入口就是在这里,SM plugin
    processPacket(packet, conn);

       插入下SM plugin 流程说明 :

    这个设计有一个惊人的结果。如果你看下面的图片,显示了两个用户之间的通信,你可以看到数据包被复制了两次才送到最终目的地: 

    会话管理器(SessionManager)必须对数据包进行两次处理。第一次以用户A的名义将其作为传出包进行处理,第二次以用户B的名义将其作为传入包进行处理。
    这是为了确保用户A有权限发送一个包,所有的processor都应用到packet上,也为了确保用户B有权限接收packet,所有的processor都应用到packet了。例如,如果用户B是脱机的,那么有一个脱机消息processor应该将包发送到数据库,而不是用户B。
     
    protected XMPPResourceConnection getXMPPResourceConnection(Packet p) {
            XMPPResourceConnection conn = null;
            
            //首先根据这个包的发起者,来查找他的连接资源类,找不到则找接收者的资源类
            JID    from = p.getPacketFrom();
            if (from != null) {
                conn = connectionsByFrom.get(from);
                if (conn != null) {
                    return conn;
                }
            }
    
            //这个接收者它可能是这个服务器上某个用户的消息,让我们为这个用户查找已建立的会话
            JID to = p.getStanzaTo();
    
            if (to != null) {
                if (log.isLoggable(Level.FINEST)) {
                    log.finest("Searching for resource connection for: " + to);
                }
                conn = getResourceConnection(to);
            } else {
    
                // Hm, not sure what should I do now....
                // Maybe I should treat it as message to admin....
                log.log(Level.INFO,
                        "Message without TO attribute set, don''t know what to do wih this: {0}", p);
            }    // end of else
    
            return conn;
        }
        
        
    protected void processPacket(Packet packet, XMPPResourceConnection conn) {
    
        ...
        packet.setPacketTo(getComponentId()); //sess-man@llooper
        ...
    
        if (!stop) {
            //授权匹配的processor处理packet
            walk(packet, conn);
            try {
                if ((conn != null) && conn.getConnectionId().equals(packet.getPacketFrom())) {
                    handleLocalPacket(packet, conn);
                }
            } catch (NoConnectionIdException ex) {
                ...
            }
        }
        
        ...
    }

    packetTo被设置为组件ID(sess-man@llooper),其值原先也是这个。
    其中walk(packet, conn)方法,匹配处理器(授权)。对于message,此处匹配到的processor是amp和message-carbons,message-carbons没有怎么处理,主要是amp在处理,packet被塞amp的队列中等待处理。

    private void walk(final Packet packet, final XMPPResourceConnection connection) {
    
            for (XMPPProcessorIfc proc_t : processors.values()) {
                XMPPProcessorIfc processor = proc_t;
                //根据element和xmlns,授权匹配成功的processor
                Authorization    result    = processor.canHandle(packet, connection);
    
                if (result == Authorization.AUTHORIZED) {
                    ....
                
                    ProcessingThreads pt = workerThreads.get(processor.id());
    
                    if (pt == null) {
                        pt = workerThreads.get(defPluginsThreadsPool);
                    }
                    //packet 放到(addItem)授权了的processor的队列
                    if (pt.addItem(processor, packet, connection)) {
                        packet.processedBy(processor.id());
                    } else {
    
                        ...
                    }
                } else {
                    ...
                }
            }   
        }
    WorkerThread.run() 从队列中取出packet,由SessionManager.process(QueueItem item)给amp处理。
    SessionManager.pocess(QueueItem item) 如下:
    @Override
    public void process(QueueItem item) {
        
        XMPPProcessorIfc processor = item.getProcessor();
    
        try {
            //由授权的 processor 处理 packet
            processor.process(item.getPacket(), item.getConn(), naUserRepository,local_results, plugin_config.get(processor.id()));
            if (item.getConn() != null) {
                setPermissions(item.getConn(), local_results);
            }
            addOutPackets(item.getPacket(), item.getConn(), local_results);
        } catch (PacketErrorTypeException e) {
            ...
        } catch (XMPPException e) {
            ...
        }
    }
    
    
    //其中processor.process()------> MessageAmp.process(),如下:
    
    @Override
    public void process(Packet packet, XMPPResourceConnection session,
            NonAuthUserRepository repo, Queue results, Map settings) throws XMPPException {
        if (packet.getElemName() == "presence") {
            ...
            
        } else {
            Element amp = packet.getElement().getChild("amp", XMLNS);
    
            if ((amp == null) || (amp.getAttributeStaticStr("status") != null)) {
                messageProcessor.process(packet, session, repo, results, settings);
            } else {
                ...
        }
    }
    
    // 其中messageProcessor.process() --------> Message.process(),如下
    
    
    @Override
    public void process(Packet packet, XMPPResourceConnection session,
            NonAuthUserRepository repo, Queue results, Map settings) throws XMPPException {
    
        ...
        try {
            ...
            // 在比较JIDs之前,记住要去除资源部分
            id = (packet.getStanzaFrom() != null)
                    ? packet.getStanzaFrom().getBareJID()
                    : null;
    
            // 检查这是否是来自客户端的数据包
            if (session.isUserId(id)) {
                // 这是来自这个客户端的数据包,最简单的操作是转发到它的目的地:
                // Simple clone the XML element and....
                // ... putting it to results queue is enough
                results.offer(packet.copyElementOnly());
    
                return;
            }
    
            
        } catch (NotAuthorizedException e) {
            ...
        }    // end of try-catch
    }
    检查stanzaFfrom与session匹配通过后,将packet.copyElementOnly()放到results中,作后续投递,原来的packet 就丢弃了。
    此时投递的packet :packetFrom=null,packetTo=null。
    packet在SessionManager.addOutPacket(Packet packet)中判断packetFrom是否为空,为空则将其设置为ComponentId(此处为sess-man@llooper),然后调用父类(AbstractMessageReceiver.java) 的addOutPacket(packet)方法塞到out_queue 队列中。
    此时packet::packetFrom=sess-man@llooper,packetTo=null。
     

    4、上层组件MessageRouter处理,把packet塞到in_queues. 又回到了MessageRouter.processPacket(Packet packet)处理:

     
    不同的是 PacketTo为空,packet.getTo()的返回值是stanzaTo。
    getLocalComponent(packet.getTo());方法根据stanzaTo与compId、comp name、Component都匹配不到。
    此时packet会给组件SessionManager处理,Packet will be processed by: sess-man@llooper,由AbstractMessageReceiver的非阻塞性方法addPacketNB(Packet packet)加入到in_queues。
     
     5、第二次来到SessionManager.processPacket(final Packet packet)处理。不同的是在getXMPPResourceConnection(packet)方法中,
    conn = connectionsByFrom.get(from)返回值是null,所以是根据stanzaTo取获取接收方的session,返回接收方连接的Connection。
    protected XMPPResourceConnection getXMPPResourceConnection(Packet p) {
        XMPPResourceConnection conn = null;
        JID                    from = p.getPacketFrom();
    
        if (from != null) {
            conn = connectionsByFrom.get(from);
            if (conn != null) {
                return conn;
            }
        }
    
        // It might be a message _to_ some user on this server
        // so let's look for established session for this user...
        JID to = p.getStanzaTo();
    
        if (to != null) {
            ...
            conn = getResourceConnection(to);
        } else {
    
            ...
        }    // end of else
    
        return conn;
    }

     6、如同步骤3,此时packet作为一个以用户B的名义将其作为传入包进行处理。

    然后packetTo被设置为组件ID(sess-man@llooper)

    此时packet: packetFrom = sess-man@llooper,packetTo =sess-man@llooper。

    之后packet又经walk(packet, conn)方法,匹配处理器(授权),扔给amp处理。

     如同前面: 直到Message.process(),如下:
    @Override
    public void process(Packet packet, XMPPResourceConnection session,
            NonAuthUserRepository repo, Queue<Packet> results, Map<String, Object> settings) throws XMPPException {
    
        // For performance reasons it is better to do the check
        // before calling logging method.
        if (log.isLoggable(Level.FINEST)) {
            log.log(Level.FINEST, "Processing packet: {0}, for session: {1}", new Object[] {
                    packet,
                    session });
        }
    
        // You may want to skip processing completely if the user is offline.
        if (session == null) {
            processOfflineUser( packet, results );
            return;
        }    // end of if (session == null)
        try {
    
            // Remember to cut the resource part off before comparing JIDs
            BareJID id = (packet.getStanzaTo() != null)
                    ? packet.getStanzaTo().getBareJID()
                    : null;
    
            // Checking if this is a packet TO the owner of the session
            if (session.isUserId(id)) {
                if (log.isLoggable(Level.FINEST)) {
                    log.log(Level.FINEST, "Message 'to' this user, packet: {0}, for session: {1}",
                            new Object[] { packet,
                            session });
                }
    
                if (packet.getStanzaFrom() != null && session.isUserId(packet.getStanzaFrom().getBareJID())) {
                    JID connectionId = session.getConnectionId();
                    if (connectionId.equals(packet.getPacketFrom())) {
                        results.offer(packet.copyElementOnly());
                        // this would cause message packet to be stored in offline storage and will not
                        // send recipient-unavailable error but it will behave the same as a message to
                        // unavailable resources from other sessions or servers
                        return;
                    }
                }
    
                // Yes this is message to 'this' client
                List<XMPPResourceConnection> conns = new ArrayList<XMPPResourceConnection>(5);
    
                // This is where and how we set the address of the component
                // which should rceive the result packet for the final delivery
                // to the end-user. In most cases this is a c2s or Bosh component
                // which keep the user connection.
                String resource = packet.getStanzaTo().getResource();
    
                if (resource == null) {
    
                    // If the message is sent to BareJID then the message is delivered to
                    // all resources
                    conns.addAll(getConnectionsForMessageDelivery(session));
                } else {
    
                    // Otherwise only to the given resource or sent back as error.
                    XMPPResourceConnection con = session.getParentSession().getResourceForResource(
                            resource);
    
                    if (con != null) {
                        conns.add(con);
                    }
                }
    
                // MessageCarbons: message cloned to all resources? why? it should be copied only
                // to resources with non negative priority!!
    
                if (conns.size() > 0) {
                    for (XMPPResourceConnection con : conns) {
                        Packet result = packet.copyElementOnly();
    
                        result.setPacketTo(con.getConnectionId());
    
                        // In most cases this might be skept, however if there is a
                        // problem during packet delivery an error might be sent back
                        result.setPacketFrom(packet.getTo());
    
                        // Don't forget to add the packet to the results queue or it
                        // will be lost.
                        results.offer(result);
                        if (log.isLoggable(Level.FINEST)) {
                            log.log(Level.FINEST, "Delivering message, packet: {0}, to session: {1}",
                                    new Object[] { packet,
                                    con });
                        }
                    }
                } else {
                    // if there are no user connections we should process packet
                    // the same as with missing session (i.e. should be stored if
                    // has type 'chat'
                    processOfflineUser( packet, results );
                }
    
                return;
            }    // end of else
    
            // Remember to cut the resource part off before comparing JIDs
            id = (packet.getStanzaFrom() != null)
                    ? packet.getStanzaFrom().getBareJID()
                    : null;
    
            // Checking if this is maybe packet FROM the client
            if (session.isUserId(id)) {
    
                // This is a packet FROM this client, the simplest action is
                // to forward it to is't destination:
                // Simple clone the XML element and....
                // ... putting it to results queue is enough
                results.offer(packet.copyElementOnly());
    
                return;
            }
    
            // Can we really reach this place here?
            // Yes, some packets don't even have from or to address.
            // The best example is IQ packet which is usually a request to
            // the server for some data. Such packets may not have any addresses
            // And they usually require more complex processing
            // This is how you check whether this is a packet FROM the user
            // who is owner of the session:
            JID jid = packet.getFrom();
    
            // This test is in most cases equal to checking getElemFrom()
            if (session.getConnectionId().equals(jid)) {
    
                // Do some packet specific processing here, but we are dealing
                // with messages here which normally need just forwarding
                Element el_result = packet.getElement().clone();
    
                // If we are here it means FROM address was missing from the
                // packet, it is a place to set it here:
                el_result.setAttribute("from", session.getJID().toString());
    
                Packet result = Packet.packetInstance(el_result, session.getJID(), packet
                        .getStanzaTo());
    
                // ... putting it to results queue is enough
                results.offer(result);
            }
        } catch (NotAuthorizedException e) {
            log.log(Level.FINE, "NotAuthorizedException for packet: " + packet + " for session: " + session, e);
            results.offer(Authorization.NOT_AUTHORIZED.getResponseMessage(packet,
                    "You must authorize session first.", true));
        }    // end of try-catch
    }

    检查stanzaTo与session匹配通过后,根据session拿到接收方所有的连接(可能多端登陆),然后Packet result = packet.copyElementOnly()生成新的packet(原packet丢弃了),并将packetTo设置为接收方连接的ConnectionId(例如:c2s@llooper/192.168.0.33_5222_192.168.0.33_38624),通过addOutPacket()方法塞到out_queue队列。
    此时packet:packetFrom = sess-man@llooper,packetTo =c2s@llooper/192.168.0.33_5222_192.168.0.33_38624。

    7、 如同前面步骤2,不同的是根据packetTo匹配到组件 c2s@llooper

    8、 组件 c2s@llooper 从queue中取出packet,分发到目的地

    public void processPacket(final Packet packet) {
        ...
        if (packet.isCommand() && (packet.getCommand() != Command.OTHER)) {
            ...
        } else {
            // 把packet 发送给客户端
            if (!writePacketToSocket(packet)) {
    
                ...
                
            }
        }    // end of else
    }

    后续有时间会不断更新,欢迎加入QQ群 310790965 更多的交流

     
     
  • 相关阅读:
    Spring.NET学习笔记
    开源项目地址
    委托的实现匿名函数和朗姆达表达式
    c#事件与委托
    c# 时间戳转换
    List 排序
    DDD的好文章
    【转】理解JMeter聚合报告(Aggregate Report)
    【转】JMeter 通过 JDBC 访问 Oracle 和 MySQL
    【转】使用JMeter测试你的EJB
  • 原文地址:https://www.cnblogs.com/eyecool/p/9885062.html
Copyright © 2011-2022 走看看