您的位置:首页 > 运维架构

OpenFire源码学习之二十五:消息回执与离线消息(下)

2018-02-06 11:20 429 查看
这一篇紧接着上面继续了。


方案二

基于redis的消息回执。主要流程分为下面几个步骤:

1)将消息暂存储与redis中,设置好消息的过期时间

2)客户端回执消息id来消灭暂存的消息

3)开通单独线程论坛在第1)步中的消息。根据消息的时间重新发送消息。如果消息第一次存放的时间大雨有效期(自定义10秒),解析消息中的to查找用户是否还在线。如果在则T掉(因为它长时间不理会服务的重要命令),如果不在线则将消息放置离线表。

 

OK,先来看看消息的存储格式吧。

1.MESSAGE消息 用户集合

 SADD  SOGU:[username]  [VALUE(messageID)] [VALUE(messageID)] ...

2.已读消息设备集合

 SADD  RT:[terminalid]  [VALUE(messageID)] [VALUE(messageID)] ...

3.消息内容

 HMSET  OGM:[messageID]  CREATIONDATE [VALUE]  UPDATEDATE [VALUE] STANZA [VALUE]

4.用户、设备关联

 SADD URT:[USERNAME]  [VALUE(terminalid)] .......

(先根据消息id查找时间,在java中排序后 查找stanza)

MESSAGE

--离线表

ZADD OFOFFLINE:[username]  [INDEX(时间戳)] [VALUE(messageID)] 、[VALUE]、[VALUE]......              [VALUE]

HMSET OFOFFLINE:[messageID] STANZA[VALUE]

        CREATIONDATE [VALUE]  MESSAGESIZ[VALUE]

 

将消息暂时消息存储:

[java] view
plain copy

 public void storeMessage(String username, Packet packet) {  

      

edis jedis = XMPPServer.getInstance().getGroupRedisManager().getJedis();  

    String packetID = "";  

    if (packet instanceof Message)   

        packetID = ((Message)packet).getID();  

        else if (packet instanceof IQ)   

             packetID = ((IQ)packet).getID();  

        else   

            return;  

      

    try {  

        jedis.sadd("SOGU:" + username, packetID);  

        Map<String, String> hash = new HashMap<String, String>();  

        hash.put("STANZA", packet.toXML());  

        hash.put("CREATIONDATE", StringUtils.dateToMillis(new Date()));  

        jedis.hmset("OGM:" + packetID, hash);  

 finally {  

XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis);  

  

      

    htp.execute(addMessagesToDB(packet));  

 }  

   

 private Runnable addMessagesToDB(final Packet packet) {  

    return new Runnable() {  

@Override  

public void run() {  

    MyDBopt.insertMessage(packet);  

}  

客户端收到消息来回执服务端的操作

[html] view
plain copy

 private void handle(IQ packet) {  

    JID recipientJID = packet.getTo();  

    if (IQ.Type.crs != packet.getType()) {  

         // Check if the packet was sent to the server hostname  

         if (recipientJID != null && recipientJID.getNode() == null &&  

                 recipientJID.getResource() == null && serverName.equals(recipientJID.getDomain())) {  

             Element childElement = packet.getChildElement();  

             if (childElement != null && childElement.element("addresses") != null) {  

                 // to route this packet  

                 multicastRouter.route(packet);  

                 return;  

             }  

         }  

    }  

       

     if (IQ.Type.crs == packet.getType()) {  

        String username = packet.getFrom().getNode();  

        String terminal = packet.getFrom().getTerminal();  

        String msgId = packet.getID();  

        if (username == null || msgId == null || "".equals(msgId)) {  

            return ;  

        }  

        if (terminal == null) {terminal = username + "_" + System.currentTimeMillis()%1000000; }  

        Jedis jedis = XMPPServer.getInstance().getGroupRedisManager().getJedis();  

          

        try {  

            jedis.sadd("URT:" + username, terminal);  

            jedis.sadd("RT:" + terminal, packet.getID());  

} finally {  

    XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis);  

}  

          

        threadPool.execute(createTask(msgId, username, terminal));  

        return;  

     }  

     if (packet.getID() != null && (IQ.Type.result == packet.getType() || IQ.Type.error == packet.getType())) {  

         // The server got an answer to an IQ packet that was sent from the server  

         IQResultListener iqResultListener = resultListeners.remove(packet.getID());  

         if (iqResultListener != null) {  

             resultTimeout.remove(packet.getID());  

             if (iqResultListener != null) {  

                 try {  

                     iqResultListener.receivedAnswer(packet);  

                 }  

                 catch (Exception e) {  

                     Log.error(  

                             "Error processing answer of remote entity. Answer: "  

                                     + packet.toXML(), e);  

                 }  

                 return;  

             }  

         }  

     }  

     try {  

         // Check for registered components, services or remote servers  

         if (recipientJID != null &&  

                 (routingTable.hasComponentRoute(recipientJID) || routingTable.hasServerRoute(recipientJID))) {  

             // A component/service/remote server was found that can handle the Packet  

             routingTable.routePacket(recipientJID, packet, false);  

             return;  

         }  

         if (isLocalServer(recipientJID)) {  

             // Let the server handle the Packet  

             Element childElement = packet.getChildElement();  

             String namespace = null;  

             if (childElement != null) {  

                 namespace = childElement.getNamespaceURI();  

             }  

             if (namespace == null) {  

                 if (packet.getType() != IQ.Type.result && packet.getType() != IQ.Type.error) {  

                     // Do nothing. We can't handle queries outside of a valid namespace  

                     Log.warn("Unknown packet " + packet.toXML());  

                 }  

             }  

             else {  

                 // Check if communication to local users is allowed  

                 if (recipientJID != null && userManager.isRegisteredUser(recipientJID.getNode())) {  

                     PrivacyList list =  

                             PrivacyListManager.getInstance().getDefaultPrivacyList(recipientJID.getNode());  

                     if (list != null && list.shouldBlockPacket(packet)) {  

                         // Communication is blocked  

                         if (IQ.Type.set == packet.getType() || IQ.Type.get == packet.getType()) {  

                             // Answer that the service is unavailable  

                             sendErrorPacket(packet, PacketError.Condition.service_unavailable);  

                         }  

                         return;  

                     }  

                 }  

                 IQHandler handler = getHandler(namespace);  

                 if (handler == null) {  

                     if (recipientJID == null) {  

                         // Answer an error since the server can't handle the requested namespace  

                         sendErrorPacket(packet, PacketError.Condition.service_unavailable);  

                     }  

                     else if (recipientJID.getNode() == null ||  

                             "".equals(recipientJID.getNode())) {  

                         // Answer an error if JID is of the form <domain>  

                         sendErrorPacket(packet, PacketError.Condition.feature_not_implemented);  

                     }  

                     else {  

                         // JID is of the form <node@domain>  

                         // Answer an error since the server can't handle packets sent to a node  

                         sendErrorPacket(packet, PacketError.Condition.service_unavailable);  

                     }  

                 }  

                 else {  

                     handler.process(packet);  

                 }  

             }  

         }  

         else {  

             // JID is of the form <node@domain/resource> or belongs to a remote server  

             // or to an uninstalled component  

             routingTable.routePacket(recipientJID, packet, false);  

         }  

     }  

     catch (Exception e) {  

     ......  

     }  

 }  


离线消息

离线消息的优化。

同样可以拓展XMPP。比如

客户端获取离线消息,可以这么通讯。

1)先向服务器询问,我总的离线消息的基本状况(有多大,有多少条)

[html] view
plain copy

<iq id="BfI3V-47" to="8ntmorv1ep4wgcy" type="get" from="test@8ntmorv1ep4wgcy">  

  <query xmlns="http://jabber.org/protocol/offmsg#bif"/>  

</iq>  

2)服务端返回

