zoukankan      html  css  js  c++  java
  • RabbitMQ简单应用の简单队列

    (1)首先创建一个maven项目:

    pom.xml,重点是配置RabbitMQ

      1     <dependencies>
      2         <dependency>
      3             <groupId>junit</groupId>
      4             <artifactId>junit</artifactId>
      5             <version>4.11</version>
      6             <!-- 表示开发的时候引入,发布的时候不会加载此包 -->
      7             <scope>test</scope>
      8         </dependency>
      9         <!-- spring核心包 -->
     10         <dependency>
     11             <groupId>org.springframework</groupId>
     12             <artifactId>spring-core</artifactId>
     13             <version>${spring.version}</version>
     14         </dependency>
     15 
     16         <dependency>
     17             <groupId>org.springframework</groupId>
     18             <artifactId>spring-web</artifactId>
     19             <version>${spring.version}</version>
     20         </dependency>
     21         <dependency>
     22             <groupId>org.springframework</groupId>
     23             <artifactId>spring-oxm</artifactId>
     24             <version>${spring.version}</version>
     25         </dependency>
     26         <dependency>
     27             <groupId>org.springframework</groupId>
     28             <artifactId>spring-tx</artifactId>
     29             <version>${spring.version}</version>
     30         </dependency>
     31 
     32         <dependency>
     33             <groupId>org.springframework</groupId>
     34             <artifactId>spring-jdbc</artifactId>
     35             <version>${spring.version}</version>
     36         </dependency>
     37 
     38         <dependency>
     39             <groupId>org.springframework</groupId>
     40             <artifactId>spring-webmvc</artifactId>
     41             <version>${spring.version}</version>
     42         </dependency>
     43         <dependency>
     44             <groupId>org.springframework</groupId>
     45             <artifactId>spring-aop</artifactId>
     46             <version>${spring.version}</version>
     47         </dependency>
     48 
     49         <dependency>
     50             <groupId>org.springframework</groupId>
     51             <artifactId>spring-context-support</artifactId>
     52             <version>${spring.version}</version>
     53         </dependency>
     54 
     55         <dependency>
     56             <groupId>org.springframework</groupId>
     57             <artifactId>spring-test</artifactId>
     58             <version>${spring.version}</version>
     59         </dependency>
     60         <!-- mybatis核心包 -->
     61         <dependency>
     62             <groupId>org.mybatis</groupId>
     63             <artifactId>mybatis</artifactId>
     64             <version>${mybatis.version}</version>
     65         </dependency>
     66         <!-- mybatis/spring包 -->
     67         <dependency>
     68             <groupId>org.mybatis</groupId>
     69             <artifactId>mybatis-spring</artifactId>
     70             <version>1.2.2</version>
     71         </dependency>
     72         <!-- 导入java ee jar 包 -->
     73         <dependency>
     74             <groupId>javax</groupId>
     75             <artifactId>javaee-api</artifactId>
     76             <version>7.0</version>
     77         </dependency>
     78         <!-- 导入Mysql数据库链接jar包 -->
     79         <dependency>
     80             <groupId>mysql</groupId>
     81             <artifactId>mysql-connector-java</artifactId>
     82             <version>5.1.30</version>
     83         </dependency>
     84         <!-- 导入dbcp的jar包,用来在applicationContext.xml中配置数据库 -->
     85         <dependency>
     86             <groupId>commons-dbcp</groupId>
     87             <artifactId>commons-dbcp</artifactId>
     88             <version>1.2.2</version>
     89         </dependency>
     90         <!-- JSTL标签类 -->
     91         <dependency>
     92             <groupId>jstl</groupId>
     93             <artifactId>jstl</artifactId>
     94             <version>1.2</version>
     95         </dependency>
     96         <!-- 日志文件管理包 -->
     97         <!-- log start -->
     98         <dependency>
     99             <groupId>log4j</groupId>
    100             <artifactId>log4j</artifactId>
    101             <version>${log4j.version}</version>
    102         </dependency>
    103 
    104 
    105         <!-- 格式化对象,方便输出日志 -->
    106         <dependency>
    107             <groupId>com.alibaba</groupId>
    108             <artifactId>fastjson</artifactId>
    109             <version>1.1.41</version>
    110         </dependency> 
    111         <dependency>
    112             <groupId>org.slf4j</groupId>
    113             <artifactId>slf4j-api</artifactId>
    114             <version>${slf4j.version}</version>
    115         </dependency>
    116         <dependency>
    117             <groupId>net.sf.json-lib</groupId>
    118             <artifactId>json-lib</artifactId>
    119             <version>2.4</version>
    120             <classifier>jdk15</classifier>
    121         </dependency>
    122         <!-- https://mvnrepository.com/artifact/org.apache.ant/ant -->
    123         <dependency>
    124             <groupId>org.apache.ant</groupId>
    125             <artifactId>ant</artifactId>
    126             <version>1.8.2</version>
    127         </dependency>
    128         <dependency>
    129             <groupId>org.slf4j</groupId>
    130             <artifactId>slf4j-log4j12</artifactId>
    131             <version>${slf4j.version}</version>
    132         </dependency>
    133         <!-- log end -->
    134         <!-- 映入JSON -->
    135         <dependency>
    136             <groupId>org.codehaus.jackson</groupId>
    137             <artifactId>jackson-mapper-asl</artifactId>
    138             <version>1.9.13</version>
    139         </dependency>
    140         <!-- 上传组件包 -->
    141         <dependency>
    142             <groupId>commons-fileupload</groupId>
    143             <artifactId>commons-fileupload</artifactId>
    144             <version>1.3.1</version>
    145         </dependency>
    146         <dependency>
    147             <groupId>commons-io</groupId>
    148             <artifactId>commons-io</artifactId>
    149             <version>2.4</version>
    150         </dependency>
    151         <dependency>
    152             <groupId>commons-codec</groupId>
    153             <artifactId>commons-codec</artifactId>
    154             <version>1.9</version>
    155         </dependency>
    156         <dependency>
    157             <groupId>net.sourceforge.jexcelapi</groupId>
    158             <artifactId>jxl</artifactId>
    159             <version>2.6.12</version>
    160         </dependency> 
    161         <!-- https://mvnrepository.com/artifact/org.json/json -->
    162         <dependency>
    163             <groupId>org.json</groupId>
    164             <artifactId>json</artifactId>
    165             <version>20171018</version>
    166         </dependency>
    167         <dependency>
    168             <groupId>com.sun.jna</groupId>
    169             <artifactId>jna</artifactId>
    170             <version>3.0.9</version>
    171         </dependency>
    172         <dependency>
    173             <groupId>com.examples</groupId>
    174             <artifactId>examples</artifactId>
    175             <version>1.1.2</version>
    176         </dependency>
    177         <dependency>
    178             <groupId>com.rabbitmq</groupId>
    179             <artifactId>amqp-client</artifactId>
    180             <version>3.5.1</version>
    181         </dependency>
    182         <dependency>
    183             <groupId>org.springframework.amqp</groupId>
    184             <artifactId>spring-rabbit</artifactId>
    185             <version>1.4.5.RELEASE</version>
    186         </dependency>
    187     </dependencies> 
    View Code

    项目结构:

    (1)首先创建获取MQ的链接工厂ConnectionUtils

     1 package com.mmr.rabbitmq.util;
     2 
     3 import java.io.IOException;
     4 
     5 import com.rabbitmq.client.Connection;
     6 import com.rabbitmq.client.ConnectionFactory;
     7 
     8 public class ConnectionUtils {
     9     /**
    10      * @desc 获取Mq 的链接
    11      * @author zp
    12      * @throws IOException 
    13      * @date 2018-7-19
    14      */
    15     public static  Connection getConnection() throws IOException {
    16         // 1.定义一个链接工厂
    17         ConnectionFactory factroy = new ConnectionFactory();
    18         
    19         // 2.设置服务地址
    20         factroy.setHost("127.0.0.1");
    21         
    22         // 3.设置端口号
    23         factroy.setPort(5672);
    24         
    25         // 4.vhost  设置数据库
    26         factroy.setVirtualHost("vhtest");
    27         
    28         // 5.设置用户名
    29         factroy.setUsername("jerry");
    30         
    31         // 6. 设置密码
    32         factroy.setPassword("123456");
    33         
    34         // 7.返回链接
    35         return factroy.newConnection();
    36     }
    37 }
    View Code

    (2)其次创建消息生产者Send,这里消息生产者每发送一次消息,我们就可以通过rabbitmq(http://localhost:15672)的服务Queues进行查看

     1 package com.mmr.rabbitmq.simple;
     2 
     3 import java.io.IOException;
     4 
     5 import com.mmr.rabbitmq.util.ConnectionUtils;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 
     9 public class Send {
    10     private static final String QUEUE_NAME = "test_simple_queue";
    11     public static void main(String[] args) throws IOException {
    12         // 1.获取一个链接
    13         Connection connection = ConnectionUtils.getConnection();
    14         
    15         // 2.获取一个通道
    16         Channel channel = connection.createChannel();
    17     
    18         // 3.创建队列 创建队列声明
    19         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    20     
    21         // 
    22         String msg = "hello simple";
    23         
    24         channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
    25         
    26         System.out.println("---send msg :"+msg);
    27         
    28         channel.close();
    29         
    30         connection.close();
    31     }
    32 }
    View Code

    (3)最后创建消息消费者Recv,这里我们通过while循环可以获取每次rabbitmq(http://localhost:15672)的服务Queues接收到消息(方法一)

     1 package com.mmr.rabbitmq.simple;
     2 
     3 import java.io.IOException;
     4 
     5 import com.mmr.rabbitmq.util.ConnectionUtils;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 import com.rabbitmq.client.ConsumerCancelledException;
     9 import com.rabbitmq.client.QueueingConsumer;
    10 import com.rabbitmq.client.QueueingConsumer.Delivery;
    11 import com.rabbitmq.client.ShutdownSignalException;
    12 
    13 /**
    14  * @desc 消费者获取消息
    15  * @author zp
    16  * @date 2018-7-19 
    17  */
    18 public class Recv {
    19     private static final String QUEUE_NAME = "test_simple_queue";
    20     public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
    21         // 获取链接
    22         Connection  connection = ConnectionUtils.getConnection();
    23         
    24         // 创建通道
    25         Channel channel = connection.createChannel();
    26         
    27         // 定义队列消费者
    28         QueueingConsumer consumer = new QueueingConsumer(channel);
    29         
    30         // 监听队列
    31         channel.basicConsume(QUEUE_NAME, true, consumer);
    32         
    33         while (true) {
    34             Delivery delivery = consumer.nextDelivery();// 下一个到达的
    35             
    36             String msgString = new String(delivery.getBody()); 
    37             
    38             System.out.println("[recv] msg:"+msgString);
    39         }
    40     }
    41 }
    View Code

    (3)最后创建消息消费者Recv,这里我们通过while循环可以获取每次rabbitmq(http://localhost:15672)的服务Queues接收到消息(方法二)

     1 package com.mmr.rabbitmq.simple;
     2 
     3 import java.io.IOException;
     4 
     5 import com.mmr.rabbitmq.util.ConnectionUtils;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 import com.rabbitmq.client.ConsumerCancelledException;
     9 import com.rabbitmq.client.DefaultConsumer;
    10 import com.rabbitmq.client.Envelope;
    11 import com.rabbitmq.client.QueueingConsumer;
    12 import com.rabbitmq.client.AMQP.BasicProperties;
    13 import com.rabbitmq.client.QueueingConsumer.Delivery;
    14 import com.rabbitmq.client.ShutdownSignalException;
    15 
    16 /**
    17  * @desc 消费者获取消息
    18  * @author zp
    19  * @date 2018-7-19 
    20  */
    21 public class Recv {
    22     private static final String QUEUE_NAME = "test_simple_queue";
    23 
    24     public static void main(String[] args) throws IOException{
    25         // 获取链接
    26         Connection connection = ConnectionUtils.getConnection();
    27         
    28         // 获取通道
    29         Channel channel = connection.createChannel();
    30         
    31         // 队列声明
    32         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    33         
    34         // 定义消费者
    35         DefaultConsumer consumer = new DefaultConsumer(channel){
    36             // 获取到到达的消息
    37             @Override
    38             public void handleDelivery(String consumerTag, Envelope envelope,
    39                     BasicProperties properties, byte[] body) throws IOException {
    40                 // TODO Auto-generated method stub
    41                 //super.handleDelivery(consumerTag, envelope, properties, body);
    42                 String msg = new String(body,"utf-8");
    43                 System.out.println("new api recv:"+msg);
    44             }
    45         };
    46         // 监听队列
    47         channel.basicConsume(QUEUE_NAME, true,consumer);
    48         
    49         
    50         
    51     } 
    52 }
    View Code

    这样一个简单的队列应用就完成了。

  • 相关阅读:
    流程控制引擎组件化
    (七):C++分布式实时应用框架 2.0
    (六):大型项目容器化改造
    (五):C++分布式实时应用框架——微服务架构的演进
    (四):C++分布式实时应用框架——状态中心模块
    (三):C++分布式实时应用框架——系统管理模块
    (二): 基于ZeroMQ的实时通讯平台
    (一):C++分布式实时应用框架----整体介绍
    分布式压测系列之Jmeter4.0第一季
    选择 NoSQL 需要考虑的 10 个问题
  • 原文地址:https://www.cnblogs.com/pengpengzhang/p/9334779.html
Copyright © 2011-2022 走看看