zoukankan      html  css  js  c++  java
  • RabbitMQ入门-路由-有选择的接受消息

    比如一个日志系统,之前的处理方式呢,是各种类型(info,error,warning)的消息都发给订阅者,可是实际情况上不一定都需要。可能A需要error,其他的都不需要。那么就引入了今天的处理方式--路由(直接交换)

    (兔子的官网真心良心,图文并茂,通俗易懂)这种处理方式你只需记住一个字:有选择的接受消息

     首先,我们将消息绑定在不同的路由键上,然后消费者根据需要绑定对应的路由键即可收到消息。路由键随便取名字

    生产者代码:

    package com.example.demo;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 有选择的接受消息
     */
    public class RoutingSend {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();    // 连接工厂
            factory.setHost("localhost");
            Connection connection = factory.newConnection();        // 获取连接
            Channel channel = connection.createChannel();
    
            // 当我们发送时,需要一个路由密钥,这里选择直接交换
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    
            String[] msg = {"错误","信息","警告"};
            // 第二个参数为路由密钥
            channel.basicPublish(EXCHANGE_NAME, "error", null, msg[0].getBytes());
            channel.basicPublish(EXCHANGE_NAME, "info", null, msg[1].getBytes());
            channel.basicPublish(EXCHANGE_NAME, "warning", null, msg[2].getBytes());
            System.out.println("PS-Send:" + msg.toString());
    
            channel.close();
            connection.close();
    
        }
    }

    消费者代码:

    package com.example.demo;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class RoutingReceive {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();    // 连接工厂
            factory.setHost("localhost");
            Connection connection = factory.newConnection();        // 获取连接
            Channel channel = connection.createChannel();
    
            // 声明一个direct交换类型
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    
            // 当声明队列,不加任何参数,产生的将是一个临时队列,getQueue返回的是队列名称
            String queueA = channel.queueDeclare().getQueue();
            String queueB = channel.queueDeclare().getQueue();
            System.out.println("临时队列:" + queueA);
            System.out.println("临时队列:" + queueB);
    
            // 第三个参数为“绑定建”
            channel.queueBind(queueA, EXCHANGE_NAME, "error");
            channel.queueBind(queueB, EXCHANGE_NAME, "info");
            channel.queueBind(queueB, EXCHANGE_NAME, "warning");
    
            Consumer consumerA = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String recv = new String(body, "UTF-8");
                    System.out.println("Direct-Receive-A:" + recv);
                }
            };
            Consumer consumerB = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String recv = new String(body, "UTF-8");
                    System.out.println("Direct-Receive-B:" + recv);
                }
            };
            channel.basicConsume(queueA, true, consumerA);
            channel.basicConsume(queueB, true, consumerB);
        }
    }

    先启动消费者:再启动生产者,查看控制台:

    ..

  • 相关阅读:
    政务公开系统专栏首页的跨站攻击漏洞
    Spring+XFire WSSecurity安全认证开发感悟
    Appfuse使用中遇到的问题及解决方案
    How to get the rowid when insert the data to Oracle database
    How to configure CVS in IntelliJ IDEA
    localhost打不开 127.0.0.1可以打开,,,或 hosts 文件不起作用的解决方法
    ASp.net中Froms验证方式
    ASP.NET 页面执行顺序详解
    【转】防止用户通过后退按钮重复提交表单ASP中的response.Buffer,Response.Expires,Response.CacheControl
    页面事件(Init,Load,PreRender)执行顺序__简单总结
  • 原文地址:https://www.cnblogs.com/LUA123/p/8477189.html
Copyright © 2011-2022 走看看