zoukankan      html  css  js  c++  java
  • 多线程(3)

    forkjoin:

      1定义分而治之:适合大问题分解成规模相同无联系的小问题,(如果有联系就是动态规划),比如排序中的分治算法;其实快速,二分也可以分治
    
    这个代码时利用forkjoin实现归并排序
    package cn.enjoyedu.ch2.forkjoin.sort;
    
    import cn.enjoyedu.ch2.forkjoin.sum.MakeArray;
    
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveTask;
    
    public class test {
    
        //归并-合并
        static void merge(int[] arr, int left, int mid, int right) {
            int[] temp = new int[right - left + 1];
            int i = 0;
            int p1 = left;
            int p2 = mid + 1;
            while (p1 <= mid && p2 <= right) {
                temp[i++] = arr[p1] <= arr[p2] ? arr[p1++] : arr[p2++];
            }
            while (p1 <= mid) {
                temp[i++] = arr[p1++];
            }
            while (p2 <= right) {
                temp[i++] = arr[p2++];
            }
            for (int j = 0; j < temp.length; j++) {
                arr[left + j] = temp[j];
            }
        }
    
        public static void main(String[] args) {
    
            ForkJoinPool pool = new ForkJoinPool();
            int[] arr = MakeArray.makeArray();
            SumTaskTest s = new SumTaskTest(arr, 0, arr.length - 1);
    
            System.out.println("============================================");
            long start = System.currentTimeMillis();
    
            pool.invoke(s);
            System.out.println(" spend time:" + (System.currentTimeMillis() - start) + "ms");
            //展示排序后的结果
    //        for (int i = 0; i < arr.length; i++) {
    //            System.out.println(arr[i]);
    //        }
        }
    
        private static class SumTaskTest extends RecursiveTask<Void> {
            int arr[];
            int left;
            int right;
    
            public SumTaskTest(int[] arr, int left, int right) {
                this.arr = arr;
                this.left = left;
                this.right = right;
            }
    
            @Override
            protected Void compute() {
                int mid = (left + right) / 2;
                if (left == right) {
                    merge(arr, left, mid, right);
                } else {
    
                    SumTaskTest s1 = new SumTaskTest(arr, left, mid);
                    SumTaskTest s2 = new SumTaskTest(arr, mid + 1, right);
                    invokeAll(s1,s2);
                    s1.join();
                    s2.join();
                    merge(arr, left, mid, right);
                }
                return null;
            }
        }
    
    
    }
    =========================创建数组的工具类==================
    package cn.enjoyedu.ch2.forkjoin.sum;
    
    import java.util.Random;
    
    public class MakeArray {
        //数组长度
        public static final int ARRAY_LENGTH  = 40000000;
        public final static int THRESHOLD = 47;
    
        public static int[] makeArray() {
    
            //new一个随机数发生器
            Random r = new Random();
            int[] result = new int[ARRAY_LENGTH];
            for(int i=0;i<ARRAY_LENGTH;i++){
                //用随机数填充数组
                result[i] =  r.nextInt(ARRAY_LENGTH*3);
            }
            return result;
    
        }
    }
    
    
    
      2 forkjoin的语法
            1. RecursiveAction,用于没有返回结果的任务 2. RecursiveTask,用于有返回值的任务 task 要通过 ForkJoinPool 来执行,使用 submit 或 invoke 提交,两者的区
            别是:invoke 是同步执行,调用之后需要等待任务完成,才能执行后面的代码; submit 是异步执行。 join()和 get 方法当任务完成的时候返回计算结果。
    

    countdownlatch

      1定义:闭锁或者叫发令枪,当计数器为0时,在计数器上等待的线程就可以执行了
      2以用场景:当想测试最大并发数时
      3语法coutdownlatch.countDown() 计数器减一
      4例子
    
    package cn.enjoyedu.ch2.tools;
    
    import jdk.internal.org.objectweb.asm.tree.TryCatchBlockNode;
    import jdk.jfr.events.SocketWriteEvent;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.CyclicBarrier;
    
    //countdownlatch
    public class test {
    
      static CountDownLatch downLatch = new CountDownLatch(1);
    
      static class A implements Runnable{
          public void run() {
              downLatch.countDown();
          }
      }
    
        static class B implements Runnable{
            public void run() {
                try {
                    downLatch.await();
                    System.out.println("downLatch...........");
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
    
            }
        }
        public static void main(String[] args) {
            new Thread(new A()).start();
            new Thread(new B()).start();
        }
    }
    
    

    cyclicbarrier 循环屏障

      1定义:和计数器类似,不同之处是他在所有线程完成后,可以都完成的那个节点设置处理之前线程所完成的结果的任务
      2例子:等待所有线程添加id,都完成后让另一个线程打印
    
    package cn.enjoyedu.ch2.tools;
    
    import jdk.internal.org.objectweb.asm.tree.TryCatchBlockNode;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.CyclicBarrier;
    
    //cyclicbarrier
    public class test {
    
        public static ConcurrentHashMap<String,Long> concurrentHashMap = new ConcurrentHashMap<String, Long>();
        static CyclicBarrier barrier = new CyclicBarrier(4,new collectionAdd());
    
    
        public static void main(String[] args) {
            for (int i = 0; i < 4; i++) {
                new Thread(new subThread()).start();
            }
        }
        public static class collectionAdd implements Runnable{
            public void run() {
                StringBuffer sb = new StringBuffer();
                for (Map.Entry<String,Long> map : concurrentHashMap.entrySet()){
                    sb.append("["+map.getValue()+"]");
                }
                System.out.println(sb);
            }
        }
    
        public static class subThread implements Runnable{
    
            public void run() {
                long id = Thread.currentThread().getId();
                concurrentHashMap.put(id + "",id);
                try {
                    Thread.sleep(1000+id);
                    System.out.println("Thread_"+id+" ....do its business ");
                    barrier.await();
                }catch (Exception e){
    
                }
            }
        }
    }
    
    

    semaphore

      1定义:用来控制特定资源线程访问数量
      2常用方法:availablePermits(),当前信号量中许可证数量;getQueueLength() 获取当前队列长度(等待许可证的线程数量)
      3例子:实现连接池线程数控制
    
    package cn.enjoyedu.ch2.tools.semaphore;
    
    import cn.enjoyedu.tools.SleepTools;
    
    import java.sql.Connection;
    import java.util.LinkedList;
    import java.util.Random;
    import java.util.concurrent.Semaphore;
    
    public class Test {
        static final int POOLSIZE = 10;
        static final Semaphore useful = new Semaphore(10),useless = new Semaphore(0);
        static LinkedList<Connection> pools = new LinkedList<Connection>();
        static{
            for (int i = 0; i < POOLSIZE ; i++) {
                pools.add(SqlConnectImpl.fetchConnection());
            }
        }
    
    
    //归还链接
        static void returnConnection(Connection connection){
            if(connection != null){
                System.out.println(Thread.currentThread().getId()+ "当前可用连接数"+useful.availablePermits());
                System.out.println(Thread.currentThread().getId()+ "当前等待队列长度"+useful.getQueueLength());
                try {
                    useless.acquire();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                synchronized (pools){
                    pools.addLast(connection);
                }
                useful.release();
            }
        }
    
        static Connection takeConnection (){
            Connection connection;
            try {
                useful.acquire();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            synchronized (pools){
               connection =  pools.removeFirst();
            }
            useless.release();
            return connection;
        }
    
    static class test111 implements Runnable{
        public void run() {
            Connection connection = Test.takeConnection();
            Random r = new Random();
            int time = r.nextInt(1000);
            try {
                Thread.sleep(time);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            System.out.println("持有链接时间" + time );
            Test.returnConnection(connection);
        }
    }
    
        public static void main(String[] args) {
            for (int i = 0; i < 50; i++) {
                new Thread(new test111()).start();
            }
        }
    
    }
    
    

    cas

      1定义:采用cpu cas 机制实现乐观锁,通过自旋的方式保证必定成功,(会影响cpu,但0.6ns一次影响不大)
      2aba问题:举个例子,你有一杯水,被小明喝了一半,但是他加满了,等你看到的时候还是一杯水,但水已经不是原先的水了,但不影响你喝水
      old值 被别的线程修改了成了 oldA ,但是当你使用的时候,又改回old值的,cas的时候没有发现他改变
      解决办法:版本号
      3常用的cas工具类,又数字类,引用类,数组类,文件类很少用。
  • 相关阅读:
    springboot Filter中无法注入Bean对象的解决办法
    springboot 2.x 采用监控模块
    Spring Cloud Alibaba项目构建(一)
    spring boot完成图片上传下载的功能
    scrapy初探
    RESETFUL四种方式提交区别
    qt TCP UDP-多线程笔记
    [‘1‘,‘2‘,‘3‘].map(parseInt)结果讲解
    安装nprogress进度条插件
    vue项目中扫二维码跳转页面---前端实现过程
  • 原文地址:https://www.cnblogs.com/xiaoshahai/p/13083054.html
Copyright © 2011-2022 走看看