zoukankan      html  css  js  c++  java
  • C# .net 环境下使用rabbitmq消息队列

    消息队列的地位越来越重要,几乎是面试的必问问题了,不会使用几种消息队列都显得尴尬,正好本文使用C#来带你认识rabbitmq消息队列

      首先,我们要安装rabbitmq,当然,如果有现成的,也可以使用,不知道曾几何时,我喜欢将数据库等等软件安装在linux虚拟机,如果没现成的rabbitmq,按照下面的来吧,嘿嘿

      rabbitmq安装:https://www.cnblogs.com/shanfeng1000/p/11951703.html

      如果要实现rabbitmq集群,参考:https://www.cnblogs.com/shanfeng1000/p/12097054.html

      我这里使用的是rabbitmq集群,但是没有比较,只是已经安装好了,就直接使用算了

      虚拟机集群地址:192.168.209.133,192.168.209.134,192.168.209.135

      端口使用的默认端口,都是5672,也就是AMQP协议端口

      Rabbitmq的工作模式

      先说说几个概念

      生产者(producer):负责生产消息,可以有多个生产者,可以理解为生成消息的那部分逻辑

      消费者(consumer):从队列中获取消息,对消息处理的那部分逻辑

      队列(queue):用于存放消息,可以理解为先进先出的一个对象

      交换机(exchange):顾名思义,就是个中介的角色,将接收到的消息按不同的规则转发到其他交换机或者队列中

      路由(route):就是交换机分发消息的规则,交换机可以指定路由规则,生产者在发布消息时也可以指定消息路由,比如交换机中设置A路由表示将消息转发到队列1,B路由表示将消息转发到队列2,那么当交换机接收到消息时,如果消息的路由满足A路由,则将消息转发到队列1,如果满足B路由则将消息转发到队列2

      虚拟主机(virtual host):虚拟地址,用于进行逻辑隔离,一个虚拟主机里面可以有若干个 exchange 和 queue,但是里面不能有相同名称的 exchange 或 queue

      再看看rabbitmq的几种工作模式,具体可参考rabbitmq官网给出的Demo:https://www.rabbitmq.com/getstarted.html

        

      其中,第6中类似我们常用的请求-响应模式,但是使用的RPC请求响应,用的比较少,这里就不过多解释,感兴趣的可以参考官网文档:https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

      总的来说,就是生产者将消息发布到rabbitmq上,然后消费者连接rabbitmq,获取到消息就消费,但是有几点说明一下

      1、rabbitmq中的消息是可被多次消费的,因为rabbitmq提供了ack机制,当消费者在消费消息时,如果将自动ack设置成false,那么需要手动提交ack才能告诉rabbitmq消息已被使用,否则当通道关闭时,消息会继续呆在队列中等待消费

      2、当存在多个消费者时,默认情况下,一个消费者获取一个消息,处理完成后再获取下一个,但是rabbitmq消费一次性获取多个,当然后当这些消息消费完成后,再获取下一批,这也就是rabbitmq的Qos机制

      

      C#使用rabbitmq

      如果感兴趣的人多,到时候再单独开一篇博文,现在就介绍其中的1-5种,也可以分类成两种:不使用交换机和使用交换机,所以下面就分这两种来说明

      首先,我们创建了两个Demo项目:RabbitMQ.PublishConsole和RabbitMQ.ConsumeConsole,分别使用使用nuget安装RabbitMQ.Client:

      

      其中RabbitMQ.PublishConsole是用来生产消息,RabbitMQ.ConsumeConsole用来消费消息  

      这里我们安装的是最新版本,旧版本和新版本在使用上可能会有一些区别


      不使用交换机情形

      不使用交换机有两种模式:简单模式和工作模式

      这里先贴上生产者生成消息的代码,简单模式和工作模式这部分测试代码是一样的:  

       RabbitMQ.PublishConsole

      上述代码执行完成后,队列queue1中就有了10条消息,可以在rabbitmq的后台管理中看到:  

      

      代码中提到,通道在申明队列时,如果队列已经存在,则申明的参数一定要对上,否则会抛出异常:The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'queue1' in vhost '/': received none but current is the value 'classic' of type 'longstr'', classId=50, methodId=10

      比如这里,我实现在rabbitmq后台创建了队列,那么他们的对应关系如下图: 

       

      

       简单模式

      这个模式很简单,其实就是只有一个消费者,简单的保证操作的顺序性

      

       接着贴上消费者代码:

       RabbitMQ.ConsumeConsole

      上述代码执行完成后,在后台管理中可以看到消息被消费掉了

      

      工作模式

       工作模式是简单模式的拓展,如果业务简单,对消息的消费是一个耗时的过程,这个模式是一个好的选择。

       

       接着调用生产者代码生产10条消息,下面是消费者的测试代码  

       RabbitMQ.ConsumeConsole

      另外说明一下,代码中提到rabbitmq的QOS机制,这里简单解释一下,当生产者将消息发布到rabbitmq之后,如果在未配置QOS的情况下,rabbitmq尽可能快速地发送队列中的所有消息到消费者端,如果消息比较多,消费者来不及处理,就会缓存这些消息,当消息堆积过多,可能导致服务器内存不足而影响其他进程,rabbitmq的QOS可以很好的解决这类问题,QOS就是限制消费者一次性从rabbitmq中获取消息的个数,而不是获取所有消息。比如设置rabbitmq的QOS为10,也就是prefetch=10,就是说,哪怕rabbitmq中有100条消息,消费者也只是一次性获取10条,然后消费者消费这10条消息,剩下的交给其他消费者,当10条消息中的unacked个数少于prefetch * 消费者数目时,会继续从rabbitmq获取消息,如果在工作模式中,不使用QOS,你会发现,所有的消息都被一个消费者消费了

      


      使用交换机情形

      使用交换机的情形有3种:发布订阅模式,路由模式,主题模式

      上面说了,交换机是一个中介的角色,当一个交换机创建后,可以将其他队列或者交换机与当前交换机绑定,绑定时需要指定绑定路由规则,这个和交换机类型有关。

      当我们不使用交换机时,那么生产者是直接将消息发布到队列中去的,生产者只需要指定消息接收的队列即可,而使用交换机做中转时,生产者只需要将消息发布到交换机,然后交换机根据接收到的消息,按与交换机绑定的路由规则,将消息转发到其他交换机或者队列中,这个处理过程和交换机的类型有关,交换机一般分为4类:

      direct:直连类型,就是将消息的路由和交换机的绑定路由作比较,当两者一致时,则匹配成功,然后消息就会被转发到这个绑定路由后的队列或者交换机

      fanout:这种类型的交换机是不需要指定路由的,当交换机接收到消息时,会将消息广播到所有绑定到它的所有队列或交换机中

      topic:主题类型,类似direct类型,只不过在将消息的路由和绑定路由做比较时,是通过特定表达式去比较的,其中# 匹配一个或多个,* 匹配一个

      headers:头部交换机,允许使用消息头中的信息来做匹配规则,这个用的少,基本上不用,这里也就不过多介绍了

      到这里,你应该发觉,使用交换机的三种情形,无非就是使用交换机的类型不一样,发布订阅模式--fanout,路由模式--direct,主题模式--topic

      现在我们先去rabbitmq的后台中,创建这几种交换机:

      交换机的创建及绑定都可以在代码中实现,如IModel类的QueueBind,ExchangeBind等方法,用多了就自然熟了,这里为了方便截图,就到后台去创建了

      

        然后我们创建两个队列,并按指定类型分别绑定到这3个交换机中:

       队列:

      

        demo.direct绑定队列规则:

       

       demo.fanout绑定队列规则:

      

       demo.topic绑定队列规则:

        

       上面所描述的,无非就是三种模式中发布消息方式的不一样,消费者当然还是从队列获取消息消费的,这里我们就先贴出消费者的代码:

       RabbitMQ.ConsumeConsole

      这里我们使用了两个队列,每个队列我们这里只用了一个消费者,对于下面几种模式,这个消费者代码都能消费到

      发布订阅模式

      发布订阅模式使用的是fanout类型的交换机,这个类型无需指定路由,交换机会将消息广播到每个绑定到交换机的队列或者交换机  

       

       RabbitMQ.PublishConsole

      代码中,我们往交换机发布了10条消息,交换机接收到消息后,会将消息转发到queue1和queue2,因此,queue1和queue2都会收到10条消息:

      

      路由模式

      路由模式使用的是direct类型的交换机,也即在进行路由匹配时,需要匹配的路由一直才算匹配成功,我们把发布订阅模式的代码稍作修改即可,贴出生产者部分代码:  

       

       RabbitMQ.PublishConsole

      代码中,我们往demo.direct交换机发布了10条消息,其中5条消息的路由是apple,另外5条消息的路由是banana,demo.direct交换机绑定的两个队列中,queue1的绑定路由是apple,queue2的绑定路由是banana,那么demo.direct交换机会将路由是apple的消息转发到queue1,将路由是banana的消息转发到queue2,从后台可以看每个队列中已经有5个消息准备好了:

      

        接下来可以使用消费者将它们消费掉

      主题模式

       主题模式使用的topic类型的交换机,在进行匹配时,是根据表达式去匹配,# 匹配一个或多个,* 匹配一个,我们将路由模式的代码稍作修改:    

      

       RabbitMQ.PublishConsole

      代码中,我们往demo.topic交换机中发布了10条消息,其中5条消息的路由是以apple开头的,另外5条消息的路由是以banana开头的,demo.direct交换机绑定的两个队列中,queue1的绑定路由是apple.#,就是匹配以apple开头的路由,queue2的绑定路由是banana.#,就是匹配以banana开头的路由,那么demo.direct交换机会将路由是以apple开头的的消息转发到queue1,将路由是以banana开头的的消息转发到queue2,从后台可以看每个队列中已经有5个消息准备好了:

      

        


      封装

      其实rabbitmq的使用还是比较简单的,只需要多谢谢代码尝试一下就能熟悉

      一般的,像这种第三方插件的调用,我建议自己要做一层封装,最好是根据自己的需求去封装,然后项目中只需要调用自己封装的类就行了,下面贴出我自己封装的类:  

       QueueOptions
       RabbitMQExchangeType
       RabbitBase
       RabbitMQProducer
      
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    
    namespace RabbitMQ.ConsoleApp
    {
        public class RabbitMQConsumer : RabbitBase
        {
            public RabbitMQConsumer(params string[] hosts) : base(hosts)
            {
    
            }
            public RabbitMQConsumer(params (string, int)[] hostAndPorts) : base(hostAndPorts)
            {
    
            }
    
            public event Action<RecieveResult> Received;
    
            /// <summary>
            /// 构造消费者
            /// </summary>
            /// <param name="channel"></param>
            /// <param name="options"></param>
            /// <returns></returns>
            private IBasicConsumer ConsumeInternal(IModel channel, ConsumeQueueOptions options)
            {
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (sender, e) =>
                {
                    try
                    {
                        CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
                        if (!options.AutoAck)
                        {
                            cancellationTokenSource.Token.Register(() =>
                            {
                                channel.BasicAck(e.DeliveryTag, false);
                            }); 
                        }
                        Received?.Invoke(new RecieveResult(e, cancellationTokenSource));
                    }
                    catch { }
                };
                if (options.FetchCount != null)
                {
                    channel.BasicQos(0, options.FetchCount.Value, false);
                }
                return consumer;
            }
    
            #region 普通模式、Work模式
            /// <summary>
            /// 消费消息
            /// </summary>
            /// <param name="queue"></param>
            /// <param name="options"></param>
            public ListenResult Listen(string queue, ConsumeQueueOptions options = null)
            {
                options = options ?? new ConsumeQueueOptions();
                var channel = GetChannel();
                channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
                var consumer = ConsumeInternal(channel, options);
                channel.BasicConsume(queue, options.AutoAck, consumer);
                ListenResult result = new ListenResult();
                result.Token.Register(() =>
                {
                    try
                    {
                        channel.Close();
                        channel.Dispose();
                    }
                    catch { }
                });
                return result;
            }
            /// <summary>
            /// 消费消息
            /// </summary>
            /// <param name="queue"></param>
            /// <param name="configure"></param>
            public ListenResult Listen(string queue, Action<ConsumeQueueOptions> configure)
            {
                ConsumeQueueOptions options = new ConsumeQueueOptions();
                configure?.Invoke(options);
                return Listen(queue, options);
            }
            #endregion
            #region 订阅模式、路由模式、Topic模式
            /// <summary>
            /// 消费消息
            /// </summary>
            /// <param name="exchange"></param>
            /// <param name="queue"></param>
            /// <param name="options"></param>
            public ListenResult Listen(string exchange, string queue, ExchangeConsumeQueueOptions options = null)
            {
                options = options ?? new ExchangeConsumeQueueOptions();
                var channel = GetChannel();
                channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
                if (options.RoutingKeys != null && !string.IsNullOrEmpty(exchange))
                {
                    foreach (var key in options.RoutingKeys)
                    {
                        channel.QueueBind(queue, exchange, key, options.BindArguments);
                    }
                }
                var consumer = ConsumeInternal(channel, options);
                channel.BasicConsume(queue, options.AutoAck, consumer);
                ListenResult result = new ListenResult();
                result.Token.Register(() =>
                {
                    try
                    {
                        channel.Close();
                        channel.Dispose();
                    }
                    catch { }
                });
                return result;
            }
            /// <summary>
            /// 消费消息
            /// </summary>
            /// <param name="exchange"></param>
            /// <param name="queue"></param>
            /// <param name="configure"></param>
            public ListenResult Listen(string exchange, string queue, Action<ExchangeConsumeQueueOptions> configure)
            {
                ExchangeConsumeQueueOptions options = new ExchangeConsumeQueueOptions();
                configure?.Invoke(options);
                return Listen(exchange, queue, options);
            }
            #endregion
        }
        public class RecieveResult
        {
            CancellationTokenSource cancellationTokenSource;
            public RecieveResult(BasicDeliverEventArgs arg, CancellationTokenSource cancellationTokenSource)
            {
                this.Body = Encoding.UTF8.GetString(arg.Body);
                this.ConsumerTag = arg.ConsumerTag;
                this.DeliveryTag = arg.DeliveryTag;
                this.Exchange = arg.Exchange;
                this.Redelivered = arg.Redelivered;
                this.RoutingKey = arg.RoutingKey;
                this.cancellationTokenSource = cancellationTokenSource;
            }
    
            /// <summary>
            /// 消息体
            /// </summary>
            public string Body { get; private set; }
            /// <summary>
            /// 消费者标签
            /// </summary>
            public string ConsumerTag { get; private set; }
            /// <summary>
            /// Ack标签
            /// </summary>
            public ulong DeliveryTag { get; private set; }
            /// <summary>
            /// 交换机
            /// </summary>
            public string Exchange { get; private set; }
            /// <summary>
            /// 是否Ack
            /// </summary>
            public bool Redelivered { get; private set; }
            /// <summary>
            /// 路由
            /// </summary>
            public string RoutingKey { get; private set; }
    
            public void Commit()
            {
                if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) return;
    
                cancellationTokenSource.Cancel();
                cancellationTokenSource.Dispose();
                cancellationTokenSource = null;
            }
        }
        public class ListenResult
        {
            CancellationTokenSource cancellationTokenSource;
    
            /// <summary>
            /// CancellationToken
            /// </summary>
            public CancellationToken Token { get { return cancellationTokenSource.Token; } }
            /// <summary>
            /// 是否已停止
            /// </summary>
            public bool Stoped { get { return cancellationTokenSource.IsCancellationRequested; } }
    
            public ListenResult()
            {
                cancellationTokenSource = new CancellationTokenSource();
            }
    
            /// <summary>
            /// 停止监听
            /// </summary>
            public void Stop()
            {
                cancellationTokenSource.Cancel();
            }
        }
    }

      测试Demo  

      
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    
    namespace RabbitMQ.ConsoleApp
    {
        class Program
        {
            static void Main(string[] args)
            {
                string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
                int port = 5672;
                string userName = "admin";
                string password = "123456";
                string virtualHost = "/";
                string queue = "queue1";
                var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
    
                //消费者
                new Thread(() =>
                {
                    using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                    {
                        consumer.UserName = userName;
                        consumer.Password = password;
                        consumer.Port = port;
                        consumer.VirtualHost = virtualHost;
    
                        consumer.Received += result =>
                        {
                            Console.WriteLine($"接收到数据:{result.Body}");
                            result.Commit();//提交
                        };
                        consumer.Listen(queue, options =>
                        {
                            options.AutoAck = false;
                            options.Arguments = arguments;
                        });
                    }
                }).Start();
    
                //消息生产
                using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
                {
                    producer.UserName = userName;
                    producer.Password = password;
                    producer.Port = port;
                    producer.VirtualHost = virtualHost;
    
                    string message = "";
                    do
                    {
                        message = Console.ReadLine();
                        if (string.IsNullOrEmpty(message))
                        {
                            break;
                        }
                        producer.Publish(queue, message, options => { options.Arguments = arguments; });
    
                    } while (true);
                }
            }
        }
    }
      
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    
    namespace RabbitMQ.ConsoleApp
    {
        class Program
        {
            static void Main(string[] args)
            {
                string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
                int port = 5672;
                string userName = "admin";
                string password = "123456";
                string virtualHost = "/";
                string queue = "queue1";
                var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
    
                //消费者1
                new Thread(() =>
                {
                    using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                    {
                        consumer.UserName = userName;
                        consumer.Password = password;
                        consumer.Port = port;
                        consumer.VirtualHost = virtualHost;
    
                        consumer.Received += result =>
                        {
                            Console.WriteLine($"消费者1接收到数据:{result.Body}");
                            result.Commit();//提交
                        };
                        consumer.Listen(queue, options =>
                        {
                            options.AutoAck = false;
                            options.Arguments = arguments;
                            options.FetchCount = 1;
                        });
                    }
                }).Start();
    
                //消费者2
                new Thread(() =>
                {
                    using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                    {
                        consumer.UserName = userName;
                        consumer.Password = password;
                        consumer.Port = port;
                        consumer.VirtualHost = virtualHost;
    
                        consumer.Received += result =>
                        {
                            Console.WriteLine($"消费者2接收到数据:{result.Body}");
                            result.Commit();//提交
                        };
                        consumer.Listen(queue, options =>
                        {
                            options.AutoAck = false;
                            options.Arguments = arguments;
                            options.FetchCount = 2;
                        });
                    }
                }).Start();
    
                //消息生产
                using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
                {
                    producer.UserName = userName;
                    producer.Password = password;
                    producer.Port = port;
                    producer.VirtualHost = virtualHost;
    
                    string message = "";
                    do
                    {
                        message = Console.ReadLine();
                        if (string.IsNullOrEmpty(message))
                        {
                            break;
                        }
                        producer.Publish(queue, message, options => { options.Arguments = arguments; });
    
                    } while (true);
                }
            }
        }
    }
       发布订阅模式
       路由模式
       主题模式

      上面是我自己做的封装,因为RabbitMQ.Client功能齐全,但是使用比较麻烦,需要编写的代码多一些,推荐一下第三方对rabbitmq的封装插件:EasyNetQ,它是建立在RabbitMQ.Client上的,多数时候可以直接通过EasyNetQ就可以完成消息发布与消费,感兴趣的可以了解一下

  • 相关阅读:
    iOS常用框架总结
    【Java】使用@Value @Reource或@Autowire依赖 (值) 注入时出现NPE的排查方法
    【Java】事件驱动模型和观察者模式
    新人训练营心得 - 道路阻且长
    【Java】Spring Web MVC注意事项
    【Linux】OpenWRT的无线设置注意事项——从2.4G到5G,hwmode不简单
    【Java】 Spring依赖注入小试牛刀:编写第一个Spring ApplicationContext Demo
    【Linux】 awk应用
    【C/C++】高亮C++中函数的重写——函数名相同?参数列表相同?返回值相同?
    【设计模式】C++单例模式的几种写法——Java自动加载内部类对象,C++怎么破?
  • 原文地址:https://www.cnblogs.com/zoujinhua/p/15503485.html
Copyright © 2011-2022 走看看