您的位置:首页 > 编程语言 > Qt开发

paho.mqtt.android代码逐步分析(四)

2018-01-16 00:00 232 查看
摘要: 从connect()开始,追踪代码执行

MqttAndroidClient暴露了connect()方法用于连接代理服务器:

@Override
public IMqttToken connect(MqttConnectOptions options, Object userContext,
IMqttActionListener callback) throws MqttException {

IMqttToken token = new MqttTokenAndroid(this, userContext, callback);
...

if (mqttService == null) { // First time - must bind to the service 首次启动服务
Intent serviceStartIntent = new Intent();
serviceStartIntent.setClassName(myContext, SERVICE_NAME);
Object service = myContext.startService(serviceStartIntent);
if (service == null) {//如果服务启动失败则回调连接失败
IMqttActionListener listener = token.getActionCallback();
if (listener != null) {
listener.onFailure(token, new RuntimeException("cannot start service " + SERVICE_NAME));
}
}

//如果服务启动成功,则绑定service生命周期
// We bind with BIND_SERVICE_FLAG (0), leaving us the manage the lifecycle
// until the last time it is stopped by a call to stopService()
myContext.bindService(serviceStartIntent, serviceConnection, Context.BIND_AUTO_CREATE);

if (!receiverRegistered) registerReceiver(this);
} else {
pool.execute(new Runnable() {

@Override
public void run() {
doConnect();//连接broker

//Register receiver to show shoulder tap.
if (!receiverRegistered) registerReceiver(MqttAndroidClient.this);
}

});
}

return token;
}

查看方法代码,connect()中会先检查是否已启动MqttService,确定服务已启动才执行doConnect()。doConnect()中其实也就是调用:

mqttService.connect(clientHandle, connectOptions, null, activityToken);

再往下看,定位到MqttConnection.class中的connect()方法:

public void connect(MqttConnectOptions options, String invocationContext, String activityToken) {
connectOptions = options;
reconnectActivityToken = activityToken;

//根据cleanSession清除历史消息
if (options != null) {
cleanSession = options.isCleanSession();
}
if (connectOptions.isCleanSession()) { // if it's a clean session,discard old data
service.messageStore.clearArrivedMessages(clientHandle);
}

service.traceDebug(TAG, "Connecting {" + serverURI + "} as {" + clientId + "}");
final Bundle resultBundle = new Bundle();
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN, activityToken);
resultBundle.putString(MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT, invocationContext);
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.CONNECT_ACTION);

try {
if (persistence == null) {
// ask Android where we can put files
File myDir = service.getExternalFilesDir(TAG);
if (myDir == null) {
// No external storage, use internal storage instead.
myDir = service.getDir(TAG, Context.MODE_PRIVATE);

if (myDir == null) {
//Shouldn't happen.
resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE, "Error! No external and internal storage available");
resultBundle.putSerializable(MqttServiceConstants.CALLBACK_EXCEPTION, new MqttPersistenceException());
service.callbackToActivity(clientHandle, Status.ERROR, resultBundle);
return;
}
}

// use that to setup MQTT client persistence storage
persistence = new MqttDefaultFilePersistence(myDir.getAbsolutePath());
}

IMqttActionListener listener = new MqttConnectionListener(resultBundle) {

@Override
public void onSuccess(IMqttToken asyncActionToken) {
doAfterConnectSuccess(resultBundle);
service.traceDebug(TAG, "connect success!");
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE, exception.getLocalizedMessage());
resultBundle.putSerializable(MqttServiceConstants.CALLBACK_EXCEPTION, exception);
service.traceError(TAG, "connect fail, call connect to reconnect.reason:" + exception.getMessage());

doAfterConnectFail(resultBundle);
}
};

if (myClient != null) {//如果已经创建过MqttAsyncClient,也即之前就调用过本connect()方法
if (isConnecting) {//上次调用的connect()还在连接中,不做处理,等待connect()结果
service.traceDebug(TAG, "myClient != null and the client is connecting. Connect return directly.");
service.traceDebug(TAG, "Connect return:isConnecting:" + isConnecting + ".disconnected:" + disconnected);
} else if (!disconnected) {//当前已处于长连接,提示连接成功
service.traceDebug(TAG, "myClient != null and the client is connected and notify!");
doAfterConnectSuccess(resultBundle);
} else {//之前的连接未成功或者已掉线,重新尝试连接
service.traceDebug(TAG, "myClient != null and the client is not connected");
service.traceDebug(TAG, "Do Real connect!");
setConnectingState(true);
myClient.connect(connectOptions, invocationContext, listener);
}
} else {// if myClient is null, then create a new connection 连接未曾建立或已被销毁,新建连接
alarmPingSender = new AlarmPingSender(service);//用于发送心跳包
myClient = new MqttAsyncClient(serverURI, clientId, persistence, alarmPingSender);
myClient.setCallback(this);

service.traceDebug(TAG, "Do Real connect!");
setConnectingState(true);
myClient.connect(connectOptions, invocationContext, listener);
}
} catch (Exception e) {
service.traceError(TAG, "Exception occurred attempting to connect: " + e.getMessage());
setConnectingState(false);
handleException(resultBundle, e);
}
}

