您的位置:首页 > 移动开发 > Android开发

如果在android中应用activemq

2016-03-23 14:02 513 查看
由于andoid项目中有大量需要用到消息推送,平台端在实现消息推送选择activemq,为了能使用消息推送,决定研究一下如果在android端实现activemq的消息推送。

这个问题说难不难,说易也不易,平台端开发人员选择activemq并且认为activemq是用java的,所以认为android使用它应该是很容易的,其实我很想告诉他们:"you are wrong!"。android其实能直接把activemq中的lib库导入android并不能正常使用,虽然编译通过了,但是执行时会报一些类没有找到的错误!在这个问题上我纠结了好久,网上也找过相关资料,根本就找不到。通过仅一天的查找,调试,到网络找相关项目,最后终于让我找到一个能用的项目,毫无疑问,老外写的。Android-Paho-Mqtt-Service。使用这个项目库终于可以正常使用activemq的消息推送了。

activemq下载地址:http://activemq.apache.org/download-archives.html,我使用的是activemq5.9.1。为什么不使用最新版本,因为我们服务器就是使用的这个版本。

Android-Paho-Mqtt-Service 下载地址:https://github.com/JesseFarebro/Android-Mqtt

测试项目相关文件

1.MainActivity类

[java] view
plain copy







public class MainActivity extends Activity {

@Override

protected void onCreate(Bundle savedInstanceState) {

super.onCreate(savedInstanceState);

setContentView(R.layout.activity_main);

MqttService.actionStart(this);

}

@Override

protected void onDestroy() {

// TODO Auto-generated method stub

super.onDestroy();

MqttService.actionStop(this);

}

}

2 MqttService类,这个类文件是我从Android-Paho-Mqtt-Service项目中下载的文件,修改了服务器地址

[java] view
plain copy







public class MqttService extends Service implements MqttCallback

