zoukankan      html  css  js  c++  java
  • RabbitMQ入门-Topic模式

    发送到topic的消息不能有任意的绑定键,绑定键的规则:必须由(.)分割的单词列表。比如apple.banana.orange

    绑定键也有两个特殊字符:

     * 可以代替一个单词。
    # 可以替代零个或多个单词。

    比如:apple.#  *.banana.*

    生产者:

    package com.example.demo;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 有选择的接受消息
     */
    public class TopicSend {
    
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();    // 连接工厂
            factory.setHost("localhost");
            Connection connection = factory.newConnection();        // 获取连接
            Channel channel = connection.createChannel();
    
            // topic类型
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
            String[] msg = {"666的兔子","懒惰的大象"};
            // 第二个参数为绑定键
            channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, msg[0].getBytes());
            channel.basicPublish(EXCHANGE_NAME, "lazy.write.elephant", null, msg[1].getBytes());
            System.out.println("PS-Send:" + msg.toString());
    
            channel.close();
            connection.close();
    
        }
    }

    消费者:

    package com.example.demo;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class TopicReceive {
    
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();    // 连接工厂
            factory.setHost("localhost");
            Connection connection = factory.newConnection();        // 获取连接
            Channel channel = connection.createChannel();
    
            // 声明一个topic交换类型
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
            // 当声明队列,不加任何参数,产生的将是一个临时队列,getQueue返回的是队列名称
            String queueA = channel.queueDeclare().getQueue();
            String queueB = channel.queueDeclare().getQueue();
            System.out.println("临时队列:" + queueA);
            System.out.println("临时队列:" + queueB);
    
            // 第三个参数为“绑定建”
            // * 可以代替一个单词。
            // # 可以替代零个或多个单词。
            channel.queueBind(queueA, EXCHANGE_NAME, "*.orange.*");
            channel.queueBind(queueB, EXCHANGE_NAME, "*.*.rabbit");
            channel.queueBind(queueB, EXCHANGE_NAME, "lazy.#");
    
            Consumer consumerA = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String recv = new String(body, "UTF-8");
                    System.out.println("Direct-Receive-A:" + recv);
                }
            };
            Consumer consumerB = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String recv = new String(body, "UTF-8");
                    System.out.println("Direct-Receive-B:" + recv);
                }
            };
            channel.basicConsume(queueA, true, consumerA);
            channel.basicConsume(queueB, true, consumerB);
        }
    }

    先启动消费者:再启动生产者:控制台

    ..

     第一条消息,匹配A和B

    第二条消息,只匹配B

  • 相关阅读:
    《Java并发编程的艺术》 第9章 Java中的线程池
    《Java并发编程的艺术》第6/7/8章 Java并发容器与框架/13个原子操作/并发工具类
    java锁总结
    《Java并发编程的艺术》第5章 Java中的锁 ——学习笔记
    《Java并发编程的艺术》第4章 Java并发编程基础 ——学习笔记
    Java并发编程的艺术(一、二章) ——学习笔记
    redis缓存使用SpringDataRedis
    商城06——solr索引库搭建&solr搜索功能实现&图片显示问题解决
    商城05——首页轮播图显示实现&Redis环境搭建&Redis实现缓存
    商城04——门户网站介绍&商城首页搭建&内容系统创建&CMS实现
  • 原文地址:https://www.cnblogs.com/LUA123/p/8477387.html
Copyright © 2011-2022 走看看