zoukankan      html  css  js  c++  java
  • RabbitMQ(三): exchange 的使用

    1. Exchange(交换机)

    生产者只能发送信息到交换机,交换机接收到生产者的信息,然后按照规则把它推送到对列中。

    一方面是接收生产者的消息,另一方面是像队列推送消息。

    匿名转发 ""

    • fanout(不处理路由键)
    // 声明交换机
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 分发
    
    // 发送消息
    String msg = "hello ps.";
    channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
    

    MQ_Fanout_exchange.png

    • direct(处理路由键)

    MQ_Direct_exchange.png

    • topic exchange

    将路由键和某模式匹配

    # 匹配零个或者多个

    * 匹配一个

    MQ_Topic_exchange.png

    2. 发布/订阅模式(publish/subscribe)

    模型

    MQ订阅模式.png

    解读:

    • 一个消费者,多个生产者

    • 每一个消费者都有自己的队列

    • 生产者没有直接把消息发送到队列,而是发到了交换机

    • 每个队列都要绑定到交换机上

    • 生产者发送的消息经过交换机到达队列实现一个消息被多个消费者消费

    生产者

    public class Send {
        private static final String EXCHANGE_NAME = "test_exchange_fanout";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            // 声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 分发
            // 发送消息
            String msg = "hello ps.";
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
            System.out.println("Send: " + msg);
    
            channel.close();
            connection.close();
        }
    }
    

    MQ_exchange.png

    消息哪去了??丢失了!因为交换机没有存储的能力,在 rabbitmq 里面只有队列有存储能力。

    因为这时候还没有队列绑定到这个交换机,所以数据丢失了。

    消费者

    • 消费者1
    public class Recv1 {
        public static final String QUEUE_NAME = "test_queue_fanout_email";
        private static final String EXCHANGE_NAME = "test_exchange_fanout";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 绑定队列到交换机(转发器)
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            channel.basicQos(1);  // 保证一次只发送一个
            // 定义一个消费者
            Consumer consumer = new DefaultConsumer(channel) {
                // 消息到达 触发方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("[1] Recv msg: " + msg);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[1] done.");
                        // 手动回执
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            boolean autoAck = false; // 自动应答 false
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    
    • 消费者2(与消费者1类似)
    public class Recv2 {
        public static final String QUEUE_NAME = "test_queue_fanout_sms";
        private static final String EXCHANGE_NAME = "test_exchange_fanout";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 绑定队列到交换机(转发器)
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            channel.basicQos(1);  // 保证一次只发送一个
            // 定义一个消费者
            Consumer consumer = new DefaultConsumer(channel) {
                // 消息到达 触发方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("[2] Recv msg: " + msg);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[2] done.");
                        // 手动回执
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            boolean autoAck = false; // 自动应答 false
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    

    MQ_exchange_bindqueue.png

    3. 路由模式(Routing)

    模型

    MQ路由模式.png

    生产者

    public class Send {
        private static final String EXCHANGE_NAME = "test_exchange_direct";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
    
            // exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    
            String msg = "hello direct.";
    
            String routingKey = "warning";
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
            System.out.println("send: " + msg);
    
            channel.close();
            connection.close();
        }
    }
    

    消费者

    • 消费者1
    public class Recv1 {
        private static final String EXCHANGE_NAME = "test_exchange_direct";
        private static final String QUEUE_NAME = "test_queue_direct_1";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String routingKey = "error";
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);
    
            channel.basicQos(1);  // 保证一次只发送一个
            // 定义一个消费者
            Consumer consumer = new DefaultConsumer(channel) {
                // 消息到达 触发方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("[1] Recv msg: " + msg);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[1] done.");
                        // 手动回执
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            boolean autoAck = false; // 自动应答 false
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    
    • 消费者2
    public class Recv2 {
        private static final String EXCHANGE_NAME = "test_exchange_direct";
        private static final String QUEUE_NAME = "test_queue_direct_2";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            String routingKey = "error";
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
    
            channel.basicQos(1);  // 保证一次只发送一个
            // 定义一个消费者
            Consumer consumer = new DefaultConsumer(channel) {
                // 消息到达 触发方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("[2] Recv msg: " + msg);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[2] done.");
                        // 手动回执
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            boolean autoAck = false; // 自动应答 false
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    

    4. 主题模式(Topics)

    模型

    MQ_Topic模式.png

    商品:发布、删除、修改、查询...

    生产者

    public class Send {
        private static final String EXCHANGE_NAME = "test_exchange_topic";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            // exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
            String msg = "商品...";
    
            channel.basicPublish(EXCHANGE_NAME, "goods.update", null, msg.getBytes());
            System.out.println("send: " + msg);
    
            channel.close();
            connection.close();
        }
    }
    

    消费者

    • 消费者1
    public class Recv1 {
        private static final String EXCHANGE_NAME = "test_exchange_topic";
        private static final String QUEUE_NAME = "test_queue_topic_1";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.update");
    
            channel.basicQos(1);  // 保证一次只发送一个
    
            // 定义一个消费者
            Consumer consumer = new DefaultConsumer(channel) {
                // 消息到达 触发方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("[1] Recv msg: " + msg);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[1] done.");
                        // 手动回执
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            boolean autoAck = false; // 自动应答 false
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    
    • 消费者2
    public class Recv2 {
        private static final String EXCHANGE_NAME = "test_exchange_topic";
        private static final String QUEUE_NAME = "test_queue_topic_2";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
    
            channel.basicQos(1);  // 保证一次只发送一个
    
            // 定义一个消费者
            Consumer consumer = new DefaultConsumer(channel) {
                // 消息到达 触发方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("[2] Recv msg: " + msg);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[2] done.");
                        // 手动回执
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            boolean autoAck = false; // 自动应答 false
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    
  • 相关阅读:
    【BZOJ4621】Tc605 DP
    【BZOJ4624】农场种植 FFT
    【BZOJ4627】[BeiJing2016]回转寿司 SBT
    【BZOJ4631】踩气球 链表+线段树+堆
    Excel error 64-bit version of SSIS
    (转) bicabo Visual Studio 2012自动添加注释(如版权信息等)
    Integration Services 变量
    (转)SSIS_数据流转换(Union All&合并联接&合并)
    (转)SSIS处理导入数据时, 存在的更新, 不存在的插入
    (转)WPF学习资源整理
  • 原文地址:https://www.cnblogs.com/dear_diary/p/10569475.html
Copyright © 2011-2022 走看看