zoukankan      html  css  js  c++  java
  • RabbitMQ—Routing路由模式


     一、模式说明

    1.路由模式特点:

    • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey (路由key
    • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
    • Exchange不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的
    • Routingkey 与消息的 Routing key 完全一致,才会接收到消息 

    图解:
    P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key
    XExchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
    C1:消费者,其所在队列指定了需要routing key error 的消息
    C2:消费者,其所在队列指定了需要routing key infoerrorwarning 的消息

    二、示例 

    1.Producer_Routing

     1 /**
     2  * 发送消息
     3  */
     4 public class Producer_Routing {
     5     public static void main(String[] args) throws IOException, TimeoutException {
     6 
     7         //1.创建连接工厂
     8         ConnectionFactory factory = new ConnectionFactory();
     9         //2. 设置参数
    10         factory.setHost("172.16.98.133");//ip  默认值 localhost
    11         factory.setPort(5672); //端口  默认值 5672
    12         factory.setVirtualHost("/itcast");//虚拟机 默认值/
    13         factory.setUsername("jingdong");//用户名 默认 guest
    14         factory.setPassword("jingdong");//密码 默认值 guest
    15         //3. 创建连接 Connection
    16         Connection connection = factory.newConnection();
    17         //4. 创建Channel
    18         Channel channel = connection.createChannel();
    19        /*
    20 
    21        exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
    22        参数:
    23         1. exchange:交换机名称
    24         2. type:交换机类型
    25             DIRECT("direct"),:定向
    26             FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
    27             TOPIC("topic"),通配符的方式
    28             HEADERS("headers");参数匹配
    29 
    30         3. durable:是否持久化
    31         4. autoDelete:自动删除
    32         5. internal:内部使用。 一般false
    33         6. arguments:参数
    34         */
    35 
    36        String exchangeName = "test_direct";
    37         //5. 创建交换机
    38         channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
    39         //6. 创建队列
    40         String queue1Name = "test_direct_queue1";
    41         String queue2Name = "test_direct_queue2";
    42 
    43         channel.queueDeclare(queue1Name,true,false,false,null);
    44         channel.queueDeclare(queue2Name,true,false,false,null);
    45         //7. 绑定队列和交换机
    46         /*
    47         queueBind(String queue, String exchange, String routingKey)
    48         参数:
    49             1. queue:队列名称
    50             2. exchange:交换机名称
    51             3. routingKey:路由键,绑定规则
    52                 如果交换机的类型为fanout ,routingKey设置为""
    53          */
    54         //队列1绑定 error
    55         channel.queueBind(queue1Name,exchangeName,"error");
    56         //队列2绑定 info  error  warning
    57         channel.queueBind(queue2Name,exchangeName,"info");
    58         channel.queueBind(queue2Name,exchangeName,"error");
    59         channel.queueBind(queue2Name,exchangeName,"warning");
    60 
    61         String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";
    62         //8. 发送消息
    63         channel.basicPublish(exchangeName,"warning",null,body.getBytes());
    64 
    65         //9. 释放资源
    66         channel.close();
    67         connection.close();
    68 
    69     }
    70 }
    View Code

    2.

    (1)Consumer_Routing1

     1 public class Consumer_PubSub2 {
     2     public static void main(String[] args) throws IOException, TimeoutException {
     3 
     4         //1.创建连接工厂
     5         ConnectionFactory factory = new ConnectionFactory();
     6         //2. 设置参数
     7         factory.setHost("172.16.98.133");//ip  默认值 localhost
     8         factory.setPort(5672); //端口  默认值 5672
     9         factory.setVirtualHost("/itcast");//虚拟机 默认值/
    10         factory.setUsername("jingdong");//用户名 默认 guest
    11         factory.setPassword("jingdong");//密码 默认值 guest
    12         //3. 创建连接 Connection
    13         Connection connection = factory.newConnection();
    14         //4. 创建Channel
    15         Channel channel = connection.createChannel();
    16 
    17 
    18         String queue1Name = "test_fanout_queue1";
    19         String queue2Name = "test_fanout_queue2";
    20 
    21 
    22         /*
    23         basicConsume(String queue, boolean autoAck, Consumer callback)
    24         参数:
    25             1. queue:队列名称
    26             2. autoAck:是否自动确认
    27             3. callback:回调对象
    28 
    29          */
    30         // 接收消息
    31         Consumer consumer = new DefaultConsumer(channel){
    32             /*
    33                 回调方法,当收到消息后,会自动执行该方法
    34 
    35                 1. consumerTag:标识
    36                 2. envelope:获取一些信息,交换机,路由key...
    37                 3. properties:配置信息
    38                 4. body:数据
    39 
    40              */
    41             @Override
    42             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    43               /*  System.out.println("consumerTag:"+consumerTag);
    44                 System.out.println("Exchange:"+envelope.getExchange());
    45                 System.out.println("RoutingKey:"+envelope.getRoutingKey());
    46                 System.out.println("properties:"+properties);*/
    47                 System.out.println("body:"+new String(body));
    48                 System.out.println("将日志信息保存数据库.....");
    49             }
    50         };
    51         channel.basicConsume(queue2Name,true,consumer);
    52 
    53 
    54         //关闭资源?不要
    55 
    56     }
    57 }
    View Code

    (2)Consumer_Routing2

     1 public class Consumer_Routing2 {
     2     public static void main(String[] args) throws IOException, TimeoutException {
     3 
     4         //1.创建连接工厂
     5         ConnectionFactory factory = new ConnectionFactory();
     6         //2. 设置参数
     7         factory.setHost("172.16.98.133");//ip  默认值 localhost
     8         factory.setPort(5672); //端口  默认值 5672
     9         factory.setVirtualHost("/itcast");//虚拟机 默认值/
    10         factory.setUsername("jingdong");//用户名 默认 guest
    11         factory.setPassword("jingdong");//密码 默认值 guest
    12         //3. 创建连接 Connection
    13         Connection connection = factory.newConnection();
    14         //4. 创建Channel
    15         Channel channel = connection.createChannel();
    16 
    17         String queue1Name = "test_direct_queue1";
    18         String queue2Name = "test_direct_queue2";
    19 
    20         // 接收消息
    21         Consumer consumer = new DefaultConsumer(channel){
    22             /*
    23                 回调方法,当收到消息后,会自动执行该方法
    24 
    25                 1. consumerTag:标识
    26                 2. envelope:获取一些信息,交换机,路由key...
    27                 3. properties:配置信息
    28                 4. body:数据
    29 
    30              */
    31             @Override
    32             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    33               /*  System.out.println("consumerTag:"+consumerTag);
    34                 System.out.println("Exchange:"+envelope.getExchange());
    35                 System.out.println("RoutingKey:"+envelope.getRoutingKey());
    36                 System.out.println("properties:"+properties);*/
    37                 System.out.println("body:"+new String(body));
    38                 System.out.println("将日志信息存储到数据库.....");
    39             }
    40         };
    41          /*
    42         basicConsume(String queue, boolean autoAck, Consumer callback)
    43         参数:
    44             1. queue:队列名称
    45             2. autoAck:是否自动确认
    46             3. callback:回调对象
    47 
    48          */
    49         channel.basicConsume(queue1Name,true,consumer);
    50 
    51 
    52         //关闭资源?不要
    53 
    54     }
    55 }
    View Code
  • 相关阅读:
    Kubernetes 内存资源限制实战
    kubernetes 资源管理概述
    终于对探针下手了
    容器和容器云
    理解 Kubernetes volume 和 共享存储
    理解 docker volume
    docker 镜像管理之 overlay2 最佳实践
    docker 资源限制之 cgroup
    Debug Assertion Failed!
    Qt 字符串转md5
  • 原文地址:https://www.cnblogs.com/aaaazzzz/p/12821415.html
Copyright © 2011-2022 走看看