您的位置:首页 > 运维架构

openfire 接受消息流程

2016-09-01 15:03 483 查看

博客分类:
openfire & xmpp 

openfire底层采用了MINA框架,它是采用事件监听的方式,其中IoHandler接口定义了不同的事件类型,因此根据不同的事件类型做相关的处理

 

Apache MINA 是一个网络应用框架,有助于用户非常方便地开发高性能、高伸缩性的网络应用。它通过Java NIO提供了一个抽象的、事件驱动的、异步的位于各种传输协议(如TCP/IP和UDP/IP)之上的API,Apache MINA 通常可被称之为:

NIO 框架库;

客户端/服务器框架库;

或者一个网络socket库。

 

Apache MINA 是一个网络应用程序框架,它对Java中的socket和NIO进行了有效和清晰的封装,方便开发人员开发TCP/UDP程序,从而抛开在使用原始的socket时需要考虑的各种繁杂而又烦人问题(线程、性能、会话等),把更多精力专著在应用中的业务逻辑的开发上

 

Java代码  


public interface IoHandler  
{  
//创建session  
    public abstract void sessionCreated(IoSession iosession)  
        throws Exception;  
//开启session  
    public abstract void sessionOpened(IoSession iosession)  
        throws Exception;  
//关闭session  
    public abstract void sessionClosed(IoSession iosession)  
        throws Exception;  
//session空闲  
    public abstract void sessionIdle(IoSession iosession, IdleStatus idlestatus)  
        throws Exception;  
//异常处理  
    public abstract void exceptionCaught(IoSession iosession, Throwable throwable)  
        throws Exception;  
//接收消息  
    public abstract void messageReceived(IoSession iosession, Object obj)  
        throws Exception;  
//发送消息  
    public abstract void messageSent(IoSession iosession, Object obj)  
        throws Exception;  
}  

 

抽象类ConnectionHandler继承了IoHandlerAdapter类,而IoHandlerAdapter实现了IoHandler接口:

public abstract class ConnectionHandler extends IoHandlerAdapter 

 

下面是ConnectionHandler类实现关于messageReceived事件的实现方法

Java代码  


@Override  
public void messageReceived(IoSession session, Object message) throws Exception {  
    // Get the stanza handler for this session  
    //得到当前会话的StanzaHandler,这个对象在sessionOpened事件对应的方法中已经创建了,可以参考sessionOpened()的实现  
    StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);  
    // Get the parser to use to process stanza. For optimization there is going  
    // to be a parser for each running thread. Each Filter will be executed  
    // by the Executor placed as the first Filter. So we can have a parser associated  
    // to each Thread  
    int hashCode = Thread.currentThread().hashCode();  
    XMPPPacketReader parser = parsers.get(hashCode);  
    if (parser == null) {  
        parser = new XMPPPacketReader();  
        parser.setXPPFactory(factory);  
        parsers.put(hashCode, parser);  
    }  
    // Update counter of read btyes  
    updateReadBytesCounter(session);  
    //System.out.println("RCVD: " + message);  
    // Let the stanza handler process the received stanza  
    try {  
    //处理接收的message,parser为XMPPPacketReader对象,用来解析XML字符串,因为openfire信息之间的传递全部都是XML格式的字符串(XMPP协议)  
    //下面方法的实现参考StanzaHandler.process(String stanza, XMPPPacketReader reader)方法  
        handler.process((String) message, parser);  
    } catch (Exception e) {  
        Log.error("Closing connection due to error while processing message: " + message, e);  
        Connection connection = (Connection) session.getAttribute(CONNECTION);  
        connection.close();  
    }  
}  
  
  
 @Override  
public void sessionOpened(IoSession session) throws Exception {  
    // Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter.  
    final XMLLightweightParser parser = new XMLLightweightParser(CHARSET);  
    session.setAttribute(XML_PARSER, parser);  
    // Create a new NIOConnection for the new session  
    final NIOConnection connection = createNIOConnection(session);  
    session.setAttribute(CONNECTION, connection);  
    //createStanzaHandler方法是当前类的一个抽象类,具体实现需要查看ConnectionHandler的子类是如何实现的,  
    session.setAttribute(HANDLER, createStanzaHandler(connection));  
    // Set the max time a connection can be idle before closing it. This amount of seconds  
    // is divided in two, as Openfire will ping idle clients first (at 50% of the max idle time)  
    // before disconnecting them (at 100% of the max idle time). This prevents Openfire from  
    // removing connections without warning.  
    final int idleTime = getMaxIdleTime() / 2;  
    if (idleTime > 0) {  
        session.setIdleTime(IdleStatus.READER_IDLE, idleTime);  
    }  
}  

 

