zoukankan      html  css  js  c++  java
  • C# .net 环境下使用rabbitmq消息队列

    消息队列的地位越来越重要,几乎是面试的必问问题了,不会使用几种消息队列都显得尴尬,正好本文使用C#来带你认识rabbitmq消息队列

      首先,我们要安装rabbitmq,当然,如果有现成的,也可以使用,不知道曾几何时,我喜欢将数据库等等软件安装在linux虚拟机,如果没现成的rabbitmq,按照下面的来吧,嘿嘿

      rabbitmq安装:https://www.cnblogs.com/shanfeng1000/p/11951703.html

      如果要实现rabbitmq集群,参考:https://www.cnblogs.com/shanfeng1000/p/12097054.html

      我这里使用的是rabbitmq集群,但是没有比较,只是已经安装好了,就直接使用算了

      虚拟机集群地址:192.168.209.133,192.168.209.134,192.168.209.135

      端口使用的默认端口,都是5672,也就是AMQP协议端口

      Rabbitmq的工作模式

      先说说几个概念

      生产者(producer):负责生产消息,可以有多个生产者,可以理解为生成消息的那部分逻辑

      消费者(consumer):从队列中获取消息,对消息处理的那部分逻辑

      队列(queue):用于存放消息,可以理解为先进先出的一个对象

      交换机(exchange):顾名思义,就是个中介的角色,将接收到的消息按不同的规则转发到其他交换机或者队列中

      路由(route):就是交换机分发消息的规则,交换机可以指定路由规则,生产者在发布消息时也可以指定消息路由,比如交换机中设置A路由表示将消息转发到队列1,B路由表示将消息转发到队列2,那么当交换机接收到消息时,如果消息的路由满足A路由,则将消息转发到队列1,如果满足B路由则将消息转发到队列2

      虚拟主机(virtual host):虚拟地址,用于进行逻辑隔离,一个虚拟主机里面可以有若干个 exchange 和 queue,但是里面不能有相同名称的 exchange 或 queue

      再看看rabbitmq的几种工作模式,具体可参考rabbitmq官网给出的Demo:https://www.rabbitmq.com/getstarted.html

        

      其中,第6中类似我们常用的请求-响应模式,但是使用的RPC请求响应,用的比较少,这里就不过多解释,感兴趣的可以参考官网文档:https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

      总的来说,就是生产者将消息发布到rabbitmq上,然后消费者连接rabbitmq,获取到消息就消费,但是有几点说明一下

      1、rabbitmq中的消息是可被多次消费的,因为rabbitmq提供了ack机制,当消费者在消费消息时,如果将自动ack设置成false,那么需要手动提交ack才能告诉rabbitmq消息已被使用,否则当通道关闭时,消息会继续呆在队列中等待消费

      2、当存在多个消费者时,默认情况下,一个消费者获取一个消息,处理完成后再获取下一个,但是rabbitmq消费一次性获取多个,当然后当这些消息消费完成后,再获取下一批,这也就是rabbitmq的Qos机制

      

      C#使用rabbitmq

      如果感兴趣的人多,到时候再单独开一篇博文,现在就介绍其中的1-5种,也可以分类成两种:不使用交换机和使用交换机,所以下面就分这两种来说明

      首先,我们创建了两个Demo项目:RabbitMQ.PublishConsole和RabbitMQ.ConsumeConsole,分别使用使用nuget安装RabbitMQ.Client:

      

      其中RabbitMQ.PublishConsole是用来生产消息,RabbitMQ.ConsumeConsole用来消费消息  

      这里我们安装的是最新版本,旧版本和新版本在使用上可能会有一些区别


      不使用交换机情形

      不使用交换机有两种模式:简单模式和工作模式

      这里先贴上生产者生成消息的代码,简单模式和工作模式这部分测试代码是一样的:  

       RabbitMQ.PublishConsole

      上述代码执行完成后,队列queue1中就有了10条消息,可以在rabbitmq的后台管理中看到:  

      

      代码中提到,通道在申明队列时,如果队列已经存在,则申明的参数一定要对上,否则会抛出异常:The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'queue1' in vhost '/': received none but current is the value 'classic' of type 'longstr'', classId=50, methodId=10

      比如这里,我实现在rabbitmq后台创建了队列,那么他们的对应关系如下图: 

       

      

       简单模式

      这个模式很简单,其实就是只有一个消费者,简单的保证操作的顺序性

      

       接着贴上消费者代码:

       RabbitMQ.ConsumeConsole

      上述代码执行完成后,在后台管理中可以看到消息被消费掉了

      

      工作模式

       工作模式是简单模式的拓展,如果业务简单,对消息的消费是一个耗时的过程,这个模式是一个好的选择。

       

       接着调用生产者代码生产10条消息,下面是消费者的测试代码  

       RabbitMQ.ConsumeConsole

      另外说明一下,代码中提到rabbitmq的QOS机制,这里简单解释一下,当生产者将消息发布到rabbitmq之后,如果在未配置QOS的情况下,rabbitmq尽可能快速地发送队列中的所有消息到消费者端,如果消息比较多,消费者来不及处理,就会缓存这些消息,当消息堆积过多,可能导致服务器内存不足而影响其他进程,rabbitmq的QOS可以很好的解决这类问题,QOS就是限制消费者一次性从rabbitmq中获取消息的个数,而不是获取所有消息。比如设置rabbitmq的QOS为10,也就是prefetch=10,就是说,哪怕rabbitmq中有100条消息,消费者也只是一次性获取10条,然后消费者消费这10条消息,剩下的交给其他消费者,当10条消息中的unacked个数少于prefetch * 消费者数目时,会继续从rabbitmq获取消息,如果在工作模式中,不使用QOS,你会发现,所有的消息都被一个消费者消费了

      


      使用交换机情形

      使用交换机的情形有3种:发布订阅模式,路由模式,主题模式

      上面说了,交换机是一个中介的角色,当一个交换机创建后,可以将其他队列或者交换机与当前交换机绑定,绑定时需要指定绑定路由规则,这个和交换机类型有关。

      当我们不使用交换机时,那么生产者是直接将消息发布到队列中去的,生产者只需要指定消息接收的队列即可,而使用交换机做中转时,生产者只需要将消息发布到交换机,然后交换机根据接收到的消息,按与交换机绑定的路由规则,将消息转发到其他交换机或者队列中,这个处理过程和交换机的类型有关,交换机一般分为4类:

      direct:直连类型,就是将消息的路由和交换机的绑定路由作比较,当两者一致时,则匹配成功,然后消息就会被转发到这个绑定路由后的队列或者交换机

      fanout:这种类型的交换机是不需要指定路由的,当交换机接收到消息时,会将消息广播到所有绑定到它的所有队列或交换机中

      topic:主题类型,类似direct类型,只不过在将消息的路由和绑定路由做比较时,是通过特定表达式去比较的,其中# 匹配一个或多个,* 匹配一个

      headers:头部交换机,允许使用消息头中的信息来做匹配规则,这个用的少,基本上不用,这里也就不过多介绍了

      到这里,你应该发觉,使用交换机的三种情形,无非就是使用交换机的类型不一样,发布订阅模式--fanout,路由模式--direct,主题模式--topic

      现在我们先去rabbitmq的后台中,创建这几种交换机:

      交换机的创建及绑定都可以在代码中实现,如IModel类的QueueBind,ExchangeBind等方法,用多了就自然熟了,这里为了方便截图,就到后台去创建了

      

        然后我们创建两个队列,并按指定类型分别绑定到这3个交换机中:

       队列:

      

        demo.direct绑定队列规则:

       

       demo.fanout绑定队列规则:

      

       demo.topic绑定队列规则:

        

       上面所描述的,无非就是三种模式中发布消息方式的不一样,消费者当然还是从队列获取消息消费的,这里我们就先贴出消费者的代码:

       RabbitMQ.ConsumeConsole

      这里我们使用了两个队列,每个队列我们这里只用了一个消费者,对于下面几种模式,这个消费者代码都能消费到

      发布订阅模式

      发布订阅模式使用的是fanout类型的交换机,这个类型无需指定路由,交换机会将消息广播到每个绑定到交换机的队列或者交换机  

       

       RabbitMQ.PublishConsole

      代码中,我们往交换机发布了10条消息,交换机接收到消息后,会将消息转发到queue1和queue2,因此,queue1和queue2都会收到10条消息:

      

      路由模式

      路由模式使用的是direct类型的交换机,也即在进行路由匹配时,需要匹配的路由一直才算匹配成功,我们把发布订阅模式的代码稍作修改即可,贴出生产者部分代码:  

       

       RabbitMQ.PublishConsole

      代码中,我们往demo.direct交换机发布了10条消息,其中5条消息的路由是apple,另外5条消息的路由是banana,demo.direct交换机绑定的两个队列中,queue1的绑定路由是apple,queue2的绑定路由是banana,那么demo.direct交换机会将路由是apple的消息转发到queue1,将路由是banana的消息转发到queue2,从后台可以看每个队列中已经有5个消息准备好了:

      

        接下来可以使用消费者将它们消费掉

      主题模式

       主题模式使用的topic类型的交换机,在进行匹配时,是根据表达式去匹配,# 匹配一个或多个,* 匹配一个,我们将路由模式的代码稍作修改:    

      

       RabbitMQ.PublishConsole

      代码中,我们往demo.topic交换机中发布了10条消息,其中5条消息的路由是以apple开头的,另外5条消息的路由是以banana开头的,demo.direct交换机绑定的两个队列中,queue1的绑定路由是apple.#,就是匹配以apple开头的路由,queue2的绑定路由是banana.#,就是匹配以banana开头的路由,那么demo.direct交换机会将路由是以apple开头的的消息转发到queue1,将路由是以banana开头的的消息转发到queue2,从后台可以看每个队列中已经有5个消息准备好了:

      

        


      封装

      其实rabbitmq的使用还是比较简单的,只需要多谢谢代码尝试一下就能熟悉

      一般的,像这种第三方插件的调用,我建议自己要做一层封装,最好是根据自己的需求去封装,然后项目中只需要调用自己封装的类就行了,下面贴出我自己封装的类:  

       QueueOptions
       RabbitMQExchangeType
       RabbitBase
       RabbitMQProducer
      
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    
    namespace RabbitMQ.ConsoleApp
    {
        public class RabbitMQConsumer : RabbitBase
        {
            public RabbitMQConsumer(params string[] hosts) : base(hosts)
            {
    
            }
            public RabbitMQConsumer(params (string, int)[] hostAndPorts) : base(hostAndPorts)
            {
    
            }
    
            public event Action<RecieveResult> Received;
    
            /// <summary>
            /// 构造消费者
            /// </summary>
            /// <param name="channel"></param>
            /// <param name="options"></param>
            /// <returns></returns>
            private IBasicConsumer ConsumeInternal(IModel channel, ConsumeQueueOptions options)
            {
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (sender, e) =>
                {
                    try
                    {
                        CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
                        if (!options.AutoAck)
                        {
                            cancellationTokenSource.Token.Register(() =>
                            {
                                channel.BasicAck(e.DeliveryTag, false);
                            }); 
                        }
                        Received?.Invoke(new RecieveResult(e, cancellationTokenSource));
                    }
                    catch { }
                };
                if (options.FetchCount != null)
                {
                    channel.BasicQos(0, options.FetchCount.Value, false);
                }
                return consumer;
            }
    
            #region 普通模式、Work模式
            /// <summary>
            /// 消费消息
            /// </summary>
            /// <param name="queue"></param>
            /// <param name="options"></param>
            public ListenResult Listen(string queue, ConsumeQueueOptions options = null)
            {
                options = options ?? new ConsumeQueueOptions();
                var channel = GetChannel();
                channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
                var consumer = ConsumeInternal(channel, options);
                channel.BasicConsume(queue, options.AutoAck, consumer);
                ListenResult result = new ListenResult();
                result.Token.Register(() =>
                {
                    try
                    {
                        channel.Close();
                        channel.Dispose();
                    }
                    catch { }
                });
                return result;
            }
            /// <summary>
            /// 消费消息
            /// </summary>
            /// <param name="queue"></param>
            /// <param name="configure"></param>
            public ListenResult Listen(string queue, Action<ConsumeQueueOptions> configure)
            {
                ConsumeQueueOptions options = new ConsumeQueueOptions();
                configure?.Invoke(options);
                return Listen(queue, options);
            }
            #endregion
            #region 订阅模式、路由模式、Topic模式
            /// <summary>
            /// 消费消息
            /// </summary>
            /// <param name="exchange"></param>
            /// <param name="queue"></param>
            /// <param name="options"></param>
            public ListenResult Listen(string exchange, string queue, ExchangeConsumeQueueOptions options = null)
            {
                options = options ?? new ExchangeConsumeQueueOptions();
                var channel = GetChannel();
                channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
                if (options.RoutingKeys != null && !string.IsNullOrEmpty(exchange))
                {
                    foreach (var key in options.RoutingKeys)
                    {
                        channel.QueueBind(queue, exchange, key, options.BindArguments);
                    }
                }
                var consumer = ConsumeInternal(channel, options);
                channel.BasicConsume(queue, options.AutoAck, consumer);
                ListenResult result = new ListenResult();
                result.Token.Register(() =>
                {
                    try
                    {
                        channel.Close();
                        channel.Dispose();
                    }
                    catch { }
                });
                return result;
            }
            /// <summary>
            /// 消费消息
            /// </summary>
            /// <param name="exchange"></param>
            /// <param name="queue"></param>
            /// <param name="configure"></param>
            public ListenResult Listen(string exchange, string queue, Action<ExchangeConsumeQueueOptions> configure)
            {
                ExchangeConsumeQueueOptions options = new ExchangeConsumeQueueOptions();
                configure?.Invoke(options);
                return Listen(exchange, queue, options);
            }
            #endregion
        }
        public class RecieveResult
        {
            CancellationTokenSource cancellationTokenSource;
            public RecieveResult(BasicDeliverEventArgs arg, CancellationTokenSource cancellationTokenSource)
            {
                this.Body = Encoding.UTF8.GetString(arg.Body);
                this.ConsumerTag = arg.ConsumerTag;
                this.DeliveryTag = arg.DeliveryTag;
                this.Exchange = arg.Exchange;
                this.Redelivered = arg.Redelivered;
                this.RoutingKey = arg.RoutingKey;
                this.cancellationTokenSource = cancellationTokenSource;
            }
    
            /// <summary>
            /// 消息体
            /// </summary>
            public string Body { get; private set; }
            /// <summary>
            /// 消费者标签
            /// </summary>
            public string ConsumerTag { get; private set; }
            /// <summary>
            /// Ack标签
            /// </summary>
            public ulong DeliveryTag { get; private set; }
            /// <summary>
            /// 交换机
            /// </summary>
            public string Exchange { get; private set; }
            /// <summary>
            /// 是否Ack
            /// </summary>
            public bool Redelivered { get; private set; }
            /// <summary>
            /// 路由
            /// </summary>
            public string RoutingKey { get; private set; }
    
            public void Commit()
            {
                if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) return;
    
                cancellationTokenSource.Cancel();
                cancellationTokenSource.Dispose();
                cancellationTokenSource = null;
            }
        }
        public class ListenResult
        {
            CancellationTokenSource cancellationTokenSource;
    
            /// <summary>
            /// CancellationToken
            /// </summary>
            public CancellationToken Token { get { return cancellationTokenSource.Token; } }
            /// <summary>
            /// 是否已停止
            /// </summary>
            public bool Stoped { get { return cancellationTokenSource.IsCancellationRequested; } }
    
            public ListenResult()
            {
                cancellationTokenSource = new CancellationTokenSource();
            }
    
            /// <summary>
            /// 停止监听
            /// </summary>
            public void Stop()
            {
                cancellationTokenSource.Cancel();
            }
        }
    }

      测试Demo  

      
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    
    namespace RabbitMQ.ConsoleApp
    {
        class Program
        {
            static void Main(string[] args)
            {
                string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
                int port = 5672;
                string userName = "admin";
                string password = "123456";
                string virtualHost = "/";
                string queue = "queue1";
                var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
    
                //消费者
                new Thread(() =>
                {
                    using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                    {
                        consumer.UserName = userName;
                        consumer.Password = password;
                        consumer.Port = port;
                        consumer.VirtualHost = virtualHost;
    
                        consumer.Received += result =>
                        {
                            Console.WriteLine($"接收到数据:{result.Body}");
                            result.Commit();//提交
                        };
                        consumer.Listen(queue, options =>
                        {
                            options.AutoAck = false;
                            options.Arguments = arguments;
                        });
                    }
                }).Start();
    
                //消息生产
                using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
                {
                    producer.UserName = userName;
                    producer.Password = password;
                    producer.Port = port;
                    producer.VirtualHost = virtualHost;
    
                    string message = "";
                    do
                    {
                        message = Console.ReadLine();
                        if (string.IsNullOrEmpty(message))
                        {
                            break;
                        }
                        producer.Publish(queue, message, options => { options.Arguments = arguments; });
    
                    } while (true);
                }
            }
        }
    }
      
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    
    namespace RabbitMQ.ConsoleApp
    {
        class Program
        {
            static void Main(string[] args)
            {
                string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
                int port = 5672;
                string userName = "admin";
                string password = "123456";
                string virtualHost = "/";
                string queue = "queue1";
                var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
    
                //消费者1
                new Thread(() =>
                {
                    using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                    {
                        consumer.UserName = userName;
                        consumer.Password = password;
                        consumer.Port = port;
                        consumer.VirtualHost = virtualHost;
    
                        consumer.Received += result =>
                        {
                            Console.WriteLine($"消费者1接收到数据:{result.Body}");
                            result.Commit();//提交
                        };
                        consumer.Listen(queue, options =>
                        {
                            options.AutoAck = false;
                            options.Arguments = arguments;
                            options.FetchCount = 1;
                        });
                    }
                }).Start();
    
                //消费者2
                new Thread(() =>
                {
                    using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                    {
                        consumer.UserName = userName;
                        consumer.Password = password;
                        consumer.Port = port;
                        consumer.VirtualHost = virtualHost;
    
                        consumer.Received += result =>
                        {
                            Console.WriteLine($"消费者2接收到数据:{result.Body}");
                            result.Commit();//提交
                        };
                        consumer.Listen(queue, options =>
                        {
                            options.AutoAck = false;
                            options.Arguments = arguments;
                            options.FetchCount = 2;
                        });
                    }
                }).Start();
    
                //消息生产
                using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
                {
                    producer.UserName = userName;
                    producer.Password = password;
                    producer.Port = port;
                    producer.VirtualHost = virtualHost;
    
                    string message = "";
                    do
                    {
                        message = Console.ReadLine();
                        if (string.IsNullOrEmpty(message))
                        {
                            break;
                        }
                        producer.Publish(queue, message, options => { options.Arguments = arguments; });
    
                    } while (true);
                }
            }
        }
    }
       发布订阅模式
       路由模式
       主题模式

      上面是我自己做的封装,因为RabbitMQ.Client功能齐全,但是使用比较麻烦,需要编写的代码多一些,推荐一下第三方对rabbitmq的封装插件:EasyNetQ,它是建立在RabbitMQ.Client上的,多数时候可以直接通过EasyNetQ就可以完成消息发布与消费,感兴趣的可以了解一下

  • 相关阅读:
    Java对象的生命周期与作用域的讨论(转)
    [置顶] Oracle学习路线与方法
    Java实现 蓝桥杯 算法训练 未名湖边的烦恼
    Java实现 蓝桥杯 算法训练 未名湖边的烦恼
    Java实现 蓝桥杯 算法训练 未名湖边的烦恼
    Java实现 蓝桥杯 算法训练 最大的算式
    Java实现 蓝桥杯 算法训练 最大的算式
    Java实现 蓝桥杯 算法训练 最大的算式
    Java实现 蓝桥杯 算法训练 最大的算式
    Java实现 蓝桥杯 算法训练 最大的算式
  • 原文地址:https://www.cnblogs.com/zoujinhua/p/15503485.html
Copyright © 2011-2022 走看看