1.简介
一个java写的mqtt客户端。项目地址:
https://github.com/fusesource/mqtt-client
2.引入fusesource-mqtt-client库
- File--->Project Structure--->Dependencies
- 点绿色+
- 在弹出的窗口中输入“‘mqtt-client”回车搜索
- 在结果中选择org.fusesource.mqtt-client:mqtt-client:1.xxx
3.示例代码
3.1 参考代码
activeMQ服务端软件内提供的示例代码 | apache-activemq-5.15.0/examples/mqtt/java/ |
dzone提供的示例 | https://dzone.com/articles/android-mqtt-activemq |
github上的示例代码 | https://github.com/fusesource/mqtt-client#using-the-callbackcontinuation-passing-based-api |
3.2 效果
3.3 源码
1 package com.example.tt.mqtt; 2 3 import android.app.NotificationManager; 4 import android.app.PendingIntent; 5 import android.content.Context; 6 import android.content.Intent; 7 import android.os.Bundle; 8 import android.support.v4.app.TaskStackBuilder; 9 import android.support.v7.app.AppCompatActivity; 10 import android.support.v7.app.NotificationCompat; 11 import android.util.Log; 12 import android.view.View; 13 import android.widget.Button; 14 import android.widget.CheckBox; 15 import android.widget.CompoundButton; 16 import android.widget.EditText; 17 import android.widget.TextView; 18 import android.widget.ToggleButton; 19 20 import org.fusesource.hawtbuf.Buffer; 21 import org.fusesource.hawtbuf.UTF8Buffer; 22 import org.fusesource.mqtt.client.BlockingConnection; 23 import org.fusesource.mqtt.client.Callback; 24 import org.fusesource.mqtt.client.CallbackConnection; 25 import org.fusesource.mqtt.client.Listener; 26 import org.fusesource.mqtt.client.MQTT; 27 import org.fusesource.mqtt.client.Message; 28 import org.fusesource.mqtt.client.QoS; 29 import org.fusesource.mqtt.client.Topic; 30 31 import java.net.URISyntaxException; 32 33 34 public class MainActivity extends AppCompatActivity implements View.OnClickListener,CompoundButton.OnCheckedChangeListener { 35 36 final static String TAG = "MQTTClient"; 37 38 //UI 39 ToggleButton btnConnect; 40 Button btnPublish, btnSubscribe; 41 EditText edtServer,edtMessage,edtTopic,edtClientID; 42 TextView received; 43 CheckBox cbxPersist; 44 45 //MQTT 46 final static String clientId = "android"; 47 final static short keepAlive = 255; 48 final static String host = "192.168.1.101"; 49 final static String user = "guest"; 50 final static int port = 1883; 51 final static String password = "admin"; 52 53 MQTT mqtt ; 54 Listener listener ; 55 CallbackConnection callbackConnection ; 56 Callback<Void> connectCallback ; 57 Callback<byte[]> subscribeCallback ; 58 Callback<Void> publishCallback ; 59 Callback<Void> disconnectCallback ; 60 61 { 62 connectCallback = new Callback<Void>(){ 63 64 @Override 65 public void onSuccess(Void value) { 66 Log.d(TAG, "connectCallback : onSuccess"); 67 received.post(new Runnable() { 68 @Override 69 public void run() { 70 received.setText("connectCallback success"); 71 } 72 }); 73 74 } 75 @Override 76 public void onFailure(Throwable value) { 77 value.printStackTrace(); 78 Log.d(TAG, "connectCallback : failure"); 79 received.post(new Runnable() { 80 @Override 81 public void run() { 82 received.setText("connectCallback failure"); 83 } 84 }); 85 System.exit(-2); 86 } 87 }; 88 disconnectCallback = new Callback<Void>(){ 89 90 public void onSuccess(Void value) { 91 received.post(new Runnable() { 92 @Override 93 public void run() { 94 received.setText("disconnect success"); 95 } 96 }); 97 } 98 public void onFailure(Throwable e) { 99 received.post(new Runnable() { 100 @Override 101 public void run() { 102 received.setText("disconnect failure"); 103 } 104 }); 105 } 106 }; 107 108 listener = new Listener() { 109 110 @Override 111 public void onConnected() { 112 Log.d(TAG, "listener onConnected"); 113 received.post(new Runnable() { 114 @Override 115 public void run() { 116 received.setText("listener onConnected"); 117 } 118 }); 119 } 120 121 @Override 122 public void onDisconnected() { 123 Log.d(TAG, "listener onDisconnected"); 124 received.post(new Runnable() { 125 @Override 126 public void run() { 127 received.setText("listener onDisconnected"); 128 } 129 }); 130 } 131 132 @Override 133 public void onPublish(final UTF8Buffer topic, Buffer msg, Runnable ack) { 134 final String body = msg.utf8().toString(); 135 Log.d(TAG, "onPublish: " + body); 136 received.post(new Runnable() { 137 @Override 138 public void run() { 139 makeNotification(topic.toString(),body); 140 received.append(" received : " + body); 141 } 142 }); 143 } 144 145 @Override 146 public void onFailure(Throwable value) { 147 Log.d(TAG, "listener failure"); 148 received.post(new Runnable() { 149 @Override 150 public void run() { 151 received.setText("listener failure"); 152 } 153 }); 154 } 155 }; 156 157 subscribeCallback = new Callback<byte[]>() { 158 159 public void onSuccess(byte[] qoses) { 160 Log.d(TAG, "subscribe : success"); 161 162 received.post(new Runnable() { 163 @Override 164 public void run() { 165 received.setText("subscribe " + edtTopic.getText().toString() + ": success"); 166 } 167 }); 168 } 169 public void onFailure(Throwable value) { 170 value.printStackTrace(); 171 Log.d(TAG, "subscribe : failure"); 172 received.post(new Runnable() { 173 @Override 174 public void run() { 175 received.setText("subscribe " + edtTopic.getText().toString() + ": failure"); 176 } 177 }); 178 System.exit(-2); 179 } 180 }; 181 publishCallback = new Callback<Void>() { 182 @Override 183 public void onSuccess(Void value) { 184 Log.d(TAG, "onSuccess: "); 185 } 186 187 @Override 188 public void onFailure(Throwable value) { 189 Log.d(TAG, "onFailure: "); 190 } 191 }; 192 } 193 194 void connect(){ 195 callbackConnection.connect(connectCallback); 196 } 197 198 void disconnect(){ 199 callbackConnection.disconnect(disconnectCallback); 200 } 201 202 void subscribe(){ 203 204 String topicName = edtTopic.getText().toString().trim(); 205 206 Topic topics[] = new Topic[]{new Topic(topicName,QoS.AT_LEAST_ONCE)}; 207 208 callbackConnection.subscribe(topics,subscribeCallback); 209 210 } 211 212 void publish(){ 213 214 String data = edtMessage.getText().toString(); 215 216 String topicName = edtTopic.getText().toString().trim(); 217 218 callbackConnection.publish(topicName,data.getBytes(),QoS.AT_LEAST_ONCE,false,publishCallback); 219 220 } 221 222 void initMqtt(){ 223 224 mqtt = new MQTT(); 225 try { 226 mqtt.setHost(host, port); 227 mqtt.setUserName(user); 228 mqtt.setPassword(password); 229 mqtt.setKeepAlive(keepAlive); 230 mqtt.getClientId(); 231 callbackConnection = mqtt.callbackConnection(); 232 callbackConnection.listener(listener); 233 234 } catch (URISyntaxException e) { 235 e.printStackTrace(); 236 Log.e(TAG,"-=-=-=-=-=-=------------==== initMqtt exception : " + e.getMessage()); 237 } 238 } 239 240 @Override 241 protected void onCreate(Bundle savedInstanceState) { 242 super.onCreate(savedInstanceState); 243 setContentView(R.layout.activity_main); 244 245 received = (TextView) findViewById(R.id.txt_received); 246 btnSubscribe= (Button) findViewById(R.id.btn_subscribe); 247 btnConnect = (ToggleButton)findViewById(R.id.btn_connect); 248 btnPublish = (Button) findViewById(R.id.btn_publish); 249 edtServer = (EditText) findViewById(R.id.edt_server); 250 edtTopic = (EditText) findViewById(R.id.edt_topic); 251 edtMessage = (EditText) findViewById(R.id.edt_message); 252 edtClientID = (EditText) findViewById(R.id.edt_clientID); 253 cbxPersist = (CheckBox) findViewById(R.id.cbx_persist); 254 255 btnConnect .setOnClickListener(this); 256 btnConnect .setOnCheckedChangeListener(this); 257 cbxPersist .setOnCheckedChangeListener(this); 258 btnSubscribe.setOnClickListener(this); 259 btnPublish .setOnClickListener(this); 260 261 initMqtt(); 262 263 } 264 void makeNotification(final String title,final String content){ 265 266 NotificationCompat.Builder mBuilder = new NotificationCompat.Builder(this); 267 mBuilder.setSmallIcon(R.drawable.mail_3_small);//must 268 mBuilder.setContentTitle(title); 269 mBuilder.setContentText(content); 270 // Creates an explicit intent for an Activity in your app 271 Intent resultIntent = new Intent(this, MainActivity.class); 272 273 // The stack builder object will contain an artificial back stack for the 274 // started Activity. 275 // This ensures that navigating backward from the Activity leads out of 276 // your app to the Home screen. 277 TaskStackBuilder stackBuilder = TaskStackBuilder.create(this); 278 // Adds the back stack for the Intent (but not the Intent itself) 279 stackBuilder.addParentStack(MainActivity.class); 280 // Adds the Intent that starts the Activity to the top of the stack 281 stackBuilder.addNextIntent(resultIntent); 282 PendingIntent resultPendingIntent = stackBuilder.getPendingIntent(0,PendingIntent.FLAG_UPDATE_CURRENT); 283 mBuilder.setContentIntent(resultPendingIntent); 284 NotificationManager mNotificationManager = 285 (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE); 286 287 // mNotificationId is a unique integer your app uses to identify the 288 // notification. For example, to cancel the notification, you can pass its ID 289 // number to NotificationManager.cancel(). 290 mNotificationManager.notify(R.string.app_name, mBuilder.build()); 291 } 292 293 void blocking(){ 294 BlockingConnection connection = mqtt.blockingConnection(); 295 try { 296 connection.connect(); 297 //publish 298 connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false); 299 300 //subscribe 301 Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)}; 302 byte[] qoses = connection.subscribe(topics); 303 304 //receive message 305 Message message = connection.receive(); 306 System.out.println(message.getTopic()); 307 byte[] payload = message.getPayload(); 308 // process the message then: 309 message.ack(); 310 311 //disconnect 312 connection.disconnect(); 313 } catch (Exception e) { 314 e.printStackTrace(); 315 } 316 } 317 318 @Override 319 public void onClick(View view) { 320 switch (view.getId()){ 321 case R.id.btn_publish : publish(); break; 322 case R.id.btn_subscribe : subscribe(); break; 323 } 324 325 } 326 327 @Override 328 public void onCheckedChanged(CompoundButton compoundButton, boolean b) { 329 switch (compoundButton.getId()){ 330 case R.id.btn_connect: 331 if (!b){ 332 connect(); 333 }else{ 334 disconnect(); 335 } 336 break; 337 case R.id.cbx_persist: 338 if (mqtt != null) { 339 mqtt.setClientId(edtClientID.getText().toString().trim()); 340 mqtt.setCleanSession(!b); 341 } 342 break; 343 } 344 } 345 346 }
3.4 完整下载地址
https://git.oschina.net/xi/mqtt-client-demo.git
4.MQTT 常用方法介绍
setClientId |
Use to set the client Id of the session. This is what an MQTT server uses to identify a session where The id must be 23 characters or less. Defaults to auto generated id (based on your socket address, port and timestamp). 每个客户端id不要相同
指定id后,才可以调用setCleanSession,持久保存订阅的会话,哪个客户端订阅了哪个主题就保存在某个会话中。
|
setCleanSession | Set to false if you want the MQTT server to persist topic subscriptions and ack positions across client sessions. Defaults to true.
设置false时,服务端将不清除会话,这样就可以持久保存订阅关系。
|
setKeepAlive |
Configures the Keep Alive timer in seconds. Defines the maximum time interval between messages received from a client. It enables the server to detect that the network connection to a client has dropped, without having to wait for the long TCP/IP timeout. 设置保活时间,单位是秒,默认为tpc连接时间。
|
setUserName | Sets the user name used to authenticate against the server.
设置服务端验证的用户名
|
setPassword | Sets the password used to authenticate against the server.
设置验证用户的密码
|
setWillTopic |
If set the server will publish the client's Will message to the specified topics if the client has an unexpected disconnection. 当客户端异常断开时,服务器按这里指定的主题发意愿消息。
|
setWillMessage | The Will message to send. Defaults to a zero length message.
意愿消息
|
setWillQos | Sets the quality of service to use for the Will message. Defaults to QoS.AT_MOST_ONCE.
意愿消息的QoS
|
setWillRetain | Set to true if you want the Will to be published with the retain option. |
setVersion | Set to "3.1.1" to use MQTT version 3.1.1. Otherwise defaults to the 3.1 protocol version.
设置MQTT协议版本
|