添加依赖

  • 项目根目录
repositories {
    maven {
        url "https://repo.eclipse.org/content/repositories/paho-releases/"
    }
}
  • app目录下的build.gradle中添加:
dependencies {
    compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
    compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.1'
}

声明权限以及添加服务

    <uses-permission android:name="android.permission.WAKE_LOCK" />
    <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
    <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
    <uses-permission android:name="android.permission.READ_PHONE_STATE" />
    <uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" />
    <uses-permission android:name="android.permission.INTERNET" />
	
	    <!-- Mqtt服务 -->
    <service android:name="org.eclipse.paho.android.service.MqttService" />

XRMqttService

XRMqttService要手动添加到AndroidManifest.xml中去,如果没有注册的话,mqtt服务将不会生效

public class XRMqttService extends Service {

    public static final String TAG = "XRMqttService";

    private static MqttAndroidClient client;
    private MqttConnectOptions conOpt;

    private static String host = "tcp://rolyrobot.com";
    private static String username = "androidcli";
    private static String password = "password";
    private static String subTopic = "ForTest";      //要订阅的主题
    private static String clientId = "androidId";//客户端标识
    private IGetMessageCallBack IGetMessageCallBack;


    public static void setHost(String host) {
        XRMqttService.host = host;
    }

    public static void setUsername(String username) {
        XRMqttService.username = username;
    }

    public static void setPassword(String password) {
        XRMqttService.password = password;
    }

    public static void setSubTopic(String subTopic) {
        XRMqttService.subTopic = subTopic;
    }

    public static void setClientId(String clientId) {
        XRMqttService.clientId = clientId;
    }

    @Override
    public void onCreate() {
        super.onCreate();
        init();
    }

