zoukankan      html  css  js  c++  java
  • RabbitMQ入门-Topic模式

    发送到topic的消息不能有任意的绑定键,绑定键的规则:必须由(.)分割的单词列表。比如apple.banana.orange

    绑定键也有两个特殊字符:

     * 可以代替一个单词。
    # 可以替代零个或多个单词。

    比如:apple.#  *.banana.*

    生产者:

    package com.example.demo;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 有选择的接受消息
     */
    public class TopicSend {
    
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();    // 连接工厂
            factory.setHost("localhost");
            Connection connection = factory.newConnection();        // 获取连接
            Channel channel = connection.createChannel();
    
            // topic类型
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
            String[] msg = {"666的兔子","懒惰的大象"};
            // 第二个参数为绑定键
            channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, msg[0].getBytes());
            channel.basicPublish(EXCHANGE_NAME, "lazy.write.elephant", null, msg[1].getBytes());
            System.out.println("PS-Send:" + msg.toString());
    
            channel.close();
            connection.close();
    
        }
    }

    消费者:

    package com.example.demo;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class TopicReceive {
    
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();    // 连接工厂
            factory.setHost("localhost");
            Connection connection = factory.newConnection();        // 获取连接
            Channel channel = connection.createChannel();
    
            // 声明一个topic交换类型
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
            // 当声明队列,不加任何参数,产生的将是一个临时队列,getQueue返回的是队列名称
            String queueA = channel.queueDeclare().getQueue();
            String queueB = channel.queueDeclare().getQueue();
            System.out.println("临时队列:" + queueA);
            System.out.println("临时队列:" + queueB);
    
            // 第三个参数为“绑定建”
            // * 可以代替一个单词。
            // # 可以替代零个或多个单词。
            channel.queueBind(queueA, EXCHANGE_NAME, "*.orange.*");
            channel.queueBind(queueB, EXCHANGE_NAME, "*.*.rabbit");
            channel.queueBind(queueB, EXCHANGE_NAME, "lazy.#");
    
            Consumer consumerA = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String recv = new String(body, "UTF-8");
                    System.out.println("Direct-Receive-A:" + recv);
                }
            };
            Consumer consumerB = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String recv = new String(body, "UTF-8");
                    System.out.println("Direct-Receive-B:" + recv);
                }
            };
            channel.basicConsume(queueA, true, consumerA);
            channel.basicConsume(queueB, true, consumerB);
        }
    }

    先启动消费者:再启动生产者:控制台

    ..

     第一条消息,匹配A和B

    第二条消息,只匹配B

  • 相关阅读:
    (转)【web前端培训之前后端的配合(中)】继续昨日的故事
    ural(Timus) 1136. Parliament
    scau Josephus Problem
    ACMICPC Live Archive 6204 Poker End Games
    uva 10391 Compound Words
    ACMICPC Live Archive 3222 Joke with Turtles
    uva 10132 File Fragmentation
    uva 270 Lining Up
    【转】各种字符串哈希函数比较
    uva 10905 Children's Game
  • 原文地址:https://www.cnblogs.com/LUA123/p/8477387.html
Copyright © 2011-2022 走看看