查看以上代码,我在关键行都添加了注释。另外需要注意到其中有两个比较重要的对象resultBundle和persistence,persistence用于将connection信息持久化,而resultBundle我会在后面分析,它最终会被用于发送广播触发我们connect、publish、subscribe等的回调监听。继续深入到MqttAsyncClient.connect():

public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)
throws MqttException, MqttSecurityException {
...

final boolean automaticReconnect = options.isAutomaticReconnect();

comms.setNetworkModules(createNetworkModules(serverURI, options));
comms.setReconnectCallback(new MqttCallbackExtended() {

public void messageArrived(String topic, MqttMessage message) throws Exception {
}
public void deliveryComplete(IMqttDeliveryToken token) {
}
public void connectComplete(boolean reconnect, String serverURI) {
}

public void connectionLost(Throwable cause) {
if(automaticReconnect){
// Automatic reconnect is set so make sure comms is in resting state
comms.setRestingState(true);
reconnecting = true;
startReconnectCycle();
}
}
});

// Insert our own callback to iterate through the URIs till the connect succeeds
MqttToken userToken = new MqttToken(getClientId());
ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback, reconnecting);
userToken.setActionCallback(connectActionListener);
userToken.setUserContext(this);

// If we are using the MqttCallbackExtended, set it on the connectActionListener
if(this.mqttCallback instanceof MqttCallbackExtended){
connectActionListener.setMqttCallbackExtended((MqttCallbackExtended)this.mqttCallback);
}

comms.setNetworkModuleIndex(0);
connectActionListener.connect();

return userToken;
}

代码比较多,我省略了一部分,主要关注comms.setReconnectCallback()中的自动重连逻辑即可,自动重连的实现看下面的attemptReconnect()方法,重连失败会继续重连直到连接成功,不过重连的间隔时间会随着重连次数增加最大到128s:

private void attemptReconnect(){
final String methodName = "attemptReconnect";
//@Trace 500=Attempting to reconnect client: {0}
try {
connect(this.connOpts, this.userContext,new IMqttActionListener() {

public void onSuccess(IMqttToken asyncActionToken) {
//@Trace 501=Automatic Reconnect Successful: {0}
comms.setRestingState(false);
stopReconnectCycle();
}

public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
//@Trace 502=Automatic Reconnect failed, rescheduling: {0}
if(reconnectDelay < 128000){//reconnectDelay初始值为1000,每次重连失败时*2
reconnectDelay = reconnectDelay * 2;
}
rescheduleReconnectCycle(reconnectDelay);
}
});
} catch (MqttSecurityException ex) {
//@TRACE 804=exception
} catch (MqttException ex) {
//@TRACE 804=exception
}
}

好了,看完重连逻辑我们再回到前面的connect()方法,MqttAsyncClient.connect()会进入ClientComms.connect():

public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
final String methodName = "connect";
synchronized (conLock) {
if (isDisconnected() && !closePending) {
...
tokenStore.open();
ConnectBG conbg = new ConnectBG(this, token, connect);
conbg.start();//经由ConnectBG然后执行ClientComms.internalSend()方法
}else {
...
}
}
}

经由ConnectBG然后执行ClientComms.internalSend()方法,并最终进入ClientState.send()方法

public void send(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "send";
。。。
if (message instanceof MqttPublish) {
synchronized (queueLock) {
if (actualInFlight >= this.maxInflight) {
//@TRACE 613= sending {0} msgs at max inflight window
log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)});

throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
}

MqttMessage innerMessage = ((MqttPublish) message).getMessage();
//@TRACE 628=pending publish key={0} qos={1} message={2}
log.fine(CLASS_NAME,methodName,"628", new Object[]{new Integer(message.getMessageId()), new Integer(innerMessage.getQos()), message});

