zoukankan      html  css  js  c++  java
  • RabbitMQ初体验

    这里官方使用的Pom是4.0.2版本

     1 <dependencies>
     2   <dependency>
     3     <groupId>com.rabbitmq</groupId>
     4     <artifactId>amqp-client</artifactId>
     5     <version>4.0.2</version>
     6   </dependency>
     7    <dependency>
     8     <groupId>org.slf4j</groupId>
     9     <artifactId>slf4j-api</artifactId>
    10     <version>1.7.10</version>
    11   </dependency>
    12   <dependency>
    13     <groupId>org.slf4j</groupId>
    14     <artifactId>slf4j-log4j12</artifactId>
    15     <version>1.7.5</version>
    16   </dependency>
    17   <dependency>
    18     <groupId>log4j</groupId>
    19     <artifactId>log4j</artifactId>
    20     <version>1.2.17</version>
    21   </dependency>
    22   <dependency>
    23     <groupId>junit</groupId>
    24     <artifactId>junit</artifactId>
    25     <version>4.11</version>
    26   </dependency>
    27 </dependencies>

    简单队列 hello word

    P:消息的生产者

    C:消息的消费者

    红色:队列

    生产者将消息发送到队列,消费者从队列中获取消息。

    那么我们根据以上的模型,咱们抽取出 3 个对象 生产者(用户发送消息) 队列(中间件):类似于容器(存储消息) 消费者(获取队列中的消息)

    JAVA 操作 获取 MQ 连接

    类似于我们在操作数据库的时候的要获取到连接然后才对数据进行

     1 package cn.wh.util;
     2 
     3 import com.rabbitmq.client.Connection;
     4 import com.rabbitmq.client.ConnectionFactory;
     5 
     6 import java.io.IOException;
     7 import java.util.concurrent.TimeoutException;
     8 
     9 public class RabbitMqConnectionUtil {
    10     /**
    11      * 获取mq的连接
    12      * @return
    13      */
    14     public static Connection getConnection() throws IOException, TimeoutException {
    15         //定义一个连接工厂
    16         ConnectionFactory factory=new ConnectionFactory();
    17         //设置服务器的地址
    18         factory.setHost("192.168.152.5");
    19         //AMQP 5672
    20         factory.setPort(5672);
    21         //设置哪一个数据库 vhost
    22         factory.setVirtualHost("/vhost_wh");
    23         //设置用户名
    24         factory.setUsername("wh");
    25         factory.setPassword("123");
    26 
    27         return  factory.newConnection();
    28     }
    29 }

    生产者发送数据到消息队列

     1 package cn.wh.simple;
     2 
     3 import cn.wh.util.RabbitMqConnectionUtil;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 
     7 import java.io.IOException;
     8 import java.util.concurrent.TimeoutException;
     9 
    10 public class Send {
    11     private static final String QUEVE_NAME = "test_simple_queue";
    12 
    13     public static void main(String[] args) throws IOException {
    14         //获取一个连接
    15         Connection connection = null;
    16         try {
    17          
    18             connection = RabbitMqConnectionUtil.getConnection();
    19         } catch (IOException e) {
    20             e.printStackTrace();
    21         } catch (TimeoutException e) {
    22             e.printStackTrace();
    23         }
    24 
    25         //创建一个通道
    26         Channel channel = connection.createChannel();
    27         // 创建队列声明
    28         channel.queueDeclare(QUEVE_NAME, false, false, false, null);
    29 
    30         //发送的消息
    31         String msg="hello simple";
    32         channel.basicPublish("",QUEVE_NAME,null,msg.getBytes());
    33         System.out.println("发送成功===============");
    34         try {
    35             channel.close();
    36             connection.close();
    37         } catch (TimeoutException e) {
    38             e.printStackTrace();
    39         }
    40     }
    41 }

    查看消消费者消费消费者消费消费者消费消费者消消费者消费消费者消费

     1 package cn.wh.simple;
     2 
     3 import cn.wh.util.RabbitMqConnectionUtil;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 import com.rabbitmq.client.QueueingConsumer;
     7 
     8 import java.io.IOException;
     9 import java.util.concurrent.TimeoutException;
    10 
    11 public class Accept {
    12     private static final java.lang.String QUEVE_NAME = "test_simple_queue";
    13     public static void main(String[] args) throws IOException, InterruptedException {
    14 
    15         //获取一个连接
    16         Connection connection=null;
    17         {
    18             try {
    19                 connection = RabbitMqConnectionUtil.getConnection();
    20             } catch (IOException e) {
    21                 e.printStackTrace();
    22             } catch (TimeoutException e) {
    23                 e.printStackTrace();
    24             }
    25             //定义管道
    26             Channel channel = connection.createChannel();
    27             //定义队列的消费者
    28             QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    29             channel.basicConsume(QUEVE_NAME,true,queueingConsumer);
    30             while (true){
    31                 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
    32                 String msg = new String(delivery.getBody());
    33 
    34 
    35                 System.out.println("msg"+ msg);
    36             }
    37         }
    38     }
    39 }

    简单队列的不足

    耦合性高 生产消费一一对应(如果有多个消费者想都消费这个消息,就不行了) 队列名称变更时需要同时更改

  • 相关阅读:
    Aurora 数据库支持多达五个跨区域只读副本
    Amazon RDS 的 Oracle 只读副本
    Amazon EC2 密钥对
    DynamoDB 读取请求单位和写入请求单位
    使用 EBS 优化的实例或 10 Gb 网络实例
    启动 LAMP 堆栈 Web 应用程序
    AWS 中的错误重试和指数退避 Error Retries and Exponential Backoff in AWS
    使用 Amazon S3 阻止公有访问
    路由表 Router Table
    使用MySQLAdmin工具查看QPS
  • 原文地址:https://www.cnblogs.com/wh1520577322/p/10059298.html
Copyright © 2011-2022 走看看