zoukankan      html  css  js  c++  java
  • RabbitMQ学习第四记:路由模式(direct)

    1、什么是路由模式(direct)

      路由模式是在使用交换机的同时,生产者指定路由发送数据,消费者绑定路由接受数据。与发布/订阅模式不同的是,发布/订阅模式只要是绑定了交换机的队列都会收到生产者向交换机推送过来的数据。而路由模式下加了一个路由设置,生产者向交换机发送数据时,会声明发送给交换机下的那个路由,并且只有当消费者的队列绑定了交换机并且声明了路由,才会收到数据。下图取自于官方网站(RabbitMQ)的路由模式的图例

    P:消息的生产者

    X:交换机

    红色:队列

    C1,C2:消息消费者

    error,info,warning:路由

      举个日志处理例子:系统需要针对日志做分析,首先所有的日志级别的日志都需要保存,其次error日志级别的日志需要单独做处理。这时就可以使用路由模式来处理了,声明交换机使用路由模式,每个日志级别的日志对应一个路由(error,info,warning)。声明一个保存日志队列用于接受所有日志,绑定交换机并绑定所有路由。声明第二个队列用于处理error级别日志,绑定交换机且只绑定error路由。以下是代码讲解。(先运行两个消费者,在运行生产者。如果没有提前将队列绑定到交换机,那么直接运行生产者的话,消息是不会发到任何队列里的

    2、生产者(Send)代码

    public class Send
    {
        //交换机名称
        private final static String EXCHANGE_NAME       = "test_exchange_direct";
        
        //路由名称warning
        private final static String ROUTING_KEY_WARNING = "warning";
        
        //路由名称info
        private final static String ROUTING_KEY_INFO    = "info";
        
        //路由名称error
        private final static String ROUTING_KEY_ERROR   = "error";
        
        public static void main(String[] args)
        {
            try
            {
                //获取连接
                Connection connection = ConnectionUtil.getConnection();
                //从连接中获取一个通道
                Channel channel = connection.createChannel();
                //声明交换机
                channel.exchangeDeclare(EXCHANGE_NAME, "direct");
                String message = "this is warning log";
                //发送消息(warning级别日志)
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_WARNING, null, message.getBytes("utf-8"));
                System.out.println("[send]:" + message);
                //发送消息(info级别日志)
                message = "this is info log";
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_INFO, null, message.getBytes("utf-8"));
                System.out.println("[send]:" + message);
                //发送消息(error级别日志)
                message = "this is error log";
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_ERROR, null, message.getBytes("utf-8"));
                System.out.println("[send]:" + message);
                channel.close();
                connection.close();
            }
            catch (IOException | TimeoutException e)
            {
                e.printStackTrace();
            }
        }
    }

    运行结果:

    [send]:this is warning log
    [send]:this is info log
    [send]:this is error log

    3、消费者1(ReceiveAllLog)

    public class ReceiveAllLog
    {
        //交换机名称
        private final static String EXCHANGE_NAME       = "test_exchange_direct";
        
        //路由名称warning
        private final static String ROUTING_KEY_WARNING = "warning";
        
        //路由名称info
        private final static String ROUTING_KEY_INFO    = "info";
        
        //路由名称error
        private final static String ROUTING_KEY_ERROR   = "error";
        
        //队列名称
        private static final String QUEUE_NAME          = "test_queue_save_all_log";
        
        public static void main(String[] args)
        {
            try
            {
                //获取连接
                Connection connection = ConnectionUtil.getConnection();
                //从连接中获取一个通道
                final Channel channel = connection.createChannel();
                //声明交换机
                channel.exchangeDeclare(EXCHANGE_NAME, "direct");
                //声明队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //将队列绑定到交换机(指定路由info,error,warning)
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_INFO);
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ERROR);
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_WARNING);
                //保证一次只分发一个  
                int prefetchCount = 1;
                channel.basicQos(prefetchCount);
                //定义消费者
                DefaultConsumer consumer = new DefaultConsumer(channel)
                {
                    //当消息到达时执行回调方法
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException
                    {
                        String message = new String(body, "utf-8");
                        System.out.println("[test_queue_save_all_log] Receive message:" + message);
                        try
                        {
                            //消费者休息2s处理业务
                            Thread.sleep(1000);
                        }
                        catch (InterruptedException e)
                        {
                            e.printStackTrace();
                        }
                        finally
                        {
                            //手动应答
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                    }
                };
                //设置手动应答
                boolean autoAck = false;
                //监听队列
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
        }
    }

    运行结果:

    [test_queue_save_all_log] Receive message:this is warning log
    [test_queue_save_all_log] Receive message:this is info log
    [test_queue_save_all_log] Receive message:this is error log

    4、消费者2(ReceiveErrorLog)

    public class ReceiveErrorLog
    {
        //交换机名称
        private final static String EXCHANGE_NAME     = "test_exchange_direct";
        
        //路由名称error
        private final static String ROUTING_KEY_ERROR = "error";
        
        //队列名称
        private static final String QUEUE_NAME        = "test_queue_handel_error";
        
        public static void main(String[] args)
        {
            try
            {
                //获取连接
                Connection connection = ConnectionUtil.getConnection();
                //从连接中获取一个通道
                final Channel channel = connection.createChannel();
                //声明队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //将队列绑定到交换机(指定路由error)
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ERROR);
                //保证一次只分发一个  
                int prefetchCount = 1;
                channel.basicQos(prefetchCount);
                //定义消费者
                DefaultConsumer consumer = new DefaultConsumer(channel)
                {
                    //当消息到达时执行回调方法
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException
                    {
                        String message = new String(body, "utf-8");
                        System.out.println("[test_queue_handel_error] Receive message:" + message);
                        try
                        {
                            //消费者休息2s处理业务
                            Thread.sleep(1000);
                        }
                        catch (InterruptedException e)
                        {
                            e.printStackTrace();
                        }
                        finally
                        {
                            //手动应答
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                    }
                };
                //设置手动应答
                boolean autoAck = false;
                //监听队列
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
        }
    }

    运行结果:
    [test_queue_handel_error] Receive message:this is error log

    总结:

      1.两个队列消费者设置的路由不一样,接收到的消息就不一样。路由模式下,决定消息向队列推送的主要取决于路由,而不是交换机了。

      2.该模式必须设置交换机,且声明路由模式:channel.exchangeDeclare(EXCHANGE_NAME, "direct");

    注意:本文仅代表个人理解和看法哟!和本人所在公司和团体无任何关系!

  • 相关阅读:
    Apache Ant 1.9.1 版发布
    Apache Subversion 1.8.0rc2 发布
    GNU Gatekeeper 3.3 发布,网关守护管理
    Jekyll 1.0 发布,Ruby 的静态网站生成器
    R语言 3.0.1 源码已经提交到 Github
    SymmetricDS 3.4.0 发布,数据同步和复制
    beego 0.6.0 版本发布,Go 应用框架
    Doxygen 1.8.4 发布,文档生成工具
    SunshineCRM 20130518发布,附带更新说明
    Semplice Linux 4 发布,轻量级发行版
  • 原文地址:https://www.cnblogs.com/wy697495/p/9615616.html
Copyright © 2011-2022 走看看