zoukankan      html  css  js  c++  java
  • SpringBoot整合ActiveMQ和开启持久化

    一、点对点

    1、提供者目录展示

      

    2、导入依赖

      

    <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- spring boot web支持:mvc,aop... -->
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

    3、生产者application.yml配置文件

      

    4、生产者MyProvider,通过JMSTemplate模板发送消息

    package com.zn.p2p;
    
    import org.apache.activemq.command.ActiveMQQueue;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    
    /**
     * 生产者,通过JMSTemplate模板发送消息
     */
    @Component
    public class MyProvider {
        //注入JMSTemplate模板
        @Resource
        private JmsTemplate jmsTemplate;
        //创建方法
        public void sendMessage(){
            //点对点,创建队列
            ActiveMQQueue queue=new ActiveMQQueue("SpringBoot_Queue");
            //发送消息
            jmsTemplate.convertAndSend(queue,"生产者产生的消息!");
        }
    }

    5、客户端访问ProviderController

    package com.zn.p2p;
    
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    
    /**
     * 客户端访问的方法
     */
    @RestController
    public class ProvideController {
        @Resource
        private MyProvider provider;
    
        @RequestMapping("/sendMessage")
        public String sendMessage(){
            provider.sendMessage();
            return "sucess!!";
        }
    }

    6、provider启动类StartProvider

    package com.zn;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class StartProvider {
        public static void main(String[] args) {
            SpringApplication.run(StartProvider.class,args);
        }
    }

    7、消费者目录展示

      

    8、导入依赖

      

    <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- spring boot web支持:mvc,aop... -->
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

    9、消费者application.yml配置文件

      

    10、consumer启动类StartP2PConsumer

    package com.zn;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.jms.annotation.JmsListeners;
    
    import javax.jms.JMSException;
    import javax.jms.TextMessage;
    
    @SpringBootApplication
    public class StartP2PConsumer {
        public static void main(String[] args) {
            SpringApplication.run(StartP2PConsumer.class,args);
        }
    
    
        //消费者消费
        @JmsListener(destination = "SpringBoot_Queue")
        public void getMessage(TextMessage message) throws JMSException {
            System.out.println("消费者获取到消息:"+message.getText());
        }
    }

    11、启动提供者并访问

      

      

    12、启动消费者

      

      

    二、发布订阅

    1、消费者目录展示

      

    2、导入依赖

      

    <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- spring boot web支持:mvc,aop... -->
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

    3、消费者application.yml配置文件

      

    4、consumer启动类StartTopicConsumer

    package com.zn;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    import org.springframework.jms.config.JmsListenerContainerFactory;
    
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.TextMessage;
    
    @SpringBootApplication
    public class StartTopicConsumer {
        public static void main(String[] args) {
            SpringApplication.run(StartTopicConsumer.class,args);
        }
    
    
        //springboot默认只配置queue类型消息,如果要使用topic类型的消息,则需要配置该bean
        @Bean
        public JmsListenerContainerFactory jmsTopicListenerContainerFactory(ConnectionFactory connectionFactory){
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            //这里必须设置为true,false则表示是queue类型
            factory.setPubSubDomain(true);
            return factory;
        }
    
    
        //消费者消费  destination队列或者主题的名字
        @JmsListener(destination = "SpringBoot_Topic",containerFactory = "jmsTopicListenerContainerFactory")
        public void getMessage(TextMessage message) throws JMSException {
    
            System.out.println("消费者获取到消息:"+message.getText());
        }
    }

    5、提供者目录展示

      

    6、导入依赖 

      

    <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- spring boot web支持:mvc,aop... -->
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

    7、TopicProvider

    package com.zn.topic;
    
    import org.apache.activemq.command.ActiveMQQueue;
    import org.apache.activemq.command.ActiveMQTopic;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    
    /**
     * 生产者,通过JMSTemplate模板发送消息
     */
    @Component
    public class TopicProvider {
        //注入JMSTemplate模板
        @Resource
        private JmsTemplate jmsTemplate;
        //创建方法
        public void sendMessage(){
            //发布订阅,创建主题
            ActiveMQTopic topic=new ActiveMQTopic("SpringBoot_Topic");
            //springboot默认是queue
            jmsTemplate.setPubSubDomain(true);
            //发送消息
            jmsTemplate.convertAndSend(topic,"生产者产生topic的消息");
        }
    }

    8、ProvideController

    package com.zn.controller;
    
    import com.zn.p2p.MyProvider;
    import com.zn.topic.TopicProvider;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    
    /**
     * 客户端访问的方法
     */
    @RestController
    public class ProvideController {
    
        //topic调用
        @Resource
        private TopicProvider topicProvider;
    
        @RequestMapping("/sendMessage")
        public String sendMessage(){
            topicProvider.sendMessage();
            return "success";
        }
    }

    9、启动消费者订阅消息

      

    10、启动生产者

      

       

     11、消费者控制台效果

      

    三、SpringBoot整合ActiveMQ开启持久化---队列持久化

    1、步骤1、2、3同上p2p

    4、生产者MyProvider,通过JMSTemplate模板发送消息

    package com.zn.p2p;
    
    import org.apache.activemq.command.ActiveMQQueue;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    
    /**
     * 生产者,通过JMSTemplate模板发送消息
     */
    @Component
    public class MyProvider {
        //注入JMSTemplate模板
        @Resource
        private JmsTemplate jmsTemplate;
        //创建方法
        public void sendMessage(){
    
            //开启持久化
            jmsTemplate.setDeliveryMode(2);
            jmsTemplate.setExplicitQosEnabled(true);
            jmsTemplate.setDeliveryPersistent(true);
    
    
            //点对点,创建队列
            ActiveMQQueue queue=new ActiveMQQueue("SpringBoot_Queue");
            //发送消息
            jmsTemplate.convertAndSend(queue,"生产者产生的消息");
        }
    }

    5、步骤5、6、7、8、9、10同上

    11、开启生产者

      

      

      

    12、开启消费者

      

         

      这里可以试一下让ActiveMQ服务器宕机,然后让服务器重启,看看数据有没有做持久化的操作

    13、结论:当服务器宕机,重启服务器之后,没有被消费的消息依然在数据库中,这样就做到了持久化操作。

    四、SpringBoot整合ActiveMQ开启持久化---主题持久化

      不会进行数据消费的,但是数据可以持久化

    1、步骤1、2、3同上

    4、consumer启动类StartTopicConsumer

    package com.zn;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    import org.springframework.jms.config.JmsListenerContainerFactory;
    import org.springframework.jms.listener.DefaultMessageListenerContainer;
    
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    @SpringBootApplication
    public class StartTopicConsumer {
        public static void main(String[] args) {
            SpringApplication.run(StartTopicConsumer.class,args);
        }
    @Bean(name
    = "topicListenerFactory") public JmsListenerContainerFactory<DefaultMessageListenerContainer> topicListenerFactory(ConnectionFactory connectionFactory){ DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setSubscriptionDurable(true);// Set this to "true" to register a durable subscription, factory.setClientId("A"); factory.setConnectionFactory(connectionFactory); return factory; } //消费者消费 destination队列或者主题的名字 @JmsListener(destination = "boot_topic",containerFactory = "topicListenerFactory") public void getMessage(TextMessage message, Session session) throws JMSException { System.out.println("消费者获取到消息:"+message.getText()); } }

    9、步骤5、6、7、8同上

    10、启动消费者

      

    11、启动生产者

      

      

       

    12、消费者控制台

      

  • 相关阅读:
    Spread Studio中文支持图解
    C#反射实例No.1
    关于XML序列化的简单例子
    将数据结构类型序列化和反序列化(BinaryFormatter类)
    获取计算机名称,IP,MAC地址
    原始套接字发送自定义IP包
    IP包首部格式
    struct和byte[]相互转换(用Marshal类实现)
    图片保存到数据库和从数据库读取图片并显示(C#)
    单词分析器源码
  • 原文地址:https://www.cnblogs.com/Zzzzn/p/12309539.html
Copyright © 2011-2022 走看看