zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列(七)-通过fanout模式将消息推送到多个Queue中(.Net Core版)

    前面第六章我们使用的是direct直连模式来进行消息投递和分发。本章将介绍如何使用fanout模式将消息推送到多个队列。 
    有时我们会遇到这样的情况,多个功能模块都希望得到完整的消息数据。例如一个log的消息,一个我们希望输出在屏幕上实时监控,另外一个用户持久化日志。这时就可以使用fanout模式。fanout模式模式不像direct模式通过routingkey来进行匹配,而是会把消息发送到所以的已经绑定的队列中。

    新建FanoutProduct用来发布消息。FanoutCustomerA和FanoutCustomerB用来订阅不同队列消费消息。

    FanoutProduct代码:

    using System;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace FanoutProduct
    {
        class Program
        {
            static void Main(string[] args)
            {
                String exchangeName = "wytExchange";
                String message = "Hello World!";
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "192.168.63.129";
                factory.Port = 5672;
                factory.VirtualHost = "/wyt";
                factory.UserName = "wyt";
                factory.Password = "wyt";
    
                using (IConnection connection=factory.CreateConnection())
                {
                    using (IModel channel=connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: exchangeName, type: "fanout", durable: true, autoDelete: false, arguments: null);
    
    
                        IBasicProperties properties = channel.CreateBasicProperties();
                        properties.Persistent = true;
    
                        Task.Run(() =>
                        {
                            while (true)
                            {
                                for (int i = 0; i < 10000; i++)
                                {
                                    Byte[] body = Encoding.UTF8.GetBytes(message + i);
                                    channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: properties, body: body);
                                }
                                Thread.Sleep(100);
                            }
                        }).Wait();
    
                        Console.WriteLine(" [x] Sent {0}", message);
                    }
                }
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
    
    
            }
        }
    }
    View Code

    FanoutCustomerA与FanoutCustomerB(代码相同):

    using System;
    using System.Text;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace FanoutCustomerA
    {
        class Program
        {
            static void Main(string[] args)
            {
                String exchangeName = "wytExchange";
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "192.168.63.129";
                factory.Port = 5672;
                factory.VirtualHost = "/wyt";
                factory.UserName = "wyt";
                factory.Password = "wyt";
    
                using (IConnection connection=factory.CreateConnection())
                {
                    using (IModel channel=connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: exchangeName, type: "fanout", durable: true, autoDelete: false, arguments: null);
    
                        String queueName = channel.QueueDeclare().QueueName;
    
                        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: "", arguments: null);
    
                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            Byte[] body = ea.Body;
                            String message = Encoding.UTF8.GetString(body);
                            Console.WriteLine(" [x] {0}", message);
                        };
    
                        channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
    
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
    
                    }
                }
            }
        }
    }
    View Code

     

    可以看到FanoutCustomerA和FanoutCustomerB收到的消息完全一致。注意以上代码FanoutProduct中并没有新建队列,所以先运行FanoutCustomerA和FanoutCustomerB,如果先运行FanoutProduct因为找不到绑定的队列数据就会丢失。 
    还有一种情况我们有可能随时增加一项处理机制,如果在声明queue时不指定名字,那么RabbitMQ会随机为我们生成一个名字,如果不指定queue为持久化队列那在消息为空并且订阅者为0时自动删除该队列。这样Queue挥之即来呼之即去。

    String queueName = channel.QueueDeclare().QueueName;
  • 相关阅读:
    JavaScript链式调用
    Javascript设计模式(2)-单体模式
    Javascript设计模式(1)
    stm32结合产品学习01—产品的框架
    【目标检测-模型对比1】R-CNN、SPPnet、Fast R-CNN、Faster R-CNN的对比
    【目标检测-框架测试】mmdetection的安装与使用
    【机器学习-笔记1】吴恩达网课笔记1——机器学习策略
    【算法】P1004 方格取数
    【算法】UVa 11624, Fire! 解题心得
    vector
  • 原文地址:https://www.cnblogs.com/wyt007/p/9077783.html
Copyright © 2011-2022 走看看