zoukankan      html  css  js  c++  java
  • rabbitmq Exchange四种模式

    一、什么是Exchange

    RabbitMQ 是 AMQP(高级消息队列协议)的标准实现:

    从 AMQP 协议可以看出,Queue、Exchange 和 Binding 构成了 AMQP 协议的核心

    • Producer:消息生产者,即投递消息的程序。

    • Broker:消息队列服务器实体。

      • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

      • Binding:绑定,它的作用就是把 Exchange 和 Queue 按照路由规则绑定起来。

      • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

    • Consumer:消息消费者,即接受消息的程序。

    二、Exchange的类型

    RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种

    fanout 

    fanout类型的Exchange路由规则非常简单,它会把所有发送到fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

    Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。所以,Fanout Exchange 转发消息是最快的。

    direct

    direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。

    direct Exchange是RabbitMQ Broker的默认Exchange,它有一个特别的属性对一些简单的应用来说是非常有用的,在使用这个类型的Exchange时,可以不必指定routing key的名字,在此类型下创建的Queue有一个默认的routing key,这个routing key一般同Queue同名。

     
     

    direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

    topic

    前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

    a)、routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
    b)、binding key与routing key一样也是句点号“. ”分隔的字符串
    c)、binding key中可以存在两种特殊字符"*"与“#”,用于做模糊匹配,其中" * "用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
    所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号"*"匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.*” 只会匹配到“log.error”

    headers

    headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
    在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

     

    示例代码
    生产者
    const amqp = require('amqplib');
    
    async function producer() {
      try {
        // 1. 创建链接对象
        const connection = await amqp.connect('amqp://localhost:5672');
    
        // 2. 获取通道
        const channel = await connection.createChannel();
    
        // 3. 声明参数
        const exchangeName = 'qosEx';
        const routingKey = 'qos.test001.cool';
        const msg = 'Producer';
    
        // 4. 声明交换机
        await channel.assertExchange(exchangeName, 'topic', { durable: true });
    
        for (let i = 0; i < 5; i++) {
          // 5. 发送消息
          console.log(i);
          await channel.publish(
            exchangeName,
            routingKey,
            Buffer.from(`${msg} 第${i}条消息`)
          );
        }
    
        await channel.close();
      } catch (error) {
        console.log(error);
      }
    }
    
    producer();

    消费者

    const amqp = require('amqplib');
    
    async function consumer() {
      // 1. 创建链接对象
      const connection = await amqp.connect('amqp://122.51.9.11:5672');
    
      // 2. 获取通道
      const channel = await connection.createChannel();
    
      // 3. 声明参数
      const exchangeName = 'qosEx';
      const queueName = 'qosQueue';
      const bindingKey = 'qos.#';
      // const bindingKey = 'qos.*';  无法匹配
    
      // 4. 声明交换机、对列进行绑定
      await channel.assertExchange(exchangeName, 'topic', { durable: true });
      await channel.assertQueue(queueName);
      await channel.bindQueue(queueName, exchangeName, bindingKey);
    
      // 5. 限流参数设置
      await channel.prefetch(1, false);// count:每次推送给消费端 N 条消息数目,如果这 N 条消息没有被ack,生产端将不会再次推送直到这 N 条消息被消费。global:在哪个级别上做限制,ture 为 channel 上做限制,false 为消费端上做限制,默认为 false。
      // 6. 限流,noAck参数必须设置为false
      await channel.consume(
        queueName,
        (msg) => {
          console.log('Consumer:', msg.content.toString());
          channel.ack(msg);
        },
        { noAck: false }
      );
    }
    
    consumer();
    参考链接:https://www.jianshu.com/p/19af0f40bbde



  • 相关阅读:
    第三百九十九节,Django+Xadmin打造上线标准的在线教育平台—生产环境部署CentOS6.5安装mysql5.6
    第三百九十七节,Django+Xadmin打造上线标准的在线教育平台—其他插件使用说,主题本地化设置
    第三百九十八节,Django+Xadmin打造上线标准的在线教育平台—生产环境部署CentOS6.5系统环境设置
    第三百九十六节,Django+Xadmin打造上线标准的在线教育平台—其他插件使用说,自定义列表页上传插件
    第三百九十五节,Django+Xadmin打造上线标准的在线教育平台—Xadmin集成富文本框
    第三百九十四节,Django+Xadmin打造上线标准的在线教育平台—Xadmin后台进阶开发配置2,以及目录结构说明
    第三百九十三节,Django+Xadmin打造上线标准的在线教育平台—Xadmin后台进阶开发配置
    第三百九十二节,Django+Xadmin打造上线标准的在线教育平台—sql注入攻击,xss攻击,csrf攻击
    第三百九十一节,Django+Xadmin打造上线标准的在线教育平台—404,403,500页面配置
    第三百九十节,Django+Xadmin打造上线标准的在线教育平台—Django+cropper插件头像裁剪上传
  • 原文地址:https://www.cnblogs.com/xiaosongJiang/p/13038062.html
Copyright © 2011-2022 走看看