zoukankan      html  css  js  c++  java
  • Rabbit主题交换机

    主题交换机类型为:topic。

    是直连交换机的一种。只是比直连交换机更灵活,在路由键上引入了通配符的概念

    topic交换机支持通配符的路由键. *表示匹配一个词。 #匹配所有

    生产者 :

    package com.kf.queueDemo.exchange.topic;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.kf.utils.RabbitConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    /**
     * 路由模式的生产者(带路由键)
     * @author kf
     *
     */
    public class TopicProducer {
        //交换机
        private static String TOPICEXCHANGE = "TOPICEXCHANGE";
        //路由键
        private static String ROUTINGKEY = "log.sms";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            
            Connection connection = RabbitConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            //声明交换机类型为主题交换机模式
            channel.exchangeDeclare(TOPICEXCHANGE, "topic");
            
            String msg = "topic_mes";
            
            //发送消息给 指定交换机EXCHANGENAME的指定路由键ROUTINGKEY上
            channel.basicPublish(TOPICEXCHANGE, ROUTINGKEY, null, msg.getBytes());
            
            channel.close();
            connection.close();
            
            
            
            
        }
    
    }

    消费者:

    package com.kf.queueDemo.exchange.topic;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.kf.utils.RabbitConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    /**
     * 路由模式的邮件消费者
     * @author kf
     *
     */
    public class TopicEMAILConsumer {
        //队列名
        private static String EMAILQUEUENAME = "EMAILQUEUENAME";
        //路由键名  topic交换机支持通配符的路由键. *表示匹配一个词。 #匹配所有
        private static String SMSROUTINGKEY = "sms";
        private static String EMAILROUTINGKEY = "log.*";
        //交换机
        private static String TOPICEXCHANGE = "TOPICEXCHANGE";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("邮件消费者启动=====");
            Connection connection = RabbitConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            //创建队列:第一个参数是队列名,后面的参数还没搞清楚干嘛的
            channel.queueDeclare(EMAILQUEUENAME, false, false, false, null);
            //绑定队列到交换机的指定路由键
            channel.queueBind(EMAILQUEUENAME, TOPICEXCHANGE, EMAILROUTINGKEY);
            
            DefaultConsumer consumer = new DefaultConsumer(channel){
                public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("进入邮件接收消息的监听");
                    String s = new String(body, "utf-8");
                    System.out.println("邮件消费者接收到消息:"+s);
                };
            };
            
            //参数分别是:队列名,是否自动应答,监听的回调类
            channel.basicConsume(EMAILQUEUENAME, true, consumer);
            
        }
    
    }
    package com.kf.queueDemo.exchange.topic;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.kf.utils.RabbitConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    
    /**
     * 路由模式的短信消费者
     * @author kf
     *
     */
    public class TopicSMSConsumer {
        //队列名
        private static String SMSQUEUENAME = "SMSQUEUENAME";
        //路由键名  topic交换机支持通配符的路由键. *表示匹配一个词。 #匹配所有
        private static String SMSROUTINGKEY = "log.#";
        private static String EMAILROUTINGKEY = "EMAIL";
        //交换机
        private static String TOPICEXCHANGE = "TOPICEXCHANGE";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("短信消费者启动=====");
            Connection connection = RabbitConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            //创建队列:第一个参数是队列名,后面的参数还没搞清楚干嘛的
            channel.queueDeclare(SMSQUEUENAME, false, false, false, null);
            //绑定队列到交换机的指定路由键
            channel.queueBind(SMSQUEUENAME, TOPICEXCHANGE, SMSROUTINGKEY);
            //绑定多个交换机的路由键
            channel.queueBind(SMSQUEUENAME, TOPICEXCHANGE, EMAILROUTINGKEY);
            
            DefaultConsumer consumer = new DefaultConsumer(channel){
                public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("进入短信接收消息的监听");
                    String s = new String(body, "utf-8");
                    System.out.println("短信消费者接收到消息:"+s);
                };
            };
            
            //参数分别是:队列名,是否自动应答,监听的回调类
            channel.basicConsume(SMSQUEUENAME, true, consumer);
            
        }
    
    }
  • 相关阅读:
    模板引擎使用详解:包含公共模板
    ThinkPHP3.2 常量参考
    ThinkPHP的全部配置选项
    报错compile_str() flow.php on line 375的解决方法
    mysql如何更新一个表中的某个字段值等于另一个表的某个字段值
    Mac下新安装的MySQL无法登陆root用户解决方法
    IOS-第三方开源库
    IOS-每个程序员的编程之路上都应该看这11本书
    IOS-程序员和设计师必备的20个CSS工具
    IOS-2016年最好的15个Web设计和开发工具
  • 原文地址:https://www.cnblogs.com/fuguang/p/10660579.html
Copyright © 2011-2022 走看看