zoukankan      html  css  js  c++  java
  • 初识RabbitMq(三) 交换机

    1fanout(广播)

    这种模式只需要将队列绑定到交换机.上即可,是不需要设置路由键的,如图(生产者先将消息给交换机,然后交换机将消息全部发给队列。每个消费者接收到的消息一模一样(不会像之前一样将消息平均分配给消费者))

    生产者代码

    package item.com.fanout;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class EmitLog {
        //定义交换机名称
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws Exception {
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPassword("admin");
            connectionFactory.setUsername("admin");
            //创建一个连接
            Connection connection = connectionFactory.newConnection();
            //建立一个频道
            Channel channel = connection.createChannel();
            //交换声明  channel.exchangeDeclare(交换机名字,交换机类型)
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //发送消息
            for (int i = 0; i <10 ; i++) {
                String message=i+"";
              //  channel.basicPublish(交换机名称,routingKey,props,消息)
                channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
            }
            channel.close();;
            connection.close();
        }
    }

    消费者代码(可写多个)

    package item.com.fanout;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    
    public class ReceiveLogs1 {
        //定义交换机名称
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws  Exception{
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            connectionFactory.setHost("127.0.0.1");
            //创建一个连接
            Connection connection = connectionFactory.newConnection();
            //建立一个频道
            Channel channel = connection.createChannel();
            //消息接收和队列绑定,不和交换机绑定
            //获取一个非持久的队列
            String queueName = channel.queueDeclare().getQueue();
            //将队列绑定到交换机上
            channel.queueBind(queueName,EXCHANGE_NAME,"");
            //  channel.queueBind(队列名称, 交换机名称, routingKey);
    
            Consumer consumer =  new DefaultConsumer(channel) {
    //重写 handleDelivery   =》new handleDelivery然后回车
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("接收消息:"+message);
                }
            };
    
            //消息消费
            channel.basicConsume(queueName, true, consumer);
            // channel.basicConsume(queueName, 是否制动接收, consumer);
        }
    }

    注意,先运行接收者然后在运行消息发送者,可以看到两个消费者接收到的消息一样

     2direct (不同消费者接收到不同消息)

    根据RoutingKey匹配消息路由到指定的队列

    首先-生产者代码。

    package item.com.direct;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class EmitLogDirect {
        //定义交换机名称
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws  Exception{
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPassword("admin");
            connectionFactory.setUsername("admin");
            //创建一个连接
            Connection connection = connectionFactory.newConnection();
            //建立一个频道
            Channel channel = connection.createChannel();
            //交换声明  channel.exchangeDeclare(交换机名字,交换机类型)
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //定义不同的消息  =》根据 routingKey
            String message = "Hello World!";
            String severity = "info";//routingKey
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
            severity = "warning";
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
            severity = "error";
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
            channel.close();
            connection.close();
        }
    }

    生产者代码(此处代码接收了三个routingKey)

    package item.com.direct;
    
    import com.rabbitmq.client.*;
    import java.io.IOException;
    
    public class ReceiveLogsDirect1 {
        //定义交换机名称
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws Exception {
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPassword("admin");
            connectionFactory.setUsername("admin");
            //创建一个连接
            Connection connection = connectionFactory.newConnection();
            //建立一个频道
            Channel channel = connection.createChannel();
    
            //交换声明  channel.exchangeDeclare(交换机名字,交换机类型)
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            //消息接收和队列绑定,不和交换机绑定
            //获取一个非持久的队列
            String queueName = channel.queueDeclare().getQueue();
            //将队列绑定到交换机上,一个交换机同时绑定三个queue
            channel.queueBind(queueName, EXCHANGE_NAME, "info");
            channel.queueBind(queueName, EXCHANGE_NAME, "warning");
            channel.queueBind(queueName, EXCHANGE_NAME, "error");
    
            Consumer consumer = new DefaultConsumer(channel) {
                //重写 handleDelivery   =》new handleDelivery然后回车
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("  Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
            //消息消费
            channel.basicConsume(queueName,true,consumer);
            // channel.basicConsume(queueName, 是否制动接收, consumer);
        }
    }

     3topic (direct升级)

    和direct相比,多了模糊查询进行筛选,功能比direct更加强大

    *可以代替一个单词
    #可以替代零个或多个单词

     生产者代码

    package item.com.topic;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class EmitLogTopic {
        //定义交换机名称
        private static final String EXCHANGE_NAME="topic_logs";
    
        public static void main(String[] args) throws  Exception {
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            //创建一个连接
            Connection connection = connectionFactory.newConnection();
            //建立一个频道
            Channel channel = connection.createChannel();
            //交换声明  channel.exchangeDeclare(交换机名字,交换机类型)
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //定义不同的消息  =》根据 routingKey
            String message = "Animal World";
            //定义多个routingKey(路由键)
            String[] routingKeys = new String[9];
            routingKeys[0] = "quick.orange.rabbit";
            routingKeys[1] = "lazy.orange.elephant";
            routingKeys[2] = "quick.orange.fox";
            routingKeys[3] = "lazy.brown.fox";
            routingKeys[4] = "lazy.pink.rabbit";
            routingKeys[5] = "quick.brown.fox";
            routingKeys[6] = "orange";
            routingKeys[7] = "quick.orange.male.rabbit";
            routingKeys[8] = "lazy.orange.male.rabbit";
    
            //发送消息
            for (int i = 0; i <routingKeys.length ; i++) {
                channel.basicPublish(EXCHANGE_NAME,routingKeys[i],null,message.getBytes("UTF-8"));
            }
    
            channel.close();
            connection.close();
        }
    
    }

    消费者代码1

    package item.com.topic;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class ReceiveLogsTopic1 {
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            String queueName = channel.queueDeclare().getQueue();
    
            //指定bindingKey
            String bindingKey1 = "*.*.rabbit";
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey1);
            String bindingKey2 = "lazy.#";
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey2);
    
            Consumer consumer=new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(
                            " [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
            channel.basicConsume(queueName,true,consumer);
        }
    }

    消费者2代码

    package item.com.topic;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class ReceiveLogsTopic2 {
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            String queueName = channel.queueDeclare().getQueue();
    
            //指定bindingKey
            String bindingKey = "*.orange.*";
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
    
            Consumer consumer=new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(
                            " [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
            channel.basicConsume(queueName,true,consumer);
        }
    }

    输出结果【根据routingKey 进行相应的模糊查询】

  • 相关阅读:
    [转]java 常用弹出框
    [转]ImageIcon icon 相对路径设置
    [转]『基本ASCII表和c语言运算表查询』
    [转]sqlmap技术手册
    [转]linux下怎么查看ssh的用户登录日志
    [转]Kali-linux安装之后的简单设置
    查看任意程序所连接的ip地址
    JS 闭包
    JS 中的 继承
    JS 原型的 理解
  • 原文地址:https://www.cnblogs.com/1439107348s/p/14455845.html
Copyright © 2011-2022 走看看