zoukankan      html  css  js  c++  java
  • 1.springboot+ActiveMQ

    1.项目结构如下

     pom.xml文件如下

     1  <dependencies>
     2     <dependency>
     3       <groupId>junit</groupId>
     4       <artifactId>junit</artifactId>
     5       <version>4.12</version>
     6     </dependency>
     7     <dependency>
     8       <groupId>org.springframework.boot</groupId>
     9       <artifactId>spring-boot-starter-activemq</artifactId>
    10     </dependency>
    11 
    12     <!--消息队列连接池-->
    13     <!--<dependency>-->
    14       <!--<groupId>org.apache.activemq</groupId>-->
    15       <!--<artifactId>activemq-pool</artifactId>-->
    16       <!--<version>5.15.0</version>-->
    17     <!--</dependency>-->
    18   </dependencies>
    pom.xml

    2.创建消息提供者

    pom.xml文件如下

     1  <dependencies>
     2     <dependency>
     3       <groupId>junit</groupId>
     4       <artifactId>junit</artifactId>
     5       <version>4.12</version>
     6     </dependency>
     7     <dependency>
     8       <groupId>org.springframework.boot</groupId>
     9       <artifactId>spring-boot-starter-activemq</artifactId>
    10     </dependency>
    11 
    12     <!--消息队列连接池-->
    13     <!--<dependency>-->
    14       <!--<groupId>org.apache.activemq</groupId>-->
    15       <!--<artifactId>activemq-pool</artifactId>-->
    16       <!--<version>5.15.0</version>-->
    17     <!--</dependency>-->
    18   </dependencies>
    pom.xml

     2.1 创建属性文件如下:

     1 server.port=8080
     2 server.servlet.context-path=/pro
     3 spring.activemq.user=admin
     4 spring.activemq.password=admin
     5 #集群配置
     6 #spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)
     7 #整合jms测试,安装在别的机器,防火墙和端口号记得开放
     8 spring.activemq.broker-url=tcp://192.168.117.152:61617
     9 queueName=publish.queue
    10 topicName=publish.topic
    11 
    12 
    13 
    14 #ActiveMQ是消息队列技术,为解决高并发问题而生
    15 #ActiveMQ生产者消费者模型(生产者和消费者可以跨平台、跨系统)
    16 #ActiveMQ支持如下两种消息传输方式
    17 #点对点模式,生产者生产了一个消息,只能由一个消费者进行消费
    18 #发布/订阅模式,生产者生产了一个消息,可以由多个消费者进行消费
    application.properties

    2.2 定义消息队列类ActiveMQConfig.java

     1 package cn.kgc.config;
     2 
     3 import org.apache.activemq.ActiveMQConnectionFactory;
     4 import org.apache.activemq.command.ActiveMQQueue;
     5 import org.apache.activemq.command.ActiveMQTopic;
     6 import org.springframework.beans.factory.annotation.Value;
     7 import org.springframework.context.annotation.Bean;
     8 import org.springframework.context.annotation.Configuration;
     9 import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    10 import org.springframework.jms.config.JmsListenerContainerFactory;
    11 
    12 import javax.jms.Queue;
    13 import javax.jms.Topic;
    14 //定义消息队列
    15 @Configuration
    16 public class ActiveMQConfig {
    17 
    18     @Value("${queueName}")
    19     private String queueName;
    20 
    21     @Value("${topicName}")
    22     private String topicName;
    23 
    24     @Value("${spring.activemq.user}")
    25     private String usrName;
    26 
    27     @Value("${spring.activemq.password}")
    28     private  String password;
    29 
    30     @Value("${spring.activemq.broker-url}")
    31     private  String brokerUrl;
    32 
    33     //定义存放消息的队列
    34     @Bean
    35     public Queue queue(){
    36         return new ActiveMQQueue(queueName);
    37     }
    38 
    39     @Bean
    40     public Topic topic(){
    41         return new ActiveMQTopic(topicName);
    42     }
    43 
    44     @Bean
    45     public ActiveMQConnectionFactory connectionFactory() {
    46         return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
    47     }
    48 
    49     @Bean
    50     public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
    51         DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
    52         bean.setConnectionFactory(connectionFactory);
    53         return bean;
    54     }
    55 
    56     @Bean
    57     public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
    58         DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
    59         //设置为发布订阅方式, 默认情况下使用的生产消费者方式
    60         bean.setPubSubDomain(true);
    61         bean.setConnectionFactory(connectionFactory);
    62         return bean;
    63     }
    64 }
    ActiveMQConfig.java

    2.3 定义消息队列类PublishController.java

     1 package cn.kgc.controller;
     2 
     3 import org.springframework.beans.factory.annotation.Autowired;
     4 import org.springframework.jms.annotation.JmsListener;
     5 import org.springframework.jms.core.JmsMessagingTemplate;
     6 import org.springframework.web.bind.annotation.RequestMapping;
     7 import org.springframework.web.bind.annotation.RequestParam;
     8 import org.springframework.web.bind.annotation.RestController;
     9 
    10 import javax.jms.Queue;
    11 import javax.jms.Topic;
    12 
    13 @RestController
    14 @RequestMapping("/publish")
    15 public class PublishController {
    16 
    17     //注入springboot封装的工具类
    18     @Autowired
    19     private JmsMessagingTemplate jms;
    20 
    21     //注入存放消息的队列,用于下列方法一
    22     @Autowired
    23     private Queue queue;
    24 
    25     @Autowired
    26     private Topic topic;
    27 
    28     @RequestMapping("/queue")
    29     public String queue(){
    30 
    31         for (int i = 0; i < 10 ; i++){
    32 
    33             //方法一:添加消息到消息队列
    34             jms.convertAndSend(queue, "queue"+i);
    35         }
    36 
    37         return "queue 发送成功";
    38     }
    39 
    40     @JmsListener(destination = "out.queue")
    41     public void consumerMsg(String msg){
    42         System.out.println(msg);
    43     }
    44 
    45     @RequestMapping("/topic")
    46     public String topic(){
    47 
    48         for (int i = 0; i < 10 ; i++){
    49             jms.convertAndSend(topic, "topic"+i);
    50         }
    51 
    52         return "topic 发送成功";
    53     }
    54 }
    PublishController.java

    2.4 启动类

     1 package cn.kgc;
     2 
     3 import org.springframework.boot.SpringApplication;
     4 import org.springframework.boot.autoconfigure.SpringBootApplication;
     5 import org.springframework.jms.annotation.EnableJms;
     6 //启动消息队列
     7 @EnableJms
     8 @SpringBootApplication
     9 public class ProviderApplication {
    10 
    11     public static void main(String[] args) {
    12         SpringApplication.run(ProviderApplication.class, args);
    13     }
    14 
    15 }
    ProviderApplication.java

    3.创建调用者项目a

    3.1 编辑属性文件application.properties

    1 server.port=8081
    2 server.servlet.context-path=/cona
    3 spring.activemq.user=admin
    4 spring.activemq.password=admin
    5 spring.activemq.broker-url=tcp://192.168.117.152:61617
    6 queueName=publish.queue
    7 topicName=publish.topic
    application.properties

    3.2 消息队列参数类ActiveMQConfig.java

     1 package cn.kgc.config;
     2 
     3 import org.apache.activemq.ActiveMQConnectionFactory;
     4 import org.apache.activemq.command.ActiveMQQueue;
     5 import org.apache.activemq.command.ActiveMQTopic;
     6 import org.springframework.beans.factory.annotation.Value;
     7 import org.springframework.context.annotation.Bean;
     8 import org.springframework.context.annotation.Configuration;
     9 import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    10 import org.springframework.jms.config.JmsListenerContainerFactory;
    11 
    12 import javax.jms.Queue;
    13 import javax.jms.Topic;
    14 
    15 @Configuration
    16 public class ActiveMQConfig {
    17 
    18     @Value("${queueName}")
    19     private String queueName;
    20 
    21     @Value("${topicName}")
    22     private String topicName;
    23 
    24     @Value("${spring.activemq.user}")
    25     private String usrName;
    26 
    27     @Value("${spring.activemq.password}")
    28     private  String password;
    29 
    30     @Value("${spring.activemq.broker-url}")
    31     private  String brokerUrl;
    32 
    33     @Bean
    34     public Queue queue(){
    35         return new ActiveMQQueue(queueName);
    36     }
    37 
    38     @Bean
    39     public Topic topic(){
    40         return new ActiveMQTopic(topicName);
    41     }
    42 
    43     @Bean
    44     public ActiveMQConnectionFactory connectionFactory() {
    45         return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
    46     }
    47 
    48     @Bean
    49     public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
    50         DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
    51         bean.setConnectionFactory(connectionFactory);
    52         return bean;
    53     }
    54 
    55     @Bean
    56     public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
    57         DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
    58         //设置为发布订阅方式, 默认情况下使用的生产消费者方式
    59         bean.setPubSubDomain(true);
    60         bean.setConnectionFactory(connectionFactory);
    61         return bean;
    62     }
    63 }
    ActiveMQConfig.java

    3.3 队列监听

     1 package cn.kgc.listener;
     2 
     3 import org.springframework.jms.annotation.JmsListener;
     4 import org.springframework.messaging.handler.annotation.SendTo;
     5 import org.springframework.stereotype.Component;
     6 
     7 @Component
     8 public class QueueListener {
     9     @JmsListener(destination = "publish.queue", containerFactory = "jmsListenerContainerQueue")
    10     @SendTo("out.queue")
    11     public String receive(String text){
    12         System.out.println("QueueListener: consumer-a 收到一条信息: " + text);
    13         return "consumer-a received : " + text;
    14     }
    15 }
    QueueListener.java

    3.4 消息主题监听

     1 package cn.kgc.listener;
     2 
     3 import org.springframework.jms.annotation.JmsListener;
     4 import org.springframework.stereotype.Component;
     5 
     6 @Component
     7 public class TopocListener {
     8 
     9     @JmsListener(destination = "publish.topic", containerFactory = "jmsListenerContainerTopic")
    10     public void receive(String text){
    11         System.out.println("TopicListener: consumer-a 收到一条信息: " + text);
    12     }
    13 }
    TopocListener.java

    3.5 控制类 PublishController.java

     1 package cn.kgc.controller;
     2 
     3 import org.springframework.beans.factory.annotation.Autowired;
     4 import org.springframework.jms.annotation.JmsListener;
     5 import org.springframework.jms.core.JmsMessagingTemplate;
     6 import org.springframework.web.bind.annotation.RequestMapping;
     7 import org.springframework.web.bind.annotation.RestController;
     8 
     9 import javax.jms.Queue;
    10 import javax.jms.Topic;
    11 
    12 @RestController
    13 @RequestMapping("/publish")
    14 public class PublishController {
    15     @Autowired
    16     private JmsMessagingTemplate jms;
    17 
    18     @Autowired
    19     private Queue queue;
    20 
    21     @Autowired
    22     private Topic topic;
    23 
    24     @RequestMapping("/queue")
    25     public String queue(){
    26 
    27         for (int i = 0; i < 10 ; i++){
    28             jms.convertAndSend(queue, "queue"+i);
    29         }
    30 
    31         return "queue 发送成功";
    32     }
    33 
    34     @JmsListener(destination = "out.queue")
    35     public void consumerMsg(String msg){
    36         System.out.println(msg);
    37     }
    38 
    39     @RequestMapping("/topic")
    40     public String topic(){
    41 
    42         for (int i = 0; i < 10 ; i++){
    43             jms.convertAndSend(topic, "topic"+i);
    44         }
    45 
    46         return "topic 发送成功";
    47     }
    48 }
    PublishController.java

    4.创建调用者项目b

    4.1 编辑属性文件application.properties

    1 server.port=8082
    2 server.servlet.context-path=/conb
    3 spring.activemq.user=admin
    4 spring.activemq.password=admin
    5 spring.activemq.broker-url=tcp://192.168.117.152:61617
    6 queueName=publish.queue
    7 topicName=publish.topic
    application.properties

    4.2 消息队列参数类ActiveMQConfig.java

    ActiveMQConfig.java

    4.3 队列监听

     1 package cn.kgc.listener;
     2 
     3 import org.springframework.jms.annotation.JmsListener;
     4 import org.springframework.messaging.handler.annotation.SendTo;
     5 import org.springframework.stereotype.Component;
     6 
     7 @Component
     8 public class QueueListener {
     9     @JmsListener(destination = "publish.queue", containerFactory = "jmsListenerContainerQueue")
    10     @SendTo("out.queue")
    11     public String receive(String text){
    12         System.out.println("QueueListener: consumer-b 收到一条信息: " + text);
    13         return "consumer-a received : " + text;
    14     }
    15 }
    QueueListener.java

    4.4 消息主题监听

     1 package cn.kgc.listener;
     2 
     3 import org.springframework.jms.annotation.JmsListener;
     4 import org.springframework.stereotype.Component;
     5 
     6 @Component
     7 public class TopocListener {
     8 
     9     @JmsListener(destination = "publish.topic", containerFactory = "jmsListenerContainerTopic")
    10     public void receive(String text){
    11         System.out.println("TopicListener: consumer-b 收到一条信息: " + text);
    12     }
    13 }
    TopocListener.java

    4.5 控制类 PublishController.java

     1 package cn.kgc.controller;
     2 
     3 import org.springframework.beans.factory.annotation.Autowired;
     4 import org.springframework.jms.annotation.JmsListener;
     5 import org.springframework.jms.core.JmsMessagingTemplate;
     6 import org.springframework.web.bind.annotation.RequestMapping;
     7 import org.springframework.web.bind.annotation.RestController;
     8 
     9 import javax.jms.Queue;
    10 import javax.jms.Topic;
    11 
    12 @RestController
    13 @RequestMapping("/publish")
    14 public class PublishController {
    15     @Autowired
    16     private JmsMessagingTemplate jms;
    17 
    18     @Autowired
    19     private Queue queue;
    20 
    21     @Autowired
    22     private Topic topic;
    23 
    24     @RequestMapping("/queue")
    25     public String queue(){
    26 
    27         for (int i = 0; i < 10 ; i++){
    28             jms.convertAndSend(queue, "queue"+i);
    29         }
    30 
    31         return "queue 发送成功";
    32     }
    33 
    34     @JmsListener(destination = "out.queue")
    35     public void consumerMsg(String msg){
    36         System.out.println(msg);
    37     }
    38 
    39     @RequestMapping("/topic")
    40     public String topic(){
    41 
    42         for (int i = 0; i < 10 ; i++){
    43             jms.convertAndSend(topic, "topic"+i);
    44         }
    45 
    46         return "topic 发送成功";
    47     }
    48 }
    PublishController.java

    请求:http://localhost:8080/pro/publish/queue

    Number Of Pending Messages:消息队列中待处理的消息
    Number Of Consumers:消费者的数量
    Messages Enqueued:累计进入过消息队列的总量
    Messages Dequeued:累计消费过的消息总量

     

     请求:http://localhost:8080/pro/publish/topic

     

     

  • 相关阅读:
    01(b)无约束优化(准备知识)
    01(a)一元函数_多元函数_无约束极值问题的求解
    谱聚类
    分类算法
    Implementing EM for Gaussian mixtures
    0-1背包问题1
    ML_推荐系统与降维
    Machine Learning: Clustering & Retrieval机器学习之聚类和信息检索(框架)
    Linux命令
    Udacity_机器学习
  • 原文地址:https://www.cnblogs.com/holly8/p/11576571.html
Copyright © 2011-2022 走看看