zoukankan      html  css  js  c++  java
  • rabbitMQ的几种工作模式及代码demo(二)-----订阅模式之广播fanout交换机

    1. 订阅模式
      • 订阅模式相比于点对点模式,多了一个交换机exchange角色。
      • 生产者会将消息发送给exchange
      • exchange一端接收消息。一端处理消息。例如,交给特定的队列、发送给所有的队列、或者将消息丢弃。
      • exchange有三种类型:Fanout广播、Direct定向、Topic通配符。
        1. Fanout:广播,将消息交给所有绑定到交换机的队列
        2. Direct:定向,把消息交给符合指定routing key 的队列
        3. Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
      • exchange只负责转发消息,不负责存储消息。所以如果没有任何队列和交换机绑定,或者没有符合路由规则的队列,那么消息就会被丢弃。
    2. Publish/Subscribe发布与订阅模式
      • 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
      • 与点对点模式对比,一条消息如果被转发到n个队列,并且每个队列都会有消费者消费。该条消息就会被消费n次。
      • 代码层面:多了声明交换机、绑定交换机和队列的两个步骤。消费者并没有多大变化。
        image
      • 广播交换机-生产者代码
      public class Producer {
          public static void main(String[] args) throws Exception {
      
              Connection connection = ConnectionUtil.getConnection();
              Channel channel = connection.createChannel();
              /*
             exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
             参数:
              1. exchange:交换机名称
              2. type:交换机类型
                  DIRECT("direct"),:定向
                  FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                  TOPIC("topic"),通配符的方式
                  HEADERS("headers");参数匹配
              3. durable:是否持久化
              4. autoDelete:自动删除
              5. internal:内部使用。 一般false
              6. arguments:参数
              */
              String exchangeName = "test_fanout";
              //5. 创建交换机
              channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
              //6. 创建队列
              String queue1Name = "test_fanout_queue1";
              String queue2Name = "test_fanout_queue2";
              channel.queueDeclare(queue1Name,true,false,false,null);
              channel.queueDeclare(queue2Name,true,false,false,null);
              //7. 绑定队列和交换机
              /*
              queueBind(String queue, String exchange, String routingKey)
              参数:
                  1. queue:队列名称
                  2. exchange:交换机名称
                  3. routingKey:路由键,绑定规则
                      如果交换机的类型为fanout ,routingKey设置为""
               */
              channel.queueBind(queue1Name,exchangeName,"");
              channel.queueBind(queue2Name,exchangeName,"");
      
              String body = "日志信息:张三调用了findAll方法...日志级别:info...";
              //8. 发送消息
              channel.basicPublish(exchangeName,"",null,body.getBytes());
      
              //9. 释放资源
              channel.close();
              connection.close();
          }
      }
      
      • 广播交换机-对应队列1的消费者代码
      public class Consumer1 {
          public static void main(String[] args) throws Exception {
              Connection connection = ConnectionUtil.getConnection();
              Channel channel = connection.createChannel();
              String queue1Name = "test_fanout_queue1";
              Consumer consumer = new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("body:"+new String(body));
                      System.out.println("将日志信息打印到控制台.....");
                  }
              };
              channel.basicConsume(queue1Name,true,consumer);
          }
      }
      
      • 广播交换机-对应队列2的消费者代码
          public class Consumer2 {
          public static void main(String[] args) throws Exception {
              Connection connection = ConnectionUtil.getConnection();
              Channel channel = connection.createChannel();
              String queue2Name = "test_fanout_queue2";
              Consumer consumer = new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("body:"+new String(body));
                      System.out.println("将日志信息打印到控制台.....");
                  }
              };
              channel.basicConsume(queue2Name,true,consumer);
          }
      }
      
  • 相关阅读:
    20145228 《信息安全系统设计基础》第九周学习总结 (1)
    20145228《信息安全系统设计基础》期中总结
    20145228《信息安全系统设计基础》第一次实验实验报告
    20145228 《信息安全系统设计基础》第八周学习总结 (1)
    20145228 《信息安全系统设计基础》第七周学习总结 (2)
    20145228 《信息安全系统设计基础》第七周学习总结 (1)
    20145228 《信息安全系统设计基础》第六周学习总结 (2)
    20145228 《信息安全系统设计基础》第六周学习总结 (1)
    20145203《信息安全系统设计》 实验二 固件开发
    20145203 《信息安全系统设计基础》第九周学习总结
  • 原文地址:https://www.cnblogs.com/rbwbear/p/15557669.html
Copyright © 2011-2022 走看看