zoukankan      html  css  js  c++  java
  • RabbitMQ在普通MAVEN项目中的使用

    五、在普通的Maven应用中使用MQ

      rabbitmq的队列结构

    5.1简单模式

    5.1.1 消息生产者
    • 创建Maven项目

    • 添加RabbitMQ连接所需要的依赖

    • <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
      <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>4.10.0</version>
      </dependency>
      <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
      <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
          <version>1.7.25</version>
          <scope>test</scope>
      </dependency>
      <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
      <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-lang3</artifactId>
          <version>3.9</version>
      </dependency>

      在resources目录下创建log4j.properties

    • log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao = DEBUG
      log4j.logger.org.mybatis = DEBUG
      log4j.appender.A1=org.apache.log4j.ConsoleAppender
      log4j.appender.A1.layout=org.apache.log4j.PatternLayout
      log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-[%p] %m%n

    • 创建MQ连接帮助类

    • package com.qfedu.mq.utils;
      
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;
      
      import java.io.IOException;
      import java.util.concurrent.TimeoutException;
      
      public class ConnectionUtil {
      
          public static Connection getConnection() throws IOException, TimeoutException {
              //1.创建连接工厂
              ConnectionFactory factory = new ConnectionFactory();
              //2.在工厂对象中设置MQ的连接信息(ip,port,virtualhost,username,password)
              factory.setHost("47.96.11.185");
              factory.setPort(5672);
              factory.setVirtualHost("host1");
              factory.setUsername("ytao");
              factory.setPassword("admin123");
              //3.通过工厂对象获取与MQ的链接
              Connection connection = factory.newConnection();
              return connection;
          }
      
      }

      消息生产者发送消息

    • package com.qfedu.mq.service;
      
      import com.qfedu.mq.utils.ConnectionUtil;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      
      public class SendMsg {
      
          public static void main(String[] args) throws Exception{
      
              String msg = "Hello HuangDaoJun!";
              Connection connection = ConnectionUtil.getConnection();   
              Channel channel = connection.createChannel();    
      
              //定义队列(使用Java代码在MQ中新建一个队列)
              //参数1:定义的队列名称
              //参数2:队列中的数据是否持久化(如果选择了持久化)
              //参数3: 是否排外(当前队列是否为当前连接私有)
              //参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据))
              //参数5:设置当前队列的参数
              //channel.queueDeclare("queue7",false,false,false,null);
      
              //参数1:交换机名称,如果直接发送信息到队列,则交换机名称为""
              //参数2:目标队列名称
              //参数3:设置当前这条消息的属性(设置过期时间 10)
              //参数4:消息的内容
              channel.basicPublish("","queue7",null,msg.getBytes());
              System.out.println("发送:" + msg);
      
              channel.close();
              connection.close();
          }
      
      }
      5.1.2 消息消费者
      • 创建Maven项目

      • 添加依赖

      • log4j.properties

      • ConnetionUtil.java

      • 消费者消费消息

      • package com.qfedu.mq.service;
        
        import com.qfedu.mq.utils.ConnectionUtil;
        import com.rabbitmq.client.*;
        
        import java.io.IOException;
        import java.util.concurrent.TimeoutException;
        
        public class ReceiveMsg {
        
            public static void main(String[] args) throws IOException, TimeoutException {
                Connection connection = ConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
        
                Consumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, 
                                AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //body就是从队列中获取的数据
                        String msg = new String(body);
                        System.out.println("接收:"+msg);
                    }
                };
        
                channel.basicConsume("queue1",true,consumer);
            }
        }

        5.2 工作模式----    一个生产者多个消费者

        • 5.2.1 发送者
        • public class SendMsg {
          
              public static void main(String[] args) throws Exception{
                  System.out.println("请输入消息:");
                  Scanner scanner = new Scanner(System.in);
                  String msg = null;
                  while(!"quit".equals(msg = scanner.nextLine())){
                      Connection connection = ConnectionUtil.getConnection();
                      Channel channel = connection.createChannel();
          
                      channel.basicPublish("","queue2",null,msg.getBytes());
                      System.out.println("发送:" + msg);
          
                      channel.close();
                      connection.close();
                  }
              }
          
          }

          5.2.2 消费者1

        • public class ReceiveMsg {
          
              public static void main(String[] args) throws Exception {
                  Connection connection = ConnectionUtil.getConnection();
                  Channel channel = connection.createChannel();
          
                  Consumer consumer = new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          //body就是从队列中获取的数据
                          String msg = new String(body);
                          System.out.println("Consumer1接收:"+msg);
                          if("wait".equals(msg)){
                              try {
                                  Thread.sleep(10000);
                              } catch (InterruptedException e) {
                                  e.printStackTrace();
                              }
                          }
                      }
                  };
          
                  channel.basicConsume("queue2",true,consumer);
              }
          }

          5.2.3 消费者2

        • public class ReceiveMsg {
          
              public static void main(String[] args) throws IOException, TimeoutException {
                  Connection connection = ConnectionUtil.getConnection();
                  Channel channel = connection.createChannel();
          
                  Consumer consumer = new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          //body就是从队列中获取的数据
                          String msg = new String(body);
                          System.out.println("Consumer2接收:"+msg);
                      }
                  };
          
                  channel.basicConsume("queue2",true,consumer);
              }
          }

          5.3 订阅模式

          5.3.1 发送者 发送消息到交换机
        • public class SendMsg {
          
              public static void main(String[] args) throws Exception{
                  System.out.println("请输入消息:");
                  Scanner scanner = new Scanner(System.in);
                  String msg = null;
                  while(!"quit".equals(msg = scanner.nextLine())){
                      Connection connection = ConnectionUtil.getConnection();
                      Channel channel = connection.createChannel();
          
                      channel.basicPublish("ex1","",null,msg.getBytes());
                      System.out.println("发送:" + msg);
          
                      channel.close();
                      connection.close();
                  }
              }
          
          }

          5.3.2 消费者1

        • public class ReceiveMsg1 {
          
              public static void main(String[] args) throws Exception {
                  Connection connection = ConnectionUtil.getConnection();
                  Channel channel = connection.createChannel();
          
                  Consumer consumer = new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          //body就是从队列中获取的数据
                          String msg = new String(body);
                          System.out.println("Consumer1接收:"+msg);
                          if("wait".equals(msg)){
                              try {
                                  Thread.sleep(10000);
                              } catch (InterruptedException e) {
                                  e.printStackTrace();
                              }
                          }
                      }
                  };
          
                  channel.basicConsume("queue3",true,consumer);
              }
          }

          5.3.3 消费者2

        • public class ReceiveMsg2 {
          
              public static void main(String[] args) throws IOException, TimeoutException {
                  Connection connection = ConnectionUtil.getConnection();
                  Channel channel = connection.createChannel();
          
                  Consumer consumer = new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          //body就是从队列中获取的数据
                          String msg = new String(body);
                          System.out.println("Consumer2接收:"+msg);
                      }
                  };
          
                  channel.basicConsume("queue4",true,consumer);
              }
          }

          5.4 路由模式

          5.4.1 发送者 发送消息到交换机
        • public class SendMsg {
          
              public static void main(String[] args) throws Exception{
                  System.out.println("请输入消息:");
                  Scanner scanner = new Scanner(System.in);
                  String msg = null;
                  while(!"quit".equals(msg = scanner.nextLine())){
                      Connection connection = ConnectionUtil.getConnection();
                      Channel channel = connection.createChannel();
          
                      if(msg.startsWith("a")){
                          channel.basicPublish("ex2","a",null,msg.getBytes());
                      }else if(msg.startsWith("b")){
                          channel.basicPublish("ex2","b",null,msg.getBytes());
                      }
                      System.out.println("发送:" + msg);
          
                      channel.close();
                      connection.close();
                  }
              }
          
          }

          5.4.2 消费者1

        • public class ReceiveMsg1 {
          
              public static void main(String[] args) throws Exception {
                  Connection connection = ConnectionUtil.getConnection();
                  Channel channel = connection.createChannel();
          
                  Consumer consumer = new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          //body就是从队列中获取的数据
                          String msg = new String(body);
                          System.out.println("Consumer1接收:"+msg);
                          if("wait".equals(msg)){
                              try {
                                  Thread.sleep(10000);
                              } catch (InterruptedException e) {
                                  e.printStackTrace();
                              }
                          }
                      }
                  };
          
                  channel.basicConsume("queue5",true,consumer);
              }
          }

          5.4.3 消费者2

        • public class ReceiveMsg2 {
          
              public static void main(String[] args) throws IOException, TimeoutException {
                  Connection connection = ConnectionUtil.getConnection();
                  Channel channel = connection.createChannel();
          
                  Consumer consumer = new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          //body就是从队列中获取的数据
                          String msg = new String(body);
                          System.out.println("Consumer2接收:"+msg);
                      }
                  };
          
                  channel.basicConsume("queue6",true,consumer);
              }
          }
  • 相关阅读:
    Count and Say
    Roman to Integer LeetCode Java
    白菜刷LeetCode记-121. Best Time to Buy and Sell Stock
    白菜刷LeetCode记-103. Binary Tree Zigzag Level Order Traversal
    白菜刷LeetCode记-102. Binary Tree Level Order Traversal
    白菜刷LeetCode记-350. Intersection of Two Arrays II
    白菜刷LeetCode记-268. Missing Number
    白菜刷LeetCode记-378. Kth Smallest Element in a Sorted Matrix
    白菜刷LeetCode记-328. Odd Even Linked List
    白菜刷LeetCode记-230. Kth Smallest Element in a BST
  • 原文地址:https://www.cnblogs.com/jikeyi/p/13339124.html
Copyright © 2011-2022 走看看