[html] view
plain copy

<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">  

  <query xmlns="http://jabber.org/protocol/offmsg#bifs">  

     <size>1024b</>  

     <count>128</>  

     <idset>1001,1002...</>  

  </query>  

</iq>  

3)客户端发送分批获取命令,一次给我发10条发完为止。

[html] view
plain copy

<iq id="BfI3V-47" to="8ntmorv1ep4wgcy" type="get" from="test@8ntmorv1ep4wgcy">  

  <query xmlns="http://jabber.org/protocol/offmsg#start"/>  

   <pagesize>10</>  

</iq>  

4)服务端开始发送消息

[html] view
plain copy

<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">  

  ......  

</iq>  

<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">  

  ......  

</iq>  

.....  

5)告诉客户端我都发完了

[html] view
plain copy

<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">  

  <query xmlns="http://jabber.org/protocol/offmsg#end"/>  

</iq>  

6)客户端本地校验,回执已经接收到的消息

[html] view
plain copy

<message id="BfI3V-47" to="8ntmorv1ep4wgcy"   

 from="test1@8ntmorv1ep4wgcy/Spark 2.6.3" type="crs"/>  

这里本人只是做了一个简单的示意想法。如果需要更加精准的不妨在仔细想想消息处理与格式。

 

离线消息存储。

将消息存储到redis中:

[java] view
plain copy

public void addMessageToRedis(Message message) {  

        if (message == null) {  

            return;  

        }  

        JID recipient = message.getTo();  

        String username = recipient.getNode();  

        // If the username is null (such as when an anonymous user), don't store.  

        if (username == null || !UserManager.getInstance().isRegisteredUser(recipient)) {  

            return;  

        }  

        else  

        if (!XMPPServer.getInstance().getServerInfo().getXMPPDomain().equals(recipient.getDomain())) {  

            // Do not store messages sent to users of remote servers  

            return;  

        }  

        String msgXML = message.getElement().asXML();  

          

        Jedis jedis = XMPPServer.getInstance().getChatMessageJedisPoolManager().getJedis();  

          

        try {  

            String newDate = StringUtils.dateToMillis(new java.util.Date());  

            String id = MessageIdTactics.mid(username);  

            jedis.zadd("OFOFFLINE:" + username, Long.valueOf(newDate), id   

            Map<String, String> hash = new HashMap<String, String>();  

            hash.put("STANZA", msgXML);  

            hash.put("MESSAGESIZ", String.valueOf(msgXML.length()));  

            hash.put("CREATIONDATE", newDate);  

            jedis.hmset("OFOFFLINE:" + id, hash);  

        } finally {  

            XMPPServer.getInstance().getChatMessageJedisPoolManager().returnRes(jedis);  

        }  

          

        if (sizeCache.containsKey(username)) {  

            int size = sizeCache.get(username);  

            size += msgXML.length();  

            sizeCache.put(username, size);  

        }  

        htp.execute(addMessageToDB(message));  

    }  

Redis优化这块就到这啦。主要要做的就是:

第一:存储用户或者MUC、Group等这些都需要设置消息存储的生命周期。当用户不处于活跃状态或者长时间不登陆的。要从redis中提出。免得浪费资源。当用户重新加载的时候再将他放置redis中

第二:将需要回执消息和离线消息分开。需要回执的消息需要设置他的生命周期。离线表最好做个定时器。轮询消息。将超时出现范围内的消息(比如周期为一周)的消息同步至关系表中。这里的离线消息需要将用户的设备分开来。



这里要考虑不同的设备终端等很多不同场景,问题会比较绕口。欢迎大家和我邮件交流
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: