// 多线程处理
public class DySchedule {
private static AtomicInteger line = new AtomicInteger(0);
static ExecutorService pool = Executors.newFixedThreadPool(100);
public static int getLine(){
return line.addAndGet(1000);
}
public static void doJob(){
for (int i = 0;i<100;i++){
Thread thread = new MyThread();
pool.execute(thread);
}
pool.shutdown();
}
public static void main(String[] args) {
DySchedule.doJob();
}
}
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("线程:" + Thread.currentThread().getName());
Integer num = DySchedule.getLine();
System.out.println("startline = " +(num-1000)+",endline = " + num);
}
}
@PostConstruct
private void multithreadingGetData() {
// 数据总量
int count = 312;
// 计算机可用cpu 设置线程池个数最好与之匹配
int availableProcessors = Runtime.getRuntime().availableProcessors();
// 每个线程要查多少条
int num = count / availableProcessors;
ExecutorService executorService = Executors.newFixedThreadPool(availableProcessors);
List<CompletableFuture> futures = new ArrayList<>();
long start = System.currentTimeMillis();
for (int i = 0; i < num; i++) {
int id = (i + 1) * num;
if(id <= count){
//futures.add(CompletableFuture.supplyAsync(() -> officeResponsibilityMapper.multithreadingGetData(id), executorService));
CompletableFuture.supplyAsync(() -> officeResponsibilityMapper.multithreadingGetData(id), executorService);
}
}
executorService.shutdown();
System.out.println("多线程查询共用时间" + (System.currentTimeMillis() - start));
long start2 = System.currentTimeMillis();
officeResponsibilityMapper.selectAll();
System.out.println("单线程查询共用时间" + (System.currentTimeMillis() - start2));
// List<Object> collect = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
}
List<String> stuIds = stuService.getStuIds();
//将集合分成n等份
List<List<String>> lists = xxUtil.averageAssign(departmentIds, n);
final List<CompletableFuture<Map<String, Integer>>> futures = lists.parallelStream().map(subIds-> CompletableFuture.supplyAsync(() -> {
Map<String, Integer> map = new HashMap<>();
try {
for (String id: subIds) {
//根据id去查询数据库
Integer count = xxDao.getData(id);
map.put(id, count);
}
} catch (Exception e) {
log.error("执行sql失败", e.getMessage(), e);
}
return map;
//asyncTaskExecutor是自定义的线程池,这里也可以不传这个参数,默认使用jdk线程池
}, asyncTaskExecutor)).collect(Collectors.toList());
CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
CompletableFuture<List<Map<String, Integer>>> listCompletableFuture = completableFuture.thenApply(v ->
futures.stream().map(future -> {
try {
return future.get();
} catch (Exception e) {
log.error("线程执行发生异常{}", e);
}
return new HashMap<>();
}).collect(Collectors.toList()));
try {
//从线程中获取结果,拿到结果后,自行处理
//get方法会等到所有的线程执行完毕,才会返回结果;换句话说,多线程的执行效率取决于最慢的那条线程的执行时间
List<Map<String,Integer>> listMap = listCompletableFuture.get();
} catch (Exception e) {
log.error("从线程中获取执行结果失败", e);
}