zoukankan      html  css  js  c++  java
  • Java:并行编程及同步使用方法

    • 知道java可以使用java.util.concurrent包下的
    CountDownLatch
    ExecutorService
    Future

    Callable
    实现并行编程,并在并行线程同步时,用起来十分简单的一种 。
    实现原理:
    1、CountDownLatch 统计并行线程完成数,并提供了await()方法,实现等待所有并行线程完成,或者指定最大等待时间。
    2、ExecutorService提供了execute(Callable)执行线程方法,还提供了submit(Callable)提交线程。
    3、Future接受实现Callable<V>接口的(可执行线程)返回值,接受Executors.submit(Callable<V>)返回值。而且Future<V>提供get()取回并行子线程返回的参数,还可以给get指定过期时间。

    想到Concurrent,就能想到c#中,命名空间System.Collection,Concurrent,在该命名空间下提供了一些线程安全的集合类。

    • 代码示例:

    MyTaskResult.java

    package com.dx.testparallel;
    
    public class MyTaskResult {
        private String name;
        
        public MyTaskResult(String name){
            this.name=name;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    }

    TaskItem.java

    package com.dx.testparallel;
    
    public class TaskItem {
        private int id;
        private String name;
        
        public TaskItem(int id,String name){
            this.id=id;
            this.name=name;
        }
        
        public int getId() {
            return id;
        }
        public void setId(int id) {
            this.id = id;
        }
        
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
    }

    MyTask.java

    package com.dx.testparallel;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.CountDownLatch;
    
    public class MyTask implements Callable<MyTaskResult> {
        private final TaskItem taskItem;
        private final CountDownLatch threadsSignal;
        
        public MyTask(CountDownLatch threadsSignal,TaskItem taskItem) {
            this.threadsSignal= threadsSignal;
            this.taskItem=taskItem;
        }
        
        @Override
        public MyTaskResult call() throws Exception {
            MyTaskResult result=new MyTaskResult(this.taskItem.getName());
            
            // 核心处理逻辑处理
            Thread.sleep(2000);
            
            System.out.println("task id:" + taskItem.getId() +" >>>>等待結束");
            System.out.println("task id:" + taskItem.getId() + " >>>>线程名称:" + Thread.currentThread().getName() + "结束. 还有" + threadsSignal.getCount() + " 个线程");
    
            // 必须等核心处理逻辑处理完成后才可以减1
            this.threadsSignal.countDown();
            
            return result;
        }
    
    }

    Main.java

    package com.dx.testparallel;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class Main {    
        public static void main(String[] args) throws InterruptedException {
            List<TaskItem> taskItems=new ArrayList<TaskItem>();
            for(int i=0;i<20;i++){
                taskItems.add(new TaskItem(i, "task "+i));
            }
            
            CountDownLatch threadsSignal = new CountDownLatch(taskItems.size());
            ExecutorService executor = Executors.newFixedThreadPool(taskItems.size());
            List<Future<MyTaskResult>> resultLazyItems=new ArrayList<Future<MyTaskResult>>();
            System.out.println("主線程開始進入並行任務提交");
            for (TaskItem taskItem : taskItems) {
                // 使用future存储子线程执行后返回结果,必须在所有子线程都完成后才可以使用get();
                // 如果在这里使用get(),会造成等待同步。
                Future<MyTaskResult> future = executor.submit(new MyTask(threadsSignal,taskItem));
                resultLazyItems.add(future);
            }
            System.out.println("主線程開始走出並行任務提交");
            System.out.println("主線程進入等待階段(等待所有并行子线程任务完成)。。。。。");
            // 等待所有并行子线程任务完成。
            threadsSignal.await();
            // 并不是终止线程的运行,而是禁止在这个Executor中添加新的任务
            executor.shutdown(); 
            System.out.println("主線程走出等待階段(等待所有并行子线程任务完成)。。。。。");
            
            for(Future<MyTaskResult> future :resultLazyItems){
                try {
                    MyTaskResult result = future.get();
                    System.out.println(result.getName());
                } catch (ExecutionException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    
    }

    运行结果:

    主線程開始進入並行任務提交
    主線程開始走出並行任務提交
    主線程進入等待階段(等待所有并行子线程任务完成)。。。。。
    task id:0 >>>>等待結束
    task id:6 >>>>等待結束
    task id:8 >>>>等待結束
    task id:3 >>>>等待結束
    task id:4 >>>>等待結束
    task id:2 >>>>等待結束
    task id:2 >>>>线程名称:pool-1-thread-3结束. 还有20 个线程
    task id:5 >>>>等待結束
    task id:1 >>>>等待結束
    task id:7 >>>>等待結束
    task id:1 >>>>线程名称:pool-1-thread-2结束. 还有19 个线程
    task id:5 >>>>线程名称:pool-1-thread-6结束. 还有19 个线程
    task id:4 >>>>线程名称:pool-1-thread-5结束. 还有20 个线程
    task id:19 >>>>等待結束
    task id:10 >>>>等待結束
    task id:18 >>>>等待結束
    task id:18 >>>>线程名称:pool-1-thread-19结束. 还有16 个线程
    task id:3 >>>>线程名称:pool-1-thread-4结束. 还有20 个线程
    task id:8 >>>>线程名称:pool-1-thread-9结束. 还有20 个线程
    task id:6 >>>>线程名称:pool-1-thread-7结束. 还有20 个线程
    task id:0 >>>>线程名称:pool-1-thread-1结束. 还有20 个线程
    task id:10 >>>>线程名称:pool-1-thread-11结束. 还有16 个线程
    task id:19 >>>>线程名称:pool-1-thread-20结束. 还有16 个线程
    task id:12 >>>>等待結束
    task id:17 >>>>等待結束
    task id:17 >>>>线程名称:pool-1-thread-18结束. 还有9 个线程
    task id:11 >>>>等待結束
    task id:11 >>>>线程名称:pool-1-thread-12结束. 还有8 个线程
    task id:14 >>>>等待結束
    task id:14 >>>>线程名称:pool-1-thread-15结束. 还有7 个线程
    task id:16 >>>>等待結束
    task id:16 >>>>线程名称:pool-1-thread-17结束. 还有6 个线程
    task id:15 >>>>等待結束
    task id:9 >>>>等待結束
    task id:13 >>>>等待結束
    task id:7 >>>>线程名称:pool-1-thread-8结束. 还有19 个线程
    task id:13 >>>>线程名称:pool-1-thread-14结束. 还有5 个线程
    task id:9 >>>>线程名称:pool-1-thread-10结束. 还有5 个线程
    task id:15 >>>>线程名称:pool-1-thread-16结束. 还有5 个线程
    task id:12 >>>>线程名称:pool-1-thread-13结束. 还有9 个线程
    主線程走出等待階段(等待所有并行子线程任务完成)。。。。。
    task 0
    task 1
    task 2
    task 3
    task 4
    task 5
    task 6
    task 7
    task 8
    task 9
    task 10
    task 11
    task 12
    task 13
    task 14
    task 15
    task 16
    task 17
    task 18
    task 19
    

      注释以下代码:

        // Thread.sleep(2000);        
        // System.out.println("task id:" + taskItem.getId() +" >>>>等待結束");

    之后运行结果:

    主線程開始進入並行任務提交
    task id:1 >>>>线程名称:pool-1-thread-2结束. 还有20 个线程
    task id:2 >>>>线程名称:pool-1-thread-3结束. 还有20 个线程
    task id:3 >>>>线程名称:pool-1-thread-4结束. 还有20 个线程
    task id:4 >>>>线程名称:pool-1-thread-5结束. 还有19 个线程
    task id:5 >>>>线程名称:pool-1-thread-6结束. 还有19 个线程
    task id:8 >>>>线程名称:pool-1-thread-9结束. 还有15 个线程
    task id:0 >>>>线程名称:pool-1-thread-1结束. 还有14 个线程
    task id:9 >>>>线程名称:pool-1-thread-10结束. 还有13 个线程
    task id:7 >>>>线程名称:pool-1-thread-8结束. 还有12 个线程
    task id:6 >>>>线程名称:pool-1-thread-7结束. 还有14 个线程
    task id:10 >>>>线程名称:pool-1-thread-11结束. 还有10 个线程
    task id:11 >>>>线程名称:pool-1-thread-12结束. 还有9 个线程
    task id:12 >>>>线程名称:pool-1-thread-13结束. 还有8 个线程
    task id:13 >>>>线程名称:pool-1-thread-14结束. 还有7 个线程
    task id:14 >>>>线程名称:pool-1-thread-15结束. 还有6 个线程
    task id:15 >>>>线程名称:pool-1-thread-16结束. 还有5 个线程
    task id:16 >>>>线程名称:pool-1-thread-17结束. 还有4 个线程
    task id:17 >>>>线程名称:pool-1-thread-18结束. 还有3 个线程
    task id:18 >>>>线程名称:pool-1-thread-19结束. 还有2 个线程
    主線程開始走出並行任務提交
    主線程進入等待階段(等待所有并行子线程任务完成)。。。。。
    task id:19 >>>>线程名称:pool-1-thread-20结束. 还有1 个线程
    主線程走出等待階段(等待所有并行子线程任务完成)。。。。。
    task 0
    task 1
    task 2
    task 3
    task 4
    task 5
    task 6
    task 7
    task 8
    task 9
    task 10
    task 11
    task 12
    task 13
    task 14
    task 15
    task 16
    task 17
    task 18
    task 19
    • 项目中应用:

    定义可执行线程类:

    public class UploadFileToTask implements Callable<UploadFileToTaskResult> {
        private final Task_UploadFileToTaskItem taskItem;
        private final Log log = LogHelper.getInstance(ImportMain.class);
        private final CountDownLatch threadsSignal;
        private final HDFSUtil hdfsUtil = new HDFSUtil();
        private final static String HADOOP_HDFS_PATH = HdfsConfiguration.getHdfsUrl();
    
        public UploadFileToTask(CountDownLatch threadsSignal ,Task_UploadFileToTaskItem taskItem){
            this.taskItem=taskItem;
            this.threadsSignal=threadsSignal;
        }
    
        @Override
        public UploadFileToTaskResult call() throws Exception {
            String area = taskItem.getArea();
            String fileGenerateDate = taskItem.getFileGenerateDate();
            String manufacturer = taskItem.getManufacturer();
            String eNodeBId = taskItem.geteNodeBId();
            String filePath = taskItem.getFilePath();
            FileType fileType = taskItem.getFileType();
    
            TaskStatus taskStatus= TaskStatus.Success;
    
            // 不确定该FileSystem是否是线程安全的,故在每一个thread初始化一次。
            Configuration conf = new Configuration();
            Path dstPath = new Path(HADOOP_HDFS_PATH);
            FileSystem hdfs = dstPath.getFileSystem(conf);
    
            // 核心代码。。。
            // 上传MR文件
            // 上传Signal文件
    
            // 如果文件路径不为空,就开始上传文件到hdfs
            if(uploadFilePath.length()>0){
                if (!hdfsUtil.uploadFileToHdfs(hdfs, filePath, uploadFilePath)) {
                    taskStatus= TaskStatus.Fail;
                }
            }
    
            TaskGroupInfo taskGroupInfo = new TaskGroupInfo();
            taskGroupInfo.setArea(area);
            taskGroupInfo.setManufacturer(manufacturer);
            taskGroupInfo.setFileGenerateDate(fileGenerateDate);
            taskGroupInfo.setFileType(fileType);
    
            String key = String.format("%s,%s,%s,%s", taskGroupInfo.getArea(), taskGroupInfo.getManufacturer(), taskGroupInfo.getFileGenerateDate(), String.valueOf(taskGroupInfo.getFileType().getValue()));
    
            UploadFileToTaskResult result=new UploadFileToTaskResult();
    
            // 填充返回值
            result.setStatus(taskStatus);
            result.setTaskGroupInfo(taskGroupInfo);
            result.setTaskGroupkey(key);
            result.setTaskOID(taskItem.getOid());
    
            System.out.println("task id:" + taskItem.getOid() + " >>>>线程名称:" + Thread.currentThread().getName() + "结束. 还有" + threadsSignal.getCount() + " 个线程");
    
            // 必须等核心处理逻辑处理完成后才可以减1
            this.threadsSignal.countDown();
    
            return result;
        }
    }

    实现并行线程同步核心代码:

                // 获取当前节点带执行任务
                ArrayList<Task_UploadFileToTaskItem> taskItems = uploadFileToTaskItemDao.getTopNTodoTaskItems(this.computeNode.getId(),Configuration.getTaskCount());
                // 批量修改任务状态为正在处理状态(doing)。
                log.info("Start:>>>>>>batch modify task status(doing)>>>>>>");
                log.info("Over:>>>>>>batch modify task status(doing)>>>>>>");
    
                // 批量处理上传任务(上传文件到)
                log.info("Start:>>>>>>each process task(upload to)>>>>>>");
    CountDownLatch threadsSignal
    = new CountDownLatch(taskItems.size()); ExecutorService executor = Executors.newFixedThreadPool(taskItems.size()); List<Future<UploadFileToTaskResult>> resultLazyItems=new ArrayList<Future<UploadFileToTaskResult>>(); for (Task_UploadFileToTaskItem taskItem : taskItems) { // 使用future存储子线程执行后返回结果,必须在所有子线程都完成后才可以使用get(); // 如果在这里使用get(),会造成等待同步。 Future<UploadFileToTaskResult> future = executor.submit(new UploadFileToTask(threadsSignal,taskItem)); resultLazyItems.add(future); } // 等待所有并行子线程任务完成。 threadsSignal.await();
    executor.shutdown();//并不是终止线程的运行,而是禁止在这个Executor中添加新的任务  log.info(
    "Over:>>>>>>each process task(upload to)>>>>>>"); // 批量修改任务处理状态 Map<String, TaskGroupInfo> taskGroupItems=new HashMap<String, TaskGroupInfo>(); Map<Integer, TaskStatus> successTaskItems = new HashMap<Integer, TaskStatus>(); Map<Integer, TaskStatus> failTaskItems = new HashMap<Integer, TaskStatus>(); for(Future<UploadFileToTaskResult> future :resultLazyItems){ UploadFileToTaskResult result= future.get(); if(!taskGroupItems.containsKey(result.getTaskGroupkey())){ taskGroupItems.put(result.getTaskGroupkey(),result.getTaskGroupInfo()); } if(result.getStatus()== TaskStatus.Success){ successTaskItems.put(result.getTaskOID(),result.getStatus()); }else{ failTaskItems.put(result.getTaskOID(),result.getStatus()); } }
    •  参考资料:

    http://blog.csdn.net/wangmuming/article/details/19832865

    http://www.importnew.com/21312.html

  • 相关阅读:
    依赖反转Ioc和unity,autofac,castle框架教程及比较
    webform非表单提交时防xss攻击
    tfs分支操作
    防火墙入站出站规则配置
    前端流程图jsplumb学习笔记
    Js闭包学习笔记
    word中加入endnote
    Rest概念学习
    DRF的版本、认证、权限
    博客园自动生成目录
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/5827662.html
Copyright © 2011-2022 走看看