zoukankan      html  css  js  c++  java
  • 线程的并发工具类

    Fork-Join(分而治之) 规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解 如何使用的流程图

     

    用法

    1.Fork/Join的同步用法同时演示返回结果值:统计整形数组中所有元素的和

    两个main方法,SumSingleThread类里的main是单线程求和,每次休眠一秒;SumByForkJoin类里使用forkandjoin进行求和

    下面是个生成随机整数数组的类

    import java.util.Random;
    
    /**
     * 产生一个整形的数组
     * */
    public class CreateArray {
        public static final int ARRAY_LENTH=1000;
        public static int[] createArray(){
            Random r = new Random();
            int[] result = new int[ARRAY_LENTH];
            for(int i=0;i<ARRAY_LENTH;i++){
                result[i]=r.nextInt(ARRAY_LENTH*3);
            }
            return result;
        }
    }
    import com.thread.demo.SleepTools;
    
    public class SumSingleThread {
        public static void main(String[] args) {
             int count = 0;
                int[] src =CreateArray.createArray();
    
                long start = System.currentTimeMillis();
                for(int i= 0;i<src.length;i++){
                    SleepTools.ms(1);
                    count = count + src[i];
                }
                System.out.println("The count is "+count
                        +" spend time:"+(System.currentTimeMillis()-start)+"ms");    
        }
    
    }
    public class SumByForkJoin {
        private static class SumTask extends RecursiveTask<Integer>{
            private static final int THRESHOLD = CreateArray.ARRAY_LENTH/10;
            private int[] src; //表示我们要实际统计的数组
            private int fromIndex;//开始统计的下标
            private int toIndex;//统计到哪里结束的下标
            public SumTask(int[] src, int fromIndex, int toIndex) {
                this.src = src;
                this.fromIndex = fromIndex;
                this.toIndex = toIndex;
            }
            /**
             * 这个是有返回值的,在compute方法中按照需要的逻辑写forkjoin逻辑
             * */
            @Override
            protected Integer compute() {
                //当满足阈值范围时,进入计算
                if(toIndex-fromIndex<THRESHOLD){
                    int count = 0;
                    for(int i=fromIndex;i<toIndex;i++){
                        count=count+src[i];
                    }
                    return count;
                }else{//不满足阈值时,继续拆分
                    int mid = (fromIndex+toIndex)/2;
                    SumTask left = new SumTask(src, fromIndex, mid);
                    SumTask right = new SumTask(src, mid+1, toIndex);
                    invokeAll(left, right);
                    return left.join()+right.join();
                    }
                }
            }
        
        public static void main(String[] args) {
            
              ForkJoinPool pool = new ForkJoinPool();
              int[] src = CreateArray.createArray();
    
              SumTask innerFind = new SumTask(src,0,src.length-1);
    
              long start = System.currentTimeMillis();
    
              pool.invoke(innerFind);//同步调用,就是这个方法执行完才会继续执行下面的sysout,所以以这个demo是同步的用法,异步调用的方法:execute(object)
              System.out.println("Task is Running.....");
    
              System.out.println("The count is "+innerFind.join()
                      +" spend time:"+(System.currentTimeMillis()-start)+"ms");
        }
       
    
    }

    Q:把循环求和中的sleep注掉,并且增大数组的长度,会发现,在小于一定长度时,单线程直接求和的速度比使用fork/jion快

    A:因为使用forkJoin时cpu会进行上下问切换操作,这个操作相比较于计算型操作其实更费时间

    2.Fork/Join的异步用法同时演示不要求返回值:遍历指定目录(含子目录)寻找指定类型文件

    import java.io.File;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveAction;
    
    public class FindFile extends RecursiveAction{
        private File path;
        public  FindFile(File path) {
            this.path=path;
            
        }
        
        @Override
        protected void compute() {
            List<FindFile> subTasks = new ArrayList<FindFile>();
            File[] files = path.listFiles();
            if(files!=null){
                
            
            for(File file:files){//循环文件路径
                if(file.isDirectory()){//判断是不是目录
                    subTasks.add(new FindFile(file));
                }else{
                    if(file.getAbsolutePath().endsWith("avi")){
                        System.out.println("找到对应文件:"+file.getAbsolutePath());
                    }
                }
            }
            if(!subTasks.isEmpty()){
                for(FindFile sub:invokeAll(subTasks)){//invokeAll的返回值和传入的值一样
                    sub.join();
                }
            }
            }
        }
        public static void main(String[] args) {
            try {
                ForkJoinPool pool = new ForkJoinPool();
                FindFile task = new FindFile(new File("D:/"));
    
                pool.execute(task);//异步调用
    
                System.out.println("Task is Running......");
                Thread.sleep(1);
                int otherWork = 0;
                for(int i=0;i<100;i++){
                    otherWork = otherWork+i;
                }
                System.err.println("Main Thread done sth......,otherWork="+otherWork);
                task.join();//阻塞的方法,此处是为了防止出现主线程走完,task被直接中断的情况
                System.out.println("Task end");
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }

    常用的并发工具类(直接放课程里的demo了,很详细了~)

    CountDownLatch

    作用:是一组线程等待其他的线程完成工作以后在执行,加强版join

    await用来等待,countDown负责计数器的减一

    import java.util.concurrent.CountDownLatch;
    
    import com.xiangxue.tools.SleepTools;
    
    /**
     *@author Mark老师   享学课堂 https://enjoy.ke.qq.com 
     *
     *类说明:演示CountDownLatch,有5个初始化的线程,6个扣除点,
     *扣除完毕以后,主线程和业务线程才能继续自己的工作
     */
    public class UseCountDownLatch {
        
        static CountDownLatch latch = new CountDownLatch(6);
    
        //初始化线程(只有一步,有4个)
        private static class InitThread implements Runnable{
    
            @Override
            public void run() {
                System.out.println("Thread_"+Thread.currentThread().getId()
                        +" ready init work......");
                latch.countDown();//初始化线程完成工作了,countDown方法只扣减一次;
                for(int i =0;i<2;i++) {
                    System.out.println("Thread_"+Thread.currentThread().getId()
                            +" ........continue do its work");
                }
            }
        }
        
        //业务线程
        private static class BusiThread implements Runnable{
    
            @Override
            public void run() {
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for(int i =0;i<3;i++) {
                    System.out.println("BusiThread_"+Thread.currentThread().getId()
                            +" do business-----");
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            //单独的初始化线程,初始化分为2步,需要扣减两次
            new Thread(new Runnable() {
                @Override
                public void run() {
                    SleepTools.ms(1);
                    System.out.println("Thread_"+Thread.currentThread().getId()
                            +" ready init work step 1st......");
                    latch.countDown();//每完成一步初始化工作,扣减一次
                    System.out.println("begin step 2nd.......");
                    SleepTools.ms(1);
                    System.out.println("Thread_"+Thread.currentThread().getId()
                            +" ready init work step 2nd......");
                    latch.countDown();//每完成一步初始化工作,扣减一次
                }
            }).start();
            new Thread(new BusiThread()).start();
            for(int i=0;i<=3;i++){
                Thread thread = new Thread(new InitThread());
                thread.start();
            }
    
            latch.await();
            System.out.println("Main do ites work........");
        }
    }

    CyclicBarrier

    让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏障时,屏障开放,所有被阻塞的线程会继续运行CyclicBarrier(int parties)

    CyclicBarrier(int parties, Runnable barrierAction),屏障开放,barrierAction定义的任务会执行

    CountDownLatch和CyclicBarrier辨析

    1、countdownlatch放行由第三者控制,CyclicBarrier放行由一组线程本身控制
    2、countdownlatch放行条件》=线程数,CyclicBarrier放行条件=线程数

    import java.util.Map;
    import java.util.Random;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.CyclicBarrier;
    
    /**
     *@author Mark老师   享学课堂 https://enjoy.ke.qq.com 
     *
     *类说明:CyclicBarrier的使用
     */
    public class UseCyclicBarrier {
        
        private static CyclicBarrier barrier 
            = new CyclicBarrier(5,new CollectThread());
        
        private static ConcurrentHashMap<String,Long> resultMap
                = new ConcurrentHashMap<>();//存放子线程工作结果的容器
    
        public static void main(String[] args) {
            for(int i=0;i<=4;i++){
                Thread thread = new Thread(new SubThread());
                thread.start();
            }
    
        }
    
        //负责屏障开放以后的工作
        private static class CollectThread implements Runnable{
    
            @Override
            public void run() {
                StringBuilder result = new StringBuilder();
                for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
                    result.append("["+workResult.getValue()+"]");
                }
                System.out.println(" the result = "+ result);
                System.out.println("do other business........");
            }
        }
    
        //工作线程
        private static class SubThread implements Runnable{
    
            @Override
            public void run() {
                long id = Thread.currentThread().getId();//线程本身的处理结果
                resultMap.put(Thread.currentThread().getId()+"",id);
                Random r = new Random();//随机决定工作线程的是否睡眠
                try {
                    if(r.nextBoolean()) {
                        Thread.sleep(2000+id);
                        System.out.println("Thread_"+id+" ....do something ");
                    }
                    System.out.println(id+"....is await");
                    barrier.await();
                    Thread.sleep(1000+id);
                    System.out.println("Thread_"+id+" ....do its business ");
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            }
        }
    }

    Semaphore

    控制同时访问某个特定资源的线程数量,用在流量控制

     

    import java.util.ArrayList;
    import java.util.HashSet;
    import java.util.List;
    import java.util.Set;
    import java.util.concurrent.Exchanger;
    
    /**
     *@author Mark老师   享学课堂 https://enjoy.ke.qq.com 
     *
     *类说明:Exchange的使用
     */
    public class UseExchange {
        private static final Exchanger<Set<String>> exchange 
            = new Exchanger<Set<String>>();
    
        public static void main(String[] args) {
    
            //第一个线程
            new Thread(new Runnable() {
                @Override
                public void run() {
                    Set<String> setA = new HashSet<String>();//存放数据的容器
                    try {
                        /*添加数据
                         * set.add(.....)
                         * */
                        setA = exchange.exchange(setA);//交换set
                        /*处理交换后的数据*/
                    } catch (InterruptedException e) {
                    }
                }
            }).start();
    
          //第二个线程
            new Thread(new Runnable() {
                @Override
                public void run() {
                    Set<String> setB = new HashSet<String>();//存放数据的容器
                    try {
                        /*添加数据
                         * set.add(.....)
                         * set.add(.....)
                         * */
                        setB = exchange.exchange(setB);//交换set
                        /*处理交换后的数据*/
                    } catch (InterruptedException e) {
                    }
                }
            }).start();
    
        }
    }

    Exchange

    两个线程间的数据交换

  • 相关阅读:
    RedisUtil
    CSS基础知识点笔记
    fdgfgfgfgf
    PerfMon Metrics Collector插件的Disks I/O使用总结
    Jmeter使用笔记之html报告扩展(一)
    Jmeter使用笔记之意料之外的
    Jmeter使用笔记之函数
    Jmeter使用笔记之组件的作用域
    css 初始化文件 全面
    vue-grid-layout 使用以及各项参数作用
  • 原文地址:https://www.cnblogs.com/keiyoumi520/p/14189884.html
Copyright © 2011-2022 走看看