zoukankan      html  css  js  c++  java
  • SpringBoot整合RabbitMQ消息组件

    1、RabbitMQ是一个在AMQP基础上构建的新一代企业级消息系统,该组件由Pivotal公司提供,使用ErLang语言开发。

    修改pom.xml配置文件,追加spring-boot-starter-amqp依赖包。

      1 <?xml version="1.0" encoding="UTF-8"?>
      2 <project xmlns="http://maven.apache.org/POM/4.0.0"
      3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      4     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
      5     https://maven.apache.org/xsd/maven-4.0.0.xsd">
      6     <modelVersion>4.0.0</modelVersion>
      7     <parent>
      8         <groupId>org.springframework.boot</groupId>
      9         <artifactId>spring-boot-starter-parent</artifactId>
     10         <version>2.3.5.RELEASE</version>
     11         <relativePath /> <!-- lookup parent from repository -->
     12     </parent>
     13     <groupId>com.example</groupId>
     14     <artifactId>demo</artifactId>
     15     <version>0.0.1-SNAPSHOT</version>
     16     <name>demo</name>
     17     <description>Demo project for Spring Boot</description>
     18 
     19     <properties>
     20         <java.version>1.8</java.version>
     21         <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
     22     </properties>
     23 
     24     <dependencies>
     25         <dependency>
     26             <groupId>org.springframework.boot</groupId>
     27             <artifactId>spring-boot-starter-web</artifactId>
     28         </dependency>
     29 
     30         <dependency>
     31             <groupId>org.springframework.boot</groupId>
     32             <artifactId>spring-boot-starter-test</artifactId>
     33             <scope>test</scope>
     34             <exclusions>
     35                 <exclusion>
     36                     <groupId>org.junit.vintage</groupId>
     37                     <artifactId>junit-vintage-engine</artifactId>
     38                 </exclusion>
     39             </exclusions>
     40         </dependency>
     41 
     42         <!-- mysql驱动包 -->
     43         <dependency>
     44             <groupId>mysql</groupId>
     45             <artifactId>mysql-connector-java</artifactId>
     46         </dependency>
     47 
     48         <!-- druid连接池 -->
     49         <dependency>
     50             <groupId>com.alibaba</groupId>
     51             <artifactId>druid</artifactId>
     52             <version>1.1.10</version>
     53         </dependency>
     54 
     55         <dependency>
     56             <groupId>org.springframework.boot</groupId>
     57             <artifactId>spring-boot-starter-data-jpa</artifactId>
     58         </dependency>
     59         <dependency>
     60             <groupId>org.springframework.boot</groupId>
     61             <artifactId>spring-boot-starter-cache</artifactId>
     62         </dependency>
     63         <dependency>
     64             <groupId>org.hibernate</groupId>
     65             <artifactId>hibernate-ehcache</artifactId>
     66         </dependency>
     67 
     68         <!-- activeMQ -->
     69         <dependency>
     70             <groupId>org.springframework.boot</groupId>
     71             <artifactId>spring-boot-starter-activemq</artifactId>
     72         </dependency>
     73         
     74         <!-- rabbitMQ -->
     75         <dependency>
     76             <groupId>org.springframework.boot</groupId>
     77             <artifactId>spring-boot-starter-amqp</artifactId>
     78         </dependency>
     79     </dependencies>
     80 
     81     <build>
     82         <plugins>
     83             <plugin>
     84                 <groupId>org.springframework.boot</groupId>
     85                 <artifactId>spring-boot-maven-plugin</artifactId>
     86             </plugin>
     87         </plugins>
     88         <resources>
     89             <resource>
     90                 <directory>src/main/resources</directory>
     91                 <includes>
     92                     <include>**/*.properties</include>
     93                     <include>**/*.yml</include>
     94                     <include>**/*.xml</include>
     95                     <include>**/*.p12</include>
     96                     <include>**/*.html</include>
     97                     <include>**/*.jpg</include>
     98                     <include>**/*.png</include>
     99                 </includes>
    100             </resource>
    101         </resources>
    102     </build>
    103 
    104 </project>

    修改yml.xml配置文件,进行RabbitMQ的相关配置,如下所示:

    1 # RabbitMQ服务主机名称
    2 spring.rabbitmq.addresses=192.168.110.133
    3 # 用户名
    4 spring.rabbitmq.username=admin
    5 # 密码
    6 spring.rabbitmq.password=admin
    7 # 虚拟主机
    8 spring.rabbitmq.virtual-host=/

    这里搞一个消息生产配置类,用来进行消息处理,如下所示:

     1 package com.demo.config;
     2 
     3 import org.springframework.amqp.core.Binding;
     4 import org.springframework.amqp.core.BindingBuilder;
     5 import org.springframework.amqp.core.DirectExchange;
     6 import org.springframework.amqp.core.Queue;
     7 import org.springframework.context.annotation.Bean;
     8 import org.springframework.context.annotation.Configuration;
     9 
    10 @Configuration
    11 public class RabbitMqConfig {
    12 
    13     public static final String EXCHANGE = "rabbitmq.exchange"; // 交换空间名称
    14     public static final String ROUTINGKEY = "rabbitmq.routingkey"; // 设置路由key
    15     public static final String QUEUE_NAME = "rabbitmq.queue"; // 设置队列名称
    16 
    17     /**
    18      * 根据路由键将队列和交换机绑定到一起
    19      * 
    20      * @param exchange
    21      * @param queue
    22      * @return
    23      */
    24     @Bean
    25     public Binding bindingExchangeQueue(DirectExchange exchange, Queue queue) {
    26         Binding binding = BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY);
    27         return binding;
    28     }
    29 
    30     /**
    31      * 使用直连的模式
    32      * 
    33      * @return
    34      */
    35     @Bean
    36     public DirectExchange getDirectExchage() {
    37         return new DirectExchange(EXCHANGE, true, true);
    38     }
    39 
    40     /**
    41      * 队列消息
    42      * 
    43      * @return
    44      */
    45     @Bean
    46     public Queue queue() {
    47         return new Queue(QUEUE_NAME);
    48     }
    49 
    50 }

    新建消息业务实现类,用于消息生产,如下所示:

     1 package com.demo.producer;
     2 
     3 import org.springframework.amqp.rabbit.core.RabbitTemplate;
     4 import org.springframework.beans.factory.annotation.Autowired;
     5 import org.springframework.stereotype.Service;
     6 
     7 import com.demo.config.RabbitMqConfig;
     8 
     9 @Service
    10 public class RabbitMqMessageProducer {
    11 
    12     @Autowired
    13     private RabbitTemplate rabbitTemplate;
    14 
    15     /**
    16      * 消息发送,将交换机和路由器进行绑定
    17      * 
    18      * @param msg
    19      */
    20     public void send(String msg) {
    21         this.rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY, msg);
    22     }
    23 
    24 }

    定义监听处理类,用于消息的消费,如下所示:

     1 package com.demo.consumer;
     2 
     3 import org.springframework.amqp.rabbit.annotation.RabbitListener;
     4 import org.springframework.stereotype.Service;
     5 
     6 @Service
     7 public class RabbitMqConsumer {
     8 
     9     /**
    10      * 进行消息的接受处理
    11      * 
    12      * @param text
    13      */
    14     @RabbitListener(queues = "rabbitmq.queue")
    15     public void receiveMessage(String text) {
    16         System.err.println("【*** 接受消息 ***】 " + text);
    17     }
    18 
    19 }

    此时就实现了与RabbitMQ消息组件的整合,同时在整个程序中只需要调用IMessageProducer接口中的send()方法就可以正常发送,而后会找到设置同样ROUTINGKEY的消费者进行消息消费。

     1 package com.demo.controller;
     2 
     3 import org.springframework.beans.factory.annotation.Autowired;
     4 import org.springframework.stereotype.Controller;
     5 import org.springframework.web.bind.annotation.RequestMapping;
     6 import org.springframework.web.bind.annotation.ResponseBody;
     7 
     8 import com.demo.producer.RabbitMqMessageProducer;
     9 
    10 @Controller
    11 public class RabbitMqController {
    12 
    13     @Autowired
    14     private RabbitMqMessageProducer rabbitMqMessageProducer;
    15 
    16     @RequestMapping(value = "/messageProducer")
    17     @ResponseBody
    18     public void findAll() {
    19         for (int i = 0; i < 10000; i++) {
    20             rabbitMqMessageProducer.send("rabbitMq producer message : " + i);
    21         }
    22     }
    23 
    24 }

    可以通过http://192.168.110.133:15672/观察,查看自己的生产消息和消费消息的情况,如下所示:

  • 相关阅读:
    NUTCH Exception in thread "Thread-12751" java.lang.OutOfMemoryError: PermGen space
    未登录词识别
    中文分词索引
    hadoop 存储空间满了
    nutch 生产者队列的大小如何控制 threadcount * 50
    nutch 采集到的数据与实际不符
    nutch 采集效率--设置采集间隔
    异常: http://www.ly.com/news/visa.html: java.io.IOException: unzipBestEffort returned null
    hbase 取多个版本数据
    JavaScript制作时钟特效
  • 原文地址:https://www.cnblogs.com/biehongli/p/13986781.html
Copyright © 2011-2022 走看看