switch(innerMessage.getQos()) {
case 2:
outboundQoS2.put(new Integer(message.getMessageId()), message);
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
break;
case 1:
outboundQoS1.put(new Integer(message.getMessageId()), message);
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
break;
}
tokenStore.saveToken(token, message);
pendingMessages.addElement(message);
queueLock.notifyAll();
}
} else {
//@TRACE 615=pending send key={0} message {1}
log.fine(CLASS_NAME,methodName,"615", new Object[]{new Integer(message.getMessageId()), message});

if (message instanceof MqttConnect) {
synchronized (queueLock) {
// Add the connect action at the head of the pending queue ensuring it jumps
// ahead of any of other pending actions.
tokenStore.saveToken(token, message);
pendingFlows.insertElementAt(message,0);
queueLock.notifyAll();
}
} else {
if (message instanceof MqttPingReq) {
this.pingCommand = message;
}
else if (message instanceof MqttPubRel) {
outboundQoS2.put(new Integer(message.getMessageId()), message);
persistence.put(getSendConfirmPersistenceKey(message), (MqttPubRel) message);
}
else if (message instanceof MqttPubComp)  {
persistence.remove(getReceivedPersistenceKey(message));
}

synchronized (queueLock) {
if ( !(message instanceof MqttAck )) {
tokenStore.saveToken(token, message);
}
pendingFlows.addElement(message);
queueLock.notifyAll();
}
}
}
}

现在我们再回头聊一聊刚才说的resultBundle,取其中一处引用:

private void doAfterConnectSuccess(final Bundle resultBundle) {
//since the device's cpu can go to sleep, acquire a wakelock and drop it later.
acquireWakeLock();
service.callbackToActivity(clientHandle, Status.OK, resultBundle);
deliverBacklog();
setConnectingState(false);
disconnected = false;
releaseWakeLock();
}

连接成功后会调用MqttService.callbackToActivity(),resultBundle就作为其中一个参数被传入,接下来我们看看这个方法的实现:

/**
* 所有消息都经此方法发出
* pass data back to the Activity, by building a suitable Intent object and
* broadcasting it
*
* @param clientHandle source of the data
* @param status       OK or Error
* @param dataBundle   the data to be passed
*/
void callbackToActivity(String clientHandle, Status status, Bundle dataBundle) {
// Don't call traceDebug, as it will try to callbackToActivity leading
// to recursion.
Intent callbackIntent = new Intent(MqttServiceConstants.CALLBACK_TO_ACTIVITY);
if (clientHandle != null) {
callbackIntent.putExtra(MqttServiceConstants.CALLBACK_CLIENT_HANDLE, clientHandle);
}
callbackIntent.putExtra(MqttServiceConstants.CALLBACK_STATUS, status);
if (dataBundle != null) {
callbackIntent.putExtras(dataBundle);
}
LocalBroadcastManager.getInstance(this).sendBroadcast(callbackIntent);
}

callbackToActivity()方法用于发送本地广播,广播中携带resultBundle,其实包括publish、subscribe等行为不论成功失败都会调用此方法,发出一个指示行为类型及状态的本地广播。那么发出的广播又是在哪里被处理的呢?请往下看。MqttAndroidClient类继承自BroadCastReceiver,查看其onReceive()方法:

@Override
public void onReceive(Context context, Intent intent) {
Bundle data = intent.getExtras();
String handleFromIntent = data.getString(MqttServiceConstants.CALLBACK_CLIENT_HANDLE);

if ((handleFromIntent == null) || (!handleFromIntent.equals(clientHandle))) {
return;
}

String action = data.getString(MqttServiceConstants.CALLBACK_ACTION);

if (MqttServiceConstants.CONNECT_ACTION.equals(action)) {
connectAction(data);
} else if (MqttServiceConstants.CONNECT_EXTENDED_ACTION.equals(action)) {
connectExtendedAction(data);
} else if (MqttServiceConstants.MESSAGE_ARRIVED_ACTION.equals(action)) {
messageArrivedAction(data);
} else if (MqttServiceConstants.SUBSCRIBE_ACTION.equals(action)) {
subscribeAction(data);
} else if (MqttServiceConstants.UNSUBSCRIBE_ACTION.equals(action)) {
unSubscribeAction(data);
} else if (MqttServiceConstants.SEND_ACTION.equals(action)) {
sendAction(data);
} else if (MqttServiceConstants.MESSAGE_DELIVERED_ACTION.equals(action)) {
messageDeliveredAction(data);
} else if (MqttServiceConstants.ON_CONNECTION_LOST_ACTION.equals(action)) {
connectionLostAction(data);
} else if (MqttServiceConstants.DISCONNECT_ACTION.equals(action)) {
disconnected(data);
} else if (MqttServiceConstants.TRACE_ACTION.equals(action)) {
traceAction(data);
} else {
mqttService.traceError(MqttService.TAG, "Callback action doesn't exist.");
}
}

没错,data.getString(MqttServiceConstants.CALLBACK_ACTION)获取的就是我们前面存放在resultBundle中的action,然后根据action去回调callback的对应方法,这里的callback就是我们建立连接时传入MqttAndroidClient的MqttCallback对象,如果需要监听action为MqttServiceConstants.CONNECT_EXTENDED_ACTION的行为,则要求我们传入的callback必须为MqttCallbackExtended的实现,MqttCallbackExtended是MqttCallback的子类
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: