zoukankan      html  css  js  c++  java
  • rabbitmq-direct(直接交换模式)

    生产者和消费者,具有相同的交换机名称(Exchange)、交换机类型和相同的密匙(routingKey),那么消费者即可成功获取到消息。
    (PS:相对比只要交换机名称即可接收到消息的广播模式(fanout),direct模式在其基础上,多加了一层密码限制(routingKey)。)

    一、什么是direct(直接交换模式)

    RabbitMQ消息模型的核心思想(core idea): 生产者会把消息发送给RabbitMQ的交换中心(Exchange),Exchange的一侧是生产者,另一侧则是一个或多个队列,由Exchange决定一条消息的生命周期–发送给某些队列,或者直接丢弃掉。

    二、代码域

    1. 生产者【DirectBoss】

    package com.iyungu.phantaci.test.rabbitmq;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import org.apache.log4j.Logger;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    //消息生产者
    public class DirectBoss {
    
    private static final Logger logger = Logger.getLogger(DirectBoss.class);
    public static void main(String[] args) {
    
        ConnectionFactory factory = new ConnectionFactory();
        try {
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            //声明交换机(名称和类型)
            channel.exchangeDeclare("directLogs", BuiltinExchangeType.DIRECT);
            String message = "2018年8月8日14:03:48";
            //消息发布(其中"directLogs"为交换机名称,"jay"为routingKey)
            channel.basicPublish("directLogs","jay",null,message.getBytes());
            logger.info("********Message********:发送成功");
            channel.close();
            connection.close();
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

    2. 消费者【DirectWorker

    package com.iyungu.phantaci.test.rabbitmq;
    
    import com.rabbitmq.client.*;
    import org.apache.log4j.Logger;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    //消息消费者
    public class DirectWorker {
    
        private static final Logger logger = Logger.getLogger(DirectWorker.class);
        public static void main(String[] args) {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try {
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
                //交换机声明(参数为:交换机名称;交换机类型)
                channel.exchangeDeclare("directLogs", BuiltinExchangeType.DIRECT);
                //获取一个临时队列
                String queueName = channel.queueDeclare().getQueue();
                //队列与交换机绑定(参数为:队列名称;交换机名称;密匙-routingKey)
                channel.queueBind(queueName,"directLogs","jay");
    
                logger.info("********Waiting for messages********");
    
                //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        super.handleDelivery(consumerTag, envelope, properties, body);
                        String message = new String(body,"UTF-8");
                        logger.info("received:" + message);
                    }
                };
    
                //声明队列中被消费掉的消息(参数为:队列名称;消息是否自动确认;consumer主体)
                channel.basicConsume(queueName,true,consumer);
                //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
            } catch (IOException |TimeoutException e) {
                e.printStackTrace();
            }
    
    
        }
    
    }

    三、direct模式效果


    1.

     1)先运行一个消费者,即【DirectWorker】


    2)把消费者【DirectWorker】里的routingKey进行修改
    把channel.queueBind(queueName,"directLogs","jay");修改为channel.queueBind(queueName,"directLogs","jjlin");。修改完毕后,再运行该消费者


    RabbitMQ网页控制台如下,可看到两个消费者队列


    2. 再运行一个生产者,即【DirectBoss】
    控制台效果图如下,一条消息发布后,和交换机routingKey一致的消费者收到了消息,不一致的无消息;
    消息是由生产者发送给交换机,所以要以生产者发布消息时的routingKey为准。此时,生产者里的routingKey为:”jay”


    消费者的routingKey为:”jay“,与生产者一致。消息接收成功


    消费者的routingKey为:”jjlin“,与生产者不一致。消息接收失败


    四、多路由(routingKey)接收


    通过给消费者绑定多个路由(routingKey),可以使该消费者同时接收多个路由获取的消息。

    如给消费者代码【DirectWorker】同时绑定两个routingKey,其余不变

    channel.queueBind(queueName, "directLogs", "jjlin"); //routingKey为jjlin
    channel.queueBind(queueName, "directLogs", "jay"); //routingKey为jay

    1.绑定两个routingKey后,运行一个消费者【DirectWorker】


    2.运行一个生产者【DirectBoss】,代码默认其routingKey为”jay”

    3.修改生产者routingKey为”jjlin”,然后再运行生产者【DirectBoss】
    把生产者代码里的routingKey进行修改,即channel.basicPublish("directLogs","jay",null,message.getBytes());改为channel.basicPublish("directLogs","jjlin",null,message.getBytes());,修改完毕后再运行生产者。

    4.局限性
    通过多路由绑定的例子,可以体会到direct模式相对比fanout模式,可以选择性的接收消息;但局限是面对更多、更复杂的路由匹配时,仍旧会力不从心,这时可以使用更全面的topic主题模式。

    原文链接:https://blog.csdn.net/fakerswe/article/details/81508963

  • 相关阅读:
    转:java.sql.SQLException: [Microsoft][ODBC 驱动程序管理器] 未发现数据源名称并且未指定默认驱动程序
    Grid组件 列头居中
    XAML文档基础
    WPF框架之MVVM系列(一)
    WPF 树型控件(TreeView)
    WPF自定义控件开发
    ASP.NET MVC系列一:Global.asax用法分析
    WPF基础系列之 控件与布局
    WPF 自定义控件基类
    DbTool验证码
  • 原文地址:https://www.cnblogs.com/guanbin-529/p/12984784.html
Copyright © 2011-2022 走看看