import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Author: SimonHu
* @Date: 2020/10/24 15:44
* @Description:
*/
public class RunThread {
static private ThreadPoolTaskExecutor taskExecutor;
static private ThreadPoolExecutor taskExecutor2;
public RunThread(Integer corePoolSize, Integer maxPoolSize) {
taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.initialize();
taskExecutor.setCorePoolSize(corePoolSize);
taskExecutor.setMaxPoolSize(maxPoolSize);
taskExecutor.setKeepAliveSeconds(2);
taskExecutor.setQueueCapacity(50);
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* @param list 需要处理的list
* @param sunSum 将集合切分的段数
* @throws InterruptedException
* @throws ExecutionException
*/
@SuppressWarnings("unused")
private void doReadList(List<Object> list, int sunSum) throws InterruptedException, ExecutionException, ExecutionException {
/**初始化集合**/
/**接收集合各段的 执行的返回结果**/
List<Future<Object>> futureList = new ArrayList<Future<Object>>();
/**集合总条数**/
int size = list.size();
/**将集合切分的段数**/
//int sunSum = 10;
int listStart, listEnd;
/***当总条数不足10条时 用总条数 当做线程切分值**/
if (sunSum > size) {
sunSum = size;
}
/**定义子线程**/
SunCallable sunCallable = null;
/**将list 切分10份 多线程执行**/
for (int i = 0; i < sunSum; i++) {
/***计算切割 开始和结束**/
listStart = size / sunSum * i;
listEnd = size / sunSum * (i + 1);
/**最后一段线程会 出现与其他线程不等的情况**/
if (i == sunSum - 1) {
listEnd = size;
}
/**线程切断**/
List<Object> sunList = list.subList(listStart, listEnd);
/**子线程初始化**/
sunCallable = new SunCallable(i, sunList);
/***多线程执行***/
futureList.add(taskExecutor.submit(sunCallable));
}
/**对各个线程段结果进行解析**/
for (Future<Object> future : futureList) {
if (null != future) {
System.err.println("成功" + future.get());
} else {
System.err.println("失败");
}
}
}
/**
* 测试方法
*/
public static void main(String[] args) {
List<Object> list = new ArrayList<Object>();
for (int i = 0; i < 123; i++) {
list.add("test--" + i);
}
long start = System.currentTimeMillis();
RunThread runThread = new RunThread(8, 8);
try {
runThread.doReadList(list, 8);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//关闭线程池
taskExecutor.shutdown();
// taskExecutor2.shutdown();
// taskExecutor2.shutdownNow();
// taskExecutor2.awaitTermination(60);
System.out.println("执行程序用时:" + (System.currentTimeMillis() - start));
}
}
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
/**
* @Author: SimonHu
* @Date: 2020/10/24 15:43
* @Description:
*/
public class SunCallable implements Callable<Object> {
/**
* 当前是属于第几段线程
**/
private int pageIndex;
private List<Object> list;
public SunCallable(int pageIndex, List<Object> list) {
this.pageIndex = pageIndex;
this.list = list;
}
@Override
public Object call() throws Exception {
System.err.println(String.format("pageIndex:%s size:%s", pageIndex, list.size()));
Boolean result = Boolean.TRUE;
if (null != list && list.size() > 0) {
for (Object str : list) {
try {
System.out.println(pageIndex + "::::" + str);
//调用业务方法
handleStr(str);
} catch (Exception e) {
result = Boolean.FALSE;
}
}
}
Map paramMap = new HashMap(16);
paramMap.put("code", "000");
paramMap.put("msg", "成功");
paramMap.put("result", result);
return paramMap;
}
/**
* 业务处理
*
* @param str
* @throws InterruptedException
*/
public void handleStr(Object str) throws InterruptedException {
// Thread.sleep(1000);
}
}