1.AMQP Messaging中的基本概念
Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker。
Virtual Host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的Namespace概念。当多个不同的用户使用同一个RabbitMQ Server提供的服务时,可以划分出多个Vhost,每个用户在自己的Vhost创建Exchange/Queue等。
Connection:Producer、Consumer和Broker之间的TCP连接。断开连接的操作只会在Client端进行,Broker不会断开连接,除非出现网络故障或Broker服务出现问题。
Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在Connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个Thread创建单独的channel进行通讯,AMQP Method包含了Channel Id帮助客户端和Message Broker识别Channel,所以Channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP Connection的开销。
Exchange:Message到达Broker的第一站,根据分发规则,匹配查询表中的Routing Key,分发消息到Queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
Queue:消息最终被送到这里等待Consumer取走。一个Message可以被同时拷贝到多个Queue中。
Binding:Exchange和Queue之间的虚拟连接,Binding中可以包含Routing Key。Binding信息被保存到Exchange中的查询表中,用于Message的分发依据。
2.Exchange交换机
很多时候我们都以为生产者是将消息都投递到Queue当中,实际上这在RabbitMQ中永远不会发生。实际的情况是,生产者将消息发送到Exchange(交换机),由Exchange将消息路由到一个或多个Queue中或者丢弃。使用的路由算法取决于交换机类型和其绑定的规则。AMQP 0-9-1 Broker提供四种交换机类型:Direct exchange(直接交换机)、Fanout exchange(扇出交换机)、Topic exchange(话题交换机)、Headers exchange(标头交换机)。由于Headers exchange在实际使用中比较少,所以这里只重点介绍前三种模式。
2.1Direct exchange
由上述两个流程图可见,生产者所有发送到Direct exchange的消息会通过指定Routing Key传递到Queue当中。
2.1 .1Default Exchange(默认交换机)
如果没有没有定义任何Exchange,默认就是Default Exchange,每个创建的Queue都使用与其名称相同的Routing Key自动绑定。具体示例,大家可以看看上一章节5.1,5.2小节,你会发现定义生产者代码中QueueName与Routing Key都是同一个名称,没有定义任何Exchange绑定(Binding)操作,只是在推送时候把Queue、Routing Key相同消息推送出去。消息传递时,Routing Key必须完全匹配,才会被Queue接收,否则该消息会被抛弃。而Default Exchange也属于Default Exchange模式。
下面来看看定义Direct exchange生产者的代码:
class Program { static void Main(string[] args) { string exchangeName = "DirectExchange"; string queueName = "DirectExchangeQueueName"; string routeKey = "DirectExchangeKey"; //创建连接工厂 var factory = new ConnectionFactory { UserName = "dengwu",//用户名 Password = "123456",//密码 HostName = "192.168.112.133",//rabbitmq ip }; //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //定义一个Direct类型交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null); //定义一个队列 channel.QueueDeclare(queueName, false, false, false, null); //将队列绑定到交换机 channel.QueueBind(queueName, exchangeName, routeKey, null); Console.WriteLine($" RabbitMQ连接成功,Exchange:{exchangeName},Queue:{queueName},Route:{routeKey}, 请输入消息,输入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //发布消息 channel.BasicPublish(exchangeName, routeKey, null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close(); } }
运行:
2.2Fanout Exchange
由上述两个流程图可见,生产者所有发送到Fanout Exchange的消息会转发到与其绑定(Binding)的所有Queue上。
Fanout Exchange模式无需通过指定Routing Key把消息绑定到Queue中,只需要把Exchange与Queue绑定,这样子通过Exchange消息将会转发到与其绑定的所有Queue上。类似子网广播,每台子网内的主机都获得了一份复制的消息。
下面定义两个队列与其消费者FanoutExchangeQueueName1、FanoutExchangeQueueName2的Fanout Exchange生产者代码:
class Program { static void Main(string[] args) { string exchangeName = "FanoutExchange"; string queueName1 = "FanoutExchangeQueueName1"; string queueName2 = "FanoutExchangeQueueName2"; string routeKey = ""; //创建连接工厂 var factory = new ConnectionFactory { UserName = "dengwu",//用户名 Password = "123456",//密码 HostName = "192.168.112.133",//rabbitmq ip }; //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //定义一个Fanout类型交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null); //定义队列1 channel.QueueDeclare(queueName1, false, false, false, null); //定义队列2 channel.QueueDeclare(queueName2, false, false, false, null); //将队列绑定到交换机 channel.QueueBind(queueName1, exchangeName, routeKey, null); channel.QueueBind(queueName2, exchangeName, routeKey, null); //生成两个队列的消费者 ConsumerGenerator(queueName1); ConsumerGenerator(queueName2); Console.WriteLine($" RabbitMQ连接成功, 请输入消息,输入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //发布消息 channel.BasicPublish(exchangeName, routeKey, null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close(); } /// <summary> /// 根据队列名称生成消费者 /// </summary> /// <param name="queueName"></param> static void ConsumerGenerator(string queueName) { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory { UserName = "dengwu",//用户名 Password = "123456",//密码 HostName = "192.168.112.133",//rabbitmq ip }; //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //事件基本消费者 var consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (ch, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($"Queue:{queueName}收到消息: {message}"); //确认该消息已被消费 channel.BasicAck(ea.DeliveryTag, false); }; //启动消费者 设置为手动应答消息 channel.BasicConsume(queueName, false, consumer); Console.WriteLine($"Queue:{queueName},消费者已启动"); } }
运行:
2.3Topic Exchange
由上述流程图可见,生产者所有发送到Topic Exchange的消息会通过指定Routing Key被转发到能和其匹配的Queue上。
Exchange指定Routing Key将路由进行模糊匹配。可以使用通配符进行模糊匹配,符号“#”匹配多个单词(可以是零个),符号“*”匹配一个单词。因此“usa.#”能够匹配到“usa.news.xxx、usa.weather.xxx”,但是“usa.*” 只会匹配到“usa.news、usa.weather”。
所以Topic Exchange 使用是非常灵活的,Topic Exchange生产者代码如下:
class Program { static void Main(string[] args) { string exchangeName = "TopicExchange"; string queueName = "DirectExchangeQueueName"; string routeKey = "TopicExchangeKey.*"; //创建连接工厂 var factory = new ConnectionFactory { UserName = "dengwu",//用户名 Password = "123456",//密码 HostName = "192.168.112.133",//rabbitmq ip }; //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //定义一个Topic类型交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null); //定义队列1 channel.QueueDeclare(queueName, false, false, false, null); //将队列绑定到交换机 channel.QueueBind(queueName, exchangeName, routeKey, null); Console.WriteLine($" RabbitMQ连接成功, 请输入消息,输入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //发布消息 channel.BasicPublish(exchangeName, "TopicExchangeKey.one", null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close(); } }
运行:
2.4Headers Exchange
Headers Exchange不依赖于Routing key 与 Binding key的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。这种模式基本没怎么用过,所以就跳过了。
参考文献:
RabbitMQ官网
.NET Core 使用RabbitMQ