zoukankan      html  css  js  c++  java
  • SpringBoot 整合 ActiveMq

      消息队列,用来处理开发中的高并发问题,通过线程池、多线程高效的处理并发任务。

      首先,需要下载一个ActiveMQ的管理端:我本地的版本是 activemq5.15.8,打开activemq5.15.8inwin64wrapper.exe客户端,可以根据localhost:端口号,访问ActiveMQ的管理界面。默认的用户名、密码都是admin。

      (一)pom 文件中添加 ActiveMq 依赖

    <dependency>
         <groupId>org.apache.activemq</groupId>
         <artifactId>activemq-core</artifactId>
         <version>5.7.0</version>
    </dependency>
    <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.7.0</version>
    </dependency>

      (二)创建线程池,通过线程池创建、管理线程,这样有利于线程的使用,避免了每次都要创建、关闭线程,浪费资源。

      需要注意这里需要把当前类、函数添加到 IOC 容器中,后面将会用到。

        1.@Configuration 在项目启动的时候,会加载当前类,构造bean。也可以使用@Component

        2.@Bean("taskExecutor"),定义了当前 bean 的名称,默认不添加名称的话,后面如果调用,则会根据对象的类型进行类型匹配。

    package com.common.utils.threadPool;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * 线程:在执行并发任务的时候,为提高效率,所以启用线程,但是需要创建、销毁
     * java 线程池,使用线程池,避免了每次使用都要创建一个线程从而影响了效率。而是在完成任务后,并不被销毁,还可以继续执行其他任务
     * @since 21:28 2019/4/9
     * @author hanyf
     */
    
    @Configuration
    public class ThreadPoolConfig {
    
        @Bean("taskExecutor")
        public ThreadPoolTaskExecutor taskExecutor(){
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            //线程池核心池的大小,默认线程池为0,需要等待任务去创建线程,如线程数量大于了核心池大小,就会将到达的线程放到【缓存队列中】
            taskExecutor.setCorePoolSize(50);
            //设置线程池能创建的最大线程数量
            taskExecutor.setMaxPoolSize(60);
            //线程没有执行任务时保存多长时间后终止
            //默认情况,只有线程数量>coreThreadNum 才会起作用,直到线程数量<coreThreadNum时结束
            taskExecutor.setKeepAliveSeconds(6*60);
            //缓存队列
            taskExecutor.setQueueCapacity(20);
            taskExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    System.err.println("线程池拒绝策略,正在执行...");
                }
            });
            return taskExecutor;
        }
    }
    ThreadPoolConfig

      (三)写一个公用的线程池调用工具类,方便以后调用;

        1.构造方法,在具体调用的时候,我们需要传入一个已定义好的线程池对象(引用)。

    package com.common.utils.threadPool;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    /**
     * 线程的具体调用方法
     * @since 9:03 2019/4/10
     * @author hanyf
     */
    public class ThreadPoolUtils {
        public static final Logger log = LoggerFactory.getLogger(ThreadPoolUtils.class);
        public ThreadPoolTaskExecutor executor;
    
        public ThreadPoolUtils(ThreadPoolTaskExecutor threadPoolTaskExecutor){
            this.executor = threadPoolTaskExecutor;
        }
    
        public void start(){
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        log.info("ThreadPoolUtils====>currentThreadPoolSize【"+ executor.getPoolSize()+"】,ActiveCount【"+executor.getActiveCount()+"】");
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    ThreadPoolUtils

      (四)配置ActiveMQ:先创建好Listener,用来监听队列的变化。在接下来的ActiveMQ对象构建中会指定队列的关联监听器,所以现在先创建好。

    package com.common.utils.activeMQ.listener;
    
    import com.common.utils.activeMQ.producer.DefaultProducerTemplate;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * 监听器,用来监听队列,并执行
     * @since 16:31 2019/4/10
     * @author hanyf
     */
    public class DefaultMessageListener implements MessageListener {
        private static final Logger log = LoggerFactory.getLogger(DefaultMessageListener.class);
        @Override
        public void onMessage(Message message) {
            TextMessage m = (TextMessage)message;
            try {
                System.out.println(m.getText());
                log.info("接收到消息啦...==========>>>"+m.getText());
            } catch (JMSException e) {
                e.printStackTrace();
                log.error("监听器错误...==========>>>"+e.getMessage());
            }
        }
    }

      (五)配置ActiveMQ :队列定义好队列名称、配置信息、绑定监听器

    package com.common.utils.activeMQ;
    
    import com.common.utils.activeMQ.listener.DefaultMessageListener;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.apache.activemq.pool.PooledConnectionFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jms.listener.DefaultMessageListenerContainer;
    import org.springframework.stereotype.Component;
    
    /**
     *  activeMQ 的相关配置
     * @since 9:36 2019/4/10
     * @author hanyf
     */
    @Component
    public class ActiveMQConfig {
        public static final String QUEUETEST = "hyfTestQueue";
    
        /**
         *  创建 activemq 的连接工厂对象
         * @since 15:24 2019/4/10
         * @author hanyf
         * @params  * @param null
         * @return
         */
        @Bean
        public ActiveMQConnectionFactory activeMQConnectionFactory(){
            return new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD
                    ,ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
        }
    
        /**
         *      PooledConnectionFactory 对象是用来管理 jms template在发送消息过程中,每次都要创建connection、session、producer 对象,将会耗费性能,
         *  使用 PooledConnectionFactory 来缓存 connection、session、producer 对象
         * @since 15:38 2019/4/10
         * @author hanyf
         * @params  * @param null
         * @return
         */
        @Bean
        public PooledConnectionFactory pooledConnectionFactory(){
            PooledConnectionFactory pooledConnectionFactory= new PooledConnectionFactory();
            pooledConnectionFactory.setConnectionFactory(this.activeMQConnectionFactory());
            pooledConnectionFactory.setMaximumActiveSessionPerConnection(500);
            pooledConnectionFactory.setMaxConnections(300);
            pooledConnectionFactory.setBlockIfSessionPoolIsFull(true);
            return pooledConnectionFactory;
        }
    
        /**
         * Spring 基于JMS监听器(有三种)
         * @since 10:45 2019/4/10
         * @author hanyf
         * @params  * @param null
         * @return
         */
        @Bean
        public DefaultMessageListenerContainer hyfTestQueueListener(){
            DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
            //监听的队列名称
            listenerContainer.setDestination(new ActiveMQQueue(QUEUETEST));
            listenerContainer.setConnectionFactory(this.activeMQConnectionFactory());
            //具体监听操作类
            listenerContainer.setMessageListener(new DefaultMessageListener());
            return listenerContainer;
        }
    
    }
    ActiveMQConfig

      (六)配置ActiveMQ:创建 Producer,具体的消息发送对象,需要使用线程池来发送信息

    package com.common.utils.activeMQ.producer;
    
    import org.apache.activemq.pool.PooledConnectionFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import javax.jms.*;
    import java.util.concurrent.Executor;
    
    /**
     * 自己创建的默认生产者模板
     *  生产者 :顾名思义就是用来生产消息的,
     * 在这里使用并发的方式来进行发送、生产消息
     * @since 11:04 2019/4/10
     * @author hanyf
     */
    @Component
    public class DefaultProducerTemplate {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultProducerTemplate.class);
    
        @Resource
        private PooledConnectionFactory pooledConnectionFactory;
    
        @Resource(name = "taskExecutor")
        public Executor threadExecutor;
    
        public boolean send(String destinationName,int priority,String message){
            try{
                threadExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        Connection connection = null;
                        Session session = null;
    
                        long start = System.currentTimeMillis();
                        try {
                            //从连接池工厂获取一个连接
                            connection = pooledConnectionFactory.createConnection();
                            //第一个参数:非事务类型   第二个参数:表示消息的确认类型
                            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
                            //创建发送的 mq 目标队列
                            Destination destination = session.createQueue(destinationName);
                            //创建 producer
                            MessageProducer producer = session.createProducer(destination);
                            //是否持久化
                            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                            //优先级
                            producer.setPriority(priority);
    
                            Message _m = session.createTextMessage(message);
                            producer.send(_m);
    
                            log.info("producer send =============>>>【"+message+"】,duration:"+ (System.currentTimeMillis()-start)+"ms" );
                        } catch (JMSException e) {
                            e.printStackTrace();
                            log.error("producer send =============>>>" + e.getMessage());
                        }
                        finally {
                            try{
                                if(connection != null)connection.close();
                                if (session != null) session.close();
                            }
                            catch (Exception e){
                                e.printStackTrace();
                                log.error("producer send =============>>>" + e.getMessage());
                            }
                        }
                    }
                });
            }
            catch (Exception e){
                e.printStackTrace();
                return false;
            }
            return true;
        }
    }
    DefaultProducerTemplate

      (七)配置ActiveMQ:调用呐,自己编写一个调用类:引入线程池、producer 

    @Resource(name = "taskExecutor")
        private ThreadPoolTaskExecutor executor;
    
    @Resource
        private DefaultProducerTemplate template;

    //具体的调用类
    @RequestMapping("doThread.json")
    public MessageOutput doThread(@ModelAttribute QuartzJobManage manage){
    int n = 40;
    while(n>0){
    new ThreadPoolUtils(this.executor).start();
    n--;
    }
    return new MessageOutput("200");
    }

     结果:

      之前

      发送:

      ActiveMQ管理端:

    Over。。。

  • 相关阅读:
    JavaScript异步编程1——Promise的初步使用
    Pailler
    ElGamal
    RSA
    密码基础
    博客园中:为文章添加版权保护
    DCT实现水印嵌入与提取(带攻击)
    量子:基于EPR块对的两步量子直接通信
    量子:拜占庭协议和测谎问题的量子协议的实验证明
    liunx:网络命令
  • 原文地址:https://www.cnblogs.com/mysouler/p/10795204.html
Copyright © 2011-2022 走看看