zoukankan      html  css  js  c++  java
  • C#代码实现阿里云消息服务MNS消息监听

    十年河东,十年河西,莫欺少年穷

    学无止境,精益求精

    近几天一直都在看阿里云的IOT云服务及消息队列MNS,一头雾水好几天了,直到今天,总算有点收获了,记录下来,方便以后查阅。

    首先借用阿里云的一张图来说明:设备是如何通过云服务平台和企业服务器‘通话的’

    针对此图,作如下说明:

    1、物联网平台作为中间组件,主要是通过消息队MNS列来实现设备和企业服务器对话的,具体可描述为:

    1.1、设备发送指令至物联网平台的MNS队列,MNS队列将设备指令收录,需要说明的是:设备发送指令是通过嵌入式开发人员开发的,例如C语言

    1.2、企业通过C#、JAVA、PHP等高级语言开发人员开发监听程序,当监听到MNS队列中的设备指令时,获取指令,做相关业务处理,并发送新的设备指令至MNS队列。【例如发送快递柜关门的指令】

    1.3、企业发送的指令被MNS收录,设备同样通过监听程序获取企业服务器发送的关门指令,收到关门指令的设备执行相关指令,完成自动关门操作。

    以上便是设备与企业服务器之间的对话过程

    下面列出C#的监听MNS代码【需要MNS C# JDK 的支持】注意:消息是经过EncodeBase64编码,接受消息要解码,发送消息要编码

    异步监听:

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using Aliyun.MNS;
    using Aliyun.MNS.Model;
    using IotCommon;
    using IotDtos.MongodbDtos;
    using IotService.Device;
    using IotService.MongoDb;
    
    namespace IotListener
    {
        class Program
        {
            private static MongoLogService _logService;
            public static string _receiptHandle;
            public static DeviceResponseService service = new DeviceResponseService();
            public static Queue nativeQueue;
            static void Main(string[] args)
            {
                LogstoreDatabaseSettings st = new LogstoreDatabaseSettings() { LogsCollectionName = "LogsForDg_" + DateTime.Now.ToString("yyyyMMdd") };
                _logService = new MongoLogService(st);
                while (true)
                {
                    try
                    {
                        IMNS client = new Aliyun.MNS.MNSClient(IotParm._accessKeyId, IotParm._secretAccessKey, IotParm._endpoint, IotParm._stsToken);
                        nativeQueue = client.GetNativeQueue(IotParm._queueName);
                        for (int i = 0; i < IotParm._receiveTimes; i++)
                        {
                            ReceiveMessageRequest request = new ReceiveMessageRequest(1);
                            nativeQueue.BeginReceiveMessage(request, ListenerCallback, null);
                            Thread.Sleep(1);
                        }
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("Receive message failed, exception info: " + ex.Message);
                    }
                }
            }
    
            /// <summary>
            /// 回调函数
            /// </summary>
            /// <param name="ar"></param>
            public static void ListenerCallback(IAsyncResult ar)
            {
                try
                {
                    Message message = nativeQueue.EndReceiveMessage(ar).Message;
                    string Json = Base64Helper.DecodeBase64(message.Body);
                    Console.WriteLine("Message: {0}", Json);
                    Console.WriteLine("----------------------------------------------------
    ");
                    var methodValue = JsonKeyHelper.GetJsonValue(Json, "method");
                    DeviceResponse(methodValue, Json);
                    if (!string.IsNullOrEmpty(methodValue))
                    {
                        _logService.Create(new LogsForDgModel { CreateTime = DateTime.Now, data = Json, methodNo = methodValue });
                    }
                    _receiptHandle = message.ReceiptHandle;
                    nativeQueue.DeleteMessage(_receiptHandle);
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Receive message failed, exception info: " + ex.Message);
                }
    
            }
    
            /// <summary>
            /// 响应设备上传接口
            /// </summary>
            /// <param name="method"></param>
            /// <param name="message"></param>
            public static void DeviceResponse(string method, string message)
            {
                switch (method)
                {
                    case "doorClosedReport": service.doorClosedReportResponse(message); break;
                    case "doorOpenReport": service.doorOpenReportResponse(message); break;
                    case "deviceStartReportToCloud": service.deviceStartReportToCloudResponse(message); break;
                    case "qryDeviceConfig": service.qryDeviceConfigResponse(message); break;
                    case "devicePingToCloud": service.devicePingToCloudResponse(message); break;
                    case "deviceFatalReport": service.deviceFatalReportResponse(message); break;
                    case "deviceVersionReport": service.deviceVersionReportResponse(message); break;
                    case "deviceFirmwareData": service.deviceFirmwareDataResponse(message); break;
                    case "deviceLocationReport": service.deviceLocationReportResponse(message); break;
                }
            }
        }
    }
    View Code

    同步监听:

    using Aliyun.MNS;
    using Aliyun.MNS.Model;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace MnsListener
    {
        class Program
        {
            #region Private Properties
            private const string _accessKeyId = "";
            private const string _secretAccessKey = "";
            private const string _endpoint = "http://.mns.cn-shanghai.aliyuncs.com/";
            private const string _stsToken = null;
    
            private const string _queueName = "Sub";
            private const string _queueNamePrefix = "my";
            private const int _receiveTimes = 1;
            private const int _receiveInterval = 2;
            private const int batchSize = 6;
            private static string _receiptHandle;
    
            #endregion
    
    
            static void Main(string[] args)
            {
                while (true)
                {
                    try
                    {
                        IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint, _stsToken);
                        var nativeQueue = client.GetNativeQueue(_queueName);
                        for (int i = 0; i < _receiveTimes; i++)
                        {
                            var receiveMessageResponse = nativeQueue.ReceiveMessage(3);
                            Console.WriteLine("Receive message successfully, status code: {0}", receiveMessageResponse.HttpStatusCode);
                            Console.WriteLine("----------------------------------------------------");
                            Message message = receiveMessageResponse.Message;
                            string s = DecodeBase64(message.Body);
                            Console.WriteLine("MessageId: {0}", message.Id);
                            Console.WriteLine("ReceiptHandle: {0}", message.ReceiptHandle);
                            Console.WriteLine("MessageBody: {0}", message.Body);
                            Console.WriteLine("MessageBodyMD5: {0}", message.BodyMD5);
                            Console.WriteLine("EnqueueTime: {0}", message.EnqueueTime);
                            Console.WriteLine("NextVisibleTime: {0}", message.NextVisibleTime);
                            Console.WriteLine("FirstDequeueTime: {0}", message.FirstDequeueTime);
                            Console.WriteLine("DequeueCount: {0}", message.DequeueCount);
                            Console.WriteLine("Priority: {0}", message.Priority);
                            Console.WriteLine("----------------------------------------------------
    ");
    
                            _receiptHandle = message.ReceiptHandle;
                            nativeQueue.DeleteMessage(_receiptHandle);
    
                            Thread.Sleep(_receiveInterval);
                        }
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("Receive message failed, exception info: " + ex.Message);
                    }
                }
    
            }
    
            ///编码
            public static string EncodeBase64(string code, string code_type= "utf-8")
            {
                string encode = "";
                byte[] bytes = Encoding.GetEncoding(code_type).GetBytes(code);
                try
                {
                    encode = Convert.ToBase64String(bytes);
                }
                catch
                {
                    encode = code;
                }
                return encode;
            }
            ///解码
            public static string DecodeBase64(string code, string code_type = "utf-8")
            {
                string decode = "";
                byte[] bytes = Convert.FromBase64String(code);
                try
                {
                    decode = Encoding.GetEncoding(code_type).GetString(bytes);
                }
                catch
                {
                    decode = code;
                }
                return decode;
            }
        }
    }
    View Code

    发送消息:

    using Aliyun.MNS;
    using Aliyun.MNS.Model;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace MnsSendMsg
    {
        class Program
        {
            #region Private Properties
            private const string _accessKeyId = "";
            private const string _secretAccessKey = "";
            private const string _endpoint = "http://.mns.cn-shanghai.aliyuncs.com/";
            private const string _stsToken = null;
    
            private const string _queueName = "Sub";
            private const string _queueNamePrefix = "my";
            private const int _receiveTimes = 1;
            private const int _receiveInterval = 2;
            private const int batchSize = 6;
            private static string _receiptHandle;
            #endregion
            static void Main(string[] args)
            {
                try
                {
                    IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint, _stsToken);
                    // 1. 获取Queue的实例
                    var nativeQueue = client.GetNativeQueue(_queueName);
                    var sendMessageRequest = new SendMessageRequest(EncodeBase64("阿里云<MessageBody>计算"));
                    sendMessageRequest.DelaySeconds = 2;
                    var sendMessageResponse = nativeQueue.SendMessage(sendMessageRequest);
                    Console.WriteLine("Send message successfully,{0}",
                        sendMessageResponse.ToString());
                    Thread.Sleep(2000);
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Send message failed, exception info: " + ex.Message);
                }
            }
    
            ///编码
            public static string EncodeBase64(string code, string code_type = "utf-8")
            {
                string encode = "";
                byte[] bytes = Encoding.GetEncoding(code_type).GetBytes(code);
                try
                {
                    encode = Convert.ToBase64String(bytes);
                }
                catch
                {
                    encode = code;
                }
                return encode;
            }
            ///解码
            public static string DecodeBase64(string code, string code_type = "utf-8")
            {
                string decode = "";
                byte[] bytes = Convert.FromBase64String(code);
                try
                {
                    decode = Encoding.GetEncoding(code_type).GetString(bytes);
                }
                catch
                {
                    decode = code;
                }
                return decode;
            }
        }
    }
    View Code

    关于MNS C# JDK下载,可以去阿里云:https://help.aliyun.com/document_detail/32447.html?spm=a2c4g.11186623.6.633.61395f64IfHTRo

    关于MNS队列,主题,主题订阅相关知识:https://help.aliyun.com/document_detail/34445.html?spm=a2c4g.11186623.6.542.699f38c6RO3nDS

    关于阿里云AMQP队列接入,可以查询:https://help.aliyun.com/document_detail/149716.html?spm=a2c4g.11186623.6.621.2cda31b4kS1zXR

    关于阿里云物联网平台,请查阅:https://help.aliyun.com/document_detail/125800.html?spm=a2c4g.11186623.6.542.7b0241c8o5r6PT

    最后:阿里云物联网平台

    @天才卧龙的博客

  • 相关阅读:
    安装与配置 Elasticsearch
    推荐几个 WebSocket 服务端实现
    GitLab 修改主机名,更换 IP 配置,配置 SMTP
    Choose GitLab for your next open source project
    大数据全栈式开发语言 – Python
    IPC's epoch 6 is less than the last promised epoch 7
    将/home空间从新挂载到/var/lib/docker
    Initialization failed for Block pool <registering> (Datanode Uuid unassigned) service to IP1:8020 Invalid volume failure config value: 1
    查看端口被那个进程占用
    查看java进程启动的详细参数和过程
  • 原文地址:https://www.cnblogs.com/chenwolong/p/13497615.html
Copyright © 2011-2022 走看看