zoukankan      html  css  js  c++  java
  • Callable+ThreadPoolExecutor实现多线程并发并获得返回值(转)

    出处:https://blog.csdn.net/kity9420/article/details/80740466

    前言
      经常会遇到一些性能问题,比如调用某个接口,可能要循环调用100次,并且需要拿到每一次调用的返回结果,通常我们都是放在for循环中一次次的串行调用,这种方式可想而知道有多慢,那怎么解决这个问题呢?

    多线程
      为了解决以上问题,我使用的方式是多线程。多线程常规的有两种实现方式,即继承Tread类,实现Runnable接口,但是这两种实现方式,有一个共同的问题,就是没有返回值,对于我们来说,获得每个线程的返回值,是个很困难的问题,因此不能用Tread类或Runnable接口,我用的是Callable和ThreadPoolExecutor,Callable的process方法可以允许有返回值,ThreadPoolExecutor的invokeAll或submit方法可以拿到线程的执行结果

    案例
      假设需要给100个用户发送邮件,并需要每个用户的返回结果,先看下代码结构

                   

      CallableTemplate.java 

    package com.gdut.thread.multiThread;
    
    import java.util.concurrent.Callable;
    
    /**
     * 多线程模板类
     * @author yang.han
     *
     * @param <V>
     */
    public abstract class CallableTemplate<V> implements Callable<V>{
        
        /**
         * 前置处理,子类可以Override该方法
         */
        public void beforeProcess() {
            System.out.println("before process");
        }
        
        /**
         * 处理业务逻辑的方法,需要子类去Override
         * @param <V>
         * @return
         */
        public abstract V process();
        
        /**
         * 后置处理,子类可以Override该方法
         */
        public void afterProcess() {
            System.out.println("after process");
        }
    
        @Override
        public V call() throws Exception {
            beforeProcess();
            V result = process();
            afterProcess();
            return result;
        }
    
    }

      CallableTemplate类实现了Callable接口,并实现了process方法,该类是一个抽象类,接收任意返回值的类型,beforeProcess方法为前置处理,afterProcess的后置处理,process为具体的业务逻辑抽象方法,该方法在子类中实现

    IConcurrentThreadPool.java

    package com.gdut.thread.multiThread;
    
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    
    public interface IConcurrentThreadPool {
    
        /**
         * 初始化线程池
         */
        void initConcurrentThreadPool();
        
        /**
         * 提交单个任务
         * @param <V>
         * @param task
         * @return
         * @throws InterruptedException
         * @throws ExecutionException
         */
        <V> V submit(CallableTemplate<V> task) throws InterruptedException, ExecutionException;
        
        /**
         * 提交多个任务
         * @param <V>
         * @param tasks
         * @return
         * @throws InterruptedException 
         * @throws ExecutionException 
         */
        <V> List<V> invokeAll(List<? extends CallableTemplate<V>> tasks) throws InterruptedException, ExecutionException;
    }

      IConcurrentThreadPool是多线程接口类,声名了三个方法,initConcurrentThreadPool:初始化线程池,submit:提交单个任务的线程,并有返回值,invokeAll:提交多个任务的线程,并有返回值

    ConcurrentThreadPool.java

    package com.gdut.thread.multiThread;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ConcurrentThreadPool implements IConcurrentThreadPool{
        
        private ThreadPoolExecutor threadPoolExecutor;
        // 核心线程数
        private int corePoolSize = 10;
        // 最大线程数
        private int maximumPoolSize = 20;
        // 超时时间30秒
        private long keepAliveTime = 30;
    
        @Override
        public void initConcurrentThreadPool() {
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                                                        maximumPoolSize, 
                                                        keepAliveTime, 
                                                        TimeUnit.SECONDS,
                                                        new LinkedBlockingDeque<Runnable>()
                                                        );
        }
    
        @Override
        public <V> V submit(CallableTemplate<V> task) throws InterruptedException, ExecutionException {
            Future<V> result = threadPoolExecutor.submit(task);
            return result.get();
        }
    
        @Override
        public <V> List<V> invokeAll(List<? extends CallableTemplate<V>> tasks) throws InterruptedException, ExecutionException {
            List<Future<V>> tasksResult = threadPoolExecutor.invokeAll(tasks);
            List<V> resultList = new ArrayList<V>();
            
            for(Future<V> future : tasksResult) {
                resultList.add(future.get());
            }
            return resultList;
        }
    
    }

      ConcurrentThreadPool是创建线程池的实现类,用到了ThreadPoolExecutor线程池类及这个类的invokeAll方法和submit方法,这两个方法的返回值,都可以通过Future类的get方法获得

    ICallableTaskFrameWork.java

    package com.gdut.thread.multiThread;
    
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    
    public interface ICallableTaskFrameWork {
        <V> List<V> submitsAll(List<? extends CallableTemplate<V>> tasks)
                throws InterruptedException, ExecutionException;
    }

      ICallableTaskFrameWork是定义的线程任务框架接口,所有的多线程调用,都通过该接口发起

    CallableTaskFrameWork.java

    package com.gdut.thread.multiThread;
    
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    
    public class CallableTaskFrameWork implements ICallableTaskFrameWork{
    
        private IConcurrentThreadPool concurrentThreadPool = new ConcurrentThreadPool();
        
        @Override
        public <V> List<V> submitsAll(List<? extends CallableTemplate<V>> tasks)
                throws InterruptedException, ExecutionException {
            concurrentThreadPool.initConcurrentThreadPool();
            return concurrentThreadPool.invokeAll(tasks);
        }
    
    }

      CallableTaskFrameWork是ICallableTaskFrameWork 的实现类,在submitsAll实现方法中,通过调用线程池对象IConcurrentThreadPool接口的invokeAll方法来发起多线程的调用,这里注意一个,在submitAll实现方法中,我手动的调用了初始化线程池的方法concurrentThreadPool.initConcurrentThreadPool(),在真实的项目上,应该在应用启动的时候就调用该方法来初始化线程池

    测试类代码 
    SendMessageService.java,假设这是一个发送邮件信息的服务类

    package com.gdut.thread.multiThread;
    
    public class SendMessageService {
        public void sendMessage(String email,String content){
            System.out.println("发送邮件。。。");
        }
    }

    SendMessageHander.java,多线程发送邮件的处理类

    package com.gdut.thread.multiThread;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class SendMessageHander extends CallableTemplate<Map<String, String>>{
        
        private String email;
        private String content;
        public SendMessageHander(String email,String content) {
            this.email = email;
            this.content = content;
        }
    
        @Override
        public Map<String, String> process() {
            SendMessageService sendMessageService = new SendMessageService();
            sendMessageService.sendMessage(email, content);
            Map<String, String> map = new HashMap<String, String>();
            map.put(email, content);
            return map;
        }
    
    }

      这个类继承了上面的CallableTemplate,我们要的返回值是Map,因此泛型类型是Map,在类中还重写了process方法,在方法中调用发送邮件的业务逻辑接口SendMessageService.sendMessage,并将返回结果组装成Map返回,这里我就简单处理了,将邮件地址及内容放在Map中直接返回了;另外还要注意这个类有个有参构造器,通过构建器可以接收需要传递进来的参数

    SendMessageTest.java,测试类

    package com.gdut.thread.multiThread;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.concurrent.ExecutionException;
    
    public class SendMessageTest {
    
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            ICallableTaskFrameWork callableTaskFrameWork = new CallableTaskFrameWork();
            
            List<CallableTemplate<Map<String, String>>> tasks = new ArrayList<CallableTemplate<Map<String, String>>>();
            
            SendMessageHander sendMessageHander = null;
            
            // 将需要发送邮件的邮件地址及内容组装好,放在一个集合中
            for (int i = 0; i < 1000; i++) {
                sendMessageHander = new SendMessageHander("email" + i, "content" + i);
                tasks.add(sendMessageHander);
            }
            
            //通过多线程一次性发起邮件,并拿到返回结果集
            List<Map<String, String>> results = callableTaskFrameWork.submitsAll(tasks);
            
            // 解析返回结果集
            for (Map<String, String> map : results) {
                for (Entry<String, String> entry : map.entrySet()) {
                    System.out.println(entry.getKey() + "	" + entry.getValue());
                }
            }
        }
    
    }

    运行结果 

                 

    附录:还可以看这边文章: java并发异步编程 原来十个接口的活现在只需要一个接口就搞定!

  • 相关阅读:
    terminal
    变量提升、函数提升
    ssh传输文件
    mocha测试框架
    npm-run 自动化
    webpack
    浅析babel
    构建工具gulp
    C++中TRACE宏及assert()函数的使用
    memcpy函数-C语言
  • 原文地址:https://www.cnblogs.com/myseries/p/11515370.html
Copyright © 2011-2022 走看看