每次想多线程处理一个大的结果集的时候 都需要写一大堆代码,自己写了个工具类 方便使用
package com.guige.fss.common.util; import com.guige.fss.common.exception.BusinessException; import io.swagger.models.auth.In; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * Created by admin on 2018/6/5. * @author 宋安伟 */ public class ThreadUtil { //创建定长线程池,初始化线程 private static Logger log = LoggerFactory.getLogger(ThreadUtil.class); /** * 对List进行多线程处理(限制 对List只读 如果想修改List 可以处理完毕后把要修改或删除的List返回 多线程执行完后再修改或删除) * @param list 要处理的List * @param threadSize 用几个线程处理 * @param threadLoadback 处理的回调(具体业务员) * @param <T> 每个回调的返回结果 * @param <V> List<V>的泛型 * @return */ public static <T,V>List<T> executorsTasks(final List<V> list,final int threadSize,final ThreadLoadback<T,V> threadLoadback){ // 开始时间 long start = System.currentTimeMillis(); // 总数据条数 int dataSize = list.size(); // 线程数 int threadNum = dataSize / threadSize + 1; // 定义标记,过滤threadNum为整数 boolean special = dataSize % threadSize == 0; // 创建一个线程池 ExecutorService exec = Executors.newFixedThreadPool(threadNum); // 定义一个任务集合 List<Callable<T>> tasks = new ArrayList<Callable<T>>(); Callable<T> task = null; List cutList = null; for (int i = 0; i < threadNum; i++) { if (i == threadNum - 1) { if (special) { break; } cutList = list.subList(threadSize * i, dataSize); } else { cutList = list.subList(threadSize * i, threadSize * (i + 1)); } // System.out.println("第" + (i + 1) + "组:" + cutList.toString()); final List listStr = cutList; task = new Callable<T>() { @Override public T call() throws Exception { // System.out.println(Thread.currentThread().getName() + "线程:" + listStr); return (T) threadLoadback.load(listStr); // return } }; // 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系 tasks.add(task); } List<Future<T>> resultsFuture = null; try { log.debug("线程任务执行开始:任务数"+tasks.size()); resultsFuture = exec.invokeAll(tasks); List<T> results = new ArrayList<>(); for (Future<T> future : resultsFuture) { T result=future.get(); if(result!=null) { results.add(result); } } return results; } catch (Exception e) { e.printStackTrace(); throw new BusinessException(e.getMessage()); }finally { // 关闭线程池 exec.shutdown(); log.debug("线程任务执行结束"); log.debug("执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒"); } } interface ThreadLoadback<T,V> { T load(List<V> list) throws Exception; } public static void main(String[] args) { List<String> list = new ArrayList<>(); for(int i=0;i<1000;i++){ list.add("i="+i); } List<List<Integer>> resultList= ThreadUtil.executorsTasks(list, 10, new ThreadLoadback<List<Integer>, String>() { @Override public List<Integer> load(List<String> list) throws Exception { List<Integer> result= new ArrayList<>(); for(String str:list){ str= str.replaceAll("i=",""); result.add(Integer.parseInt(str)); System.out.println(Thread.currentThread().getName()+"休息1秒"); Thread.sleep(1000L); } return result; } }); if(!CollectionUtils.isEmpty(resultList)){ List<Integer> integers = new ArrayList<>(); resultList.stream().forEach(items -> { if (!CollectionUtils.isEmpty(resultList)) { items.stream().forEach(item -> { integers.add(item); }); } } ); integers.stream().forEach(item->System.out.println(item)); } } }