下面我就以ClientConnectionHandler类作为例子来讲解

Java代码  


public class ClientConnectionHandler extends ConnectionHandler  
  
public ClientStanzaHandler(PacketRouter router, String serverName, Connection connection) {  
    super(router, serverName, connection);  
}  
  
@Override  
StanzaHandler createStanzaHandler(NIOConnection connection) {  
    return new ClientStanzaHandler(XMPPServer.getInstance().getPacketRouter(), serverName, connection);  
}  

 

StanzaHandler类:A StanzaHandler is the main responsible for handling incoming stanzas.

Java代码  


public void process(String stanza, XMPPPacketReader reader) throws Exception {  
  
    boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream");  
    if (!sessionCreated || initialStream) {  
        if (!initialStream) {  
            // Allow requests for flash socket policy files directly on the client listener port  
            if (stanza.startsWith("<policy-file-request/>")) {  
                String crossDomainText = FlashCrossDomainServlet.CROSS_DOMAIN_TEXT +  
                        XMPPServer.getInstance().getConnectionManager().getClientListenerPort() +  
                        FlashCrossDomainServlet.CROSS_DOMAIN_END_TEXT + '\0';  
                connection.deliverRawText(crossDomainText);  
                return;  
            }  
            else {  
                // Ignore <?xml version="1.0"?>  
                return;  
            }  
        }  
        // Found an stream:stream tag...  
        if (!sessionCreated) {  
            sessionCreated = true;  
            MXParser parser = reader.getXPPParser();  
            parser.setInput(new StringReader(stanza));  
            createSession(parser);  
        }  
        else if (startedTLS) {  
            startedTLS = false;  
            tlsNegotiated();  
        }  
        else if (startedSASL && saslStatus == SASLAuthentication.Status.authenticated) {  
            startedSASL = false;  
            saslSuccessful();  
        }  
        else if (waitingCompressionACK) {  
            waitingCompressionACK = false;  
            compressionSuccessful();  
        }  
        return;  
    }  
  
    // Verify if end of stream was requested  
    if (stanza.equals("</stream:stream>")) {  
        session.close();  
        return;  
    }  
    // Ignore <?xml version="1.0"?> stanzas sent by clients  
    if (stanza.startsWith("<?xml")) {  
        return;  
    }  
    // Create DOM object from received stanza  
    Element doc = reader.read(new StringReader(stanza)).getRootElement();  
    if (doc == null) {  
        // No document found.  
        return;  
    }  
    String tag = doc.getName();  
    if ("starttls".equals(tag)) {  
        // Negotiate TLS  
        if (negotiateTLS()) {  
            startedTLS = true;  
        }  
        else {  
            connection.close();  
            session = null;  
        }  
    }  
    else if ("auth".equals(tag)) {  
        // User is trying to authenticate using SASL  
        startedSASL = true;  
        // Process authentication stanza  
        saslStatus = SASLAuthentication.handle(session, doc);  
    }  
    else if (startedSASL && "response".equals(tag)) {  
        // User is responding to SASL challenge. Process response  
        saslStatus = SASLAuthentication.handle(session, doc);  
    }  
    else if ("compress".equals(tag)) {  
        // Client is trying to initiate compression  
        if (compressClient(doc)) {  
            // Compression was successful so open a new stream and offer  
            // resource binding and session establishment (to client sessions only)  
            waitingCompressionACK = true;  
        }  
    }  
    else {  
    //最终处理消息,doc就是发送过来的XML字符串转化为Element对象  
        process(doc);  
    }  
}  
  
  
private void process(Element doc) throws UnauthorizedException {  
    if (doc == null) {  
        return;  
    }  
  
    // Ensure that connection was secured if TLS was required  
    if (connection.getTlsPolicy() == Connection.TLSPolicy.required &&  
            !connection.isSecure()) {  
        closeNeverSecuredConnection();  
        return;  
    }  
  
    String tag = doc.getName();  
    //消息类型是<message>打头的,表示消息是message类型,这个就是对XMPP协议的解释  
    if ("message".equals(tag)) {  
        Message packet;  
        try {  
            packet = new Message(doc, !validateJIDs());  
        }  
        catch (IllegalArgumentException e) {  
            Log.debug("Rejecting packet. JID malformed", e);  
            // The original packet contains a malformed JID so answer with an error.  
            Message reply = new Message();  
            reply.setID(doc.attributeValue("id"));  
            reply.setTo(session.getAddress());  
            reply.getElement().addAttribute("from", doc.attributeValue("to"));  
            reply.setError(PacketError.Condition.jid_malformed);  
            session.process(reply);  
            return;  
        }  
        processMessage(packet);  
    }  
    //消息类型是<presence>打头的,表示消息请求用户的状态  
    else if ("presence".equals(tag)) {  
        Presence packet;  
        try {  
            packet = new Presence(doc, !validateJIDs());  
        }  
        catch (IllegalArgumentException e) {  
            Log.debug("Rejecting packet. JID malformed", e);  
            // The original packet contains a malformed JID so answer an error  
            Presence reply = new Presence();  
            reply.setID(doc.attributeValue("id"));  
            reply.setTo(session.getAddress());  
            reply.getElement().addAttribute("from", doc.attributeValue("to"));  
            reply.setError(PacketError.Condition.jid_malformed);  
            session.process(reply);  
            return;  
        }  
        // Check that the presence type is valid. If not then assume available type  
        try {  
            packet.getType();  
        }  
        catch (IllegalArgumentException e) {  
            Log.warn("Invalid presence type", e);  
            // The presence packet contains an invalid presence type so replace it with  
            // an available presence type  
            packet.setType(null);  
        }  
        // Check that the presence show is valid. If not then assume available show value  
        try {  
            packet.getShow();  
        }  
        catch (IllegalArgumentException e) {  
            Log.warn("Invalid presence show for -" + packet.toXML(), e);  
            // The presence packet contains an invalid presence show so replace it with  
            // an available presence show  
            packet.setShow(null);  
        }  
        if (session.getStatus() == Session.STATUS_CLOSED && packet.isAvailable()) {  
            // Ignore available presence packets sent from a closed session. A closed  
            // session may have buffered data pending to be processes so we want to ignore  
            // just Presences of type available  
            Log.warn("Ignoring available presence packet of closed session: " + packet);  
            return;  
        }  
        processPresence(packet);  
    }  
    //消息类型是<iq>打头的,表示客户端对server端的一个请求  
    else if ("iq".equals(tag)) {  
        IQ packet;  
        try {  
            packet = getIQ(doc);  
        }  
        catch (IllegalArgumentException e) {  
            Log.debug("Rejecting packet. JID malformed", e);  
            // The original packet contains a malformed JID so answer an error  
            IQ reply = new IQ();  
            if (!doc.elements().isEmpty()) {  
                reply.setChildElement(((Element)doc.elements().get(0)).createCopy());  
            }  
            reply.setID(doc.attributeValue("id"));  
            reply.setTo(session.getAddress());  
            if (doc.attributeValue("to") != null) {  
                reply.getElement().addAttribute("from", doc.attributeValue("to"));  
            }  
            reply.setError(PacketError.Condition.jid_malformed);  
            session.process(reply);  
            return;  
        }  
        if (packet.getID() == null && JiveGlobals.getBooleanProperty("xmpp.server.validation.enabled", false)) {  
            // IQ packets MUST have an 'id' attribute so close the connection  
            StreamError error = new StreamError(StreamError.Condition.invalid_xml);  
            session.deliverRawText(error.toXML());  
            session.close();  
            return;  
        }  
        processIQ(packet);  
    }  
    //如果消息类型不是IQ\Presence\Message三种类型,则执行processUnknowPacket()方法。  
    else {  
    //abstract boolean processUnknowPacket(Element doc) throws UnauthorizedException;  
    //该方法是一个抽象类,具体的实现是要看继承ConnectionHandler类的具体类,它会重写createStanzaHandler方法。  
    //在ClientConnectionHandler类中实现该方法对象是ClientStanzaHandler类  
        if (!processUnknowPacket(doc)) {  
            Log.warn(LocaleUtils.getLocalizedString("admin.error.packet.tag") +  
                    doc.asXML());  
            session.close();  
        }  
    }  
}  

 

openfire之所以能够做到“即时通信”的目的正是因为MINA框架对socket进行了一层封装,说白了还是socket通信。

 



 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: