zoukankan      html  css  js  c++  java
  • C#使用RabbitMq队列 sample

    出自:https://www.jb51.net/article/197580.htm

    1:RabbitMQ是个啥?(专业术语参考自网络)

     RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。

      RabbitMQ服务器是用Erlang语言编写的,Erlang是专门为高并发而生的语言,而集群和故障转移是构建在开发电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库

    2:使用RabbitMQ有啥好处?

    RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。
    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
    AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
    RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。

    对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。

    RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式,

    网络上有很多关于怎么搭建配置RabbitMq服务环境的详细文章,也比较简单,这里不再说明,本人是Docker上面的pull RabbitMq 镜像来安装的!

    3.1:运行容器的命令如下:

    docker run -d --hostname Log --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=log_user -e RABBITMQ_DEFAULT_PASS=331QQFEG123 rabbitmq:3-management

    4:RabbitMq的使用场景主要有哪些,啥时候用或者不用?

    4.1什么时候使用MQ?

    1)数据驱动的任务依赖

    2)上游不关心多下游执行结果

    3)异步返回执行时间长

    4.2什么时候不使用MQ?

    需要实时关注执行结果 (eg:同步调用)

    5:具体C#怎么使用RabbitMq?下面直接上code和测试截图了(Demo环境是.NetCore3.1控制台+Docker上的RabbitMQ容器来进行的)

    6:sample模式,就是简单地队列模式,一进一出的效果差不多,测试截图:

    //简单生产端 ui调用者
     
    using System;
    namespace RabbitMqPublishDemo
    {
      using MyRabbitMqService;
      using System.Runtime.CompilerServices;
     
      class Program
      {
        static void Main(string[] args)
        {
            //就是简单的队列,生产者
            Console.WriteLine("====RabbitMqPublishDemo====");
            for (int i = 0; i < 500; i++)
            {
              ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
            }
            Console.WriteLine("生成完毕!");
            Console.ReadLine();
        }
      }
    }
     
    /// <summary>
    /// 简单生产者 逻辑
    /// </summary>
    /// <param name="queueName"></param>
    /// <param name="msg"></param>
    public static void PublishSampleMsg(string queueName, string msg)
    {
     
      using (IConnection conn = connectionFactory.CreateConnection())
      {
        using (IModel channel = conn.CreateModel())
        {
          channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
          var msgBody = Encoding.UTF8.GetBytes(msg);
          channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
        }
      }
    }
     
     
    //简单消费端
    using System;
     
    namespace RabbitMqConsumerDemo
    {
      using MyRabbitMqService;
      using System.Runtime.InteropServices;
     
      class Program
      {
        static void Main(string[] args)
        {
          Console.WriteLine("====RabbitMqConsumerDemo====");
          ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
          {
            Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");
          });
          Console.ReadLine();
        }
      }
    }
     
       #region 简单生产者后端逻辑
        /// <summary>
        /// 简单消费者
        /// </summary>
        /// <param name="queueName">队列名称</param>
        /// <param name="isBasicNack">失败后是否自动放到队列</param>
        /// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
        public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
        {
          Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
          IConnection conn = connectionFactory.CreateConnection();
          IModel channel = conn.CreateModel();
          channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
          var consumer = new EventingBasicConsumer(channel);
          consumer.Received += (sender, ea) =>
          {
            byte[] bymsg = ea.Body.ToArray();
            string msg = Encoding.UTF8.GetString(bymsg);
            if (handleMsgStr != null)
            {
              handleMsgStr.Invoke(msg);
            }
            else
            {
              Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
            }
          };
          channel.BasicConsume(queueName, autoAck: true, consumer);
        }
        #endregion
     
     

    本文来自博客园,作者:.net&new,转载请注明原文链接:https://www.cnblogs.com/wugh8726254/p/15091704.html

  • 相关阅读:
    spring源码怎么解决循环依赖?
    观察者模式
    单例模式
    Python 列表(List)
    python字符串(str)
    内置函数
    python运算符
    函数名的应用 闭包 迭代器
    生成器,推导式
    python的起源
  • 原文地址:https://www.cnblogs.com/wugh8726254/p/15091704.html
Copyright © 2011-2022 走看看