zoukankan      html  css  js  c++  java
  • (2)RabbitMQ交换器Exchange介绍

    1.AMQP Messaging中的基本概念


    Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker。
    Virtual Host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的Namespace概念。当多个不同的用户使用同一个RabbitMQ Server提供的服务时,可以划分出多个Vhost,每个用户在自己的Vhost创建Exchange/Queue等。
    Connection:Producer、Consumer和Broker之间的TCP连接。断开连接的操作只会在Client端进行,Broker不会断开连接,除非出现网络故障或Broker服务出现问题。
    Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在Connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个Thread创建单独的channel进行通讯,AMQP Method包含了Channel Id帮助客户端和Message Broker识别Channel,所以Channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP Connection的开销。
    Exchange:Message到达Broker的第一站,根据分发规则,匹配查询表中的Routing Key,分发消息到Queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
    Queue:消息最终被送到这里等待Consumer取走。一个Message可以被同时拷贝到多个Queue中。
    Binding:Exchange和Queue之间的虚拟连接,Binding中可以包含Routing Key。Binding信息被保存到Exchange中的查询表中,用于Message的分发依据。

    2.Exchange交换机

    很多时候我们都以为生产者是将消息都投递到Queue当中,实际上这在RabbitMQ中永远不会发生。实际的情况是,生产者将消息发送到Exchange(交换机),由Exchange将消息路由到一个或多个Queue中或者丢弃。使用的路由算法取决于交换机类型和其绑定的规则。AMQP 0-9-1 Broker提供四种交换机类型:Direct exchange(直接交换机)、Fanout exchange(扇出交换机)、Topic exchange(话题交换机)、Headers exchange(标头交换机)。由于Headers exchange在实际使用中比较少,所以这里只重点介绍前三种模式。

    2.1Direct exchange




    由上述两个流程图可见,生产者所有发送到Direct exchange的消息会通过指定Routing Key传递到Queue当中。

    2.1 .1Default Exchange(默认交换机)

    如果没有没有定义任何Exchange,默认就是Default Exchange,每个创建的Queue都使用与其名称相同的Routing Key自动绑定。具体示例,大家可以看看上一章节5.1,5.2小节,你会发现定义生产者代码中QueueName与Routing Key都是同一个名称,没有定义任何Exchange绑定(Binding)操作,只是在推送时候把Queue、Routing Key相同消息推送出去。消息传递时,Routing Key必须完全匹配,才会被Queue接收,否则该消息会被抛弃。而Default Exchange也属于Default Exchange模式。
    下面来看看定义Direct exchange生产者的代码:

    class Program
    {
        static void Main(string[] args)
        {
            string exchangeName = "DirectExchange";
            string queueName = "DirectExchangeQueueName";
            string routeKey = "DirectExchangeKey";
    
            //创建连接工厂
            var factory = new ConnectionFactory
            {
                UserName = "dengwu",//用户名
                Password = "123456",//密码
                HostName = "192.168.112.133",//rabbitmq ip
            };
    
            //创建连接
            var connection = factory.CreateConnection();
            //创建通道
            var channel = connection.CreateModel();
    
            //定义一个Direct类型交换机
            channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
    
            //定义一个队列
            channel.QueueDeclare(queueName, false, false, false, null);
    
            //将队列绑定到交换机
            channel.QueueBind(queueName, exchangeName, routeKey, null);
    
            Console.WriteLine($"
    RabbitMQ连接成功,Exchange:{exchangeName},Queue:{queueName},Route:{routeKey},
    
    请输入消息,输入exit退出!");
    
            string input;
            do
            {
                input = Console.ReadLine();
    
                var sendBytes = Encoding.UTF8.GetBytes(input);
                //发布消息
                channel.BasicPublish(exchangeName, routeKey, null, sendBytes);
    
            } while (input.Trim().ToLower() != "exit");
            channel.Close();
            connection.Close();
        }
    }

    运行:

    2.2Fanout Exchange


    由上述两个流程图可见,生产者所有发送到Fanout Exchange的消息会转发到与其绑定(Binding)的所有Queue上。
    Fanout Exchange模式无需通过指定Routing Key把消息绑定到Queue中,只需要把Exchange与Queue绑定,这样子通过Exchange消息将会转发到与其绑定的所有Queue上。类似子网广播,每台子网内的主机都获得了一份复制的消息。
    下面定义两个队列与其消费者FanoutExchangeQueueName1、FanoutExchangeQueueName2的Fanout Exchange生产者代码:

    class Program
    {
        static void Main(string[] args)
        {
            string exchangeName = "FanoutExchange";
            string queueName1 = "FanoutExchangeQueueName1";
            string queueName2 = "FanoutExchangeQueueName2";
            string routeKey = "";
    
            //创建连接工厂
            var factory = new ConnectionFactory
            {
                UserName = "dengwu",//用户名
                Password = "123456",//密码
                HostName = "192.168.112.133",//rabbitmq ip
            };
    
            //创建连接
            var connection = factory.CreateConnection();
            //创建通道
            var channel = connection.CreateModel();
    
            //定义一个Fanout类型交换机
            channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);
    
            //定义队列1
            channel.QueueDeclare(queueName1, false, false, false, null);
            //定义队列2
            channel.QueueDeclare(queueName2, false, false, false, null);
    
            //将队列绑定到交换机
            channel.QueueBind(queueName1, exchangeName, routeKey, null);
            channel.QueueBind(queueName2, exchangeName, routeKey, null);
    
            //生成两个队列的消费者
            ConsumerGenerator(queueName1);
            ConsumerGenerator(queueName2);
    
    
            Console.WriteLine($"
    RabbitMQ连接成功,
    
    请输入消息,输入exit退出!");
    
            string input;
            do
            {
                input = Console.ReadLine();
    
                var sendBytes = Encoding.UTF8.GetBytes(input);
                //发布消息
                channel.BasicPublish(exchangeName, routeKey, null, sendBytes);
    
            } while (input.Trim().ToLower() != "exit");
            channel.Close();
            connection.Close();
        }
    
        /// <summary>
        /// 根据队列名称生成消费者
        /// </summary>
        /// <param name="queueName"></param>
        static void ConsumerGenerator(string queueName)
        {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory
            {
                UserName = "dengwu",//用户名
                Password = "123456",//密码
                HostName = "192.168.112.133",//rabbitmq ip
            };
    
            //创建连接
            var connection = factory.CreateConnection();
            //创建通道
            var channel = connection.CreateModel();
    
            //事件基本消费者
            var consumer = new EventingBasicConsumer(channel);
    
            //接收到消息事件
            consumer.Received += (ch, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
    
                Console.WriteLine($"Queue:{queueName}收到消息: {message}");
                //确认该消息已被消费
                channel.BasicAck(ea.DeliveryTag, false);
            };
            //启动消费者 设置为手动应答消息
            channel.BasicConsume(queueName, false, consumer);
            Console.WriteLine($"Queue:{queueName},消费者已启动");
        }
    }

    运行:

    2.3Topic Exchange


    由上述流程图可见,生产者所有发送到Topic Exchange的消息会通过指定Routing Key被转发到能和其匹配的Queue上。
    Exchange指定Routing Key将路由进行模糊匹配。可以使用通配符进行模糊匹配,符号“#”匹配多个单词(可以是零个),符号“*”匹配一个单词。因此“usa.#”能够匹配到“usa.news.xxx、usa.weather.xxx”,但是“usa.*” 只会匹配到“usa.news、usa.weather”。
    所以Topic Exchange 使用是非常灵活的,Topic Exchange生产者代码如下:

    class Program
    {
        static void Main(string[] args)
        {
            string exchangeName = "TopicExchange";
            string queueName = "DirectExchangeQueueName";
            string routeKey = "TopicExchangeKey.*";
    
            //创建连接工厂
            var factory = new ConnectionFactory
            {
                UserName = "dengwu",//用户名
                Password = "123456",//密码
                HostName = "192.168.112.133",//rabbitmq ip
            };
    
            //创建连接
            var connection = factory.CreateConnection();
            //创建通道
            var channel = connection.CreateModel();
    
            //定义一个Topic类型交换机
            channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);
    
            //定义队列1
            channel.QueueDeclare(queueName, false, false, false, null);
    
            //将队列绑定到交换机
            channel.QueueBind(queueName, exchangeName, routeKey, null);
    
            Console.WriteLine($"
    RabbitMQ连接成功,
    
    请输入消息,输入exit退出!");
    
            string input;
            do
            {
                input = Console.ReadLine();
    
                var sendBytes = Encoding.UTF8.GetBytes(input);
                //发布消息
                channel.BasicPublish(exchangeName, "TopicExchangeKey.one", null, sendBytes);
    
            } while (input.Trim().ToLower() != "exit");
            channel.Close();
            connection.Close();
        }
    }

    运行:

    2.4Headers Exchange

    Headers Exchange不依赖于Routing key 与 Binding key的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。这种模式基本没怎么用过,所以就跳过了。

    参考文献:
    RabbitMQ官网
    .NET Core 使用RabbitMQ

  • 相关阅读:
    2019牛客暑期多校训练营(第八场)A All-one Matrices(单调栈+前缀和)
    2019牛客暑期多校训练营(第三场)A Graph Games(分块)
    2019牛客暑期多校训练营(第二场)E MAZE(线段树维护矩阵+DP)
    2019牛客暑期多校训练营(第二场)D Kth Minimum Clique(bitset+暴力枚举)
    2019牛客暑期多校训练营(第一场)H XOR(线性基)
    2019牛客暑期多校训练营(第六场)D Move(multiset+枚举)
    2019牛客暑期多校训练营(第五场)H subsequence 2(拓扑排序)
    2019牛客暑期多校训练营(第六场)J Upgrading Technology(矩阵前缀和+最小子串和+贪心)
    2019牛客暑期多校训练营(第五场)G subsequence 1(dp+组合数)
    P3857 [TJOI2008]彩灯(线性基)
  • 原文地址:https://www.cnblogs.com/wzk153/p/13219397.html
Copyright © 2011-2022 走看看