zoukankan      html  css  js  c++  java
  • rabbitMQ_routing(四)

    路由

    本次我们将通过路由将信息发送到指定的队列中,将消息发送给指定的队列需要在转发器和队列之间建立一个routeKey

    绑定

    在以前的例子中,我们已经创建了绑定。你可能会记得如下代码:

    channel.queueBind(queueName,EXCHANGE_NAME,“”);
    以上代码是针对于fanout转发器,这种类型的转发器会广播消息到所以与它绑定了的队列,会忽略第三个参数
    本例将使用direct类型的转发器,这种转发器需要指定routeKey,告诉这个转发器,消息应当转发给哪个队列
    channel.queueBind(queueName,EXCHANGE_NAME,“black”);

    以上代码中的第三个参数black就是routekey。

      routeKey的含义取决于转发类型。第三个参数是routeKey,如果转发器类型是fanout,那么这个routeKey会被忽略。

    direct转发器

    直接转发器的路由算法很简单 - 消息只会传递给routeKey匹配的队列。如下图:

     

    上图中,我们可以看到直接转发器X与两个绑定的队列。第一个队列与routeKey橙色绑定,第二个队列有两个绑定,一个routeKey为黑色,另一个routeKey绿色

    生产者发送消息给orange这个routeKey,那么消息会被转发到Q1这个队列,如果routeKey是black和green,那么就会转发到Q2这个队列。

    多重绑定

    使用相同的routeKey绑定多个队列是完全合法的。在我们的示例中,我们可以在XQ1之间添加routeKey black在这种情况下,direct转发器将表现得像fanout转发器,并将消息广播到所有匹配的队列。具有routeKey为black的消息将传送到 Q1和Q2。

    提交日志

    本次实例将消息发送到direct转发器,转发器根据routeKey决定消息应当发往哪个队列,我们将以日志的严重程度(error, warning, info)作为routeKey。

    我们需要定义一个直接转发器:

    channel.exchangeDeclare(EXCHANGE_NAME,“direct”);
    

    我们准备发送消息:

    channel.basicPublish(EXCHANGE_NAME,severity,null,message.getBytes());
    

    severity为error,info,warning之一。

    订阅

       创建一个排他,自动删除,随机命名,非持久的队列:

    String queueName = channel.queueDeclare().getQueue();
    
    使用severity绑定队列 for(String severity:argv){ channel.queueBind(queueName,EXCHANGE_NAME,severity); }

    下图为绑定示意图:

    EmitLogDirect.java类的代码:

     1 package com.rabbitMQ;
     2 
     3 import com.rabbitmq.client.Channel;
     4 import com.rabbitmq.client.Connection;
     5 import com.rabbitmq.client.ConnectionFactory;
     6 
     7 public class EmitLogDirect {
     8 
     9     private static final String EXCHANGE_NAME = "direct_logs";
    10 
    11     public static void main(String[] argv) throws java.io.IOException, Exception {
    12 
    13         ConnectionFactory factory = new ConnectionFactory();
    14         factory.setHost("localhost");
    15         Connection connection = factory.newConnection();
    16         Channel channel = connection.createChannel();
    17 
    18         channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    19 
    20         String severity = getSeverity(argv);
    21         String message = getMessage(argv);
    22         // 第二参数为routeKey,指定消息应当发给哪个队列
    23         channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
    24         System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    25 
    26         channel.close();
    27         connection.close();
    28     }
    29 
    30     private static String getSeverity(String[] argv) {
    31 
    32         if (argv.length == 0) {
    33 
    34             return "info";
    35         }
    36         String str = argv[0];
    37 
    38         String routeKey = str.replaceAll("=.*", "");
    39 
    40         return routeKey;
    41     }
    42 
    43     private static String getMessage(String[] argv) {
    44         if (argv.length == 0) {
    45 
    46             return "mistake happen!!!";
    47         }
    48         String str = argv[0];
    49         String message = str.replaceAll(".*=", "");
    50         return message;
    51     }
    52 
    53 }

    ReceiveLogsDirect.java的代码:

     1 package com.rabbitMQ;
     2 
     3 import com.rabbitmq.client.*;
     4 
     5 import java.io.IOException;
     6 
     7 public class ReceiveLogsDirect {
     8 
     9   private static final String EXCHANGE_NAME = "direct_logs";
    10 
    11   public static void main(String[] argv) throws Exception {
    12     ConnectionFactory factory = new ConnectionFactory();
    13     factory.setHost("localhost");
    14     Connection connection = factory.newConnection();
    15     Channel channel = connection.createChannel();
    16 
    17     channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    18     String queueName = channel.queueDeclare().getQueue();
    19 
    20     if (argv.length < 1){
    21       System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
    22       System.exit(1);
    23     }
    24 
    25     //给转发器和队列之间建立routeKey,传入info,warning,error等级
    26     for(String severity : argv){
    27       channel.queueBind(queueName, EXCHANGE_NAME, severity);
    28     }
    29     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    30 
    31     Consumer consumer = new DefaultConsumer(channel) {
    32       @Override
    33       public void handleDelivery(String consumerTag, Envelope envelope,
    34                                  AMQP.BasicProperties properties, byte[] body) throws IOException {
    35         String message = new String(body, "UTF-8");
    36         System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
    37       }
    38     };
    39     channel.basicConsume(queueName, true, consumer);
    40   }
    41 }

    启动ReceiveLogsDirect.java,给它传error,warning,info参数

    再次启动ReceiveLogsDirect.java,给它传warning,info参数

    启动EmitLogDirect.java,给它传error=some error happend

    那么只有第一个ReceiveLogsDirect.java才能接收到消息,

    再次启动EmitLogDirect.java,给它传info=Send message

    那么两个ReceiveLogsDirect.java都会接收到消息。

  • 相关阅读:
    HDU-1102 Constructing Roads ( 最小生成树 )
    POJ-1287 Networking ( 最小生成树 )
    HDU-1272 小希的迷宫 ( 并查集 )
    Java基本数据类型、关键字
    观察者模式
    Android系统启动过程分析
    Activity启动过程源码分析(Android 8.0)
    Okhttp解析—Okhttp概览
    Okhttp解析—Interceptor详解
    Okhttp源码分析--基本使用流程分析
  • 原文地址:https://www.cnblogs.com/honger/p/6963766.html
Copyright © 2011-2022 走看看