zoukankan      html  css  js  c++  java
  • RabbitMQ 交换器、持久化

     一、 交换器

      RabbitMQ交换器(Exchange)分为四种

    1.   direct       
    2.   fanout
    3.   topic
    4.   headers
    •  direct

       默认的交换器类型,消息的RoutingKey与队列的bindingKey匹配,消息就投递到相应的队列

    •  fanout

      一种发布/订阅模式的交换器,发布一条消息时,fanout把消息广播附加到fanout交换器的队列上  

      接收类(订阅): 

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class ReceiveLogs {
      private static final String EXCHANGE_NAME = "logs";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//一旦创建exchange,RabbitMQ不允许对其改变,否则报错
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");//绑定是交换器与队列之间的关系,可以理解为,队列对此交换器的消息感兴趣
    
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
          }
        };
        channel.basicConsume(queueName, true, consumer);
      }
    }

      发布类: 

    import java.io.IOException;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    
    public class ReceiveLog {
    
        private static final String EXCHANGE_NAME = "log";
    
        public static void main(String[] argv)
                      throws java.io.IOException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = "hi"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
    • topic

      topic类似于fanout交换器,但更加具体化,用routingKey进行规则匹配,更灵活的匹配出用户想要接收的消息

      routingKey形如:com.company.module.demo,具体匹配规则:

        "*"与"#"可以匹配任意字符,区别是"*"只能匹配由"."分割的一段字符,而"#"可以匹配所有字符   

       发布一条"com.abc.test.push"的消息,能匹配的routingKey:

    com.abc.test.*
    #.test.push
    #

      不能匹配的:

    com.abc.*
    *.test.push
    *

     发布类:

      声明队列时,需要注意队列的属性,虽然队列的声明由消费者或生产者完成都可以,但如果由消费者声明,由于生产者生产消息时,可能队列还没有声明,会造成消息丢失,所以推荐由生产者声明队列

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
     
    import com.rabbitmq.client.ConnectionFactory;
     
    import java.io.IOException;
     
    public class RabbitMqSendTest {
        private static String queue = "test_queue";
        private static String exchange = "TestExchange";
        private static String routingKey = "abc.test";
        public static void main(String[] args) {
            ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
            factory.setHost("172.16.67.60");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            Connection mqConnection = null;
            try {
                mqConnection = factory.newConnection();
                Channel mqChannel = mqConnection.createChannel();
                if (null != mqChannel && mqChannel.isOpen()) {
                    mqChannel.exchangeDeclare(exchange, "topic");
    //                String queueName = mqChannel.queueDeclare().getQueue();
    //                mqChannel.queueBind(queueName, exchange, routingKey);
    //声明队列名称与属性
    //durable持久队列,mq重启队列可恢复 exclusive独占队列,仅限于声明它的连接使用操作
    //autoDelete 自动删除 arguments 其他属性
                    mqChannel.queueDeclare(queue, false, false, false, null);
                    mqChannel.queueBind(queue, exchange, routingKey);
     
                    //*******************************************
                        mqChannel.basicPublish(exchange, routingKey, null,
                                ("hello").getBytes());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                   try {
                       mqConnection.close();
                   } catch (IOException e) {
                       e.printStackTrace();
                   }
            }
     
        }
    }

    接收类

    import com.rabbitmq.client.*;
    import java.io.IOException;
    
    public class ReceiveTopic {
        private static String queue = "consume_queue";
        private static String exchange = "TestExchange";
        private static String routingKey = "*.test";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("172.16.67.60");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
    //        channel声明Exchange,名称与类型
            channel.exchangeDeclare(exchange, "topic");
    //        String queuename = channel.queueDeclare().getQueue();
    
            channel.queueDeclare(queue, false, false, false, null);
            channel.queueBind(queue, exchange, "*.test");      //消费者指定消息队列,并选择特定的RoutingKey
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            Consumer client = new DefaultConsumer(channel) {
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)throws IOException {
                    String msgString = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + msgString + "'");
                }
            };
            channel.basicConsume(queue, true,client);
            System.out.println();
        }
    }

    二、持久化

      RabbitMQ默认情况下重启消息服务器时,会丢失消息,为了尽量保证消息在服务器宕机时不丢失,就需要把消息持久化,但是也只是尽量不丢失,由于涉及磁盘写入,当消息量巨大时,mq性能也会被严重拉低。

  • 相关阅读:
    句子
    Https
    SSH
    uCMDB
    snapshot与release
    Ansible
    .NET core webApi 使用JWT验证签名(转)
    .NET core 使用Swagger(转)
    微服务的4个设计原则和19个解决方案(转)
    .NET Core 使用RabbitMQ(转)
  • 原文地址:https://www.cnblogs.com/castielangel/p/9952069.html
Copyright © 2011-2022 走看看