zoukankan      html  css  js  c++  java
  • rabbitMQ的几种工作模式及代码demo(三)--订阅模式之路由direct交换机

    1. Routing路由模式
      • 需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
      • 应用场景:在不同策略下,使用不同的消费者去使用该模式。
      • 代码层面:交换机与队列绑定时,必须指定routing key;在发送消息时,也必须给对应的消息指定routing key。
        image
      • 生产者代码:
      public class Producer {
          public static void main(String[] args) throws Exception {
             Connection connection = ConnectionUtil.getConnection();
             Channel channel = connection.createChannel();
             String exchangeName = "test_direct";
             // 创建交换机
             channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);
             // 创建队列
             String queue1Name = "test_direct_queue1";
             String queue2Name = "test_direct_queue2";
              // 声明(创建)队列
             channel.queueDeclare(queue1Name,true,false,false,null);
             channel.queueDeclare(queue2Name,true,false,false,null);
             // 队列绑定交换机
              // 队列1绑定error
              channel.queueBind(queue1Name,exchangeName,"error");
              // 队列2绑定info error warning
              channel.queueBind(queue2Name,exchangeName,"info");
              channel.queueBind(queue2Name,exchangeName,"error");
              channel.queueBind(queue2Name,exchangeName,"warning");
      
              String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";
              // 发送消息
      	/**
           	* 参数1:交换机名称
           	* 参数2:路由key,简单模式可以传递队列名称
           	* 参数3:配置信息
           	* 参数4:消息内容
           	*/
              channel.basicPublish(exchangeName,"warning",null,message.getBytes());
              System.out.println(message);
      
              channel.close();
              connection.close();
          }
      }
      
      • 消费者1代码:
      public class Consumer1 {
          public static void main(String[] args) throws Exception {
              Connection connection = ConnectionUtil.getConnection();
              Channel channel = connection.createChannel();
              String queue1Name = "test_direct_queue1";
              Consumer consumer = new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("body:"+new String(body));
                      System.out.println("将日志信息打印到控制台.....");
                  }
              };
              channel.basicConsume(queue1Name,true,consumer);
          }
      }
      
      • 消费者2代码:
      public class Consumer2 {
          public static void main(String[] args) throws Exception {
              Connection connection = ConnectionUtil.getConnection();
              Channel channel = connection.createChannel();
              String queue2Name = "test_direct_queue2";
              Consumer consumer = new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("body:"+new String(body));
                      System.out.println("将日志信息打印到控制台.....");
                  }
              };
              channel.basicConsume(queue2Name,true,consumer);
          }
      }
      
  • 相关阅读:
    扩展知识
    day61——多表操作(增、删除、改、基于对象的跨表查询)
    day60——单表操作补充(批量插入、查询、表结构)
    day59——orm单表操作
    day58——模板继承、组件、自定义标签和过滤器、inclusion_tag、静态文件配置、url别名和反向解析、url命名空间
    day57——视图、模板渲染
    WARNING: Ignoring invalid distribution -ip
    Python- 【python无法更新pip】提示python.exe: No module named pip
    Anaconda Prompt 切换路径不能进入D盘
    Failed calling sys.__interactivehook__ 错误的解决
  • 原文地址:https://www.cnblogs.com/rbwbear/p/15557678.html
Copyright © 2011-2022 走看看