zoukankan      html  css  js  c++  java
  • 阿里云消息队列MQ_HTTP接入 for .NetCore 简单例子

    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.IO;
    using System.Linq;
    using System.Net.Http;
    using System.Net.Http.Headers;
    using System.Security.Cryptography;
    using System.Text;
    using System.Threading.Tasks;
    using Newtonsoft.Json;

    namespace MQWebCore
    {
        public class MQHelper
        {
            string URL = "http://publictest-rest.ons.aliyun.com";

            string topic, secretKey, accessKey;
            public MQHelper(string topic,string secretKey,string accessKey)
            {
                this.topic = topic;
                this.secretKey = secretKey;
                this.accessKey = accessKey;

            }
            /// <summary>
            
    /// URL 中的 Key,Tag以及 POST Content-Type 没有任何的限制,只要确保Key 和 Tag 相同唯一即可
            
    /// </summary>
            
    /// <param name="tag"></param>
            
    /// <param name="key"></param>
            
    /// <param name="body"></param>
            
    /// <returns></returns>
            public async Task<bool> Pub(string tag, string key, string body)
            {
                using (HttpClient httpClient = new HttpClient())
                {
                    httpClient.DefaultRequestHeaders.Connection.Add("keep-alive");
                    HttpContent content = new StringContent(body, Encoding.UTF8);
                    httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/html"));
                    
                    var time = (long)(DateTime.Now.ToUniversalTime() - new DateTime(197011)).TotalMilliseconds;
                    var signString = Sign(string.Format("{0} PID_{0} {1} {2}", topic, MD5Encrypt(body), time), secretKey);

                    httpClient.DefaultRequestHeaders.Add("AccessKey", accessKey);
                    httpClient.DefaultRequestHeaders.Add("Signature", signString);
                    httpClient.DefaultRequestHeaders.Add("ProducerID"string.Format("PID_{0}", topic));

                    var url = URL + "/message/?topic=" + topic + "&time=" + time + "&tag=" + tag + "&key=" + key;
                    var res = await httpClient.PostAsync(url, content);
                    if (res.StatusCode == System.Net.HttpStatusCode.Created)
                    {
                        return true;
                    }
                    return false;
                }
            }

            public async void Subscribe(string tag = "*")
            {
                using (HttpClient httpClient = new HttpClient())
                {
                    httpClient.DefaultRequestHeaders.Connection.Add("keep-alive");
                    httpClient.DefaultRequestHeaders.Add("Accept-Charset""utf-8"); 
                    
                    var time = (long)(DateTime.Now.ToUniversalTime() - new DateTime(197011)).TotalMilliseconds;
                    var signString = Sign(string.Format("{0} CID_{0} {1}", topic, time), secretKey);

                    httpClient.DefaultRequestHeaders.Add("AccessKey", accessKey);
                    httpClient.DefaultRequestHeaders.Add("Signature", signString);
                    httpClient.DefaultRequestHeaders.Add("ConsumerID"string.Format("CID_{0}", topic));

                    var url = URL + "/message/?topic=" + topic + "&time=" + time + "&num=32";
                    var res = httpClient.GetAsync(url).GetAwaiter().GetResult();
                    Console.WriteLine(res.StatusCode);
                    if (res.StatusCode == System.Net.HttpStatusCode.OK)
                    {
                        var msg = await res.Content.ReadAsStringAsync();
                        Console.WriteLine(msg);
                        if (msg != null && msg.Length > 10)
                        {
                            MQMessage[] mqMsgs = JsonConvert.DeserializeObject<MQMessage[]>(msg);
                            foreach (var mqMsg in mqMsgs)
                            {
                                Delete(mqMsg.msgHandle);
                            }
                        }
                    }
                }
            }

            async void Delete(string msgHandle)
            {
                using (HttpClient httpClient = new HttpClient())
                {
                    httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/html"));
                    
                    var time = (long)(DateTime.Now.ToUniversalTime() - new DateTime(197011)).TotalMilliseconds;
                    var signString = Sign(string.Format("{0} CID_{0} {1} {2}", topic, msgHandle, time), secretKey);

                    httpClient.DefaultRequestHeaders.Add("AccessKey", accessKey);
                    httpClient.DefaultRequestHeaders.Add("Signature", signString);
                    httpClient.DefaultRequestHeaders.Add("ConsumerID"string.Format("CID_{0}", topic));

                    var url = URL + "/message/?topic=" + topic + "&time=" + time + "&msgHandle=" + msgHandle;
                    var res = await httpClient.DeleteAsync(url);
                    if (res.StatusCode == System.Net.HttpStatusCode.NoContent)
                    {
                        Console.WriteLine("消息删除成功,无需返回内容");
                    }
                    else
                    {
                        Console.WriteLine(res.StatusCode);
                    }
                }
            }

            string MD5Encrypt(string strText)
            {
                using (var md5 = MD5.Create())
                {
                    var result = md5.ComputeHash(Encoding.UTF8.GetBytes(strText));
                    return BitConverter.ToString(result).Replace("-""").ToLower();
                }
            }

            string Sign(string signatureString, string secretKey, bool isRaw = true)
            {
                var enc = Encoding.UTF8;
                HMACSHA1 hmac = new HMACSHA1(enc.GetBytes(secretKey));
                hmac.Initialize();

                byte[] buffer = enc.GetBytes(signatureString);
                if (isRaw)
                {
                    byte[] ret = hmac.ComputeHash(buffer);
                    return Convert.ToBase64String(ret);
                }
                else
                {
                    string res = BitConverter.ToString(hmac.ComputeHash(buffer)).Replace("-""").ToLower();
                    return Convert.ToBase64String(Encoding.UTF8.GetBytes(res));
                }
            }
        }

        public class MQMessage
        {
            public string body;
            public string bornTime;
            public string msgHandle;
            public string msgId;
            public long reconsumeTimes;
            public string tag;
        }
    使用:
    using MQWebCore;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;

    namespace ConsoleApp1
    {
        public class Program
        {
            public static void Main(string[] args)
            {
                Console.OutputEncoding = System.Text.Encoding.UTF8;
                //Encoding.RegisterProvider(CodePagesEncodingProvider.Instance);
                MQHelper mqHelper = new MQHelper("Test""3412qsd's12""3412341212");
                var res = mqHelper.Pub("testTag", "testKey", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "阿特斯地方").GetAwaiter().GetResult();
                
    Debug.WriteLine(res);
                while (true)
                {
                    mqHelper.Subscribe();
                    Thread.Sleep(1000);
                }
                Console.Read();

            }
        }
    }
  • 相关阅读:
    Jmeter安装与环境部署
    BVT & BAT (版本验证测试和版本验收测试)
    测试计划(Test Plan)
    小米8se从miui12降级刷机手记
    网络传输---HttpURLConnection
    mybatis 中 foreach collection的三种用法
    XStream--java对象与xml形式文件相互转换
    idea问题总结记录
    ssh框架配置过程
    ssm框架配置过程
  • 原文地址:https://www.cnblogs.com/94cool/p/5747971.html
Copyright © 2011-2022 走看看