zoukankan      html  css  js  c++  java
  • ThreadUtil 多线程处理List,回调处理具体的任务

    每次想多线程处理一个大的结果集的时候 都需要写一大堆代码,自己写了个工具类 方便使用

    package com.guige.fss.common.util;
    
    
    import com.guige.fss.common.exception.BusinessException;
    import io.swagger.models.auth.In;
    import lombok.extern.slf4j.Slf4j;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.util.CollectionUtils;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    /**
     * Created by admin on 2018/6/5.
     * @author 宋安伟
     */
    public class ThreadUtil {
        //创建定长线程池,初始化线程
        private static Logger log = LoggerFactory.getLogger(ThreadUtil.class);
    
        /**
         * 对List进行多线程处理(限制 对List只读 如果想修改List 可以处理完毕后把要修改或删除的List返回 多线程执行完后再修改或删除)
         * @param list 要处理的List
         * @param threadSize 用几个线程处理
         * @param threadLoadback 处理的回调(具体业务员)
         * @param <T> 每个回调的返回结果
         * @param <V> List<V>的泛型
         * @return
         */
        public static <T,V>List<T> executorsTasks(final List<V> list,final  int threadSize,final  ThreadLoadback<T,V> threadLoadback){
            // 开始时间
            long start = System.currentTimeMillis();
            // 总数据条数
            int dataSize = list.size();
            // 线程数
            int threadNum = dataSize / threadSize + 1;
            // 定义标记,过滤threadNum为整数
            boolean special = dataSize % threadSize == 0;
            // 创建一个线程池
            ExecutorService exec = Executors.newFixedThreadPool(threadNum);
            // 定义一个任务集合
            List<Callable<T>> tasks = new ArrayList<Callable<T>>();
            Callable<T> task = null;
            List cutList = null;
    
            for (int i = 0; i < threadNum; i++) {
                if (i == threadNum - 1) {
                    if (special) {
                        break;
                    }
                    cutList = list.subList(threadSize * i, dataSize);
                } else {
                    cutList = list.subList(threadSize * i, threadSize * (i + 1));
                }
                // System.out.println("第" + (i + 1) + "组:" + cutList.toString());
                final List listStr = cutList;
                task = new Callable<T>() {
                    @Override
                    public T  call() throws Exception {
                        // System.out.println(Thread.currentThread().getName() + "线程:" + listStr);
                    return (T) threadLoadback.load(listStr);
                          //  return
    
    
                    }
                };
                // 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系
                tasks.add(task);
            }
            List<Future<T>> resultsFuture = null;
            try {
                log.debug("线程任务执行开始:任务数"+tasks.size());
                resultsFuture = exec.invokeAll(tasks);
                List<T> results = new ArrayList<>();
                for (Future<T> future : resultsFuture) {
                    T result=future.get();
                    if(result!=null) {
                        results.add(result);
                    }
                }
                return results;
    
            } catch (Exception e) {
                e.printStackTrace();
                throw new BusinessException(e.getMessage());
            }finally {
                // 关闭线程池
                exec.shutdown();
                log.debug("线程任务执行结束");
                log.debug("执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒");
            }
    
        }
    
        interface ThreadLoadback<T,V> {
            T load(List<V> list) throws Exception;
        }
    
    
        public static void main(String[] args) {
            List<String> list = new ArrayList<>();
            for(int i=0;i<1000;i++){
                list.add("i="+i);
            }
         List<List<Integer>> resultList=   ThreadUtil.executorsTasks(list, 10, new ThreadLoadback<List<Integer>, String>() {
             @Override
             public List<Integer> load(List<String> list) throws Exception {
                    List<Integer> result= new ArrayList<>();
                    for(String str:list){
                        str= str.replaceAll("i=","");
                        result.add(Integer.parseInt(str));
                        System.out.println(Thread.currentThread().getName()+"休息1秒");
                        Thread.sleep(1000L);
                    }
                 return result;
             }
         });
          if(!CollectionUtils.isEmpty(resultList)){
              List<Integer> integers = new ArrayList<>();
              resultList.stream().forEach(items -> {
                          if (!CollectionUtils.isEmpty(resultList)) {
                              items.stream().forEach(item -> {
                                  integers.add(item);
    
                              });
                          }
                      }
              );
              integers.stream().forEach(item->System.out.println(item));
    
          }
        }
    
    
    }
  • 相关阅读:
    毕业设计进度3
    毕业设计进度2
    毕业设计进度1
    hadoop环境搭建
    大数据之kettle安装
    服务化管理和治理框架的技术选型
    云时代架构读后感五
    云时代架构读后感四
    毕业设计2019/10/24至2019/10/31进度计划
    IT架构的本质
  • 原文地址:https://www.cnblogs.com/songanwei/p/10131936.html
Copyright © 2011-2022 走看看