RabbitMQ系列
RabbitMQ(六)——路由模式
前言
本章讲解路由模式,路由模式跟发布订阅模式类似,然后在发布订阅模式的基础上改变了类型(fanout => direct),订阅模式是分发到所有绑定到交换机的队列,路由模式只分发到绑定在交换机上面指定路由的队列,我们可以看一下下面这张图:
以上的图是info,error,warning为路由,表示日志的等级通过不同的路由发送到不同的队列中,下面开始路由模式start~~~~
发布订阅模式 VS 路由模式
发布:
发布订阅模式的类型为fanout,而路由模式类型为direct;
//发布订阅模式 channel.ExchangeDeclare(ExchangeName, "fanout"); //路由模式 channel.ExchangeDeclare(ExchangeName, "direct");
订阅发布模式发布时roukey参数为空字符串,路由模式指定了路由;
//发布订阅模式 channel.BasicPublish(ExchangeName, "", null, body); //路由模式 channel.BasicPublish(ExchangeName, RouteName, null, body);
接收:
定义交换器时,与发布时一致,发布订阅模式为fanout,路由模式为direct;
//发布订阅模式 channel.ExchangeDeclare(ExchangeName, "fanout"); //路由模式 channel.ExchangeDeclare(ExchangeName, "direct");
绑定路由时,发布订阅模式的routekey参数为空字符串,表示接受所有消息,而路由模式的routekey参数必须指定某一路由。
//发布订阅模式 channel.QueueBind(queueName, ExchangeName, ""); //路由模式 channel.QueueBind(queueName, ExchangeName, routkey);
通过以上对比可以知道,路由模式与发布订阅模式基本一致,唯一差距就是两个参数,exchange类型和 routingKey
代码
生产者:
static void Main(string[] args) { Console.WriteLine("DirectServer发布服务器启动..."); //1.创建连接工厂 var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "guest", Password = "guest" }; //2.创建连接 using (var conn = factory.CreateConnection()) { //3.创建通道 using (var channel = conn.CreateModel()) { //4.声明交换器 channel.ExchangeDeclare("directExchange", "direct"); string msg = ""; for (int i = 0; i < 20; i++) { msg = $"发布消息{i}"; string ROUTE_KEY = ""; var body = Encoding.UTF8.GetBytes(msg); //模拟向不同路由发送消息 if (i % 2 == 0) { ROUTE_KEY = "route1"; } else { ROUTE_KEY = "route2"; } //5.发布消息 channel.BasicPublish("directExchange", ROUTE_KEY, null, body); Console.WriteLine($"向{ROUTE_KEY}发布消息成功:{msg}"); Thread.Sleep(1000); } Console.ReadKey(); } } }
消费者1:
static void Main(string[] args) { Console.WriteLine("DirectClient接收客户端启动..."); //1.创建连接工厂 var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "guest", Password = "guest" }; //2.创建连接 using (var conn = factory.CreateConnection()) { //3.创建通道 using (var channel = conn.CreateModel()) { //3.声明队列 var queue = channel.QueueDeclare().QueueName; //4.绑定交换器 channel.QueueBind(queue, "directExchange", "route1"); //5.声明消费者 消费消息 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { //接收消息 var body = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"接收route1消息:{body.ToString()}"); }; channel.BasicConsume(queue, true, consumer); Console.ReadKey(); } } }
消费者2:
static void Main(string[] args) { Console.WriteLine("DirectClient接收客户端启动..."); //1.创建连接工厂 var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "guest", Password = "guest" }; //2.创建连接 using (var conn = factory.CreateConnection()) { //3.创建通道 using (var channel = conn.CreateModel()) { //3.声明队列 var queue = channel.QueueDeclare().QueueName; //4.绑定交换器 channel.QueueBind(queue, "directExchange", "route2"); //5.声明消费者 消费消息 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, e) => { byte[] message = e.Body.ToArray(); Console.WriteLine("接收消息:" + Encoding.UTF8.GetString(message)); //返回消息确认 channel.BasicAck(e.DeliveryTag, false); }; //消费者开始监听 channel.BasicConsume(queue, true, consumer); Console.ReadKey(); } } }
可以看到生产者发布消息时指定了路由rout1/rout2,随后消息会转发到route1/route2路由上。接收时通过将一个队列与交换器绑定,指定路由route1/route2,这样就能接收到route1/route2路由上的消息。
效果
消费者定义的随机队列
向route1、route2发布消息,两个消费者分别接收route1和route2的消息
效果
路由模式中生产者发布消息时指定路由,向指定路由发送,消费者绑定交换器与路由即可接收到生产者向此路由发布的消息;
只有将消费者发送消息的交换器、路由 与生产者指定的交换器、路由一致,消费者才能接收到生产者向指定路由的消费者发送的消息。
注意:声明路由时类型必须为direct