十年河东,十年河西,莫欺少年穷
学无止境,精益求精
近几天一直都在看阿里云的IOT云服务及消息队列MNS,一头雾水好几天了,直到今天,总算有点收获了,记录下来,方便以后查阅。
首先借用阿里云的一张图来说明:设备是如何通过云服务平台和企业服务器‘通话的’
针对此图,作如下说明:
1、物联网平台作为中间组件,主要是通过消息队MNS列来实现设备和企业服务器对话的,具体可描述为:
1.1、设备发送指令至物联网平台的MNS队列,MNS队列将设备指令收录,需要说明的是:设备发送指令是通过嵌入式开发人员开发的,例如C语言
1.2、企业通过C#、JAVA、PHP等高级语言开发人员开发监听程序,当监听到MNS队列中的设备指令时,获取指令,做相关业务处理,并发送新的设备指令至MNS队列。【例如发送快递柜关门的指令】
1.3、企业发送的指令被MNS收录,设备同样通过监听程序获取企业服务器发送的关门指令,收到关门指令的设备执行相关指令,完成自动关门操作。
以上便是设备与企业服务器之间的对话过程
下面列出C#的监听MNS代码【需要MNS C# JDK 的支持】注意:消息是经过EncodeBase64编码,接受消息要解码,发送消息要编码
异步监听:
using System; using System.Threading; using System.Threading.Tasks; using Aliyun.MNS; using Aliyun.MNS.Model; using IotCommon; using IotDtos.MongodbDtos; using IotService.Device; using IotService.MongoDb; namespace IotListener { class Program { private static MongoLogService _logService; public static string _receiptHandle; public static DeviceResponseService service = new DeviceResponseService(); public static Queue nativeQueue; static void Main(string[] args) { LogstoreDatabaseSettings st = new LogstoreDatabaseSettings() { LogsCollectionName = "LogsForDg_" + DateTime.Now.ToString("yyyyMMdd") }; _logService = new MongoLogService(st); while (true) { try { IMNS client = new Aliyun.MNS.MNSClient(IotParm._accessKeyId, IotParm._secretAccessKey, IotParm._endpoint, IotParm._stsToken); nativeQueue = client.GetNativeQueue(IotParm._queueName); for (int i = 0; i < IotParm._receiveTimes; i++) { ReceiveMessageRequest request = new ReceiveMessageRequest(1); nativeQueue.BeginReceiveMessage(request, ListenerCallback, null); Thread.Sleep(1); } } catch (Exception ex) { Console.WriteLine("Receive message failed, exception info: " + ex.Message); } } } /// <summary> /// 回调函数 /// </summary> /// <param name="ar"></param> public static void ListenerCallback(IAsyncResult ar) { try { Message message = nativeQueue.EndReceiveMessage(ar).Message; string Json = Base64Helper.DecodeBase64(message.Body); Console.WriteLine("Message: {0}", Json); Console.WriteLine("---------------------------------------------------- "); var methodValue = JsonKeyHelper.GetJsonValue(Json, "method"); DeviceResponse(methodValue, Json); if (!string.IsNullOrEmpty(methodValue)) { _logService.Create(new LogsForDgModel { CreateTime = DateTime.Now, data = Json, methodNo = methodValue }); } _receiptHandle = message.ReceiptHandle; nativeQueue.DeleteMessage(_receiptHandle); } catch (Exception ex) { Console.WriteLine("Receive message failed, exception info: " + ex.Message); } } /// <summary> /// 响应设备上传接口 /// </summary> /// <param name="method"></param> /// <param name="message"></param> public static void DeviceResponse(string method, string message) { switch (method) { case "doorClosedReport": service.doorClosedReportResponse(message); break; case "doorOpenReport": service.doorOpenReportResponse(message); break; case "deviceStartReportToCloud": service.deviceStartReportToCloudResponse(message); break; case "qryDeviceConfig": service.qryDeviceConfigResponse(message); break; case "devicePingToCloud": service.devicePingToCloudResponse(message); break; case "deviceFatalReport": service.deviceFatalReportResponse(message); break; case "deviceVersionReport": service.deviceVersionReportResponse(message); break; case "deviceFirmwareData": service.deviceFirmwareDataResponse(message); break; case "deviceLocationReport": service.deviceLocationReportResponse(message); break; } } } }
同步监听:
using Aliyun.MNS; using Aliyun.MNS.Model; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace MnsListener { class Program { #region Private Properties private const string _accessKeyId = ""; private const string _secretAccessKey = ""; private const string _endpoint = "http://.mns.cn-shanghai.aliyuncs.com/"; private const string _stsToken = null; private const string _queueName = "Sub"; private const string _queueNamePrefix = "my"; private const int _receiveTimes = 1; private const int _receiveInterval = 2; private const int batchSize = 6; private static string _receiptHandle; #endregion static void Main(string[] args) { while (true) { try { IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint, _stsToken); var nativeQueue = client.GetNativeQueue(_queueName); for (int i = 0; i < _receiveTimes; i++) { var receiveMessageResponse = nativeQueue.ReceiveMessage(3); Console.WriteLine("Receive message successfully, status code: {0}", receiveMessageResponse.HttpStatusCode); Console.WriteLine("----------------------------------------------------"); Message message = receiveMessageResponse.Message; string s = DecodeBase64(message.Body); Console.WriteLine("MessageId: {0}", message.Id); Console.WriteLine("ReceiptHandle: {0}", message.ReceiptHandle); Console.WriteLine("MessageBody: {0}", message.Body); Console.WriteLine("MessageBodyMD5: {0}", message.BodyMD5); Console.WriteLine("EnqueueTime: {0}", message.EnqueueTime); Console.WriteLine("NextVisibleTime: {0}", message.NextVisibleTime); Console.WriteLine("FirstDequeueTime: {0}", message.FirstDequeueTime); Console.WriteLine("DequeueCount: {0}", message.DequeueCount); Console.WriteLine("Priority: {0}", message.Priority); Console.WriteLine("---------------------------------------------------- "); _receiptHandle = message.ReceiptHandle; nativeQueue.DeleteMessage(_receiptHandle); Thread.Sleep(_receiveInterval); } } catch (Exception ex) { Console.WriteLine("Receive message failed, exception info: " + ex.Message); } } } ///编码 public static string EncodeBase64(string code, string code_type= "utf-8") { string encode = ""; byte[] bytes = Encoding.GetEncoding(code_type).GetBytes(code); try { encode = Convert.ToBase64String(bytes); } catch { encode = code; } return encode; } ///解码 public static string DecodeBase64(string code, string code_type = "utf-8") { string decode = ""; byte[] bytes = Convert.FromBase64String(code); try { decode = Encoding.GetEncoding(code_type).GetString(bytes); } catch { decode = code; } return decode; } } }
发送消息:
using Aliyun.MNS; using Aliyun.MNS.Model; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace MnsSendMsg { class Program { #region Private Properties private const string _accessKeyId = ""; private const string _secretAccessKey = ""; private const string _endpoint = "http://.mns.cn-shanghai.aliyuncs.com/"; private const string _stsToken = null; private const string _queueName = "Sub"; private const string _queueNamePrefix = "my"; private const int _receiveTimes = 1; private const int _receiveInterval = 2; private const int batchSize = 6; private static string _receiptHandle; #endregion static void Main(string[] args) { try { IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint, _stsToken); // 1. 获取Queue的实例 var nativeQueue = client.GetNativeQueue(_queueName); var sendMessageRequest = new SendMessageRequest(EncodeBase64("阿里云<MessageBody>计算")); sendMessageRequest.DelaySeconds = 2; var sendMessageResponse = nativeQueue.SendMessage(sendMessageRequest); Console.WriteLine("Send message successfully,{0}", sendMessageResponse.ToString()); Thread.Sleep(2000); } catch (Exception ex) { Console.WriteLine("Send message failed, exception info: " + ex.Message); } } ///编码 public static string EncodeBase64(string code, string code_type = "utf-8") { string encode = ""; byte[] bytes = Encoding.GetEncoding(code_type).GetBytes(code); try { encode = Convert.ToBase64String(bytes); } catch { encode = code; } return encode; } ///解码 public static string DecodeBase64(string code, string code_type = "utf-8") { string decode = ""; byte[] bytes = Convert.FromBase64String(code); try { decode = Encoding.GetEncoding(code_type).GetString(bytes); } catch { decode = code; } return decode; } } }
关于MNS C# JDK下载,可以去阿里云:https://help.aliyun.com/document_detail/32447.html?spm=a2c4g.11186623.6.633.61395f64IfHTRo
关于MNS队列,主题,主题订阅相关知识:https://help.aliyun.com/document_detail/34445.html?spm=a2c4g.11186623.6.542.699f38c6RO3nDS
关于阿里云AMQP队列接入,可以查询:https://help.aliyun.com/document_detail/149716.html?spm=a2c4g.11186623.6.621.2cda31b4kS1zXR
关于阿里云物联网平台,请查阅:https://help.aliyun.com/document_detail/125800.html?spm=a2c4g.11186623.6.542.7b0241c8o5r6PT
最后:
@天才卧龙的博客