zoukankan      html  css  js  c++  java
  • rabbitmq+java入门(四)routing

    参考:http://www.rabbitmq.com/tutorials/tutorial-four-java.html

    源码:https://github.com/zuzhaoyue/JavaDemo

    路由

    (使用Java客户端)

    先决条件

    本教程假定RabbitMQ 在标准端口(5672上的本地主机安装并运行如果您使用不同的主机,端口或证书,则连接设置需要进行调整。

    之前的教程中,我们构建了一个简单的日志系统 我们能够将日志消息广播给所有接收者。

    在本教程中,我们将添加一个功能 - 我们将只能订阅一部分消息。例如,我们只能将重要的错误消息引导到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

    绑定

    在前面的例子中,我们已经创建了绑定。之前的代码如下:

    channel.queueBind(queueName,EXCHANGE_NAME,“”);
    

    绑定是exchange和队列之间的关系。这可以简单地理解为:队列对来自该交换机的消息感兴趣。

    绑定可以采用额外的routingKey参数。为了避免与basic_publish参数混淆,我们将其称为 绑定键。以下就是我们如何使用绑定键创建一个绑定:

    channel.queueBind(queueName,EXCHANGE_NAME,“black”);
    

    绑定键的含义取决于交换类型。我们之前使用的 fanout忽略了它的值。

    direct exchange

    我们之前教程的日志记录系统将所有消息广播给所有消费者。我们希望将其扩展为允许根据其严重性过滤消息。例如,我们可能需要一个将日志消息写入磁盘的程序,以仅接收严重错误,而不会在警告或信息日志消息中浪费磁盘空间。

    我们正在使用一个fanout交换机,这没有给我们很大的灵活性 - 它只能进行盲目的广播。

    我们将使用direct交换。direct交换背后的路由算法很简单 - 消息进入队列,其 绑定密钥消息路由密钥完全匹配

    为了说明这一点,请考虑以下设置:

    在这个设置中,我们可以看到有两个队列绑定的direct exchange. 第一个队列用绑定键橙色绑定,第二个队列有两个绑定,一个绑定键为黑色,另一个为绿色

    在这种设置中,使用路由键橙色发布到交换机的消息 将被路由到队列Q1带有黑色 或绿色路由键的消息将进入Q2所有其他消息将被丢弃。

    多个绑定

    使用相同的绑定密钥绑定多个队列是完全合法的。在我们的例子中,我们可以使用绑定键黑色添加XQ1之间的绑定在这种情况下,direct交换就像fanout一样,并将消息广播到所有匹配的队列。带有路由键黑色的消息将传送到 Q1Q2

    发送日志

    我们将使用这个模型用于我们的日志系统。取而代之fanout,我们将消息发送到direct exchange我们将提供日志severity作为路由键这样接收程序将能够选择想要接收的严重程度。我们先关注发送日志。

    与往常一样,我们需要先创建一个交换:

    channel.exchangeDeclare(EXCHANGE_NAME,“direct”);
    

    我们准备发送一条消息:

    channel.basicPublish(EXCHANGE_NAME,severity,null,message.getBytes());
    

    为了简化事情,我们将假设“serverity”可以是'info','warn','error'之一。

    订阅

    接收消息的方式与上一个教程中的一样,只有一个例外 - 我们将为每个我们感兴趣的严重程度serverity创建一个新绑定。

    String queueName = channel.queueDeclare().getQueue();
    
    for(String severity:argv){
      channel.queueBind(queueName,EXCHANGE_NAME,severity);
    }
    

    把以上放在一起

    生产者EmitLogDirect.java的代码

    //package rmq.routing;
    
    /**
     * Created by zuzhaoyue on 18/5/17.
     */
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    
    public class EmitLogDirect {
    
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] argv) throws Exception {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//声明一个direct的exchange
    
            String severity = getSeverity(argv);
            String message = getMessage(argv);
    
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    
            channel.close();
            connection.close();
        }
    
        private static String getSeverity(String[] strings){
            if (strings.length < 1)
                return "info";
            return strings[0];
        }
    
        private static String getMessage(String[] strings){
            if (strings.length < 2)
                return "Hello World!";
            return joinStrings(strings, " ", 1);
        }
    
        private static String joinStrings(String[] strings, String delimiter, int startIndex) {
            int length = strings.length;
            if (length == 0 ) return "";
            if (length < startIndex ) return "";
            StringBuilder words = new StringBuilder(strings[startIndex]);
            for (int i = startIndex + 1; i < length; i++) {
                words.append(delimiter).append(strings[i]);
            }
            return words.toString();
        }
    }

    消费者的代码:

    //package rmq.routing;
    
    /**
     * Created by zuzhaoyue on 18/5/17.
     */
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class ReceiveLogsDirect {
    
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            String queueName = channel.queueDeclare().getQueue();
    
            if (argv.length < 1){
                System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
                System.exit(1);
            }
    
            for(String severity : argv){
                channel.queueBind(queueName, EXCHANGE_NAME, severity);
            }
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }

    测试

    1.编译

    javac -cp /data/amqp-client-4.2.0.jar EmitLogDirect.java ReceiveLogDirect.java

    2.执行

    1)打开三个窗口作为消费者

    第一个窗口输入命令:java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. ReceiveLogsDirect info

    第二个窗口输入命令:java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. ReceiveLogsDirect error

    第三个窗口输入命令:java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. ReceiveLogsDirect warn error>/data/rmqlogs.log

    2)启动生产者

    依次输入以下命令:

    java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. EmitLogDirect info   

    java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. EmitLogDirect error   

    java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. EmitLogDirect warn

    发现不同的消费者根据参数不同有选择地进行了消费:

       

    调试成功。

  • 相关阅读:
    Console.WriteLine输出字符格式化
    GridView动态生成列问题
    日历控件,可运行在XHTML1.0下
    GridView內容導出Excel時異常:必须置於有 runat=server 的表单标记之中
    圆弧分割
    矩阵变换
    使用group by 来统计的小作业
    group by 后使用 rollup 子句总结
    mysql字符串拼接,存储过程,(来自网上看到)
    【深入理解Linux内核】《第二章 内存寻址》笔记 (2014-06-28 12:38)
  • 原文地址:https://www.cnblogs.com/zuxiaoyuan/p/9052448.html
Copyright © 2011-2022 走看看