openfire 接受消息流程
2016-09-01 15:03
483 查看
博客分类:
openfire & xmpp
openfire底层采用了MINA框架,它是采用事件监听的方式,其中IoHandler接口定义了不同的事件类型,因此根据不同的事件类型做相关的处理
Apache MINA 是一个网络应用框架,有助于用户非常方便地开发高性能、高伸缩性的网络应用。它通过Java NIO提供了一个抽象的、事件驱动的、异步的位于各种传输协议(如TCP/IP和UDP/IP)之上的API,Apache MINA 通常可被称之为:
NIO 框架库;
客户端/服务器框架库;
或者一个网络socket库。
Apache MINA 是一个网络应用程序框架,它对Java中的socket和NIO进行了有效和清晰的封装,方便开发人员开发TCP/UDP程序,从而抛开在使用原始的socket时需要考虑的各种繁杂而又烦人问题(线程、性能、会话等),把更多精力专著在应用中的业务逻辑的开发上
Java代码
public interface IoHandler
{
//创建session
public abstract void sessionCreated(IoSession iosession)
throws Exception;
//开启session
public abstract void sessionOpened(IoSession iosession)
throws Exception;
//关闭session
public abstract void sessionClosed(IoSession iosession)
throws Exception;
//session空闲
public abstract void sessionIdle(IoSession iosession, IdleStatus idlestatus)
throws Exception;
//异常处理
public abstract void exceptionCaught(IoSession iosession, Throwable throwable)
throws Exception;
//接收消息
public abstract void messageReceived(IoSession iosession, Object obj)
throws Exception;
//发送消息
public abstract void messageSent(IoSession iosession, Object obj)
throws Exception;
}
抽象类ConnectionHandler继承了IoHandlerAdapter类,而IoHandlerAdapter实现了IoHandler接口:
public abstract class ConnectionHandler extends IoHandlerAdapter
下面是ConnectionHandler类实现关于messageReceived事件的实现方法
Java代码
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
// Get the stanza handler for this session
//得到当前会话的StanzaHandler,这个对象在sessionOpened事件对应的方法中已经创建了,可以参考sessionOpened()的实现
StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);
// Get the parser to use to process stanza. For optimization there is going
// to be a parser for each running thread. Each Filter will be executed
// by the Executor placed as the first Filter. So we can have a parser associated
// to each Thread
int hashCode = Thread.currentThread().hashCode();
XMPPPacketReader parser = parsers.get(hashCode);
if (parser == null) {
parser = new XMPPPacketReader();
parser.setXPPFactory(factory);
parsers.put(hashCode, parser);
}
// Update counter of read btyes
updateReadBytesCounter(session);
//System.out.println("RCVD: " + message);
// Let the stanza handler process the received stanza
try {
//处理接收的message,parser为XMPPPacketReader对象,用来解析XML字符串,因为openfire信息之间的传递全部都是XML格式的字符串(XMPP协议)
//下面方法的实现参考StanzaHandler.process(String stanza, XMPPPacketReader reader)方法
handler.process((String) message, parser);
} catch (Exception e) {
Log.error("Closing connection due to error while processing message: " + message, e);
Connection connection = (Connection) session.getAttribute(CONNECTION);
connection.close();
}
}
@Override
public void sessionOpened(IoSession session) throws Exception {
// Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter.
final XMLLightweightParser parser = new XMLLightweightParser(CHARSET);
session.setAttribute(XML_PARSER, parser);
// Create a new NIOConnection for the new session
final NIOConnection connection = createNIOConnection(session);
session.setAttribute(CONNECTION, connection);
//createStanzaHandler方法是当前类的一个抽象类,具体实现需要查看ConnectionHandler的子类是如何实现的,
session.setAttribute(HANDLER, createStanzaHandler(connection));
// Set the max time a connection can be idle before closing it. This amount of seconds
// is divided in two, as Openfire will ping idle clients first (at 50% of the max idle time)
// before disconnecting them (at 100% of the max idle time). This prevents Openfire from
// removing connections without warning.
final int idleTime = getMaxIdleTime() / 2;
if (idleTime > 0) {
session.setIdleTime(IdleStatus.READER_IDLE, idleTime);
}
}
下面我就以ClientConnectionHandler类作为例子来讲解
Java代码
public class ClientConnectionHandler extends ConnectionHandler
public ClientStanzaHandler(PacketRouter router, String serverName, Connection connection) {
super(router, serverName, connection);
}
@Override
StanzaHandler createStanzaHandler(NIOConnection connection) {
return new ClientStanzaHandler(XMPPServer.getInstance().getPacketRouter(), serverName, connection);
}
StanzaHandler类:A StanzaHandler is the main responsible for handling incoming stanzas.
Java代码
public void process(String stanza, XMPPPacketReader reader) throws Exception {
boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream");
if (!sessionCreated || initialStream) {
if (!initialStream) {
// Allow requests for flash socket policy files directly on the client listener port
if (stanza.startsWith("<policy-file-request/>")) {
String crossDomainText = FlashCrossDomainServlet.CROSS_DOMAIN_TEXT +
XMPPServer.getInstance().getConnectionManager().getClientListenerPort() +
FlashCrossDomainServlet.CROSS_DOMAIN_END_TEXT + '\0';
connection.deliverRawText(crossDomainText);
return;
}
else {
// Ignore <?xml version="1.0"?>
return;
}
}
// Found an stream:stream tag...
if (!sessionCreated) {
sessionCreated = true;
MXParser parser = reader.getXPPParser();
parser.setInput(new StringReader(stanza));
createSession(parser);
}
else if (startedTLS) {
startedTLS = false;
tlsNegotiated();
}
else if (startedSASL && saslStatus == SASLAuthentication.Status.authenticated) {
startedSASL = false;
saslSuccessful();
}
else if (waitingCompressionACK) {
waitingCompressionACK = false;
compressionSuccessful();
}
return;
}
// Verify if end of stream was requested
if (stanza.equals("</stream:stream>")) {
session.close();
return;
}
// Ignore <?xml version="1.0"?> stanzas sent by clients
if (stanza.startsWith("<?xml")) {
return;
}
// Create DOM object from received stanza
Element doc = reader.read(new StringReader(stanza)).getRootElement();
if (doc == null) {
// No document found.
return;
}
String tag = doc.getName();
if ("starttls".equals(tag)) {
// Negotiate TLS
if (negotiateTLS()) {
startedTLS = true;
}
else {
connection.close();
session = null;
}
}
else if ("auth".equals(tag)) {
// User is trying to authenticate using SASL
startedSASL = true;
// Process authentication stanza
saslStatus = SASLAuthentication.handle(session, doc);
}
else if (startedSASL && "response".equals(tag)) {
// User is responding to SASL challenge. Process response
saslStatus = SASLAuthentication.handle(session, doc);
}
else if ("compress".equals(tag)) {
// Client is trying to initiate compression
if (compressClient(doc)) {
// Compression was successful so open a new stream and offer
// resource binding and session establishment (to client sessions only)
waitingCompressionACK = true;
}
}
else {
//最终处理消息,doc就是发送过来的XML字符串转化为Element对象
process(doc);
}
}
private void process(Element doc) throws UnauthorizedException {
if (doc == null) {
return;
}
// Ensure that connection was secured if TLS was required
if (connection.getTlsPolicy() == Connection.TLSPolicy.required &&
!connection.isSecure()) {
closeNeverSecuredConnection();
return;
}
String tag = doc.getName();
//消息类型是<message>打头的,表示消息是message类型,这个就是对XMPP协议的解释
if ("message".equals(tag)) {
Message packet;
try {
packet = new Message(doc, !validateJIDs());
}
catch (IllegalArgumentException e) {
Log.debug("Rejecting packet. JID malformed", e);
// The original packet contains a malformed JID so answer with an error.
Message reply = new Message();
reply.setID(doc.attributeValue("id"));
reply.setTo(session.getAddress());
reply.getElement().addAttribute("from", doc.attributeValue("to"));
reply.setError(PacketError.Condition.jid_malformed);
session.process(reply);
return;
}
processMessage(packet);
}
//消息类型是<presence>打头的,表示消息请求用户的状态
else if ("presence".equals(tag)) {
Presence packet;
try {
packet = new Presence(doc, !validateJIDs());
}
catch (IllegalArgumentException e) {
Log.debug("Rejecting packet. JID malformed", e);
// The original packet contains a malformed JID so answer an error
Presence reply = new Presence();
reply.setID(doc.attributeValue("id"));
reply.setTo(session.getAddress());
reply.getElement().addAttribute("from", doc.attributeValue("to"));
reply.setError(PacketError.Condition.jid_malformed);
session.process(reply);
return;
}
// Check that the presence type is valid. If not then assume available type
try {
packet.getType();
}
catch (IllegalArgumentException e) {
Log.warn("Invalid presence type", e);
// The presence packet contains an invalid presence type so replace it with
// an available presence type
packet.setType(null);
}
// Check that the presence show is valid. If not then assume available show value
try {
packet.getShow();
}
catch (IllegalArgumentException e) {
Log.warn("Invalid presence show for -" + packet.toXML(), e);
// The presence packet contains an invalid presence show so replace it with
// an available presence show
packet.setShow(null);
}
if (session.getStatus() == Session.STATUS_CLOSED && packet.isAvailable()) {
// Ignore available presence packets sent from a closed session. A closed
// session may have buffered data pending to be processes so we want to ignore
// just Presences of type available
Log.warn("Ignoring available presence packet of closed session: " + packet);
return;
}
processPresence(packet);
}
//消息类型是<iq>打头的,表示客户端对server端的一个请求
else if ("iq".equals(tag)) {
IQ packet;
try {
packet = getIQ(doc);
}
catch (IllegalArgumentException e) {
Log.debug("Rejecting packet. JID malformed", e);
// The original packet contains a malformed JID so answer an error
IQ reply = new IQ();
if (!doc.elements().isEmpty()) {
reply.setChildElement(((Element)doc.elements().get(0)).createCopy());
}
reply.setID(doc.attributeValue("id"));
reply.setTo(session.getAddress());
if (doc.attributeValue("to") != null) {
reply.getElement().addAttribute("from", doc.attributeValue("to"));
}
reply.setError(PacketError.Condition.jid_malformed);
session.process(reply);
return;
}
if (packet.getID() == null && JiveGlobals.getBooleanProperty("xmpp.server.validation.enabled", false)) {
// IQ packets MUST have an 'id' attribute so close the connection
StreamError error = new StreamError(StreamError.Condition.invalid_xml);
session.deliverRawText(error.toXML());
session.close();
return;
}
processIQ(packet);
}
//如果消息类型不是IQ\Presence\Message三种类型,则执行processUnknowPacket()方法。
else {
//abstract boolean processUnknowPacket(Element doc) throws UnauthorizedException;
//该方法是一个抽象类,具体的实现是要看继承ConnectionHandler类的具体类,它会重写createStanzaHandler方法。
//在ClientConnectionHandler类中实现该方法对象是ClientStanzaHandler类
if (!processUnknowPacket(doc)) {
Log.warn(LocaleUtils.getLocalizedString("admin.error.packet.tag") +
doc.asXML());
session.close();
}
}
}
openfire之所以能够做到“即时通信”的目的正是因为MINA框架对socket进行了一层封装,说白了还是socket通信。
openfire & xmpp
openfire底层采用了MINA框架,它是采用事件监听的方式,其中IoHandler接口定义了不同的事件类型,因此根据不同的事件类型做相关的处理
Apache MINA 是一个网络应用框架,有助于用户非常方便地开发高性能、高伸缩性的网络应用。它通过Java NIO提供了一个抽象的、事件驱动的、异步的位于各种传输协议(如TCP/IP和UDP/IP)之上的API,Apache MINA 通常可被称之为:
NIO 框架库;
客户端/服务器框架库;
或者一个网络socket库。
Apache MINA 是一个网络应用程序框架,它对Java中的socket和NIO进行了有效和清晰的封装,方便开发人员开发TCP/UDP程序,从而抛开在使用原始的socket时需要考虑的各种繁杂而又烦人问题(线程、性能、会话等),把更多精力专著在应用中的业务逻辑的开发上
Java代码
public interface IoHandler
{
//创建session
public abstract void sessionCreated(IoSession iosession)
throws Exception;
//开启session
public abstract void sessionOpened(IoSession iosession)
throws Exception;
//关闭session
public abstract void sessionClosed(IoSession iosession)
throws Exception;
//session空闲
public abstract void sessionIdle(IoSession iosession, IdleStatus idlestatus)
throws Exception;
//异常处理
public abstract void exceptionCaught(IoSession iosession, Throwable throwable)
throws Exception;
//接收消息
public abstract void messageReceived(IoSession iosession, Object obj)
throws Exception;
//发送消息
public abstract void messageSent(IoSession iosession, Object obj)
throws Exception;
}
抽象类ConnectionHandler继承了IoHandlerAdapter类,而IoHandlerAdapter实现了IoHandler接口:
public abstract class ConnectionHandler extends IoHandlerAdapter
下面是ConnectionHandler类实现关于messageReceived事件的实现方法
Java代码
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
// Get the stanza handler for this session
//得到当前会话的StanzaHandler,这个对象在sessionOpened事件对应的方法中已经创建了,可以参考sessionOpened()的实现
StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);
// Get the parser to use to process stanza. For optimization there is going
// to be a parser for each running thread. Each Filter will be executed
// by the Executor placed as the first Filter. So we can have a parser associated
// to each Thread
int hashCode = Thread.currentThread().hashCode();
XMPPPacketReader parser = parsers.get(hashCode);
if (parser == null) {
parser = new XMPPPacketReader();
parser.setXPPFactory(factory);
parsers.put(hashCode, parser);
}
// Update counter of read btyes
updateReadBytesCounter(session);
//System.out.println("RCVD: " + message);
// Let the stanza handler process the received stanza
try {
//处理接收的message,parser为XMPPPacketReader对象,用来解析XML字符串,因为openfire信息之间的传递全部都是XML格式的字符串(XMPP协议)
//下面方法的实现参考StanzaHandler.process(String stanza, XMPPPacketReader reader)方法
handler.process((String) message, parser);
} catch (Exception e) {
Log.error("Closing connection due to error while processing message: " + message, e);
Connection connection = (Connection) session.getAttribute(CONNECTION);
connection.close();
}
}
@Override
public void sessionOpened(IoSession session) throws Exception {
// Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter.
final XMLLightweightParser parser = new XMLLightweightParser(CHARSET);
session.setAttribute(XML_PARSER, parser);
// Create a new NIOConnection for the new session
final NIOConnection connection = createNIOConnection(session);
session.setAttribute(CONNECTION, connection);
//createStanzaHandler方法是当前类的一个抽象类,具体实现需要查看ConnectionHandler的子类是如何实现的,
session.setAttribute(HANDLER, createStanzaHandler(connection));
// Set the max time a connection can be idle before closing it. This amount of seconds
// is divided in two, as Openfire will ping idle clients first (at 50% of the max idle time)
// before disconnecting them (at 100% of the max idle time). This prevents Openfire from
// removing connections without warning.
final int idleTime = getMaxIdleTime() / 2;
if (idleTime > 0) {
session.setIdleTime(IdleStatus.READER_IDLE, idleTime);
}
}
下面我就以ClientConnectionHandler类作为例子来讲解
Java代码
public class ClientConnectionHandler extends ConnectionHandler
public ClientStanzaHandler(PacketRouter router, String serverName, Connection connection) {
super(router, serverName, connection);
}
@Override
StanzaHandler createStanzaHandler(NIOConnection connection) {
return new ClientStanzaHandler(XMPPServer.getInstance().getPacketRouter(), serverName, connection);
}
StanzaHandler类:A StanzaHandler is the main responsible for handling incoming stanzas.
Java代码
public void process(String stanza, XMPPPacketReader reader) throws Exception {
boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream");
if (!sessionCreated || initialStream) {
if (!initialStream) {
// Allow requests for flash socket policy files directly on the client listener port
if (stanza.startsWith("<policy-file-request/>")) {
String crossDomainText = FlashCrossDomainServlet.CROSS_DOMAIN_TEXT +
XMPPServer.getInstance().getConnectionManager().getClientListenerPort() +
FlashCrossDomainServlet.CROSS_DOMAIN_END_TEXT + '\0';
connection.deliverRawText(crossDomainText);
return;
}
else {
// Ignore <?xml version="1.0"?>
return;
}
}
// Found an stream:stream tag...
if (!sessionCreated) {
sessionCreated = true;
MXParser parser = reader.getXPPParser();
parser.setInput(new StringReader(stanza));
createSession(parser);
}
else if (startedTLS) {
startedTLS = false;
tlsNegotiated();
}
else if (startedSASL && saslStatus == SASLAuthentication.Status.authenticated) {
startedSASL = false;
saslSuccessful();
}
else if (waitingCompressionACK) {
waitingCompressionACK = false;
compressionSuccessful();
}
return;
}
// Verify if end of stream was requested
if (stanza.equals("</stream:stream>")) {
session.close();
return;
}
// Ignore <?xml version="1.0"?> stanzas sent by clients
if (stanza.startsWith("<?xml")) {
return;
}
// Create DOM object from received stanza
Element doc = reader.read(new StringReader(stanza)).getRootElement();
if (doc == null) {
// No document found.
return;
}
String tag = doc.getName();
if ("starttls".equals(tag)) {
// Negotiate TLS
if (negotiateTLS()) {
startedTLS = true;
}
else {
connection.close();
session = null;
}
}
else if ("auth".equals(tag)) {
// User is trying to authenticate using SASL
startedSASL = true;
// Process authentication stanza
saslStatus = SASLAuthentication.handle(session, doc);
}
else if (startedSASL && "response".equals(tag)) {
// User is responding to SASL challenge. Process response
saslStatus = SASLAuthentication.handle(session, doc);
}
else if ("compress".equals(tag)) {
// Client is trying to initiate compression
if (compressClient(doc)) {
// Compression was successful so open a new stream and offer
// resource binding and session establishment (to client sessions only)
waitingCompressionACK = true;
}
}
else {
//最终处理消息,doc就是发送过来的XML字符串转化为Element对象
process(doc);
}
}
private void process(Element doc) throws UnauthorizedException {
if (doc == null) {
return;
}
// Ensure that connection was secured if TLS was required
if (connection.getTlsPolicy() == Connection.TLSPolicy.required &&
!connection.isSecure()) {
closeNeverSecuredConnection();
return;
}
String tag = doc.getName();
//消息类型是<message>打头的,表示消息是message类型,这个就是对XMPP协议的解释
if ("message".equals(tag)) {
Message packet;
try {
packet = new Message(doc, !validateJIDs());
}
catch (IllegalArgumentException e) {
Log.debug("Rejecting packet. JID malformed", e);
// The original packet contains a malformed JID so answer with an error.
Message reply = new Message();
reply.setID(doc.attributeValue("id"));
reply.setTo(session.getAddress());
reply.getElement().addAttribute("from", doc.attributeValue("to"));
reply.setError(PacketError.Condition.jid_malformed);
session.process(reply);
return;
}
processMessage(packet);
}
//消息类型是<presence>打头的,表示消息请求用户的状态
else if ("presence".equals(tag)) {
Presence packet;
try {
packet = new Presence(doc, !validateJIDs());
}
catch (IllegalArgumentException e) {
Log.debug("Rejecting packet. JID malformed", e);
// The original packet contains a malformed JID so answer an error
Presence reply = new Presence();
reply.setID(doc.attributeValue("id"));
reply.setTo(session.getAddress());
reply.getElement().addAttribute("from", doc.attributeValue("to"));
reply.setError(PacketError.Condition.jid_malformed);
session.process(reply);
return;
}
// Check that the presence type is valid. If not then assume available type
try {
packet.getType();
}
catch (IllegalArgumentException e) {
Log.warn("Invalid presence type", e);
// The presence packet contains an invalid presence type so replace it with
// an available presence type
packet.setType(null);
}
// Check that the presence show is valid. If not then assume available show value
try {
packet.getShow();
}
catch (IllegalArgumentException e) {
Log.warn("Invalid presence show for -" + packet.toXML(), e);
// The presence packet contains an invalid presence show so replace it with
// an available presence show
packet.setShow(null);
}
if (session.getStatus() == Session.STATUS_CLOSED && packet.isAvailable()) {
// Ignore available presence packets sent from a closed session. A closed
// session may have buffered data pending to be processes so we want to ignore
// just Presences of type available
Log.warn("Ignoring available presence packet of closed session: " + packet);
return;
}
processPresence(packet);
}
//消息类型是<iq>打头的,表示客户端对server端的一个请求
else if ("iq".equals(tag)) {
IQ packet;
try {
packet = getIQ(doc);
}
catch (IllegalArgumentException e) {
Log.debug("Rejecting packet. JID malformed", e);
// The original packet contains a malformed JID so answer an error
IQ reply = new IQ();
if (!doc.elements().isEmpty()) {
reply.setChildElement(((Element)doc.elements().get(0)).createCopy());
}
reply.setID(doc.attributeValue("id"));
reply.setTo(session.getAddress());
if (doc.attributeValue("to") != null) {
reply.getElement().addAttribute("from", doc.attributeValue("to"));
}
reply.setError(PacketError.Condition.jid_malformed);
session.process(reply);
return;
}
if (packet.getID() == null && JiveGlobals.getBooleanProperty("xmpp.server.validation.enabled", false)) {
// IQ packets MUST have an 'id' attribute so close the connection
StreamError error = new StreamError(StreamError.Condition.invalid_xml);
session.deliverRawText(error.toXML());
session.close();
return;
}
processIQ(packet);
}
//如果消息类型不是IQ\Presence\Message三种类型,则执行processUnknowPacket()方法。
else {
//abstract boolean processUnknowPacket(Element doc) throws UnauthorizedException;
//该方法是一个抽象类,具体的实现是要看继承ConnectionHandler类的具体类,它会重写createStanzaHandler方法。
//在ClientConnectionHandler类中实现该方法对象是ClientStanzaHandler类
if (!processUnknowPacket(doc)) {
Log.warn(LocaleUtils.getLocalizedString("admin.error.packet.tag") +
doc.asXML());
session.close();
}
}
}
openfire之所以能够做到“即时通信”的目的正是因为MINA框架对socket进行了一层封装,说白了还是socket通信。
相关文章推荐
- 【Linux开发】./configure,make,make install的作用
- django+nginx+ubuntu
- ‘close’ was not declared in this scope
- Linux NTP (一)
- linux 服务器性能收集命令详解
- linux压缩和解压缩命令大全
- SVN学习笔记一——安装配置
- Linux手动释放缓存的方法
- Linux下修改SSH登录端口
- KGDB is a source level debugger for the linux kernel
- tomcat 安装心得
- CentOS下给vim安装emmet插件
- Run loop observer的使用(含demo)
- 数据库单点与Nginx单点提高可用性
- akka actor监控(monitoring)
- shell脚本学习总结
- Ubuntukylin-14.04-desktop(带分区)安装步骤详解
- ubuntu 安装配置nginx(上一篇文章中)和php
- 如何解决tomcat8080端口占用问题
- Linux下各个目录的作用及内容