zoukankan      html  css  js  c++  java
  • SpringBoot+ActiveMQ(整合)

    1.POM文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.4.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.atguigu</groupId>
        <artifactId>springboot-activemq</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>springboot-activemq</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <!--activemq依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>
    
            <!--消息队列连接池-->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
                <version>5.12.0</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <!--log快捷方式插件-->
            <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.16.12</version>
            </dependency>
    
            <!--spring boot 热部署功能-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.45</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>

    2.yml文件

    spring:
      ## URL of the ActiveMQ broker. Auto-generated by default. For instance `tcp://localhost:61616`
      # failover:(tcp://localhost:61616,tcp://localhost:61617)
      # tcp://localhost:61616
      jms:
        pub-sub-domain: true
      activemq:
        broker-url: failover:tcp://192.168.68.137:61616
        in-memory: true
        pool:
          enabled: false
        packages:
          trust-all: true
        user: admin
        password: admin
    
    server:
      port: 9011

    3.定义容器与属性文件

    package com.atguigu.springbootactivemq.config;
    
    import org.apache.activemq.command.ActiveMQQueue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.jms.Queue;
    
    @Configuration
    public class BeanConfig {
        //定义存放消息
        @Bean
        public Queue queue(){
            return new ActiveMQQueue("ActiveMQQueue");
        }
    }

    自定义配置(获取yml文件的mq属性)

    package com.atguigu.springbootactivemq.config;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.RedeliveryPolicy;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jms.annotation.EnableJms;
    import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    import org.springframework.jms.connection.CachingConnectionFactory;
    import org.springframework.jms.support.destination.DestinationResolver;
    
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    
    /**
     * 自定义配置JMS
     *
     * @author : caigq
     * @version : 1.0
     * @date : 2018-06-01 9:32
     */
    @Configuration
    @EnableJms
    public class MyJmsListenerConfigurer {
    
        @Value("${spring.activemq.broker-url}")
        private String activeMQURL;
        @Value("${spring.activemq.user}")
        private String userName;
        @Value("${spring.activemq.password}")
        private String password;
    
        /**
         * JMS 队列的监听容器工厂
         */
        @Bean(name = "MyjmsQueueListener")
        public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ConnectionFactory jmsConnectionFactory) {
            DefaultJmsListenerContainerFactory factory =
                    new DefaultJmsListenerContainerFactory();
    
            CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(jmsConnectionFactory);
    
            factory.setConnectionFactory(cachingConnectionFactory);
    
            factory.setSessionTransacted(true);
            factory.setConcurrency("5");
            DestinationResolver destinationResolver = (session, destinationName, pubSubDomain) -> {
                Destination destination = session.createQueue(destinationName);
                return destination;
            };
    
            factory.setDestinationResolver(destinationResolver);
            return factory;
        }
    
        @Bean(name = "MyjmsTopicListener")
        public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory(ConnectionFactory jmsConnectionFactory) {
            DefaultJmsListenerContainerFactory factory =
                    new DefaultJmsListenerContainerFactory();
    
            CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(jmsConnectionFactory);
            factory.setConnectionFactory(cachingConnectionFactory);
    
            factory.setPubSubDomain(true);
            factory.setSessionTransacted(true);
            factory.setConcurrency("6");
    
            return factory;
        }
    
        @Bean
        public ConnectionFactory jmsConnectionFactory(){
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    
            connectionFactory.setBrokerURL(activeMQURL);
            connectionFactory.setUserName(userName);
            connectionFactory.setPassword(password);
            connectionFactory.setTrustAllPackages(true);
            connectionFactory.setMaxThreadPoolSize(ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE);
            RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    
            //定义ReDelivery(重发机制)机制 ,重发时间间隔是100毫秒,最大重发次数是3次
            //是否在每次尝试重新发送失败后,增长这个等待时间
            redeliveryPolicy.setUseExponentialBackOff(true);
            //重发次数,默认为6次   这里设置为1次
            redeliveryPolicy.setMaximumRedeliveries(1);
            //重发时间间隔,默认为1秒
            redeliveryPolicy.setInitialRedeliveryDelay(1000);
            //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
            redeliveryPolicy.setBackOffMultiplier(2);
            //最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第
            //二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。
            redeliveryPolicy.setMaximumRedeliveryDelay(1000);
            connectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    
            return connectionFactory;
        }
    
    }

    4.创建消息的监听类(消费者)

    package com.atguigu.springbootactivemq.consumer;
    import com.atguigu.springbootactivemq.pojo.student;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    
    @Component
    @Slf4j
    public class QueueAddFriendReceiver {
        @JmsListener(destination = "QUEUE_RECEIVE_ADD_FIREND", containerFactory = "MyjmsQueueListener") //红色为监听的队列名称
        public void receiveAddFriend(student student) {
            System.out.println("啦啦啦啦"+student.toString());
                    log.error("receiveAddFriend Exception:{}");
    
    
        }
    }

    5.创建消息提供者

    package com.atguigu.springbootactivemq.controller;
    
    
    import com.atguigu.springbootactivemq.pojo.student;
    import com.atguigu.springbootactivemq.produce.QueueProducer;
    import org.springframework.beans.factory.annotation.Autowired;
    
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    
    @RestController
    public class ProviderController {
        @Autowired
        private QueueProducer queueProducer;
        //注入存放消息的队列,用于下列方法一
    
        @GetMapping("/value")
        public String value() {
            String queueName="QUEUE_RECEIVE_ADD_FIREND"; //自定义队列名称
            student student = new student();
            student.setName("小明");
            student.setAge(11);
            queueProducer.sendObjectMessage(queueName, student);   //发送到MQS
            return "消息已经发送";
        }
    
        }

    6.消息发送的工具类

    package com.atguigu.springbootactivemq.produce;
    
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.activemq.command.ActiveMQMapMessage;
    import org.apache.activemq.command.ActiveMQObjectMessage;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    import org.springframework.stereotype.Component;
    
    import javax.jms.Destination;
    import java.io.Serializable;
    import java.util.Date;
    
    /**
     * 队列模式提供者
     */
    @Component
    @Slf4j
    public class QueueProducer {
    
        /**
         * MQ jms实例
         **/
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        @Autowired
        private ThreadPoolTaskExecutor threadPoolTaskExecutor;
    
        public void sendMapMessage(String queueName, Object message) {
            threadPoolTaskExecutor.submit(() -> {
                try {
                    Destination destination = new ActiveMQQueue(queueName);
                    // 这里定义了Queue的key
                    ActiveMQMapMessage mqMapMessage = new ActiveMQMapMessage();
                    mqMapMessage.setJMSDestination(destination);
                    mqMapMessage.setObject("result", message);
                    this.jmsMessagingTemplate.convertAndSend(destination, mqMapMessage);
                } catch (Throwable e) {
                    log.error("{}", e);
                }
            });
        }
    
        public void sendObjectMessage(String queueName, Object message) {
            threadPoolTaskExecutor.submit(() -> {
                try {
                    log.info("发送添加好友请求:{}",message.toString());
                    Destination destination = new ActiveMQQueue(queueName);
                    // 这里定义了Queue的key
                    ActiveMQObjectMessage mqObjectMessage = new ActiveMQObjectMessage();
                    mqObjectMessage.setJMSDestination(destination);
                    mqObjectMessage.setObject((Serializable) message);
                    this.jmsMessagingTemplate.convertAndSend(destination, mqObjectMessage);
                } catch (Throwable e) {
                    log.error("{}", e);
                }
            });
        }
    
        public void sendObjectMessage(Destination destination, Object message) {
            threadPoolTaskExecutor.submit(() -> {
                Date date = new Date();
                try {
                    // 这里定义了Queue的key
                    log.info("【queue-->send】:activeCount={},queueCount={},completedTaskCount={},taskCount={}", threadPoolTaskExecutor.getThreadPoolExecutor().getActiveCount(), threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size(), threadPoolTaskExecutor.getThreadPoolExecutor().getCompletedTaskCount(), threadPoolTaskExecutor.getThreadPoolExecutor().getTaskCount());
    
                    ActiveMQObjectMessage mqObjectMessage = new ActiveMQObjectMessage();
                    mqObjectMessage.setJMSDestination(destination);
                    mqObjectMessage.setObject((Serializable) message);
                    this.jmsMessagingTemplate.convertAndSend(destination, mqObjectMessage);
                } catch (Throwable e) {
                    log.error("{}", e);
                }
            });
        }
    
    }

    7.结构图

     

    8.MQ的可视化界面

     

  • 相关阅读:
    canvas裁剪图片
    Dubbo
    SpringBoot请求参数传递与接收
    神经网络量化入门--Add和Concat
    防火墙如何设置特定IP访问指定端口
    如何解释 On-Premises、IaaS、PaaS、SaaS、 Serverless 的区别?
    screw一键生成数据库文档,无需重复CV大法
    获取application.properties中配置的路径
    《《《发布项目引入的jar包,运行不报错打包发布项目时候报错
    IDEA maven mvn install无法引用手动导入的jar包的解决方式
  • 原文地址:https://www.cnblogs.com/telwanggs/p/14982650.html
Copyright © 2011-2022 走看看