zoukankan      html  css  js  c++  java
  • Topics(主题模式)

    引言

    topic exchange和direct exchange类似,都是通过routing key和binding key进行匹配,不同的是topic exchange可以为routing key设置多重标准。

    direct路由器类似于sql语句中的精确查询;topic 路由器有点类似于sql语句中的模糊查询。

    topic 使用通配符“*”和“#”进行routingkey的模糊匹配:

    *:精确匹配一个; #:任意匹配多个

    1.模型

    2.创建生产者

    package com.dwz.rabbitmq.exchange.topic;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.dwz.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Producer {
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            
            String exchangeName = "test_topic_exchange";
            String routingKey_1 = "user.save";
            String routingKey_2 = "user.update";
            String routingKey_3 = "user.delete.abc";
            
            String msg = "hello rabbitmq topic message successs!--";
            channel.basicPublish(exchangeName, routingKey_1, null, (msg + routingKey_1).getBytes());
            channel.basicPublish(exchangeName, routingKey_2, null, (msg + routingKey_2).getBytes());
            channel.basicPublish(exchangeName, routingKey_3, null, (msg + routingKey_3).getBytes());
            
            channel.close();
            connection.close();
        }
    }

    3.创建消费者1

    package com.dwz.rabbitmq.exchange.topic;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.dwz.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    /**
     * topic:模糊匹配
     * @author dangwangzhen
     *
     */
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            
            String exchangeName = "test_topic_exchange";
            String exchangeType = "topic";
            String queueName = "test_topic_queue_1";
            String routingKey = "user.#";
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("rec topic 1--message:" + msg);
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }

    4.创建消费者2

    package com.dwz.rabbitmq.exchange.topic;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.dwz.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    /**
     * topic:模糊匹配
     * @author dangwangzhen
     *
     */
    public class Consumer2 {
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            
            String exchangeName = "test_topic_exchange";
            String exchangeType = "topic";
            String queueName = "test_topic_queue_2";
            String routingKey = "user.*.*";
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("rec topic2--message:" + msg);
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }

    5.运行代码

    success!

  • 相关阅读:
    数据仓库 VS 数据库
    准确率,精确率,召回率,F-measure 之间的关系
    OpenCV——查找、绘制轮廓
    OpenCV——仿射变换
    OpenCV函数 重映射
    Hough变换原理
    霍夫变换(直线检测、圆检测)
    边缘检测算子+滤波总结
    图像滤波—opencv函数
    图像滤波
  • 原文地址:https://www.cnblogs.com/zheaven/p/11801811.html
Copyright © 2011-2022 走看看