{

public static final String DEBUG_TAG = "MqttService"; // Debug TAG

private static final String MQTT_THREAD_NAME = "MqttService[" + DEBUG_TAG + "]"; // Handler Thread ID

private static final String MQTT_BROKER = "192.168.11.62"; // Broker URL or IP Address

private static final int MQTT_PORT = 1883; // Broker Port

public static final int MQTT_QOS_0 = 0; // QOS Level 0 ( Delivery Once no confirmation )

public static final int MQTT_QOS_1 = 1; // QOS Level 1 ( Delevery at least Once with confirmation )

public static final int MQTT_QOS_2 = 2; // QOS Level 2 ( Delivery only once with confirmation with handshake )

private static final int MQTT_KEEP_ALIVE = 240000; // KeepAlive Interval in MS

private static final String MQTT_KEEP_ALIVE_TOPIC_FORAMT = "/users/%s/keepalive"; // Topic format for KeepAlives

private static final byte[] MQTT_KEEP_ALIVE_MESSAGE = { 0 }; // Keep Alive message to send

private static final int MQTT_KEEP_ALIVE_QOS = MQTT_QOS_0; // Default Keepalive QOS

private static final boolean MQTT_CLEAN_SESSION = true; // Start a clean session?

private static final String MQTT_URL_FORMAT = "tcp://%s:%d"; // URL Format normally don't change

private static final String ACTION_START = DEBUG_TAG + ".START"; // Action to start

private static final String ACTION_STOP = DEBUG_TAG + ".STOP"; // Action to stop

private static final String ACTION_KEEPALIVE= DEBUG_TAG + ".KEEPALIVE"; // Action to keep alive used by alarm manager

private static final String ACTION_RECONNECT= DEBUG_TAG + ".RECONNECT"; // Action to reconnect

private static final String DEVICE_ID_FORMAT = "ljf_%s"; // Device ID Format, add any prefix you'd like

// Note: There is a 23 character limit you will get

// An NPE if you go over that limit

private boolean mStarted = false; // Is the Client started?

private String mDeviceId; // Device ID, Secure.ANDROID_ID

private Handler mConnHandler; // Seperate Handler thread for networking

private MqttDefaultFilePersistence mDataStore; // Defaults to FileStore

private MemoryPersistence mMemStore; // On Fail reverts to MemoryStore

private MqttConnectOptions mOpts; // Connection Options

private MqttTopic mKeepAliveTopic; // Instance Variable for Keepalive topic

private MqttClient mClient; // Mqtt Client

private AlarmManager mAlarmManager; // Alarm manager to perform repeating tasks

private ConnectivityManager mConnectivityManager; // To check for connectivity changes

/**

* Start MQTT Client

* @param Context context to start the service with

* @return void

*/

public static void actionStart(Context ctx) {

Intent i = new Intent(ctx,MqttService.class);

i.setAction(ACTION_START);

ctx.startService(i);

}

/**

* Stop MQTT Client

* @param Context context to start the service with

* @return void

*/

public static void actionStop(Context ctx) {

Intent i = new Intent(ctx,MqttService.class);

i.setAction(ACTION_STOP);

ctx.startService(i);

}

/**

* Send a KeepAlive Message

* @param Context context to start the service with

* @return void

*/

public static void actionKeepalive(Context ctx) {

Intent i = new Intent(ctx,MqttService.class);

i.setAction(ACTION_KEEPALIVE);

ctx.startService(i);

}

/**

* Initalizes the DeviceId and most instance variables

* Including the Connection Handler, Datastore, Alarm Manager

* and ConnectivityManager.

*/

@Override

public void onCreate() {

super.onCreate();

mDeviceId = String.format(DEVICE_ID_FORMAT,

Secure.getString(getContentResolver(), Secure.ANDROID_ID));

HandlerThread thread = new HandlerThread(MQTT_THREAD_NAME);

thread.start();

mConnHandler = new Handler(thread.getLooper());

try {

mDataStore = new MqttDefaultFilePersistence(getCacheDir().getAbsolutePath());

} catch(MqttPersistenceException e) {

e.printStackTrace();

mDataStore = null;

mMemStore = new MemoryPersistence();

}

mOpts = new MqttConnectOptions();

mOpts.setCleanSession(MQTT_CLEAN_SESSION);

// Do not set keep alive interval on mOpts we keep track of it with alarm's

mAlarmManager = (AlarmManager) getSystemService(ALARM_SERVICE);

mConnectivityManager = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);

}

/**

* Service onStartCommand

* Handles the action passed via the Intent

*

* @return START_REDELIVER_INTENT

*/

@Override

public int onStartCommand(Intent intent, int flags, int startId) {

super.onStartCommand(intent, flags, startId);

String action = intent.getAction();

Log.i(DEBUG_TAG,"Received action of " + action);

if(action == null) {

Log.i(DEBUG_TAG,"Starting service with no action\n Probably from a crash");

} else {

if(action.equals(ACTION_START)) {

Log.i(DEBUG_TAG,"Received ACTION_START");

start();

} else if(action.equals(ACTION_STOP)) {

stop();

} else if(action.equals(ACTION_KEEPALIVE)) {

keepAlive();

} else if(action.equals(ACTION_RECONNECT)) {

if(isNetworkAvailable()) {

reconnectIfNecessary();

}

}

}

return START_REDELIVER_INTENT;

}

/**

* Attempts connect to the Mqtt Broker

* and listen for Connectivity changes

* via ConnectivityManager.CONNECTVITIY_ACTION BroadcastReceiver

*/

private synchronized void start() {

if(mStarted) {

Log.i(DEBUG_TAG,"Attempt to start while already started");

return;

}

if(hasScheduledKeepAlives()) {

stopKeepAlives();

}

connect();

registerReceiver(mConnectivityReceiver,new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));

}

/**

* Attempts to stop the Mqtt client

* as well as halting all keep alive messages queued

* in the alarm manager

*/

