zoukankan      html  css  js  c++  java
  • OpenFire源码学习之二十五:消息回执与离线消息(下)

    这一篇紧接着上面继续了。

    方案二

    基于redis的消息回执。主要流程分为下面几个步骤:

    1)将消息暂存储与redis中,设置好消息的过期时间

    2)客户端回执消息id来消灭暂存的消息

    3)开通单独线程论坛在第1)步中的消息。根据消息的时间重新发送消息。如果消息第一次存放的时间大雨有效期(自定义10秒),解析消息中的to查找用户是否还在线。如果在则T掉(因为它长时间不理会服务的重要命令),如果不在线则将消息放置离线表。

    OK,先来看看消息的存储格式吧。

    1.MESSAGE消息 用户集合

     SADD  SOGU:[username]  [VALUE(messageID)] [VALUE(messageID)] ...

    2.已读消息设备集合

     SADD  RT:[terminalid]  [VALUE(messageID)] [VALUE(messageID)] ...

    3.消息内容

     HMSET  OGM:[messageID]  CREATIONDATE [VALUE]  UPDATEDATE [VALUE] STANZA [VALUE]

    4.用户、设备关联

     SADD URT:[USERNAME]  [VALUE(terminalid)] .......

    (先根据消息id查找时间,在java中排序后 查找stanza)

    MESSAGE

    --离线表

    ZADD OFOFFLINE:[username]  [INDEX(时间戳)] [VALUE(messageID)] 、[VALUE]、[VALUE]......              [VALUE]

    HMSET OFOFFLINE:[messageID] STANZA[VALUE]

            CREATIONDATE [VALUE]  MESSAGESIZ[VALUE]

    将消息暂时消息存储:

        public void storeMessage(String username, Packet packet) {
        	
    		Jedis jedis = XMPPServer.getInstance().getGroupRedisManager().getJedis();
        	String packetID = "";
        	if (packet instanceof Message) 
        		packetID = ((Message)packet).getID();
       		else if (packet instanceof IQ) 
       			 packetID = ((IQ)packet).getID();
       		else 
       			return;
        	
        	try {
        		jedis.sadd("SOGU:" + username, packetID);
            	Map<String, String> hash = new HashMap<String, String>();
            	hash.put("STANZA", packet.toXML());
            	hash.put("CREATIONDATE", StringUtils.dateToMillis(new Date()));
            	jedis.hmset("OGM:" + packetID, hash);
    		} finally {
    			XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis);
    		}
        	
        	htp.execute(addMessagesToDB(packet));
        }
        
        private Runnable addMessagesToDB(final Packet packet) {
        	return new Runnable() {
    			@Override
    			public void run() {
    				MyDBopt.insertMessage(packet);
    			}
    		
    

    客户端收到消息来回执服务端的操作

        private void handle(IQ packet) {
        	JID recipientJID = packet.getTo();
        	if (IQ.Type.crs != packet.getType()) {
                // Check if the packet was sent to the server hostname
                if (recipientJID != null && recipientJID.getNode() == null &&
                        recipientJID.getResource() == null && serverName.equals(recipientJID.getDomain())) {
                    Element childElement = packet.getChildElement();
                    if (childElement != null && childElement.element("addresses") != null) {
                        // to route this packet
                        multicastRouter.route(packet);
                        return;
                    }
                }
        	}
            
            if (IQ.Type.crs == packet.getType()) {
            	String username = packet.getFrom().getNode();
            	String terminal = packet.getFrom().getTerminal();
            	String msgId = packet.getID();
            	if (username == null || msgId == null || "".equals(msgId)) {
            		return ;
            	}
            	if (terminal == null) {terminal = username + "_" + System.currentTimeMillis()%1000000; }
            	Jedis jedis = XMPPServer.getInstance().getGroupRedisManager().getJedis();
            	
            	try {
            		jedis.sadd("URT:" + username, terminal);
                	jedis.sadd("RT:" + terminal, packet.getID());
    			} finally {
    				XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis);
    			}
            	
        		threadPool.execute(createTask(msgId, username, terminal));
            	return;
            }
            if (packet.getID() != null && (IQ.Type.result == packet.getType() || IQ.Type.error == packet.getType())) {
                // The server got an answer to an IQ packet that was sent from the server
                IQResultListener iqResultListener = resultListeners.remove(packet.getID());
                if (iqResultListener != null) {
                    resultTimeout.remove(packet.getID());
                    if (iqResultListener != null) {
                        try {
                            iqResultListener.receivedAnswer(packet);
                        }
                        catch (Exception e) {
                            Log.error(
                                    "Error processing answer of remote entity. Answer: "
                                            + packet.toXML(), e);
                        }
                        return;
                    }
                }
            }
            try {
                // Check for registered components, services or remote servers
                if (recipientJID != null &&
                        (routingTable.hasComponentRoute(recipientJID) || routingTable.hasServerRoute(recipientJID))) {
                    // A component/service/remote server was found that can handle the Packet
                    routingTable.routePacket(recipientJID, packet, false);
                    return;
                }
                if (isLocalServer(recipientJID)) {
                    // Let the server handle the Packet
                    Element childElement = packet.getChildElement();
                    String namespace = null;
                    if (childElement != null) {
                        namespace = childElement.getNamespaceURI();
                    }
                    if (namespace == null) {
                        if (packet.getType() != IQ.Type.result && packet.getType() != IQ.Type.error) {
                            // Do nothing. We can't handle queries outside of a valid namespace
                            Log.warn("Unknown packet " + packet.toXML());
                        }
                    }
                    else {
                        // Check if communication to local users is allowed
                        if (recipientJID != null && userManager.isRegisteredUser(recipientJID.getNode())) {
                            PrivacyList list =
                                    PrivacyListManager.getInstance().getDefaultPrivacyList(recipientJID.getNode());
                            if (list != null && list.shouldBlockPacket(packet)) {
                                // Communication is blocked
                                if (IQ.Type.set == packet.getType() || IQ.Type.get == packet.getType()) {
                                    // Answer that the service is unavailable
                                    sendErrorPacket(packet, PacketError.Condition.service_unavailable);
                                }
                                return;
                            }
                        }
                        IQHandler handler = getHandler(namespace);
                        if (handler == null) {
                            if (recipientJID == null) {
                                // Answer an error since the server can't handle the requested namespace
                                sendErrorPacket(packet, PacketError.Condition.service_unavailable);
                            }
                            else if (recipientJID.getNode() == null ||
                                    "".equals(recipientJID.getNode())) {
                                // Answer an error if JID is of the form <domain>
                                sendErrorPacket(packet, PacketError.Condition.feature_not_implemented);
                            }
                            else {
                                // JID is of the form <node@domain>
                                // Answer an error since the server can't handle packets sent to a node
                                sendErrorPacket(packet, PacketError.Condition.service_unavailable);
                            }
                        }
                        else {
                            handler.process(packet);
                        }
                    }
                }
                else {
                    // JID is of the form <node@domain/resource> or belongs to a remote server
                    // or to an uninstalled component
                    routingTable.routePacket(recipientJID, packet, false);
                }
            }
            catch (Exception e) {
            ......
            }
        }
    

    离线消息

    离线消息的优化。

    同样可以拓展XMPP。比如

    客户端获取离线消息,可以这么通讯。

    1)先向服务器询问,我总的离线消息的基本状况(有多大,有多少条)

    <iq id="BfI3V-47" to="8ntmorv1ep4wgcy" type="get" from="test@8ntmorv1ep4wgcy">
      <query xmlns="http://jabber.org/protocol/offmsg#bif"/>
    </iq>
    

    2)服务端返回

    <iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">
      <query xmlns="http://jabber.org/protocol/offmsg#bifs">
         <size>1024b</>
         <count>128</>
         <idset>1001,1002...</>
      </query>
    </iq>
    

    3)客户端发送分批获取命令,一次给我发10条发完为止。

    <iq id="BfI3V-47" to="8ntmorv1ep4wgcy" type="get" from="test@8ntmorv1ep4wgcy">
      <query xmlns="http://jabber.org/protocol/offmsg#start"/>
       <pagesize>10</>
    </iq>
    

    4)服务端开始发送消息

    <iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">
      ......
    </iq>
    <iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">
      ......
    </iq>
    .....
    

    5)告诉客户端我都发完了

    <iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">
      <query xmlns="http://jabber.org/protocol/offmsg#end"/>
    </iq>
    

    6)客户端本地校验,回执已经接收到的消息

    <message id="BfI3V-47" to="8ntmorv1ep4wgcy" 
     from="test1@8ntmorv1ep4wgcy/Spark 2.6.3" type="crs"/>
    

    这里本人只是做了一个简单的示意想法。如果需要更加精准的不妨在仔细想想消息处理与格式。

    离线消息存储。

    将消息存储到redis中:

    public void addMessageToRedis(Message message) {
        	if (message == null) {
                return;
            }
            JID recipient = message.getTo();
            String username = recipient.getNode();
            // If the username is null (such as when an anonymous user), don't store.
            if (username == null || !UserManager.getInstance().isRegisteredUser(recipient)) {
                return;
            }
            else
            if (!XMPPServer.getInstance().getServerInfo().getXMPPDomain().equals(recipient.getDomain())) {
                // Do not store messages sent to users of remote servers
                return;
            }
            String msgXML = message.getElement().asXML();
            
        	Jedis jedis = XMPPServer.getInstance().getChatMessageJedisPoolManager().getJedis();
        	
        	try {
        		String newDate = StringUtils.dateToMillis(new java.util.Date());
            	String id = MessageIdTactics.mid(username);
            	jedis.zadd("OFOFFLINE:" + username, Long.valueOf(newDate), id 
            	Map<String, String> hash = new HashMap<String, String>();
            	hash.put("STANZA", msgXML);
            	hash.put("MESSAGESIZ", String.valueOf(msgXML.length()));
            	hash.put("CREATIONDATE", newDate);
            	jedis.hmset("OFOFFLINE:" + id, hash);
    		} finally {
    			XMPPServer.getInstance().getChatMessageJedisPoolManager().returnRes(jedis);
    		}
        	
        	if (sizeCache.containsKey(username)) {
                int size = sizeCache.get(username);
                size += msgXML.length();
                sizeCache.put(username, size);
            }
        	htp.execute(addMessageToDB(message));
        }
    

    Redis优化这块就到这啦。主要要做的就是:

    第一:存储用户或者MUC、Group等这些都需要设置消息存储的生命周期。当用户不处于活跃状态或者长时间不登陆的。要从redis中提出。免得浪费资源。当用户重新加载的时候再将他放置redis中

    第二:将需要回执消息和离线消息分开。需要回执的消息需要设置他的生命周期。离线表最好做个定时器。轮询消息。将超时出现范围内的消息(比如周期为一周)的消息同步至关系表中。这里的离线消息需要将用户的设备分开来。

    这里要考虑不同的设备终端等很多不同场景,问题会比较绕口。欢迎大家和我邮件交流。

  • 相关阅读:
    如果向某网址Post信息,并得到CookieContainer以便以后直接通过验证
    使用WebClient自动填写并提交ASP.NET页面表单的源代码
    mysql中文乱码,phpmyadmin乱码,php乱码 产生原因及其解决方法
    自己修改的一个WebRequest+代理+POST登录
    HttpWebRequest POST 数据时请求头多了一行Expect: 100continue,少了数据行
    [zt]Windows 2000下PHP5+IIS的安装方法(ISAPI方式)
    GridView控件修改、删除示例(修改含有DropDownList控件)
    GridView的常用操作(增删改查)
    ASP.net 验证码(C#)
    IE对input元素onchange事件的支持BUG
  • 原文地址:https://www.cnblogs.com/huwf/p/4273343.html
Copyright © 2011-2022 走看看