zoukankan      html  css  js  c++  java
  • Rabbit主题交换机

    主题交换机类型为:topic。

    是直连交换机的一种。只是比直连交换机更灵活,在路由键上引入了通配符的概念

    topic交换机支持通配符的路由键. *表示匹配一个词。 #匹配所有

    生产者 :

    package com.kf.queueDemo.exchange.topic;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.kf.utils.RabbitConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    /**
     * 路由模式的生产者(带路由键)
     * @author kf
     *
     */
    public class TopicProducer {
        //交换机
        private static String TOPICEXCHANGE = "TOPICEXCHANGE";
        //路由键
        private static String ROUTINGKEY = "log.sms";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            
            Connection connection = RabbitConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            //声明交换机类型为主题交换机模式
            channel.exchangeDeclare(TOPICEXCHANGE, "topic");
            
            String msg = "topic_mes";
            
            //发送消息给 指定交换机EXCHANGENAME的指定路由键ROUTINGKEY上
            channel.basicPublish(TOPICEXCHANGE, ROUTINGKEY, null, msg.getBytes());
            
            channel.close();
            connection.close();
            
            
            
            
        }
    
    }

    消费者:

    package com.kf.queueDemo.exchange.topic;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.kf.utils.RabbitConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    /**
     * 路由模式的邮件消费者
     * @author kf
     *
     */
    public class TopicEMAILConsumer {
        //队列名
        private static String EMAILQUEUENAME = "EMAILQUEUENAME";
        //路由键名  topic交换机支持通配符的路由键. *表示匹配一个词。 #匹配所有
        private static String SMSROUTINGKEY = "sms";
        private static String EMAILROUTINGKEY = "log.*";
        //交换机
        private static String TOPICEXCHANGE = "TOPICEXCHANGE";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("邮件消费者启动=====");
            Connection connection = RabbitConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            //创建队列:第一个参数是队列名,后面的参数还没搞清楚干嘛的
            channel.queueDeclare(EMAILQUEUENAME, false, false, false, null);
            //绑定队列到交换机的指定路由键
            channel.queueBind(EMAILQUEUENAME, TOPICEXCHANGE, EMAILROUTINGKEY);
            
            DefaultConsumer consumer = new DefaultConsumer(channel){
                public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("进入邮件接收消息的监听");
                    String s = new String(body, "utf-8");
                    System.out.println("邮件消费者接收到消息:"+s);
                };
            };
            
            //参数分别是:队列名,是否自动应答,监听的回调类
            channel.basicConsume(EMAILQUEUENAME, true, consumer);
            
        }
    
    }
    package com.kf.queueDemo.exchange.topic;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.kf.utils.RabbitConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    
    /**
     * 路由模式的短信消费者
     * @author kf
     *
     */
    public class TopicSMSConsumer {
        //队列名
        private static String SMSQUEUENAME = "SMSQUEUENAME";
        //路由键名  topic交换机支持通配符的路由键. *表示匹配一个词。 #匹配所有
        private static String SMSROUTINGKEY = "log.#";
        private static String EMAILROUTINGKEY = "EMAIL";
        //交换机
        private static String TOPICEXCHANGE = "TOPICEXCHANGE";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("短信消费者启动=====");
            Connection connection = RabbitConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            //创建队列:第一个参数是队列名,后面的参数还没搞清楚干嘛的
            channel.queueDeclare(SMSQUEUENAME, false, false, false, null);
            //绑定队列到交换机的指定路由键
            channel.queueBind(SMSQUEUENAME, TOPICEXCHANGE, SMSROUTINGKEY);
            //绑定多个交换机的路由键
            channel.queueBind(SMSQUEUENAME, TOPICEXCHANGE, EMAILROUTINGKEY);
            
            DefaultConsumer consumer = new DefaultConsumer(channel){
                public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("进入短信接收消息的监听");
                    String s = new String(body, "utf-8");
                    System.out.println("短信消费者接收到消息:"+s);
                };
            };
            
            //参数分别是:队列名,是否自动应答,监听的回调类
            channel.basicConsume(SMSQUEUENAME, true, consumer);
            
        }
    
    }
  • 相关阅读:
    MYSQL: 什么是MYSQLD Service
    100 logging模块
    099 hashlib和hmac模块
    098 json和pickle模块
    097 sys模块
    096 os模块
    095 random模块
    094 datetime模块
    093 time模块
    092 模块基础实战之ATM和购物车系统分文件处理
  • 原文地址:https://www.cnblogs.com/fuguang/p/10660579.html
Copyright © 2011-2022 走看看