zoukankan      html  css  js  c++  java
  • RabbitMQ学习系列(四): 几种Exchange 模式

      上一篇,讲了RabbitMQ的具体用法,可以看看这篇文章:RabbitMQ学习系列(三): C# 如何使用 RabbitMQ。今天说些理论的东西,Exchange 的几种模式。

      AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

      RabbitMQ提供了四种Exchange模式:fanout,direct,topic,header 。 header模式在实际使用中较少,本文只对前三种模式进行比较。

     

      一. Fanout Exchange

       

      所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

      Fanout Exchange  不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。

      所以,Fanout Exchange 转发消息是最快的。

         /// <summary>
            /// 生产者
            /// </summary>
            /// <param name="change"></param>
            private static void ProducerMessage(MyMessage msg)
            {
                var advancedBus = CreateAdvancedBus();
    
                if (advancedBus.IsConnected)
                {
                    var exchange = advancedBus.ExchangeDeclare("user", ExchangeType.Fanout);
    
                    advancedBus.Publish(exchange, "", false, new Message<MyMessage>(msg));
                }
                else
                {
                    Console.WriteLine("Can't connect");
                }
    
            }
    
            /// <summary>
            /// 消费者
            /// </summary>
            private static void ConsumeMessage()
            {
                var advancedBus = CreateAdvancedBus();
                var exchange = advancedBus.ExchangeDeclare("user", ExchangeType.Fanout);
    
                var queue = advancedBus.QueueDeclare("user.notice.wangwu");
                advancedBus.Bind(exchange, queue, "user.notice.wangwu");
                advancedBus.Consume(queue, registration =>
                {
                    registration.Add<MyMessage>((message, info) => { Console.WriteLine("Body: {0}", message.Body); });
                });
            }

      

    public static IAdvancedBus CreateAdvancedBus()
    {
        // 消息服务器连接字符串
        string connString = "host=dev.corp.wingoht.com:5672;virtualHost=cd;username=ishowfun;password=123456";
        if (connString == null || connString == string.Empty)
        {
            throw new Exception("messageserver connection string is missing or empty");
        }
    
        return RabbitHutch.CreateBus(connString).Advanced;
    }    

      二. Direct Exchange

       

      所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。

      Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

         /// <summary>
            /// 生产者
            /// </summary>
            /// <param name="change"></param>
            private static void ProducerMessage(MyMessage msg)
            {
                var advancedBus = CreateAdvancedBus();
    
                if (advancedBus.IsConnected)
                {
                    var queue = advancedBus.QueueDeclare("user.notice.zhangsan");
    
                    advancedBus.Publish(Exchange.GetDefault(), queue.Name, false, new Message<MyMessage>(msg));
                }
                else
                {
                    Console.WriteLine("Can't connect");
                }
    
            }
    
            /// <summary>
            /// 消费者
            /// </summary>
            private static void ConsumeMessage()
            {
                var advancedBus = CreateAdvancedBus();
    
                var exchange = advancedBus.ExchangeDeclare("user", ExchangeType.Direct);
    
                var queue = advancedBus.QueueDeclare("user.notice.lisi");
    
                advancedBus.Bind(exchange, queue, "user.notice.lisi");
    
                advancedBus.Consume(queue, registration =>
                {
                    registration.Add<MyMessage>((message, info) =>
                    {
                        Console.WriteLine("Body: {0}", message.Body);
                    });
                });
            }

      三. Topic Exchange

       

      所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,

      Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.*” 只会匹配到“log.error”。

      所以,Topic Exchange 使用非常灵活。

         /// <summary>
            /// 生产者
            /// </summary>
            /// <param name="change"></param>
            private static void ProducerMessage(MyMessage msg)
            {
                //// 创建消息bus
                IBus bus = CreateBus();
    
                try
                {
                    bus.Publish(msg, x => x.WithTopic(msg.MessageRouter));
                }
                catch (EasyNetQException ex)
                {
                    //处理连接消息服务器异常 
                }
    
                bus.Dispose();//与数据库connection类似,使用后记得销毁bus对象
            }
    
            /// <summary>
            /// 消费者
            /// </summary>
            private static void ConsumeMessage(MyMessage msg)
            {
                //// 创建消息bus
                IBus bus = CreateBus();
    
                try
                {
                    bus.Subscribe<MyMessage>(msg.MessageRouter, message => Console.WriteLine(msg.MessageBody), x => x.WithTopic("user.notice.#"));
                }
                catch (EasyNetQException ex)
                {
                    //处理连接消息服务器异常 
                }
            }

      这个是RabbitMQ 的实际使用的几个场景,熟悉了这个,基本上rabbitmq 也就了解了。http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

     

      至此,Rabbitmq 几种Exchange 模式已经介绍完了,实际使用过程中,我们会更具不同的场景,来使用不同的exchange 模式。

      查看RabbitMQ 系列其他文章,http://www.cnblogs.com/zhangweizhong/category/855479.html

  • 相关阅读:
    【React Native】某个页面禁用物理返回键
    【React Native】DeviceEventEmitter监听通知及带参数传值
    转载【React Native代码】手写验证码倒计时组件
    【React Native】 中设置 APP 名称、应用图标、为安卓添加启动图
    【React Native错误集】* What went wrong: Execution failed for task ':app:installDebug'.
    【React Native错误集】Import fails with "Failed to execute 'ImportScripts' on 'WorkerGlobalScope'"
    【React Native错误集】Android error “Could not get BatchedBridge, make sure your bundle is packaged properly” on start of app
    「React Native笔记」在React的 setState 中操作数组和对象的多种方法(合集)
    【React Native】Error: Attribute application@allowBackup value=(false) from AndroidManifest.xml
    坚果云如何使用二次验证码/谷歌身份验证器/两步验证/虚拟MFA?
  • 原文地址:https://www.cnblogs.com/zhangweizhong/p/5713874.html
Copyright © 2011-2022 走看看