您的位置:首页 > 其它

Smack PacketReader 启动过程分析

2014-05-09 22:32 288 查看
一、PacketReader实例化

PacketReader 在 XmppConnection中被实例化:

PacketReader packetReader = new PacketReader(this);
1、PacketReader 的构造方法

PacketReader实例化执行的构造方法如下:

protected PacketReader(final XMPPConnection connection) { this.connection = connection; //connect成为类的成员变量 this.init(); }
2、初始化
protected void init() { done = false; //只有在connectError和shutdown的时候,才有done = true; done是 parsePackets方法中条件循环的判断条件之一,只有done = false 才可以读取socket connectionID = null;
readerThread = new Thread() { //创建一个用于读取socket输入流的线程 public void run() { parsePackets(this); // A、数据解析和装换的入口,在readerThread线程启动前,此方法没有被执行。 } }; readerThread.setName("Smack Packet Reader (" + connection.connectionCounterValue + ")"); readerThread.setDaemon(true);
// Create an executor to deliver incoming packets to listeners. We'll use a single // thread with an unbounded queue. listenerExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { // B、这里创建一个单线程的线程池,这个线程池是 packet 监听机制的起始点。
public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, "Smack Listener Processor (" + connection.connectionCounterValue + ")"); thread.setDaemon(true); return thread; } });
resetParser(); //此方法创建了一个解析器MXParser的实例,在下面详细分析 }
3、解析器初始化

此方法将connect类中的封装socket 输入流的reader 作为解析器的输入,既PacketReader中的解析器MXParser输入 指向了socket 的inputStream 。parser 为readerThread 提供了数据源。

private void resetParser() { try { parser = new MXParser(); parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); parser.setInput(connection.reader); } catch (XmlPullParserException xppe) { xppe.printStackTrace(); } }
resetParser()方法执行完后,PacketReader实例化完成,然后执行packetReader.startup()。

二、启动readerThread线程

packetReader.startup()启动了readerThread线程:

