RabbitMQ系列
RabbitMQ(五)——发布订阅模式
前言
上一章的工作队列模式中,生产者发布的一堆消息进入队列,消费者接收队列中的消息,每条消息只能发给一个消费者。
本章要做的是吧一条消息发送给多个消费者,这种模式就是Fanout Exchange(扇形交换机)“发布/订阅模式”,它会将消息路由给绑定到它身上的所有队列。
注意:该模式定义队列时durable:false没有存储消息功能,如果消息发送到没有绑定消费队列的交换器,消息丢失,也就是说,当消息生产者发送消息时,消费者还没有绑定此交换器,则没有接收到消息,且接收不到了;durable:true只要没被消费就一直存在队列中。
交换器:
在RabbitMQ中完整的消息传递并不是生产者直接将消息发送到队列,生产者甚至不不知道消息是否会进入队列。正确的模型是生产者把消息发送给交换器(Exchange),交换器会将接受到的消息转发到队列中,交换器必须确定将消息发送到哪些队列。
交换器类型(详情):
-
-
- Fanout
- Direct
- Topic
- Header
-
fanout类型即发布订阅模式,它会把收到的消息广播到它绑定的队列中。
channel.ExchangeDeclare("exchange", "fanout");
之前我们发布的做法是这样的:
channel.BasicPublish("", "simple", null, Encoding.UTF8.GetBytes(msg));
前面我们并没有声明交换器,之所以还能把消息发送到队列是因为用了“”空的字符串标识了默认或匿名交换器。BasicPublish方法第一个参数就是交换器,第二个参数是routekey,消息通过本交换器进入到routekey队列。
发送消息到指定交换器:
channel.BasicPublish("exchange", "simple", null, Encoding.UTF8.GetBytes(msg));
创建好fanout交换器后,定义多个队列,然后将交换器和队列绑定:
channel.QueueBind("simple", "exchange", "");
代码
生产者:
static void Main(string[] args) { Console.WriteLine("FanoutServer发布服务器启动..."); //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "guest", Password = "guest" }; //2.创建连接 using (var connection = factory.CreateConnection()) //3.创建通道 using (var channel = connection.CreateModel()) { //4.创建交换器 channel.ExchangeDeclare("exchange", "fanout"); string msg = ""; for (int i = 0; i < 100; i++) { msg = $"发布消息{i}"; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish("exchange", "", null, body); Console.WriteLine($"发布成功:{msg}"); Thread.Sleep(600); } Console.ReadKey(); } }
消费者:
static void Main(string[] args) { Console.WriteLine("FanoutClient接收客户端启动..."); //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "guest", Password = "guest" }; //2.创建连接 using (var connection = factory.CreateConnection()) { //3.创建通道 using (var channel = connection.CreateModel()) { //4.定义交换器 channel.ExchangeDeclare("exchange", "fanout"); //5.创建匿名队列,绑定交换器 //var queueName = channel.QueueDeclare("simple"); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queueName, "exchange", ""); //6.创建消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { //接收消息 var body = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"接收消息:{body.ToString()}"); }; //7.消费消息 channel.BasicConsume(queueName, true, consumer); Console.ReadKey(); } } }
生产者创建完通道后,创建一个交换器,给它命名“exchange”以及设定“fanout”类型,最后发布消息到此交换器,与前面的模式不同的是不用设定队列参数。
消费者创建完通道后,创建一个交换器,与生产者的交换器一致,定义一个队列(可以指定,或者使用匿名),然后将此队列与交换器绑定,消息会在此队列中消费,最后创建消费者进行消费。
效果
先启动2个消费者,再启动生产者,消息完全接受到。
先启动生产者,再分别启动消费者,只能接受到消费者启动之后生产发送的消息
发布订阅模式就先到这,有什么不对的地方望斧正~