zoukankan      html  css  js  c++  java
  • Topics(主题模式)

    引言

    topic exchange和direct exchange类似,都是通过routing key和binding key进行匹配,不同的是topic exchange可以为routing key设置多重标准。

    direct路由器类似于sql语句中的精确查询;topic 路由器有点类似于sql语句中的模糊查询。

    topic 使用通配符“*”和“#”进行routingkey的模糊匹配:

    *:精确匹配一个; #:任意匹配多个

    1.模型

    2.创建生产者

    package com.dwz.rabbitmq.exchange.topic;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.dwz.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Producer {
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            
            String exchangeName = "test_topic_exchange";
            String routingKey_1 = "user.save";
            String routingKey_2 = "user.update";
            String routingKey_3 = "user.delete.abc";
            
            String msg = "hello rabbitmq topic message successs!--";
            channel.basicPublish(exchangeName, routingKey_1, null, (msg + routingKey_1).getBytes());
            channel.basicPublish(exchangeName, routingKey_2, null, (msg + routingKey_2).getBytes());
            channel.basicPublish(exchangeName, routingKey_3, null, (msg + routingKey_3).getBytes());
            
            channel.close();
            connection.close();
        }
    }

    3.创建消费者1

    package com.dwz.rabbitmq.exchange.topic;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.dwz.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    /**
     * topic:模糊匹配
     * @author dangwangzhen
     *
     */
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            
            String exchangeName = "test_topic_exchange";
            String exchangeType = "topic";
            String queueName = "test_topic_queue_1";
            String routingKey = "user.#";
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("rec topic 1--message:" + msg);
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }

    4.创建消费者2

    package com.dwz.rabbitmq.exchange.topic;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.dwz.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    /**
     * topic:模糊匹配
     * @author dangwangzhen
     *
     */
    public class Consumer2 {
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            
            String exchangeName = "test_topic_exchange";
            String exchangeType = "topic";
            String queueName = "test_topic_queue_2";
            String routingKey = "user.*.*";
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("rec topic2--message:" + msg);
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }

    5.运行代码

    success!

  • 相关阅读:
    获取一组radio按钮选中的值Value
    三相异步电动机过载保护及报警PLC控制
    2014年天津市第一批科技计划项目
    USB HID报告及报告描述符简介
    Log Explorer使用说明
    SQL日志文件的作用
    STM32 USB数据接收与数据发送程序流程分析
    多少人没熬过那三厘米!
    构建区域综合交通枢纽 京津冀将形成“一张图”
    Altium Designer下Gerber转PCB的方法(转)
  • 原文地址:https://www.cnblogs.com/zheaven/p/11801811.html
Copyright © 2011-2022 走看看