OpenFire源码学习之二十五:消息回执与离线消息(下)
2018-02-06 11:20
357 查看
这一篇紧接着上面继续了。
基于redis的消息回执。主要流程分为下面几个步骤:
1)将消息暂存储与redis中,设置好消息的过期时间
2)客户端回执消息id来消灭暂存的消息
3)开通单独线程论坛在第1)步中的消息。根据消息的时间重新发送消息。如果消息第一次存放的时间大雨有效期(自定义10秒),解析消息中的to查找用户是否还在线。如果在则T掉(因为它长时间不理会服务的重要命令),如果不在线则将消息放置离线表。
OK,先来看看消息的存储格式吧。
1.MESSAGE消息 用户集合
SADD SOGU:[username] [VALUE(messageID)] [VALUE(messageID)] ...
2.已读消息设备集合
SADD RT:[terminalid] [VALUE(messageID)] [VALUE(messageID)] ...
3.消息内容
HMSET OGM:[messageID] CREATIONDATE [VALUE] UPDATEDATE [VALUE] STANZA [VALUE]
4.用户、设备关联
SADD URT:[USERNAME] [VALUE(terminalid)] .......
(先根据消息id查找时间,在java中排序后 查找stanza)
MESSAGE
--离线表
ZADD OFOFFLINE:[username] [INDEX(时间戳)] [VALUE(messageID)] 、[VALUE]、[VALUE]...... [VALUE]
HMSET OFOFFLINE:[messageID] STANZA[VALUE]
CREATIONDATE [VALUE] MESSAGESIZ[VALUE]
将消息暂时消息存储:
[java] view
plain copy
public void storeMessage(String username, Packet packet) {
edis jedis = XMPPServer.getInstance().getGroupRedisManager().getJedis();
String packetID = "";
if (packet instanceof Message)
packetID = ((Message)packet).getID();
else if (packet instanceof IQ)
packetID = ((IQ)packet).getID();
else
return;
try {
jedis.sadd("SOGU:" + username, packetID);
Map<String, String> hash = new HashMap<String, String>();
hash.put("STANZA", packet.toXML());
hash.put("CREATIONDATE", StringUtils.dateToMillis(new Date()));
jedis.hmset("OGM:" + packetID, hash);
finally {
XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis);
htp.execute(addMessagesToDB(packet));
}
private Runnable addMessagesToDB(final Packet packet) {
return new Runnable() {
@Override
public void run() {
MyDBopt.insertMessage(packet);
}
客户端收到消息来回执服务端的操作
[html] view
plain copy
private void handle(IQ packet) {
JID recipientJID = packet.getTo();
if (IQ.Type.crs != packet.getType()) {
// Check if the packet was sent to the server hostname
if (recipientJID != null && recipientJID.getNode() == null &&
recipientJID.getResource() == null && serverName.equals(recipientJID.getDomain())) {
Element childElement = packet.getChildElement();
if (childElement != null && childElement.element("addresses") != null) {
// to route this packet
multicastRouter.route(packet);
return;
}
}
}
if (IQ.Type.crs == packet.getType()) {
String username = packet.getFrom().getNode();
String terminal = packet.getFrom().getTerminal();
String msgId = packet.getID();
if (username == null || msgId == null || "".equals(msgId)) {
return ;
}
if (terminal == null) {terminal = username + "_" + System.currentTimeMillis()%1000000; }
Jedis jedis = XMPPServer.getInstance().getGroupRedisManager().getJedis();
try {
jedis.sadd("URT:" + username, terminal);
jedis.sadd("RT:" + terminal, packet.getID());
} finally {
XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis);
}
threadPool.execute(createTask(msgId, username, terminal));
return;
}
if (packet.getID() != null && (IQ.Type.result == packet.getType() || IQ.Type.error == packet.getType())) {
// The server got an answer to an IQ packet that was sent from the server
IQResultListener iqResultListener = resultListeners.remove(packet.getID());
if (iqResultListener != null) {
resultTimeout.remove(packet.getID());
if (iqResultListener != null) {
try {
iqResultListener.receivedAnswer(packet);
}
catch (Exception e) {
Log.error(
"Error processing answer of remote entity. Answer: "
+ packet.toXML(), e);
}
return;
}
}
}
try {
// Check for registered components, services or remote servers
if (recipientJID != null &&
(routingTable.hasComponentRoute(recipientJID) || routingTable.hasServerRoute(recipientJID))) {
// A component/service/remote server was found that can handle the Packet
routingTable.routePacket(recipientJID, packet, false);
return;
}
if (isLocalServer(recipientJID)) {
// Let the server handle the Packet
Element childElement = packet.getChildElement();
String namespace = null;
if (childElement != null) {
namespace = childElement.getNamespaceURI();
}
if (namespace == null) {
if (packet.getType() != IQ.Type.result && packet.getType() != IQ.Type.error) {
// Do nothing. We can't handle queries outside of a valid namespace
Log.warn("Unknown packet " + packet.toXML());
}
}
else {
// Check if communication to local users is allowed
if (recipientJID != null && userManager.isRegisteredUser(recipientJID.getNode())) {
PrivacyList list =
PrivacyListManager.getInstance().getDefaultPrivacyList(recipientJID.getNode());
if (list != null && list.shouldBlockPacket(packet)) {
// Communication is blocked
if (IQ.Type.set == packet.getType() || IQ.Type.get == packet.getType()) {
// Answer that the service is unavailable
sendErrorPacket(packet, PacketError.Condition.service_unavailable);
}
return;
}
}
IQHandler handler = getHandler(namespace);
if (handler == null) {
if (recipientJID == null) {
// Answer an error since the server can't handle the requested namespace
sendErrorPacket(packet, PacketError.Condition.service_unavailable);
}
else if (recipientJID.getNode() == null ||
"".equals(recipientJID.getNode())) {
// Answer an error if JID is of the form <domain>
sendErrorPacket(packet, PacketError.Condition.feature_not_implemented);
}
else {
// JID is of the form <node@domain>
// Answer an error since the server can't handle packets sent to a node
sendErrorPacket(packet, PacketError.Condition.service_unavailable);
}
}
else {
handler.process(packet);
}
}
}
else {
// JID is of the form <node@domain/resource> or belongs to a remote server
// or to an uninstalled component
routingTable.routePacket(recipientJID, packet, false);
}
}
catch (Exception e) {
......
}
}
离线消息的优化。
同样可以拓展XMPP。比如
客户端获取离线消息,可以这么通讯。
1)先向服务器询问,我总的离线消息的基本状况(有多大,有多少条)
[html] view
plain copy
<iq id="BfI3V-47" to="8ntmorv1ep4wgcy" type="get" from="test@8ntmorv1ep4wgcy">
<query xmlns="http://jabber.org/protocol/offmsg#bif"/>
</iq>
2)服务端返回
[html] view
plain copy
<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">
<query xmlns="http://jabber.org/protocol/offmsg#bifs">
<size>1024b</>
<count>128</>
<idset>1001,1002...</>
</query>
</iq>
3)客户端发送分批获取命令,一次给我发10条发完为止。
[html] view
plain copy
<iq id="BfI3V-47" to="8ntmorv1ep4wgcy" type="get" from="test@8ntmorv1ep4wgcy">
<query xmlns="http://jabber.org/protocol/offmsg#start"/>
<pagesize>10</>
</iq>
4)服务端开始发送消息
[html] view
plain copy
<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">
......
</iq>
<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">
......
</iq>
.....
5)告诉客户端我都发完了
[html] view
plain copy
<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">
<query xmlns="http://jabber.org/protocol/offmsg#end"/>
</iq>
6)客户端本地校验,回执已经接收到的消息
[html] view
plain copy
<message id="BfI3V-47" to="8ntmorv1ep4wgcy"
from="test1@8ntmorv1ep4wgcy/Spark 2.6.3" type="crs"/>
这里本人只是做了一个简单的示意想法。如果需要更加精准的不妨在仔细想想消息处理与格式。
离线消息存储。
将消息存储到redis中:
[java] view
plain copy
public void addMessageToRedis(Message message) {
if (message == null) {
return;
}
JID recipient = message.getTo();
String username = recipient.getNode();
// If the username is null (such as when an anonymous user), don't store.
if (username == null || !UserManager.getInstance().isRegisteredUser(recipient)) {
return;
}
else
if (!XMPPServer.getInstance().getServerInfo().getXMPPDomain().equals(recipient.getDomain())) {
// Do not store messages sent to users of remote servers
return;
}
String msgXML = message.getElement().asXML();
Jedis jedis = XMPPServer.getInstance().getChatMessageJedisPoolManager().getJedis();
try {
String newDate = StringUtils.dateToMillis(new java.util.Date());
String id = MessageIdTactics.mid(username);
jedis.zadd("OFOFFLINE:" + username, Long.valueOf(newDate), id
Map<String, String> hash = new HashMap<String, String>();
hash.put("STANZA", msgXML);
hash.put("MESSAGESIZ", String.valueOf(msgXML.length()));
hash.put("CREATIONDATE", newDate);
jedis.hmset("OFOFFLINE:" + id, hash);
} finally {
XMPPServer.getInstance().getChatMessageJedisPoolManager().returnRes(jedis);
}
if (sizeCache.containsKey(username)) {
int size = sizeCache.get(username);
size += msgXML.length();
sizeCache.put(username, size);
}
htp.execute(addMessageToDB(message));
}
Redis优化这块就到这啦。主要要做的就是:
第一:存储用户或者MUC、Group等这些都需要设置消息存储的生命周期。当用户不处于活跃状态或者长时间不登陆的。要从redis中提出。免得浪费资源。当用户重新加载的时候再将他放置redis中
第二:将需要回执消息和离线消息分开。需要回执的消息需要设置他的生命周期。离线表最好做个定时器。轮询消息。将超时出现范围内的消息(比如周期为一周)的消息同步至关系表中。这里的离线消息需要将用户的设备分开来。
这里要考虑不同的设备终端等很多不同场景,问题会比较绕口。欢迎大家和我邮件交流
方案二
基于redis的消息回执。主要流程分为下面几个步骤:1)将消息暂存储与redis中,设置好消息的过期时间
2)客户端回执消息id来消灭暂存的消息
3)开通单独线程论坛在第1)步中的消息。根据消息的时间重新发送消息。如果消息第一次存放的时间大雨有效期(自定义10秒),解析消息中的to查找用户是否还在线。如果在则T掉(因为它长时间不理会服务的重要命令),如果不在线则将消息放置离线表。
OK,先来看看消息的存储格式吧。
1.MESSAGE消息 用户集合
SADD SOGU:[username] [VALUE(messageID)] [VALUE(messageID)] ...
2.已读消息设备集合
SADD RT:[terminalid] [VALUE(messageID)] [VALUE(messageID)] ...
3.消息内容
HMSET OGM:[messageID] CREATIONDATE [VALUE] UPDATEDATE [VALUE] STANZA [VALUE]
4.用户、设备关联
SADD URT:[USERNAME] [VALUE(terminalid)] .......
(先根据消息id查找时间,在java中排序后 查找stanza)
MESSAGE
--离线表
ZADD OFOFFLINE:[username] [INDEX(时间戳)] [VALUE(messageID)] 、[VALUE]、[VALUE]...... [VALUE]
HMSET OFOFFLINE:[messageID] STANZA[VALUE]
CREATIONDATE [VALUE] MESSAGESIZ[VALUE]
将消息暂时消息存储:
[java] view
plain copy
public void storeMessage(String username, Packet packet) {
edis jedis = XMPPServer.getInstance().getGroupRedisManager().getJedis();
String packetID = "";
if (packet instanceof Message)
packetID = ((Message)packet).getID();
else if (packet instanceof IQ)
packetID = ((IQ)packet).getID();
else
return;
try {
jedis.sadd("SOGU:" + username, packetID);
Map<String, String> hash = new HashMap<String, String>();
hash.put("STANZA", packet.toXML());
hash.put("CREATIONDATE", StringUtils.dateToMillis(new Date()));
jedis.hmset("OGM:" + packetID, hash);
finally {
XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis);
htp.execute(addMessagesToDB(packet));
}
private Runnable addMessagesToDB(final Packet packet) {
return new Runnable() {
@Override
public void run() {
MyDBopt.insertMessage(packet);
}
客户端收到消息来回执服务端的操作
[html] view
plain copy
private void handle(IQ packet) {
JID recipientJID = packet.getTo();
if (IQ.Type.crs != packet.getType()) {
// Check if the packet was sent to the server hostname
if (recipientJID != null && recipientJID.getNode() == null &&
recipientJID.getResource() == null && serverName.equals(recipientJID.getDomain())) {
Element childElement = packet.getChildElement();
if (childElement != null && childElement.element("addresses") != null) {
// to route this packet
multicastRouter.route(packet);
return;
}
}
}
if (IQ.Type.crs == packet.getType()) {
String username = packet.getFrom().getNode();
String terminal = packet.getFrom().getTerminal();
String msgId = packet.getID();
if (username == null || msgId == null || "".equals(msgId)) {
return ;
}
if (terminal == null) {terminal = username + "_" + System.currentTimeMillis()%1000000; }
Jedis jedis = XMPPServer.getInstance().getGroupRedisManager().getJedis();
try {
jedis.sadd("URT:" + username, terminal);
jedis.sadd("RT:" + terminal, packet.getID());
} finally {
XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis);
}
threadPool.execute(createTask(msgId, username, terminal));
return;
}
if (packet.getID() != null && (IQ.Type.result == packet.getType() || IQ.Type.error == packet.getType())) {
// The server got an answer to an IQ packet that was sent from the server
IQResultListener iqResultListener = resultListeners.remove(packet.getID());
if (iqResultListener != null) {
resultTimeout.remove(packet.getID());
if (iqResultListener != null) {
try {
iqResultListener.receivedAnswer(packet);
}
catch (Exception e) {
Log.error(
"Error processing answer of remote entity. Answer: "
+ packet.toXML(), e);
}
return;
}
}
}
try {
// Check for registered components, services or remote servers
if (recipientJID != null &&
(routingTable.hasComponentRoute(recipientJID) || routingTable.hasServerRoute(recipientJID))) {
// A component/service/remote server was found that can handle the Packet
routingTable.routePacket(recipientJID, packet, false);
return;
}
if (isLocalServer(recipientJID)) {
// Let the server handle the Packet
Element childElement = packet.getChildElement();
String namespace = null;
if (childElement != null) {
namespace = childElement.getNamespaceURI();
}
if (namespace == null) {
if (packet.getType() != IQ.Type.result && packet.getType() != IQ.Type.error) {
// Do nothing. We can't handle queries outside of a valid namespace
Log.warn("Unknown packet " + packet.toXML());
}
}
else {
// Check if communication to local users is allowed
if (recipientJID != null && userManager.isRegisteredUser(recipientJID.getNode())) {
PrivacyList list =
PrivacyListManager.getInstance().getDefaultPrivacyList(recipientJID.getNode());
if (list != null && list.shouldBlockPacket(packet)) {
// Communication is blocked
if (IQ.Type.set == packet.getType() || IQ.Type.get == packet.getType()) {
// Answer that the service is unavailable
sendErrorPacket(packet, PacketError.Condition.service_unavailable);
}
return;
}
}
IQHandler handler = getHandler(namespace);
if (handler == null) {
if (recipientJID == null) {
// Answer an error since the server can't handle the requested namespace
sendErrorPacket(packet, PacketError.Condition.service_unavailable);
}
else if (recipientJID.getNode() == null ||
"".equals(recipientJID.getNode())) {
// Answer an error if JID is of the form <domain>
sendErrorPacket(packet, PacketError.Condition.feature_not_implemented);
}
else {
// JID is of the form <node@domain>
// Answer an error since the server can't handle packets sent to a node
sendErrorPacket(packet, PacketError.Condition.service_unavailable);
}
}
else {
handler.process(packet);
}
}
}
else {
// JID is of the form <node@domain/resource> or belongs to a remote server
// or to an uninstalled component
routingTable.routePacket(recipientJID, packet, false);
}
}
catch (Exception e) {
......
}
}
离线消息
离线消息的优化。同样可以拓展XMPP。比如
客户端获取离线消息,可以这么通讯。
1)先向服务器询问,我总的离线消息的基本状况(有多大,有多少条)
[html] view
plain copy
<iq id="BfI3V-47" to="8ntmorv1ep4wgcy" type="get" from="test@8ntmorv1ep4wgcy">
<query xmlns="http://jabber.org/protocol/offmsg#bif"/>
</iq>
2)服务端返回
[html] view
plain copy
<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">
<query xmlns="http://jabber.org/protocol/offmsg#bifs">
<size>1024b</>
<count>128</>
<idset>1001,1002...</>
</query>
</iq>
3)客户端发送分批获取命令,一次给我发10条发完为止。
[html] view
plain copy
<iq id="BfI3V-47" to="8ntmorv1ep4wgcy" type="get" from="test@8ntmorv1ep4wgcy">
<query xmlns="http://jabber.org/protocol/offmsg#start"/>
<pagesize>10</>
</iq>
4)服务端开始发送消息
[html] view
plain copy
<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">
......
</iq>
<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">
......
</iq>
.....
5)告诉客户端我都发完了
[html] view
plain copy
<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">
<query xmlns="http://jabber.org/protocol/offmsg#end"/>
</iq>
6)客户端本地校验,回执已经接收到的消息
[html] view
plain copy
<message id="BfI3V-47" to="8ntmorv1ep4wgcy"
from="test1@8ntmorv1ep4wgcy/Spark 2.6.3" type="crs"/>
这里本人只是做了一个简单的示意想法。如果需要更加精准的不妨在仔细想想消息处理与格式。
离线消息存储。
将消息存储到redis中:
[java] view
plain copy
public void addMessageToRedis(Message message) {
if (message == null) {
return;
}
JID recipient = message.getTo();
String username = recipient.getNode();
// If the username is null (such as when an anonymous user), don't store.
if (username == null || !UserManager.getInstance().isRegisteredUser(recipient)) {
return;
}
else
if (!XMPPServer.getInstance().getServerInfo().getXMPPDomain().equals(recipient.getDomain())) {
// Do not store messages sent to users of remote servers
return;
}
String msgXML = message.getElement().asXML();
Jedis jedis = XMPPServer.getInstance().getChatMessageJedisPoolManager().getJedis();
try {
String newDate = StringUtils.dateToMillis(new java.util.Date());
String id = MessageIdTactics.mid(username);
jedis.zadd("OFOFFLINE:" + username, Long.valueOf(newDate), id
Map<String, String> hash = new HashMap<String, String>();
hash.put("STANZA", msgXML);
hash.put("MESSAGESIZ", String.valueOf(msgXML.length()));
hash.put("CREATIONDATE", newDate);
jedis.hmset("OFOFFLINE:" + id, hash);
} finally {
XMPPServer.getInstance().getChatMessageJedisPoolManager().returnRes(jedis);
}
if (sizeCache.containsKey(username)) {
int size = sizeCache.get(username);
size += msgXML.length();
sizeCache.put(username, size);
}
htp.execute(addMessageToDB(message));
}
Redis优化这块就到这啦。主要要做的就是:
第一:存储用户或者MUC、Group等这些都需要设置消息存储的生命周期。当用户不处于活跃状态或者长时间不登陆的。要从redis中提出。免得浪费资源。当用户重新加载的时候再将他放置redis中
第二:将需要回执消息和离线消息分开。需要回执的消息需要设置他的生命周期。离线表最好做个定时器。轮询消息。将超时出现范围内的消息(比如周期为一周)的消息同步至关系表中。这里的离线消息需要将用户的设备分开来。
这里要考虑不同的设备终端等很多不同场景,问题会比较绕口。欢迎大家和我邮件交流
相关文章推荐
- OpenFire源码学习之二十五:消息回执与离线消息(下)
- OpenFire源码学习之二十五:消息回执与离线消息(下)
- OpenFire源码学习之二十五:消息回执与离线消息(下)
- OpenFire源码学习之二十四:消息回执与离线消息(上)
- OpenFire源码学习之二十四:消息回执与离线消息(上)
- OpenFire源码学习之二十四:消息回执与离线消息(上)
- OpenFire源码学习之二十四:消息回执与离线消息(上)
- openfire服务端消息回执插件(接收方离线时的情况),判断用户的在线状态
- openfire服务端消息回执插件(接收方离线时的情况),判断用户的在线状态
- openfire服务端消息回执插件(接收方离线时的情况),判断用户的在线状态
- (转)OpenFire源码学习之六:用户注册
- (转)OpenFire源码学习之十七:HTTP Service插件
- Openfire 离线消息的处理机制
- 关于XMPP和openfire中的消息回执和聊天状态
- xmpp和OpenFire实例,实现即时聊天室,支持离线消息
- 第67讲:Scala并发编程匿名Actor、消息传递、偏函数实战解析及其在Spark源码中的应用解析学习笔记
- OpenFire源码学习之四:openfire的启动流程
- Android2.1消息应用(Messaging)源码学习笔记
- OpenFire源码学习之三十一:使用Tsung测试openfire(上)
- Android2.1消息应用(Messaging)源码学习笔记