private synchronized void stop() {

if(!mStarted) {

Log.i(DEBUG_TAG,"Attemtpign to stop connection that isn't running");

return;

}

if(mClient != null) {

mConnHandler.post(new Runnable() {

@Override

public void run() {

try {

mClient.disconnect();

} catch(MqttException ex) {

ex.printStackTrace();

}

mClient = null;

mStarted = false;

stopKeepAlives();

}

});

}

unregisterReceiver(mConnectivityReceiver);

}

/**

* Connects to the broker with the appropriate datastore

*/

private synchronized void connect() {

String url = String.format(Locale.US, MQTT_URL_FORMAT, MQTT_BROKER, MQTT_PORT);

Log.i(DEBUG_TAG,"Connecting with URL: " + url);

try {

if(mDataStore != null) {

Log.i(DEBUG_TAG,"Connecting with DataStore");

mClient = new MqttClient(url,mDeviceId,mDataStore);

} else {

Log.i(DEBUG_TAG,"Connecting with MemStore");

mClient = new MqttClient(url,mDeviceId,mMemStore);

}

} catch(MqttException e) {

e.printStackTrace();

}

mConnHandler.post(new Runnable() {

@Override

public void run() {

try {

mClient.connect(mOpts);

mClient.subscribe("hello", 0);

mClient.setCallback(MqttService.this);

mStarted = true; // Service is now connected

Log.i(DEBUG_TAG,"Successfully connected and subscribed starting keep alives");

startKeepAlives();

} catch(MqttException e) {

e.printStackTrace();

}

}

});

}

/**

* Schedules keep alives via a PendingIntent

* in the Alarm Manager

*/

private void startKeepAlives() {

Intent i = new Intent();

i.setClass(this, MqttService.class);

i.setAction(ACTION_KEEPALIVE);

PendingIntent pi = PendingIntent.getService(this, 0, i, 0);

mAlarmManager.setRepeating(AlarmManager.RTC_WAKEUP,

System.currentTimeMillis() + MQTT_KEEP_ALIVE,

MQTT_KEEP_ALIVE, pi);

}

/**

* Cancels the Pending Intent

* in the alarm manager

*/

private void stopKeepAlives() {

Intent i = new Intent();

i.setClass(this, MqttService.class);

i.setAction(ACTION_KEEPALIVE);

PendingIntent pi = PendingIntent.getService(this, 0, i , 0);

mAlarmManager.cancel(pi);

}

/**

* Publishes a KeepALive to the topic

* in the broker

*/

private synchronized void keepAlive() {

if(isConnected()) {

try {

sendKeepAlive();

return;

} catch(MqttConnectivityException ex) {

ex.printStackTrace();

reconnectIfNecessary();

} catch(MqttPersistenceException ex) {

ex.printStackTrace();

stop();

} catch(MqttException ex) {

ex.printStackTrace();

stop();

}

}

}

/**

* Checkes the current connectivity

* and reconnects if it is required.

*/

private synchronized void reconnectIfNecessary() {

if(mStarted && mClient == null) {

connect();

}

}

/**

* Query's the NetworkInfo via ConnectivityManager

* to return the current connected state

* @return boolean true if we are connected false otherwise

*/

private boolean isNetworkAvailable() {

NetworkInfo info = mConnectivityManager.getActiveNetworkInfo();

return (info == null) ? false : info.isConnected();

}

/**

* Verifies the client State with our local connected state

* @return true if its a match we are connected false if we aren't connected

*/

private boolean isConnected() {

if(mStarted && mClient != null && !mClient.isConnected()) {

Log.i(DEBUG_TAG,"Mismatch between what we think is connected and what is connected");

}

if(mClient != null) {

return (mStarted && mClient.isConnected()) ? true : false;

}

return false;

}

/**

* Receiver that listens for connectivity chanes

* via ConnectivityManager

*/

private final BroadcastReceiver mConnectivityReceiver = new BroadcastReceiver() {

@Override

public void onReceive(Context context, Intent intent) {

Log.i(DEBUG_TAG,"Connectivity Changed...");

}

};