    /**
     * 发送消息
     * @param topic 主题
     * @param msg 待发送的消息
     */
    public static void publish(String topic, String msg) {
        int qos = 0;
        try {
            if (client != null) {
                client.publish(topic, msg.getBytes(), (int) qos, false);
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 发送消息
     * @param topic 主题
     * @param msg 消息
     * @param qos 服务质量
     */
    public static void publish(String topic, String msg, int qos) {
        try {
            if (client != null) {
                client.publish(topic, msg.getBytes(), qos, false);
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private void init() {
        // 服务器地址(协议+地址+端口号)
        client = new MqttAndroidClient(this, host, clientId);
        // 设置MQTT监听并且接受消息
        client.setCallback(mqttCallback);

        conOpt = new MqttConnectOptions();
        // 清除缓存
        conOpt.setCleanSession(true);
        // 设置超时时间,单位:秒
        conOpt.setConnectionTimeout(10);
        // 心跳包发送间隔,单位:秒
        conOpt.setKeepAliveInterval(60);
        //设置自动重连
        conOpt.setAutomaticReconnect(true);
//        conOpt.setUserName(username);
//        conOpt.setPassword(password.toCharArray());     //将字符串转换为字符串数组
        doClientConnection();
    }


    @Override
    public void onDestroy() {
        stopSelf();
        try {
            if (client != null) {
                client.disconnect();
                Thread.sleep(50);
                client.unregisterResources();
            }
        } catch (MqttException | InterruptedException e) {
            e.printStackTrace();
        }
        super.onDestroy();
    }

    /**
     * 连接MQTT服务器
     */
    private void doClientConnection() {
        if (!client.isConnected() && isConnectIsNormal()) {
            try {
                client.connect(conOpt, null, iMqttActionListener);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

    }

    // MQTT是否连接成功
    private final IMqttActionListener iMqttActionListener = new IMqttActionListener() {

        @Override
        public void onSuccess(IMqttToken arg0) {
            Log.i(TAG, "连接成功 ");
            try {
                // 订阅subTopic话题
                client.subscribe(subTopic, 0);
                if (IGetMessageCallBack != null) {
                    IGetMessageCallBack.onSuccess(arg0);
                }
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void onFailure(IMqttToken arg0, Throwable arg1) {
            arg1.printStackTrace();
            // 连接失败,重连
            if (IGetMessageCallBack != null) {
                IGetMessageCallBack.onFailure(arg0, arg1);
            }
        }
    };

    // MQTT监听并且接受消息
    private final MqttCallback mqttCallback = new MqttCallback() {

        @Override
        public void messageArrived(String topic, MqttMessage message) {

            String str1 = new String(message.getPayload());
            if (IGetMessageCallBack != null) {
                IGetMessageCallBack.setMessage(str1);
            }
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {

        }

        @Override
        public void connectionLost(Throwable arg0) {
            // 失去连接,重连
            if (IGetMessageCallBack != null) {
                IGetMessageCallBack.connectionLost(arg0);
            }
        }
    };

    /**
     * 判断网络是否连接
     */
    private boolean isConnectIsNormal() {
        ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext()
                .getSystemService(Context.CONNECTIVITY_SERVICE);
        NetworkInfo info = connectivityManager != null ? connectivityManager.getActiveNetworkInfo() : null;
        if (info != null && info.isAvailable()) {
            String name = info.getTypeName();
            Log.e(TAG, "MQTT当前网络名称:" + name);
            return true;
        } else {
            Log.e(TAG, "MQTT 没有可用网络");
            return false;
        }
    }


    @Override
    public IBinder onBind(Intent intent) {
        Log.e(getClass().getName(), "onBind");
        return new CustomBinder();
    }

    public void setIGetMessageCallBack(IGetMessageCallBack IGetMessageCallBack) {
        this.IGetMessageCallBack = IGetMessageCallBack;
    }

    public class CustomBinder extends Binder {
        public XRMqttService getService() {
            return XRMqttService.this;
        }
    }
}

XRServiceConnection

public class XRServiceConnection implements ServiceConnection {

    private XRMqttService XRMqttService;
    private IGetMessageCallBack IGetMessageCallBack;

    @Override
    public void onServiceConnected(ComponentName componentName, IBinder iBinder) {
        XRMqttService = ((XRMqttService.CustomBinder)iBinder).getService();
        XRMqttService.setIGetMessageCallBack(IGetMessageCallBack);
    }

    @Override
    public void onServiceDisconnected(ComponentName componentName) {

    }

    public XRMqttService getMqttService(){
        return XRMqttService;
    }

    public void setIGetMessageCallBack(IGetMessageCallBack IGetMessageCallBack){
        this.IGetMessageCallBack = IGetMessageCallBack;
    }
}

IGetMessageCallBcak

数据回调,调用mqtt服务的主类中必须继承该接口

public interface IGetMessageCallBack {
    void setMessage(String message);
    void onSuccess(IMqttToken arg0);
    void onFailure(IMqttToken arg0, Throwable arg1);
    void connectionLost(Throwable arg0);
}

Activity中引用

private String getRandomString(int length) {
	String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
	Random random = new Random();
	StringBuffer sb = new StringBuffer();
	for (int i = 0; i < length; i++) {
		int number = random.nextint(62);
		sb.append(str.charAt(number));
	}
	return sb.toString();
}

@Override
public void setMessage(String message) {
	try {
		//          解析泛型数据,应该如此封装数据
		Type type = new TypeToken<SensorData<SubData>>() {}.getType();
		SensorData<SubData> sensorData = gson.fromJson(message, type);
		Log.e(TAG, "setMessage: " + sensorData.toString());
		if (sensorData.getSensor().equals("SF6")) {
			sf6.setText(getString(R.string.sf6, sensorData.getData().getConcentration()) + "ppm");
			sf6.setTextColor((double) sensorData.getData().getConcentration() >= 1000 ? 0xffff0000 : 0xff0093dd);
		} else if (sensorData.getSensor().equals("O2")) {
			oxygen.setText(getString(R.string.oxygen, sensorData.getData().getConcentration()) + "%");
			oxygen.setTextColor((double) sensorData.getData().getConcentration() <= 18 ? 0xffff0000 : 0xff0093dd);
		}
	}
	catch (Exception e) {
		Log.e(TAG, "setMessage: " + e.getMessage());
	}
}

@Override
public void onSuccess(IMqttToken arg0) {
	Toast.makeText(this, "XRMSC: Connected", Toast.LENGTH_lONG).show();
}

@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
	Log.e(TAG, "onFailure: 连接失败");
	Toast.makeText(this, "XRMSC: Error", Toast.LENGTH_lONG).show();
}

@Override
public void connectionLost(Throwable arg0) {
	Toast.makeText(this, "XRMSC: Connection lost", Toast.LENGTH_lONG).show();
}
XRServiceConnection serviceConnection;

@Override
public void onCreate(Bundle icicle) {
	// 省略
	serviceConnection = new XRServiceConnection();
	serviceConnection.setIGetMessageCallBack(WIFIRobot.this);
	XRMqttService.setHost("tcp://"+Constant.CONTROL_URL);
	XRMqttService.setSubTopic("xrrobot/client/#");
	XRMqttService.setClientId("android-iot-" + getRandomString(6));
	//		XRMqttService.setUsername("android-client");
	bindService(new Intent(this, XRMqttService.class), serviceConnection, Context.BIND_AUTO_CREATE);
}

@Override
protected void onDestroy() {
	if (serviceConnection != null) {
	// 用完之后,需要手动解绑,否则会报异常
		unbindService(serviceConnection);
	}
}

泛型保持

由于在android中使用了泛型类用以解析和编写JSON对象,导致在打包之后,由代码没有设置混淆规则数据无法正常解析,一帧完整的JSON数据到最后可能就成了这样的形式

  • 原始数据
{
    "data": {
        "cali": 1
    },
    "deviceId": "xr_thx_sensor_1a2b3c",
    "sensor": "O2"
}
  • 混淆之后
{
    "a": "O2",
    "b": "xr_thx_sensor_1a2b3c",
    "c": {
        "a": 1
    }
}

因此为了解决上面的问题,需要在混淆文件中添加如下内容

```shell
# 保持定义的泛型类所在的包的所有内容,保持不变
-keep class com.mqtt.utils.bean.** { *; }

-keepattributes InnerClasses
-keepattributes Signature
```