zoukankan      html  css  js  c++  java
  • 使用ThreadPoolExecutor 创建线程池,完成并行操作

    日常工作中很多地方很多效率极低的操作,往往可以改串行为并行,执行效率往往提高数倍,废话不多说先上代码

    1、用到的guava坐标

            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>18.0</version>
            </dependency>
    View Code

    2、创建一个枚举保证线程池是单例

    package com.hao.service;
    
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    
    public enum ExecutorManager {
    
        INSTANCE;
    
        private ExecutorManager() {
    
        }
    
        private static int AVAILABLEPROCESSORS = Runtime.getRuntime().availableProcessors();
    
        public static final ThreadPoolExecutor threadPoolExecutor =
            new ThreadPoolExecutor(AVAILABLEPROCESSORS * 50, AVAILABLEPROCESSORS * 80, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(AVAILABLEPROCESSORS * 2000),
                new ThreadFactoryBuilder().setNameFormat("ExecutorManager-pool-Thread-%d").build());
        
        
    
    }
    View Code

    3、创建一个方法类

    package com.hao.service;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import javax.annotation.PostConstruct;
    
    import org.springframework.stereotype.Service;
    
    import com.google.common.base.Preconditions;
    
    @Service
    public class ExecutorContext {
    
        public ExecutorService executorService;
        private int DEFAULT_WAIT_SECONDS = 2;
    
        @PostConstruct
        public void init() {
            executorService = ExecutorManager.threadPoolExecutor;
        }
    
        public <T> List<T> waitAllFutures(List<Callable<T>> calls, int milliseconds) throws Exception {
            Preconditions.checkArgument(null != calls && !calls.isEmpty(), "callable empty.");
            LatchedCallables<T> latchAndCallables = wrapCallables(calls);
            List<Future<T>> futurres = new LinkedList<>();
            for (CountdownedCallable<T> callable : latchAndCallables.wrappedCallables) {
                if (null != callable) {
                    futurres.add(executorService.submit(callable));
                }
            }
            List<T> rets = new ArrayList<>();
            if (latchAndCallables.latch.await(milliseconds, TimeUnit.MILLISECONDS)) {
                for (CountdownedCallable<T> call : latchAndCallables.wrappedCallables) {
                    rets.add(call.getResult());
                }
            } else {
                for (Future<T> future : futurres) {
                    if (!future.isDone()) {
                        future.cancel(true);
                    }
                }
            }
            return rets;
        }
    
        public <T> List<T> waitAllCallables(List<Callable<T>> calls, int seconds) throws Exception {
            Preconditions.checkArgument(null != calls && !calls.isEmpty(), "callable empty.");
            LatchedCallables<T> latchAndCallables = wrapCallables(calls);
            for (CountdownedCallable<T> callable : latchAndCallables.wrappedCallables) {
                executorService.submit(callable);
            }
            List<T> rets = new ArrayList<>();
            if (latchAndCallables.latch.await(seconds, TimeUnit.SECONDS)) {
                for (CountdownedCallable<T> call : latchAndCallables.wrappedCallables) {
                    rets.add(call.getResult());
                }
            }
            return rets;
        }
    
        public <T> List<T> waitAllCallables(@SuppressWarnings("unchecked") Callable<T>... calls) throws Exception {
            Preconditions.checkNotNull(calls, "callable empty.");
            return waitAllCallables(Arrays.asList(calls), DEFAULT_WAIT_SECONDS);
        }
    
        private static <T> LatchedCallables<T> wrapCallables(List<Callable<T>> callables) {
            CountDownLatch latch = new CountDownLatch(callables.size());
            List<CountdownedCallable<T>> wrapped = new ArrayList<>(callables.size());
            for (Callable<T> callable : callables) {
                wrapped.add(new CountdownedCallable<>(callable, latch));
            }
    
            LatchedCallables<T> returnVal = new LatchedCallables<>();
            returnVal.latch = latch;
            returnVal.wrappedCallables = wrapped;
            return returnVal;
        }
    
        public static class LatchedCallables<T> {
            public CountDownLatch latch;
            public List<CountdownedCallable<T>> wrappedCallables;
        }
    
        public static class CountdownedCallable<T> implements Callable<T> {
            private final Callable<T> wrapped;
            private final CountDownLatch latch;
            private T result;
    
            public CountdownedCallable(Callable<T> wrapped, CountDownLatch latch) {
                this.wrapped = wrapped;
                this.latch = latch;
            }
    
            @Override
            public T call() throws Exception {
                try {
                    result = wrapped.call();
                    return result;
                } finally {
                    latch.countDown();
                }
            }
    
            public T getResult() {
                return result;
            }
        }
    
    }
    View Code

    4、创建一个测试类

    package com.hao;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import org.junit.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    
    import com.hao.bean.Employee;
    import com.hao.service.EmployeeService;
    import com.hao.service.ExecutorContext;
    
    public class ExecutorTest extends BaseTest {
    
        @Autowired
        ExecutorContext executorContext;
        
        @Autowired
        EmployeeService employeeService;
    
        @Test
        public void test01() {
            long t0 = System.currentTimeMillis();
            List<Employee> employees = new ArrayList<Employee>();
            try {
                List<Callable<Integer>> calls = new ArrayList<Callable<Integer>>();
                Callable<Integer> able1 = new Callable<Integer>() {
                    @Override
                    public Integer call() throws Exception {
                        Thread.sleep(5000);
                        Employee employee = employeeService.getById(1L);
                        employees.add(employee);
                        return 1;
                    }
    
                };
                calls.add(able1);
                Callable<Integer> able2 = new Callable<Integer>() {
                    @Override
                    public Integer call() throws Exception {
                        Thread.sleep(5000);
                        Employee employee = employeeService.getById(2L);
                        employees.add(employee);
                        return 2;
                    }
    
                };
                calls.add(able2);
                Callable<Integer> able3 = new Callable<Integer>() {
                    @Override
                    public Integer call() throws Exception {
                        Thread.sleep(5000);
                        Employee employee = employeeService.getById(3L);
                        employees.add(employee);
                        return 3;
                    }
    
                };
                calls.add(able3);
    
                executorContext.waitAllCallables(calls, 5000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            for (Employee employee : employees) {
                System.out.println(employee);
            }
            System.out.println(System.currentTimeMillis() - t0);
        }
    
    }
    View Code

    5、执行结果如下

     次工具类的好处在于能够像使用普通 service一样使用线程池完成并行操作,当然不要忘记将 ExecutorContext 置于能被sping扫描到的地方,

    否则不能直接使用@Autowired 依赖注入

  • 相关阅读:
    html常用标签及示例
    判断一个数是否是素数的讨论
    图像的空间域变化
    图像增强的点运算(一)
    字符串匹配——KMP
    AcWing1134最短路计数(spfa)
    AcWing1137拯救大兵瑞恩(双端队列搜索,状态压缩,分层图最短路)
    AcWing1175电路维修(双端队列+搜索)
    AcWing1137选择最佳线路(最短路)
    AcWing342道路与航线(dijkstra+拓扑排序)
  • 原文地址:https://www.cnblogs.com/zhanh247/p/12576491.html
Copyright © 2011-2022 走看看