/**

* Sends a Keep Alive message to the specified topic

* @see MQTT_KEEP_ALIVE_MESSAGE

* @see MQTT_KEEP_ALIVE_TOPIC_FORMAT

* @return MqttDeliveryToken specified token you can choose to wait for completion

*/

private synchronized MqttDeliveryToken sendKeepAlive()

throws MqttConnectivityException, MqttPersistenceException, MqttException {

if(!isConnected())

throw new MqttConnectivityException();

if(mKeepAliveTopic == null) {

mKeepAliveTopic = mClient.getTopic(

String.format(Locale.US, MQTT_KEEP_ALIVE_TOPIC_FORAMT,mDeviceId));

}

Log.i(DEBUG_TAG,"Sending Keepalive to " + MQTT_BROKER);

MqttMessage message = new MqttMessage(MQTT_KEEP_ALIVE_MESSAGE);

message.setQos(MQTT_KEEP_ALIVE_QOS);

return mKeepAliveTopic.publish(message);

}

/**

* Query's the AlarmManager to check if there is

* a keep alive currently scheduled

* @return true if there is currently one scheduled false otherwise

*/

private synchronized boolean hasScheduledKeepAlives() {

Intent i = new Intent();

i.setClass(this, MqttService.class);

i.setAction(ACTION_KEEPALIVE);

PendingIntent pi = PendingIntent.getBroadcast(this, 0, i, PendingIntent.FLAG_NO_CREATE);

return (pi != null) ? true : false;

}

@Override

public IBinder onBind(Intent arg0) {

return null;

}

/**

* Connectivity Lost from broker

*/

@Override

public void connectionLost(Throwable arg0) {

stopKeepAlives();

mClient = null;

if(isNetworkAvailable()) {

reconnectIfNecessary();

}

}

/**

* Publish Message Completion

*/

@Override

public void deliveryComplete(MqttDeliveryToken arg0) {

}

/**

* Received Message from broker

*/

@Override

public void messageArrived(MqttTopic topic, MqttMessage message)

throws Exception {

Log.i(DEBUG_TAG," Topic:\t" + topic.getName() +

" Message:\t" + new String(message.getPayload()) +

" QoS:\t" + message.getQos());

Toast.makeText(this, topic.getName(), Toast.LENGTH_LONG).show();

}

/**

* MqttConnectivityException Exception class

*/

private class MqttConnectivityException extends Exception {

private static final long serialVersionUID = -7385866796799469420L;

}

}

3.manifest 文件

[html] view
plain copy







<manifest xmlns:android="http://schemas.android.com/apk/res/android"

package="com.example.activemqforandroid"

android:versionCode="1"

android:versionName="1.0" >

<uses-sdk

android:minSdkVersion="8"

android:targetSdkVersion="19" />

<uses-permission android:name="android.permission.RECEIVE_BOOT_COMPLETED" />

<!--别忘了加下面的权限-->

<uses-permission android:name="android.permission.INTERNET" />

<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

<application

android:allowBackup="true"

android:icon="@drawable/ic_launcher"

android:label="@string/app_name"

android:theme="@style/AppTheme" >

<activity

android:name="com.example.activemqforandroid.MainActivity"

android:label="@string/app_name" >

<intent-filter>

<action android:name="android.intent.action.MAIN" />

<category android:name="android.intent.category.LAUNCHER" />

</intent-filter>

</activity>

<receiver

android:name=".MyReceiver"

android:enabled="true"

android:exported="false" >

<intent-filter>

<action android:name="android.intent.action.BOOT_COMPLETED" />

</intent-filter>

</receiver>

<!--别忘了加service-->

<service

android:name="com.ljf.mqservice.MqttService"

android:exported="false" />

</application>

</manifest>

测试项目源码下载地址:http://download.csdn.net/detail/junfeng120125/7582209
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: