zoukankan      html  css  js  c++  java
  • C#代码实现阿里云消息服务MNS消息监听

    十年河东,十年河西,莫欺少年穷

    学无止境,精益求精

    近几天一直都在看阿里云的IOT云服务及消息队列MNS,一头雾水好几天了,直到今天,总算有点收获了,记录下来,方便以后查阅。

    首先借用阿里云的一张图来说明:设备是如何通过云服务平台和企业服务器‘通话的’

    针对此图,作如下说明:

    1、物联网平台作为中间组件,主要是通过消息队MNS列来实现设备和企业服务器对话的,具体可描述为:

    1.1、设备发送指令至物联网平台的MNS队列,MNS队列将设备指令收录,需要说明的是:设备发送指令是通过嵌入式开发人员开发的,例如C语言

    1.2、企业通过C#、JAVA、PHP等高级语言开发人员开发监听程序,当监听到MNS队列中的设备指令时,获取指令,做相关业务处理,并发送新的设备指令至MNS队列。【例如发送快递柜关门的指令】

    1.3、企业发送的指令被MNS收录,设备同样通过监听程序获取企业服务器发送的关门指令,收到关门指令的设备执行相关指令,完成自动关门操作。

    以上便是设备与企业服务器之间的对话过程

    下面列出C#的监听MNS代码【需要MNS C# JDK 的支持】注意:消息是经过EncodeBase64编码,接受消息要解码,发送消息要编码

    异步监听:

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using Aliyun.MNS;
    using Aliyun.MNS.Model;
    using IotCommon;
    using IotDtos.MongodbDtos;
    using IotService.Device;
    using IotService.MongoDb;
    
    namespace IotListener
    {
        class Program
        {
            private static MongoLogService _logService;
            public static string _receiptHandle;
            public static DeviceResponseService service = new DeviceResponseService();
            public static Queue nativeQueue;
            static void Main(string[] args)
            {
                LogstoreDatabaseSettings st = new LogstoreDatabaseSettings() { LogsCollectionName = "LogsForDg_" + DateTime.Now.ToString("yyyyMMdd") };
                _logService = new MongoLogService(st);
                while (true)
                {
                    try
                    {
                        IMNS client = new Aliyun.MNS.MNSClient(IotParm._accessKeyId, IotParm._secretAccessKey, IotParm._endpoint, IotParm._stsToken);
                        nativeQueue = client.GetNativeQueue(IotParm._queueName);
                        for (int i = 0; i < IotParm._receiveTimes; i++)
                        {
                            ReceiveMessageRequest request = new ReceiveMessageRequest(1);
                            nativeQueue.BeginReceiveMessage(request, ListenerCallback, null);
                            Thread.Sleep(1);
                        }
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("Receive message failed, exception info: " + ex.Message);
                    }
                }
            }
    
            /// <summary>
            /// 回调函数
            /// </summary>
            /// <param name="ar"></param>
            public static void ListenerCallback(IAsyncResult ar)
            {
                try
                {
                    Message message = nativeQueue.EndReceiveMessage(ar).Message;
                    string Json = Base64Helper.DecodeBase64(message.Body);
                    Console.WriteLine("Message: {0}", Json);
                    Console.WriteLine("----------------------------------------------------
    ");
                    var methodValue = JsonKeyHelper.GetJsonValue(Json, "method");
                    DeviceResponse(methodValue, Json);
                    if (!string.IsNullOrEmpty(methodValue))
                    {
                        _logService.Create(new LogsForDgModel { CreateTime = DateTime.Now, data = Json, methodNo = methodValue });
                    }
                    _receiptHandle = message.ReceiptHandle;
                    nativeQueue.DeleteMessage(_receiptHandle);
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Receive message failed, exception info: " + ex.Message);
                }
    
            }
    
            /// <summary>
            /// 响应设备上传接口
            /// </summary>
            /// <param name="method"></param>
            /// <param name="message"></param>
            public static void DeviceResponse(string method, string message)
            {
                switch (method)
                {
                    case "doorClosedReport": service.doorClosedReportResponse(message); break;
                    case "doorOpenReport": service.doorOpenReportResponse(message); break;
                    case "deviceStartReportToCloud": service.deviceStartReportToCloudResponse(message); break;
                    case "qryDeviceConfig": service.qryDeviceConfigResponse(message); break;
                    case "devicePingToCloud": service.devicePingToCloudResponse(message); break;
                    case "deviceFatalReport": service.deviceFatalReportResponse(message); break;
                    case "deviceVersionReport": service.deviceVersionReportResponse(message); break;
                    case "deviceFirmwareData": service.deviceFirmwareDataResponse(message); break;
                    case "deviceLocationReport": service.deviceLocationReportResponse(message); break;
                }
            }
        }
    }
    View Code

    同步监听:

    using Aliyun.MNS;
    using Aliyun.MNS.Model;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace MnsListener
    {
        class Program
        {
            #region Private Properties
            private const string _accessKeyId = "";
            private const string _secretAccessKey = "";
            private const string _endpoint = "http://.mns.cn-shanghai.aliyuncs.com/";
            private const string _stsToken = null;
    
            private const string _queueName = "Sub";
            private const string _queueNamePrefix = "my";
            private const int _receiveTimes = 1;
            private const int _receiveInterval = 2;
            private const int batchSize = 6;
            private static string _receiptHandle;
    
            #endregion
    
    
            static void Main(string[] args)
            {
                while (true)
                {
                    try
                    {
                        IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint, _stsToken);
                        var nativeQueue = client.GetNativeQueue(_queueName);
                        for (int i = 0; i < _receiveTimes; i++)
                        {
                            var receiveMessageResponse = nativeQueue.ReceiveMessage(3);
                            Console.WriteLine("Receive message successfully, status code: {0}", receiveMessageResponse.HttpStatusCode);
                            Console.WriteLine("----------------------------------------------------");
                            Message message = receiveMessageResponse.Message;
                            string s = DecodeBase64(message.Body);
                            Console.WriteLine("MessageId: {0}", message.Id);
                            Console.WriteLine("ReceiptHandle: {0}", message.ReceiptHandle);
                            Console.WriteLine("MessageBody: {0}", message.Body);
                            Console.WriteLine("MessageBodyMD5: {0}", message.BodyMD5);
                            Console.WriteLine("EnqueueTime: {0}", message.EnqueueTime);
                            Console.WriteLine("NextVisibleTime: {0}", message.NextVisibleTime);
                            Console.WriteLine("FirstDequeueTime: {0}", message.FirstDequeueTime);
                            Console.WriteLine("DequeueCount: {0}", message.DequeueCount);
                            Console.WriteLine("Priority: {0}", message.Priority);
                            Console.WriteLine("----------------------------------------------------
    ");
    
                            _receiptHandle = message.ReceiptHandle;
                            nativeQueue.DeleteMessage(_receiptHandle);
    
                            Thread.Sleep(_receiveInterval);
                        }
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("Receive message failed, exception info: " + ex.Message);
                    }
                }
    
            }
    
            ///编码
            public static string EncodeBase64(string code, string code_type= "utf-8")
            {
                string encode = "";
                byte[] bytes = Encoding.GetEncoding(code_type).GetBytes(code);
                try
                {
                    encode = Convert.ToBase64String(bytes);
                }
                catch
                {
                    encode = code;
                }
                return encode;
            }
            ///解码
            public static string DecodeBase64(string code, string code_type = "utf-8")
            {
                string decode = "";
                byte[] bytes = Convert.FromBase64String(code);
                try
                {
                    decode = Encoding.GetEncoding(code_type).GetString(bytes);
                }
                catch
                {
                    decode = code;
                }
                return decode;
            }
        }
    }
    View Code

    发送消息:

    using Aliyun.MNS;
    using Aliyun.MNS.Model;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace MnsSendMsg
    {
        class Program
        {
            #region Private Properties
            private const string _accessKeyId = "";
            private const string _secretAccessKey = "";
            private const string _endpoint = "http://.mns.cn-shanghai.aliyuncs.com/";
            private const string _stsToken = null;
    
            private const string _queueName = "Sub";
            private const string _queueNamePrefix = "my";
            private const int _receiveTimes = 1;
            private const int _receiveInterval = 2;
            private const int batchSize = 6;
            private static string _receiptHandle;
            #endregion
            static void Main(string[] args)
            {
                try
                {
                    IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint, _stsToken);
                    // 1. 获取Queue的实例
                    var nativeQueue = client.GetNativeQueue(_queueName);
                    var sendMessageRequest = new SendMessageRequest(EncodeBase64("阿里云<MessageBody>计算"));
                    sendMessageRequest.DelaySeconds = 2;
                    var sendMessageResponse = nativeQueue.SendMessage(sendMessageRequest);
                    Console.WriteLine("Send message successfully,{0}",
                        sendMessageResponse.ToString());
                    Thread.Sleep(2000);
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Send message failed, exception info: " + ex.Message);
                }
            }
    
            ///编码
            public static string EncodeBase64(string code, string code_type = "utf-8")
            {
                string encode = "";
                byte[] bytes = Encoding.GetEncoding(code_type).GetBytes(code);
                try
                {
                    encode = Convert.ToBase64String(bytes);
                }
                catch
                {
                    encode = code;
                }
                return encode;
            }
            ///解码
            public static string DecodeBase64(string code, string code_type = "utf-8")
            {
                string decode = "";
                byte[] bytes = Convert.FromBase64String(code);
                try
                {
                    decode = Encoding.GetEncoding(code_type).GetString(bytes);
                }
                catch
                {
                    decode = code;
                }
                return decode;
            }
        }
    }
    View Code

    关于MNS C# JDK下载,可以去阿里云:https://help.aliyun.com/document_detail/32447.html?spm=a2c4g.11186623.6.633.61395f64IfHTRo

    关于MNS队列,主题,主题订阅相关知识:https://help.aliyun.com/document_detail/34445.html?spm=a2c4g.11186623.6.542.699f38c6RO3nDS

    关于阿里云AMQP队列接入,可以查询:https://help.aliyun.com/document_detail/149716.html?spm=a2c4g.11186623.6.621.2cda31b4kS1zXR

    关于阿里云物联网平台,请查阅:https://help.aliyun.com/document_detail/125800.html?spm=a2c4g.11186623.6.542.7b0241c8o5r6PT

    最后:阿里云物联网平台

    @天才卧龙的博客

  • 相关阅读:
    OSI安全体系结构
    PHP 二维数组根据相同的值进行合并
    Java实现 LeetCode 17 电话号码的字母组合
    Java实现 LeetCode 16 最接近的三数之和
    Java实现 LeetCode 16 最接近的三数之和
    Java实现 LeetCode 16 最接近的三数之和
    Java实现 LeetCode 15 三数之和
    Java实现 LeetCode 15 三数之和
    Java实现 LeetCode 15 三数之和
    Java实现 LeetCode 14 最长公共前缀
  • 原文地址:https://www.cnblogs.com/chenwolong/p/13497615.html
Copyright © 2011-2022 走看看