synchronized public void startup() throws XMPPException { readerThread.start(); //指向 parsePackets(this); 方法(即上面一、2红色的A标记处) try { int waitTime = SmackConfiguration.getPacketReplyTimeout(); wait(3 * waitTime); } catch (InterruptedException ie) { } if (connectionID == null) { throw new XMPPException("Connection failed. No response from server."); } else { connection.connectionID = connectionID; } }
三、解析数据

parsePackets(this)是数据解析、转换和分发的入口。

private void parsePackets(Thread thread) { try { int eventType = parser.getEventType(); //从parser中获取数据,判断事件类型。 do { if (eventType == XmlPullParser.START_TAG) { int parserDepth = parser.getDepth(); ParsingExceptionCallback callback = connection.getParsingExceptionCallback(); if (parser.getName().equals("message")) { //处理 Message类型 Packet packet; try { packet = PacketParserUtils.parseMessage(parser); //从parser中获取数据,生成message,完成xml到java对象的转换 } catch (Exception e) { String content = PacketParserUtils.parseContentDepth(parser, parserDepth); UnparsedMessage message = new UnparsedMessage(content, e); if (callback != null) { callback.messageParsingException(e, message); } continue; } processPacket(packet); //分发packet,进入packetListener 的入口。 } else if (parser.getName().equals("iq")) { IQ iq; try { iq = PacketParserUtils.parseIQ(parser, connection); //从parser中获取数据,生成iq,完成xml到java对象的转换 } catch (Exception e) { String content = PacketParserUtils.parseContentDepth(parser, parserDepth); UnparsedIQ uniq = new UnparsedIQ(content, e); if (callback != null) { callback.iqParsingException(e, uniq); } continue; } processPacket(iq); //分发packet,进入packetListener 的入口。 } else if (parser.getName().equals("presence")) { Presence presence; try { presence = PacketParserUtils.parsePresence(parser); //从parser中获取数据,生成presence,完成xml到java对象的转换 } catch (Exception e) { String content = PacketParserUtils.parseContentDepth(parser, parserDepth); UnparsedPresence unpresence = new UnparsedPresence(content, e); if (callback != null) { callback.presenceParsingException(e, unpresence); } continue; } processPacket(presence); //分发packet,进入packetListener 的入口。 } // We found an opening stream. Record information about it, then notify // the connectionID lock so that the packet reader startup can finish. else if (parser.getName().equals("stream")) { // Ensure the correct jabber:client namespace is being used. if ("jabber:client".equals(parser.getNamespace(null))) { // Get the connection id. for (int i=0; i<parser.getAttributeCount(); i++) { if (parser.getAttributeName(i).equals("id")) { // Save the connectionID connectionID = parser.getAttributeValue(i); if (!"1.0".equals(parser.getAttributeValue("", "version"))) { // Notify that a stream has been opened if the // server is not XMPP 1.0 compliant otherwise make the // notification after TLS has been negotiated or if TLS // is not supported releaseConnectionIDLock(); } } else if (parser.getAttributeName(i).equals("from")) { // Use the server name that the server says that it is. connection.config.setServiceName(parser.getAttributeValue(i)); } } } } else if (parser.getName().equals("error")) { throw new XMPPException(PacketParserUtils.parseStreamError(parser)); } else if (parser.getName().equals("features")) { parseFeatures(parser); } else if (parser.getName().equals("proceed")) { // Secure the connection by negotiating TLS connection.proceedTLSReceived(); // Reset the state of the parser since a new stream element is going // to be sent by the server resetParser(); } else if (parser.getName().equals("failure")) { String namespace = parser.getNamespace(null); if ("urn:ietf:params:xml:ns:xmpp-tls".equals(namespace)) { // TLS negotiation has failed. The server will close the connection throw new Exception("TLS negotiation has failed"); } else if ("http://jabber.org/protocol/compress".equals(namespace)) { // Stream compression has been denied. This is a recoverable // situation. It is still possible to authenticate and // use the connection but using an uncompressed connection connection.streamCompressionDenied(); } else { // SASL authentication has failed. The server may close the connection // depending on the number of retries final Failure failure = PacketParserUtils.parseSASLFailure(parser); processPacket(failure); connection.getSASLAuthentication().authenticationFailed(failure.getCondition()); } } else if (parser.getName().equals("challenge")) { // The server is challenging the SASL authentication made by the client String challengeData = parser.nextText(); processPacket(new Challenge(challengeData)); connection.getSASLAuthentication().challengeReceived(challengeData); } else if (parser.getName().equals("success")) { processPacket(new Success(parser.nextText())); // We now need to bind a resource for the connection // Open a new stream and wait for the response connection.packetWriter.openStream(); // Reset the state of the parser since a new stream element is going // to be sent by the server resetParser(); // The SASL authentication with the server was successful. The next step // will be to bind the resource connection.getSASLAuthentication().authenticated(); } else if (parser.getName().equals("compressed")) { // Server confirmed that it's possible to use stream compression. Start // stream compression connection.startStreamCompression(); // Reset the state of the parser since a new stream element is going // to be sent by the server resetParser(); } } else if (eventType == XmlPullParser.END_TAG) { if (parser.getName().equals("stream")) { // Disconnect the connection connection.disconnect(); } } eventType = parser.next(); } while (!done && eventType != XmlPullParser.END_DOCUMENT && thread == readerThread); //此处判断done 等条件,开始进入循环体 } catch (Exception e) { // The exception can be ignored if the the connection is 'done' // or if the it was caused because the socket got closed if (!(done || connection.isSocketClosed())) { // Close the connection and notify connection listeners of the // error. connection.notifyConnectionError(e); } } }
五、处理数据
上面第四步中,所有的 processPacket() 方法都是同一个,即

private void processPacket(Packet packet) { if (packet == null) { return; }
// Loop through all collectors and notify the appropriate ones. for (PacketCollector collector: connection.getPacketCollectors()) { collector.processPacket(packet); }
// Deliver the incoming packet to listeners. listenerExecutor.submit(new ListenerNotification(packet)); //在这里packet 进入线程池,并由ListenerNotification开启监听模式。 }

总结:readerThread 线程启动后,以 parser 为数据源,从socket inputStream中读取数据并解析,完成从xml数据到java对象的转换过程。
XML数据可以分为很多种类型,目前比较关注的是 Message 、Iq、Presence 。补充:第三步,parsePacket()方法中,会根据 parser.getName() 得到的类型调用不同的解析方法,这些方法针对 Message,IQ和Presence是不同的。

parsePackets(Thread thread) 用到了 PacketParserUtils 类,PacketParserUtils有一点特殊的地方。
在解析Message、IQ 和 Presence的时候,用到了 ProviderManager.getInstance() 方法。在connect.connect()执行成功后,在解析 IQ、Message、Presence之前,要设置

ProviderManager.getInstance().addIQProvider("notification", "androidpn:iq:notification", new NotificationIQProvider());或者
ProviderManager.getInstance().addExtensionProvider("myExtension", "androidpn:iq:myextension", new MyExtensionProvider());



下一步 从ListenerNotification做为入口分析。

<EOF>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: