zoukankan      html  css  js  c++  java
  • RabbitMQ 入门

    官网:https://www.rabbitmq.com/

    参考:https://blog.csdn.net/hellozpc/article/details/81436980#52_204

    一.消息中间件的作用

    异步处理

    应用解耦

    流量削峰

    日志处理

    二.rabbitmq 安装与配置

    下载:https://www.rabbitmq.com/download.html

    在cmd 窗口输入:

    rabbitmq-service start
    rabbitmq-plugins enable rabbitmq_management

    浏览器输入:localhost:15672   guest/guest

    三. Java 操作 rabbitmq

      (1) simple 简单队列

    添加依赖

         <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.4.1</version>
            </dependency>

     定义连接工具类:

    public class ConnectionUtil {
    
        public static Connection getConnection() throws Exception {
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost("localhost");
            //端口
            factory.setPort(5672);
            //设置账号信息,用户名、密码、vhost
            factory.setVirtualHost("testhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            // 通过工程获取连接
            Connection connection = factory.newConnection();
            return connection;
        }
    }
     
    View Code

    定义生产者:

    public class Send {
    
        private final static String QUEUE_NAME = "q_test_01";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            // 从连接中创建通道
            Channel channel = connection.createChannel();
    
            // 声明(创建)队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 消息内容
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }
    View Code

    定义消费者:

    public class Recv {
    
        private final static String QUEUE_NAME = "q_test_01";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            // 从连接中创建通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
    
            // 监听队列
            channel.basicConsume(QUEUE_NAME, true, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
            }
        }
    }
    View Code

    (2)work queue  工作队列

     

    一个生产者,多个消费者

    消费者01:

    public class Consumer01 {
    
        private static final  String queue = "my_queue";
    
         public static void main(String []args) throws Exception{
    
            Connection connection = MqConnection.getConnect();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(queue,false,false,false,null);
    
           // channel.basicQos(1);
    
    
            QueueingConsumer consumer = new QueueingConsumer(channel);
    
           // channel.basicConsume(queue,false,consumer);
             channel.basicConsume(queue,true,consumer);
    
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
    
                System.out.println("Consum-01-[Receive]:"+msg);
    
                Thread.sleep(10);
    
                //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    
            }
        }
    }
    View Code

    消费者02:

    public class Consumer02 {
    
        private static final  String queue = "my_queue";
    
         public static void main(String []args) throws Exception{
    
            Connection connection = MqConnection.getConnect();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(queue,false,false,false,null);
    
            //channel.basicQos(1);
    
            QueueingConsumer consumer = new QueueingConsumer(channel);
    
            //channel.basicConsume(queue,false,consumer);
             channel.basicConsume(queue,true,consumer);
    
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
    
                System.out.println("Consum-02-[Receive]:"+msg);
    
                Thread.sleep(500);
    
                //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    View Code

    生产者:

    public class Producer {
        private static final  String queue = "my_queue";
    
        public static void main(String []args) throws  Exception{
    
            Connection connection = MqConnection.getConnect();
    
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(queue,false,false,false,null);
    
    
            for(int i=0;i<50;i++){
                String msg = "第"+i+"条消息。。。。。";
                channel.basicPublish("",queue,null,msg.getBytes());
                System.out.println("[发送"+i+"消息]:"+msg);
                Thread.sleep(10);
            }
    
            channel.close();
            connection.close();
    
        }
    }
    View Code

    结果:

    1.消费者01 与消费者02 获取的内容不同,同一个消息只能被一个消费者获取

    2.消费者01 与消费者02 获取的消息的相等的

    不合理: 消费者01 处理的时间更短,可以获取更多的消息

    机质:

    轮询分发(round-robin):使用任务队列可以并行的工作。默认的情况下,rabbitmq 将诸葛发送到序列中的消费者,不考虑每个任务的时间,且是提前一次性分配。每个

    消费者获取相等数量的消息,这种方式分发消息机制成为 Round-Robin 轮询

    虽然上面的分配方法可行,但如果某个任务时间长,别的消费者比较闲

    解决方法: 

    使用basicQos(prefetchCount = 1) ,限制 RabbitMQ 只发送不超过 1条的消息给同一个消费者。当消息处理完毕后才发送第二条。

    能者多劳:

    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    //开启这行 表示使用手动确认模式
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    // 监听队列,false表示手动返回完成状态,true表示自动
    channel.basicConsume(QUEUE_NAME, false, consumer);

    (3)publish/subscribe  发布/订阅

    1.一个生产者,多个消费者

    2.每个消费者都有自己的一个队列

    3.每个队列都要绑定到交换机

    4.生产者将消息发送到交换机

    5.当消息发送到没有队列绑定的交换机时,消息丢失。因为交换机没有存储消息的能力,下拍戏只能存在队列种。

    生产者:向交换机发送消息

    public class Product {
    
        private static final String EXCHANGE_Name = "text_exchange_fanout";
    
        public static void main(String []args) throws Exception{
    
            Connection connection = MqConnection.getConnect();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_Name,"fanout");
    
            String msg = "hello,world";
    
            channel.basicPublish(EXCHANGE_Name,"",null,msg.getBytes());
    
            System.out.println("[Send]:"+msg);
    
            channel.close();
    
            connection.close();
    
        }
    
    }
    View Code

    消费者01:

    public class Consumer01 {
        
        private static final String EXCHANGE_Name = "text_exchange_fanout";
        private static final String QUEUE_Name = "my_queue01";
    
        public static void main(String []args) throws Exception{
    
            Connection connection = MqConnection.getConnect();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_Name,false,false,false,null);
    
            channel.queueBind(QUEUE_Name,EXCHANGE_Name,"");
    
            QueueingConsumer consumer = new QueueingConsumer(channel);
    
            channel.basicConsume(QUEUE_Name,true,consumer);
    
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("[Recive01]:"+msg);
                Thread.sleep(10);
    
            }
        }
    }

    消费者02:

    public class Consumer02 {
        private static final String EXCHANGE_Name = "text_exchange_fanout";
        private static final String QUEUE_Name = "my_queue02";
    
        public static void main(String []args) throws Exception{
    
            Connection connection = MqConnection.getConnect();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_Name,false,false,false,null);
    
            channel.queueBind(QUEUE_Name,EXCHANGE_Name,"");
    
    
    
            QueueingConsumer consumer = new QueueingConsumer(channel);
    
            channel.basicConsume(QUEUE_Name,true,consumer);
    
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("[Recive02]:"+msg);
                Thread.sleep(100);
    
            }
        }
    }

    结果:当生产者发送一条消息时,多个消费者可以获取到消息。一个消费者队列可以有多个消费者实例,只要其中一个消费者实例会消费到消息。  

    (4)routing 路由选择 

     

    生产者:

    public class Producer {
    
        private static final String EXCHANGE_NAME = "test_exchange_direct";
    
         public static void main(String []args) throws Exception{
    
            Connection connection = MqConnection.getConnect();
    
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME,"direct");
    
            String msg = "hello,你哈";
    
            channel.basicPublish(EXCHANGE_NAME,"update",null,msg.getBytes());
            System.out.println("[send]:"+msg);
    
            channel.close();
            connection.close();
        }
    }
    View Code

    消费者01:

     1 public class Consumer01 {
     2     private static final String EXCHANGE_NAME = "test_exchange_direct";
     3     private static final String Queue_name = "queue_01";
     4 
     5      public static void main(String []args) throws Exception{
     6         Connection connection = MqConnection.getConnect();
     7         Channel channel = connection.createChannel();
     8 
     9         channel.queueDeclare(Queue_name,false,false,false,null);
    10 
    11         channel.queueBind(Queue_name,EXCHANGE_NAME,"select");
    12         channel.queueBind(Queue_name,EXCHANGE_NAME,"delete");
    13 
    14         channel.basicQos(1);
    15 
    16         QueueingConsumer consumer = new QueueingConsumer(channel);
    17 
    18          channel.basicConsume(Queue_name,true,consumer);
    19 
    20          while(true){
    21              QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    22              String msg = new String(delivery.getBody());
    23              System.out.println("[queue_01]:"+msg);
    24              Thread.sleep(10);
    25 
    26          }
    27     }
    28 }
    View Code

    消费者02:

     1 public class Consumer02 {
     2     private static final String EXCHANGE_NAME = "test_exchange_direct";
     3     private static final String Queue_name = "queue_02";
     4 
     5      public static void main(String []args) throws Exception{
     6         Connection connection = MqConnection.getConnect();
     7         Channel channel = connection.createChannel();
     8 
     9         channel.queueDeclare(Queue_name,false,false,false,null);
    10 
    11         channel.queueBind(Queue_name,EXCHANGE_NAME,"select");
    12         channel.queueBind(Queue_name,EXCHANGE_NAME,"update");
    13 
    14         channel.basicQos(1);
    15 
    16         QueueingConsumer consumer = new QueueingConsumer(channel);
    17 
    18          channel.basicConsume(Queue_name,true,consumer);
    19 
    20          while(true){
    21              QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    22              String msg = new String(delivery.getBody());
    23              System.out.println("[queue_02]:"+msg);
    24              Thread.sleep(10);
    25 
    26          }
    27     }
    28 }
    View Code

    结果:生产者产生的消息会附带key, 消费者也会附带key,当生产者的key == 消费者的key 才能让消费者获得消息。

     

    (5)Topics  主题

     

    同一个消息被多个消费者获取。

    消息生产者:

     1 public class Send {
     2 
     3     private final  static String EXCHANGE_NAME = "test_exchange_topic";
     4 
     5     public static void main(String []args) throws Exception{
     6 
     7         Connection connection = MqConnection.getConnect();
     8         Channel channel = connection.createChannel();
     9 
    10         channel.exchangeDeclare(EXCHANGE_NAME,"topic");
    11 
    12         String msg = "Hello,World";
    13 
    14         channel.basicPublish(EXCHANGE_NAME,"emp.update.del",null,msg.getBytes());
    15         System.out.println("[Send]:"+msg);
    16 
    17         channel.close();
    18         connection.close();
    19     }
    20 
    21 }
    View Code

    消费者01:

     1 public class Rec01 {
     2     private final  static String EXCHANGE_NAME = "test_exchange_topic";
     3     private final  static String QUEUE_NAME = "test_queue_topic01";
     4 
     5 
     6     public static void main(String []args) throws Exception{
     7         Connection connection = MqConnection.getConnect();
     8         Channel channel = connection.createChannel();
     9 
    10         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    11 
    12         channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"emp.*");
    13 
    14         channel.basicQos(1);
    15 
    16         QueueingConsumer consumer = new QueueingConsumer(channel);
    17         channel.basicConsume(QUEUE_NAME,false,consumer);
    18 
    19         while(true){
    20             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    21             String msg = new String(delivery.getBody());
    22             System.out.println("[rec]01:"+msg);
    23             Thread.sleep(100);
    24             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    25         }
    26     }
    27 }
    View Code

    消费者02:

     1 public class Rec02 {
     2     private final  static String EXCHANGE_NAME = "test_exchange_topic";
     3     private final  static String QUEUE_NAME = "test_queue_topic02";
     4 
     5 
     6     public static void main(String []args) throws Exception{
     7         Connection connection = MqConnection.getConnect();
     8         Channel channel = connection.createChannel();
     9 
    10         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    11 
    12         channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"emp.#");
    13 
    14         channel.basicQos(1);
    15 
    16         QueueingConsumer consumer = new QueueingConsumer(channel);
    17         channel.basicConsume(QUEUE_NAME,false,consumer);
    18 
    19         while(true){
    20             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    21             String msg = new String(delivery.getBody());
    22             System.out.println("[rec]02:"+msg);
    23             Thread.sleep(500);
    24             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    25         }
    26     }
    27 }
    View Code

    * :一个,#一个的或者多个

  • 相关阅读:
    Oracle修改表Table所属表空间及Clob、Blob字段的处理
    MyBatis返回多表连接结果
    MyBatis查询结果resultType返回值类型详细介绍
    SpringBoot之分页PageHelper
    Postman简单用法以及转cURL等命令的正确姿势
    postman 巧用cURL
    Spring Boot设置跨域访问
    springboot设置cors跨域请求的两种方式
    @Configuration使用
    @GetMapping和@PostMapping接收参数的格式
  • 原文地址:https://www.cnblogs.com/bytecodebuffer/p/11366278.html
Copyright © 2011-2022 走看看