zoukankan      html  css  js  c++  java
  • RabbitMQ入门-Topic模式

    上篇《RabbitMQ入门-Routing直连模式》我们介绍了可以定向发送消息,并可以根据自定义规则派发消息。看起来,这个Routing模式已经算灵活的了,但是,这还不够,我们还有更加多样灵活的Topic模式。

    Topic模式

    image.png

    • 模型组成相较前几种没有什么变化,一个生产者P,一个交换机X,多个消息队列Q以及多个消费者C

    • 在Exchange派发消息到消息队列Queue所用的规则不同,我们看到了有符号"*"以及"#",可以认为是通配符

    • "*"用于匹配一个单词,比如"a","abc"等;"#"用于匹配0个或者多个单词,比如"", "abc", "abc.def"等

    发送端

    /**
     * Created by jackie on 17/8/7.
     */
    public class EmitLogDirect {
    
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] argv) throws Exception {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.3.161");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            String severity = getSeverity(argv);
            String message = getMessage(argv);
    
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    
            channel.close();
            connection.close();
        }
    
        private static String getSeverity(String[] strings){
            if (strings.length < 1)
                return "info";
            return strings[0];
        }
    
        private static String getMessage(String[] strings){
            if (strings.length < 2)
                return "Hello World!";
            return joinStrings(strings, " ", 1);
        }
    
        private static String joinStrings(String[] strings, String delimiter, int startIndex) {
            int length = strings.length;
            if (length == 0 ) return "";
            if (length < startIndex ) return "";
            StringBuilder words = new StringBuilder(strings[startIndex]);
            for (int i = startIndex + 1; i < length; i++) {
                words.append(delimiter).append(strings[i]);
            }
            return words.toString();
        }
    }
    
    • channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);这里指定的Exchagne模式为Topic模式

    • 通过String routingKey = getRouting(argv);实现在Program arguments中填写routing key参数

    • 通过String message = getMessage(argv);实现在Program arguments中填写发送的消息

    这时候我们给Program argument赋值如下,并启动发送端程序

    image.png

    程序运行完,可以在RabbitMQ管理应用中看到名为“topic_logs”的Exchange。

    接收端

    /**
     * Created by jackie on 17/8/7.
     */
    public class ReceiveLogsDirect {
    
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.3.161");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            String queueName = channel.queueDeclare().getQueue();
    
            if (argv.length < 1){
                System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
                System.exit(1);
            }
    
            for(String severity : argv){
                channel.queueBind(queueName, EXCHANGE_NAME, severity);
            }
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }
    
    • 和Routing模式异曲同工,声明与发送端一样的Exchange名称

    • 通过Program arguments得到的routing key的输入参数,并将其与Exchange绑定,这时候就可以使用灵活的通配符了

    运行情况

    我们将启动两个消费者,并分别制定两套Routing key的规则。
    第一个消费者

    image.png

    第二个消费者

    image.png

    启动两个消费者后,使用发送端发送一条消息,我们可以发现两个消费者都通过Routing key规则派发到了消息
    31ff000395b3dedc07a2

    注意:实际上如果Routing key写成了“#”表示能够接受所有的消息,类似广播模式。
    这就是Topic模式,到此为止,几大主要RabbitMQ模式已经讲完了。你是否对于RabbitMQ有了一个基本的了解了?

    如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写作动力!如果您想持续关注我的文章,请扫描二维码,关注JackieZheng的微信公众号,我会将我的文章推送给您,并和您一起分享我日常阅读过的优质文章。

  • 相关阅读:
    《JAVA设计模式》之模板模式(Template)
    《JAVA设计模式》之策略模式(Strategy)
    《JAVA设计模式》之享元模式(Flyweight)
    《JAVA设计模式》之桥接模式(Bridge)
    《JAVA设计模式》之组合模式(Composite)
    《JAVA设计模式》之外观模式(Facade)
    《JAVA设计模式》之代理模式(Proxy)
    《JAVA设计模式》之装饰模式(Decorator)
    《JAVA设计模式》之适配器模式(Adapter)
    《JAVA设计模式》之原型模式(Prototype)
  • 原文地址:https://www.cnblogs.com/bigdataZJ/p/rabbitmq7.html
Copyright © 2011-2022 走看看