zoukankan      html  css  js  c++  java
  • java多线程并发执行demo,主线程阻塞

    其中有四个知识点我单独罗列了出来,属于多线程编程中需要知道的知识:

             知识点1:X,T为泛型,为什么要用泛型,泛型和Object的区别请看:https://www.cnblogs.com/xiaoxiong2015/p/12705815.html
             知识点2:线程池:https://www.cnblogs.com/xiaoxiong2015/p/12706153.html
             知识点3:队列:@author Doug Lea https://www.cnblogs.com/xiaoxiong2015/p/12825636.html
             知识点4:计数器,还是并发包大神 @author Doug Lea 编写。是一个原子安全的计数器,可以利用它实现发令枪
    Doug Lea真是大神,编程不识Doug Lea,写尽Java也枉然,concurrent包点进去,都是他写的。可能是需要什么东西就写了吧,信手拈来的感觉。

    主类:MultiThread,执行并发类
    复制代码
    package java8test;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.LinkedBlockingQueue;
    
    /**
     * @param <H> 为被处理的数据类型
     * @param <T>返回数据类型
     * 知识点1:X,T为泛型,为什么要用泛型,泛型和Object的区别请看:https://www.cnblogs.com/xiaoxiong2015/p/12705815.html
     */
    public abstract class MultiThread<X, T> {
    
        public static int i = 0;
    
        // 知识点2:线程池:https://www.cnblogs.com/xiaoxiong2015/p/12706153.html
        private final ExecutorService exec; // 线程池
    
        // 知识点3:@author Doung Lea 队列:https://www.cnblogs.com/xiaoxiong2015/p/12825636.html
        private final BlockingQueue<Future<T>> queue = new LinkedBlockingQueue<>();
    
        // 知识点4:计数器,还是并发包大神 @author Doug Lea 编写。是一个原子安全的计数器,可以利用它实现发令枪
        private final CountDownLatch startLock = new CountDownLatch(1); // 启动门,当所有线程就绪时调用countDown
    
        private final CountDownLatch endLock; // 结束门
    
        private final List<X> listData;// 被处理的数据
    
        /**
         * @param list list.size()为多少个线程处理,list里面的H为被处理的数据
         */
        public MultiThread(List<X> list) {
            if (list != null && list.size() > 0) {
                this.listData = list;
                exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // 创建线程池,线程池共有nThread个线程
                endLock = new CountDownLatch(list.size()); // 设置结束门计数器,当一个线程结束时调用countDown
            } else {
                listData = null;
                exec = null;
                endLock = null;
            }
        }
    
        /**
         * 
         * @return 获取每个线程处理结速的数组
         * @throws InterruptedException
         * @throws ExecutionException
         */
        public List<T> getResult() throws InterruptedException, ExecutionException {
    
            List<T> resultList = new ArrayList<>();
            if (listData != null && listData.size() > 0) {
                int nThread = listData.size(); // 线程数量
                for (int i = 0; i < nThread; i++) {
                    X data = listData.get(i);
                    Future<T> future = exec.submit(new Task(i, data) {
    
                        @Override
                        public T execute(int currentThread, X data) {
    
                            return outExecute(currentThread, data);
                        }
                    }); // 将任务提交到线程池
                    queue.add(future); // 将Future实例添加至队列
                }
                startLock.countDown(); // 所有任务添加完毕,启动门计数器减1,这时计数器为0,所有添加的任务开始执行
                endLock.await(); // 主线程阻塞,直到所有线程执行完成
                for (Future<T> future : queue) {
                    resultList.add(future.get());
                }
                exec.shutdown(); // 关闭线程池
            }
            return resultList;
        }
    
        /**
         * 每一个线程执行的功能,需要调用者来实现
         * @param currentThread 线程号
         * @param data 每个线程被处理的数据
         * @return T返回对象
         */
        public abstract T outExecute(int currentThread, X data);
    
        /**
         * 线程类
         */
        private abstract class Task implements Callable<T> {
    
            private int currentThread;// 当前线程号
    
            private X data;
    
            public Task(int currentThread, X data) {
                this.currentThread = currentThread;
                this.data = data;
            }
    
            @Override
            public T call() throws Exception {
    
                // startLock.await(); // 线程启动后调用await,当前线程阻塞,只有启动门计数器为0时当前线程才会往下执行
                T t = null;
                try {
                    t = execute(currentThread, data);
                } finally {
                    endLock.countDown(); // 线程执行完毕,结束门计数器减1
                }
                return t;
            }
    
            /**
             * 每一个线程执行的功能
             * @param currentThread 线程号
             * @param data 每个线程被处理的数据
             * @return T返回对象
             */
            public abstract T execute(int currentThread, X data);
        }
    }
    复制代码
    结果类:ResultVO,保存返回结果,根据实际情况替换成自己的
    复制代码
    package java8test;
    
    public class ResultVo {
    
        int i;
    
        public ResultVo(int i) {
            this.i = i;
        }
    
        public ResultVo() {
            // TODO Auto-generated constructor stub
        }
    
    }
    复制代码

    参数类:ParamVO,传入参数类,根据实际情况替换成自己的
    复制代码
    package java8test;
    
    public class ParamVo {
    
        private int i;
    
        ParamVo(int i) {
            this.i = i;
        }
    
        public int getI() {
    
            return i;
        }
    
        @Override
        public String toString() {
    
            return String.valueOf(i) + "  " + hashCode();
        }
    }
    复制代码

    测试类:new两个MultiThread,可以看到MultiThread这个类不存在线程安全问题。
    复制代码
    package java8test;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class Test {
    
        public static void main(String[] args) {
    
            try {
                List<ParamVo> splitList = new ArrayList<ParamVo>();
                for (int i = 0; i < 100; i++) {
                    splitList.add(new ParamVo(i));
                }
                List<ParamVo> splitList1 = new ArrayList<ParamVo>();
                for (int i = 200; i < 300; i++) {
                    splitList1.add(new ParamVo(i));
                }
                MultiThread<ParamVo, ResultVo> multiThread = new MultiThread<ParamVo, ResultVo>(splitList) {
    
                    @Override
                    public ResultVo outExecute(int currentThread, ParamVo data) {
    
                        System.out.println("当前线程名称:" + Thread.currentThread().getName() + "当前线程号=" + currentThread
                                + " data=" + data);
                        i--;
                        return new ResultVo(data.getI());
                    }
                };
    
                MultiThread<ParamVo, ResultVo> multiThread1 = new MultiThread<ParamVo, ResultVo>(splitList1) {
    
                    @Override
                    public ResultVo outExecute(int currentThread, ParamVo data) {
    
                        System.out.println("当前线程名称:" + Thread.currentThread().getName() + "当前线程号=" + currentThread
                                + " data=" + data);
                        i--;
                        return new ResultVo(data.getI());
                    }
                };
                List<ResultVo> list = multiThread.getResult();
                List<ResultVo> list1 = multiThread1.getResult();
                // 获取每一批次处理结果
                System.out.println("获取处理结果........................");
                for (ResultVo vo : list) {
                    System.out.println(vo.i);
                }
                System.out.println("获取1处理结果........................");
                for (ResultVo vo : list1) {
                    System.out.println(vo.i);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
    }
    复制代码

     

     

    这个类也用在了生产当中,用来并发插入数据。但是事务不能被管控,需要自己保证最终事务一致。需要注意。

  • 相关阅读:
    如何作需求
    AS400如何将Spooled File 拷贝到源物理文件
    AS400 批量FTP
    Oracle和db2/400的差别
    CL内建函数
    visio如何扩大画布的大小
    如何把C/S架构较为平滑的切换到SOA架构
    关于DataTable里大批量查找的更快速的方法
    c#键值对容器
    什么是委托
  • 原文地址:https://www.cnblogs.com/xiaoxiong2015/p/12837061.html
Copyright © 2011-2022 走看看