zoukankan      html  css  js  c++  java
  • RabbitMQ Fanout交换机代码实现

    一般情况下,生产者发送消息,先到先得,一个消费者消费之后,该条消息便消失不会再被消费,抢完即止。

    那能否生产者发送的消息每个消费者都能接收到,都能消费呢?

    Fanout交换机就可以实现。

    代码实现:

    生产者:

    public class FanoutExchange
        {
            public void FanoutPublish()
            {
                MQHelper mh = new MQHelper();
                using (var conn = mh.GetConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        //声明队列
                        channel.QueueDeclare(queue: "FanoutAdu001", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queue: "FanoutAdu002", durable: true, exclusive: false, autoDelete: false, arguments: null);
    
                        //声明交换机
                        channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
    
                        //绑定
                        channel.QueueBind(queue: "FanoutAdu001", exchange: "FanoutExchange", routingKey: string.Empty,arguments:null);
                        channel.QueueBind(queue: "FanoutAdu002", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null);
    
                        //发布
                        int i = 0;
                        while (true)
                        {
                            string message = $"通知{i}";
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(exchange: "FanoutExchange", routingKey: string.Empty, basicProperties: null, body: body);
                            Console.WriteLine($"通知{i}已发送到队列");
                            Thread.Sleep(2000);
                            i++;
                        }
    
                    }
                }
            }
        }

    可以看到,这里和Direct交换机代码相比,类型发生了变化,同时路由键变成了Empty

    消费者:

    public class FanoutExchangeConsumer
        {
            public void FanoutConsume()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";
                factory.UserName = "guest";
                factory.Password = "guest";
    
                using (var conn = factory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        //声明队列
                        channel.QueueDeclare(queue: "FanoutAdu002", durable: true, exclusive: false, autoDelete: false, arguments: null);
    
                        //声明交换机
                        channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
    
                        //绑定
                        channel.QueueBind(queue: "FanoutAdu002", exchange: "FanoutExchange", routingKey: string.Empty);
    
    
                        //消费消息
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"接收成功,[{message}]");
                        };
                        //处理消息
                        channel.BasicConsume(queue: "FanoutAdu002", autoAck: true, consumer: consumer);
                    }
                }
            }
        }

    这里每个消费者一条路由,都能够接收生产者发送的所有消息

    记录编程的点滴,体会学习的乐趣
  • 相关阅读:
    拓端tecdat|R语言平滑算法LOESS局部加权回归、三次样条、变化点检测拟合电视节目《白宫风云》在线收视率
    拓端tecdat|R语言结合新冠疫情COVID-19对股票价格预测:ARIMA,KNN和神经网络时间序列分析
    拓端tecdat|Stata广义矩量法GMM面板向量自回归PVAR模型选择、估计、Granger因果检验分析投资、收入和消费数据
    拓端tecdat|Python用T-SNE非线性降维技术拟合和可视化高维数据iris鸢尾花、MNIST 数据
    confirmit中常用 短代码
    DELL服务器硬件信息采集SHELL脚本
    功能、资源权限管理的设计
    maven--插件篇(assembly插件)
    呀哈哈
    virtualbox扩展硬盘容量
  • 原文地址:https://www.cnblogs.com/AduBlog/p/14901594.html
Copyright © 2011-2022 走看看