zoukankan      html  css  js  c++  java
  • Rabbit的直连交换机direct

    直连交换机类型为:direct。加入了路由键routingKey的概念。

    就是说 生产者投递消息给指定交换机的指定路由键。

    只有绑定了此交换机指定路由键的消息队列才可以收到消息。

    生产者:

    package com.kf.queueDemo.exchange.direct;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.kf.utils.RabbitConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    /**
     * 路由模式的生产者(带路由键)
     * @author kf
     *
     */
    public class DirectProducer {
        //交换机
        private static String DIRECTEXCHANGE = "DIRECTEXCHANGE";
        //路由键
        private static String ROUTINGKEY = "SMS";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            
            Connection connection = RabbitConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            //声明交换机类型为路由模式
            channel.exchangeDeclare(DIRECTEXCHANGE, "direct");
            
            String msg = "direct_mes";
            
            //发送消息给 指定交换机EXCHANGENAME的指定路由键ROUTINGKEY上
            channel.basicPublish(DIRECTEXCHANGE, ROUTINGKEY, null, msg.getBytes());
            
            channel.close();
            connection.close();
            
            
            
            
        }
    
    }

    消费者:

    package com.kf.queueDemo.exchange.direct;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.kf.utils.RabbitConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    /**
     * 路由模式的邮件消费者
     * @author kf
     *
     */
    public class DirectEMAILConsumer {
        //队列名
        private static String EMAILQUEUENAME = "EMAILQUEUENAME";
        //路由键名
        private static String SMSROUTINGKEY = "SMS";
        private static String EMAILROUTINGKEY = "EMAIL";
        //交换机
        private static String DIRECTEXCHANGE = "DIRECTEXCHANGE";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("邮件消费者启动=====");
            Connection connection = RabbitConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            //创建队列:第一个参数是队列名,后面的参数还没搞清楚干嘛的
            channel.queueDeclare(EMAILQUEUENAME, false, false, false, null);
            //绑定队列到交换机的指定路由键
            channel.queueBind(EMAILQUEUENAME, DIRECTEXCHANGE, EMAILROUTINGKEY);
            
            DefaultConsumer consumer = new DefaultConsumer(channel){
                public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("进入邮件接收消息的监听");
                    String s = new String(body, "utf-8");
                    System.out.println("邮件消费者接收到消息:"+s);
                };
            };
            
            //参数分别是:队列名,是否自动应答,监听的回调类
            channel.basicConsume(EMAILQUEUENAME, true, consumer);
            
        }
    
    }
    package com.kf.queueDemo.exchange.direct;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.kf.utils.RabbitConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    
    /**
     * 路由模式的短信消费者
     * @author kf
     *
     */
    public class DirectSMSConsumer {
        //队列名
        private static String SMSQUEUENAME = "SMSQUEUENAME";
        //路由键名
        private static String SMSROUTINGKEY = "SMS";
        private static String EMAILROUTINGKEY = "EMAIL";
        //交换机
        private static String DIRECTEXCHANGE = "DIRECTEXCHANGE";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("短信消费者启动=====");
            Connection connection = RabbitConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            //创建队列:第一个参数是队列名,后面的参数还没搞清楚干嘛的
            channel.queueDeclare(SMSQUEUENAME, false, false, false, null);
            //绑定队列到交换机的指定路由键
            channel.queueBind(SMSQUEUENAME, DIRECTEXCHANGE, SMSROUTINGKEY);
            //绑定多个交换机的路由键
            channel.queueBind(SMSQUEUENAME, DIRECTEXCHANGE, EMAILROUTINGKEY);
            
            DefaultConsumer consumer = new DefaultConsumer(channel){
                public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("进入短信接收消息的监听");
                    String s = new String(body, "utf-8");
                    System.out.println("短信消费者接收到消息:"+s);
                };
            };
            
            //参数分别是:队列名,是否自动应答,监听的回调类
            channel.basicConsume(SMSQUEUENAME, true, consumer);
            
        }
    
    }
  • 相关阅读:
    HDU1163 Eddy's digital Roots【九剩余定理】
    【ThinkingInC++】8、说明,浅谈数据类型的大小
    教你如何使用U盘装系统
    图像不显示该问题的解决方案
    HTTP相关概念
    AndroidUI的组成部分GridView
    uploadify 3.2 后台动态传参数
    Oracle11g创建表空间语句
    Uncaught RangeError: Maximum call stack size exceeded解决思路
    panel,dialog,window组件越界问题汇总
  • 原文地址:https://www.cnblogs.com/fuguang/p/10660575.html
Copyright © 2011-2022 走看看