zoukankan      html  css  js  c++  java
  • JDK1.5引入的concurrent包

      并发是伴随着多核处理器的诞生而产生的,为了充分利用硬件资源,诞生了多线程技术。但是多线程又存在资源竞争的问题,引发了同步和互斥,并带来线程安全的问题。于是,从jdk1.5开始,引入了concurrent包来解决这些问题。

      java.util.concurrent 包是专为 Java并发编程而设计的包。

    在Java中,当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替进行,在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么称这个类是线程安全的。
    

        一般来说,concurrent包基本上由有3个package组成 : 

       java.util.concurrent:提供大部分关于并发的接口和类,如BlockingQueue,Callable,ConcurrentHashMap,ExecutorService, Semaphore等 ;
      java.util.concurrent.atomic:提供所有原子操作的类, 如AtomicInteger, AtomicLong等;
      java.util.concurrent.locks:提供锁相关的类, 如Lock, ReentrantLock, ReadWriteLock, Condition等。 

       concurrent包下的所有类可以分为如下几大类:

        locks部分:显式锁(互斥锁和速写锁)相关,如ReentrantLock,ReentrantReadWriteLock等;
        atomic部分:原子变量类相关,是构建非阻塞算法的基础,如AtomicInteger,AtomicBoolean,AtomicLong,AtomicReference等;
        executor部分:线程池相关,如ExecutorService,Callable,Future等;
        collections部分:并发容器相关,如BlockingQueue,Deque,ConcurrentMap等;
        tools部分:同步工具相关,如CountDownLatch,CyclicBarrier,Semaphore,Executors,Exchanger等。

      JUC的类图结构如下所示:

      concurrent包的优点有:
      ①功能丰富,诸如线程池(ThreadPoolExecutor),CountDownLatch等并发编程中需要的类已经有现成的实现,不需要自己去实现一套; 相比较而言,jdk1.4对多线程编程的主要支持几乎只有Thread, Runnable,synchronized等。synchronized和JDK5之后的Lock均是悲观锁(悲观锁一般是一个人在使用的时候,另一个人不能用,所以性能极低,所能支持的并发量就不高)。
      ②concurrent包里面的一些操作是基于硬件级别的CAS(compare and swap,比较再赋值),就是在cpu级别提供了原子操作,简单的说就是可以提供无阻塞、无锁定的算法; 而现代cpu大部分都是支持这种算法的。JUC(java.util.concurrent)是基于乐观锁的,既能保证数据不混乱,又能保证性能。

      version-(版本管理)就是基于乐观锁机制-->拿着我们期望的结果,和现有结果进行比对,如果是相同的,就赋值,如果不是相同的,就重试。
      CAS:有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。  
      CAS算法内部是通过JNI--native方法来实现: 由java底层的C语言或者C++实现。 

      一般情况下:同步容器是有使用价值的。有时候,我们的异步容器,比如ArrayList,在并发环境下,会有这些问题:①数据紊乱;②java.util.ConcurrentModificationException。这都是对于集合的读写状态不一致造成的问题。
       我们可以这样构建同步容器:   

         Collections.synchronizedList(new ArrayList<>());
         Collections.synchronizedMap()      
         Collections.synchronizedSet()
    

       上面的方法可以实现同步容器,但是使用了悲观锁,因而效率不高。
      使用JUC体系中提供的容器,如:ConcurrentHashMap,则有这样的优势:①不会出现同步问题,数据是正常的;②速度相对较快  ,就算没有hashmap快,也比hashtable快的多。其实,ConcurrentHashMap内部也是有同步的,在这方面面和hashtable没有区别。那么,ConcurrentHashMap快在哪里?主要是因为,其内部划分了很多segment区域,当不同的线程操作不同的segment的时候,其实还是一个异步操作;只有当不同线程操作同一个segment的时候,才会发生同步操作,所以速度很快。一个ConcurrentHashMap内部最多能有16个segment。

      我们接下来看一个非常有用的类CountDownLatch, 它是一个可以用来在一个进程中等待多个线程完成任务的类。在此给出一个应用场景:某个主线程接到一个任务,起了n个子线程去完成,但是主线程需要等待这n个子线程都完成任务以后才开始执行某个操作。详见代码:

    package com.itszt.test1;
    import java.util.concurrent.CountDownLatch;
    /**
     * 主线程会在启动的子线程完全结束后再继续执行
     */
    public class Test {
        public static void main(String[] args) {
            Test test = new Test();
            test.demoCountDown();
        }
    
        public void demoCountDown() {
            int count = 10;
            final CountDownLatch l = new CountDownLatch(count);
            for (int i = 0; i < count; ++i) {
                final int index = i;
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            Thread.currentThread().sleep(2 * 1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("thread -" + index + "- has finished...");
                        l.countDown();
                    }
                }).start();
            }
            try {
                l.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("now all threads have finished");
        }
    }
    

       执行结果如下所示:

    thread -9- has finished...
    thread -8- has finished...
    thread -0- has finished...
    thread -4- has finished...
    thread -1- has finished...
    thread -2- has finished...
    thread -5- has finished...
    thread -6- has finished...
    thread -3- has finished...
    thread -7- has finished...
    now all threads have finished
    

       接下来,我们再看下Atomic相关的类, 比如AtomicLong, AtomicInteger等。简单来说,这些类都是线程安全的,支持无阻塞无锁定的。

    package com.itszt.test1;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.atomic.AtomicLong;
    /**
     * 测试AtomicLong与long
     */
    public class AtomicTest {
        public static void main(String[] args) {
            AtomicTest test = new AtomicTest();
            test.testAtomic();
        }
    
        public void testAtomic() {
            final int loopcount = 10000;
            int threadcount = 10;
            final NonSafeSeq seq1 = new NonSafeSeq();
            final SafeSeq seq2 = new SafeSeq();
            final CountDownLatch l = new CountDownLatch(threadcount);
            for (int i = 0; i < threadcount; ++i) {
                final int index = i;
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int j = 0; j < loopcount; ++j) {
                            seq1.inc();
                            seq2.inc();
                        }
                        System.out.println("finished : " + index);
                        l.countDown();
                    }
                }).start();
            }
            try {
                l.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("both have finished....");
            System.out.println("NonSafeSeq:" + seq1.get());
            System.out.println("SafeSeq with atomic: " + seq2.get());
        }
        class NonSafeSeq {
            private long count = 0;
            public void inc() {
                count++;
            }
            public long get() {
                return count;
            }
        }
    
        class SafeSeq {
            private AtomicLong count = new AtomicLong(0);
            public void inc() {
                count.incrementAndGet();
            }
            public long get() {
                return count.longValue();
            }
        }
    }
    

      上述代码执行如下:

    finished : 0
    finished : 3
    finished : 2
    finished : 6
    finished : 9
    finished : 5
    finished : 8
    finished : 1
    finished : 4
    finished : 7
    both have finished....
    NonSafeSeq:98454
    SafeSeq with atomic: 100000
    

       其中,NonSafeSeq是作为对比的类,直接放一个private long count不是线程安全的,而SafeSeq里面放了一个AtomicLong,是线程安全的;可以直接调用incrementAndGet来增加。通过上述执行结果可以看到,10个线程,每个线程运行了10,000次,理论上应该有100,000次增加,使用了普通的long是非线程安全的,而使用了AtomicLong是线程安全的。需要注意的是,这个例子也说明,虽然long本身的单个设置是原子的,要么成功要么不成功,但是诸如count++这样的操作就不是线程安全的,因为这包括了读取和写入两步操作。

       在jdk 1.4时代,线程间的同步主要依赖于synchronized关键字,本质上该关键字是一个对象锁,可以加在不同的instance上或者class上。
      concurrent包提供了一个可以替代synchronized关键字的ReentrantLock,简单的说,你可以new一个ReentrantLock, 然后通过lock.lock和lock.unlock来获取锁和释放锁;需要注意的是,必须将unlock放在finally块里面。reentrantlock的好处有 :
      ①更好的性能;
      ②提供同一个lock对象上不同condition的信号通知;
      ③还提供lockInterruptibly这样支持响应中断的加锁过程,意思是说你试图去加锁,但是当前锁被其他线程hold住,然后你这个线程可以被中断。  

    package com.itszt.test1;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.locks.ReentrantLock;
    /**
     * 测试ReentrantLock
     */
    public class ReentrantLockTest {
        public static void main(String[] args) {
            ReentrantLockTest lockTest = new ReentrantLockTest();
            lockTest.demoLock();
        }
    
        public void demoLock() {
            final int loopcount = 10000;
            int threadcount = 10;
            final SafeSeqWithLock seq = new SafeSeqWithLock();
            final CountDownLatch l = new CountDownLatch(threadcount);
            for (int i = 0; i < threadcount; ++i) {
                final int index = i;
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int j = 0; j < loopcount; ++j) {
                            seq.inc();
                        }
                        System.out.println("finished : " + index);
                        l.countDown();
                    }
                }).start();
            }
            try {
                l.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("both have finished....");
            System.out.println("SafeSeqWithLock:" + seq.get());
        }
    
        class SafeSeqWithLock {
            private long count = 0;
            private ReentrantLock lock = new ReentrantLock();
    
            public void inc() {
                lock.lock();
                try {
                    count++;
                } finally {
                    lock.unlock();
                }
            }
    
            public long get() {
                return count;
            }
        }
    }
    

       上述代码执行如下:

    finished : 5
    finished : 3
    finished : 1
    finished : 8
    finished : 0
    finished : 6
    finished : 4
    finished : 2
    finished : 7
    finished : 9
    both have finished....
    SafeSeqWithLock:100000
    

       上述代码操作中,通过对inc操作加锁,保证了线程安全。

      concurrent包里面还提供了一个非常有用的锁,读写锁ReadWriteLock。  

    A ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing. 
    The read lock may be held simultaneously by multiple reader threads, so long as there are no writers.
    The write lock is exclusive.

       上述英文意思是说:读锁可以有很多个锁同时上锁,只要当前没有写锁; 写锁是排他的,上了写锁,其他线程既不能上读锁,也不能上写锁;同样,需要上写锁的前提是既没有读锁,也没有写锁。  

    package com.itszt.test1;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    /**
     * 测试读写锁
     */
    public class RWLockTest {
        public static void main(String[] args) {
            RWLockTest lockTest = new RWLockTest();
            lockTest.testRWLock_getw_onr();
        }
    
        public void testRWLock_getw_onr() {
            ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
            final Lock rlock = lock.readLock();
            final Lock wlock = lock.writeLock();
            final CountDownLatch l = new CountDownLatch(2);
            // start r thread,开启读锁
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " now to get rlock--获取读锁");
                    rlock.lock();
                    try {
                        Thread.currentThread().sleep(2 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " now to unlock rlock--释放读锁");
                    rlock.unlock();
                    l.countDown();
                }
            }).start();
            // start w thread,开启写锁
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " now to get wlock--获取写锁");
                    wlock.lock();
                    System.out.println(Thread.currentThread().getName() + " now to unlock wlock--释放写锁");
                    wlock.unlock();
                    l.countDown();
                }
            }).start();
            try {
                l.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " finished");
        }
    }
    

       上述代码执行如下:

    Thread-0 now to get rlock--获取读锁
    Thread-1 now to get wlock--获取写锁
    Thread-0 now to unlock rlock--释放读锁
    Thread-1 now to unlock wlock--释放写锁
    main finished
    

       ReadWriteLock的实现是ReentrantReadWriteLock,有趣的是,在一个线程中,读锁不能直接升级为写锁,但是写锁可以降级为读锁;这意思是说,如果你已经有了读锁,再去试图获得写锁,将会无法获得, 一直堵住了;但是如果你有了写锁,再去试图获得读锁,就没问题。

      下面是一段写锁降级的代码:

    public void testRWLock_downgrade() {
            ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
            Lock rlock = lock.readLock();
            Lock wlock = lock.writeLock();
            System.out.println("now to get wlock");
            wlock.lock();
            System.out.println("now to get rlock");
            rlock.lock();
            System.out.println("now to unlock wlock");
            wlock.unlock();
            System.out.println("now to unlock rlock");
            rlock.unlock();
            System.out.println("finished");
        }
    

       上述代码在main函数中执行后,结果如下:

    now to get wlock
    now to get rlock
    now to unlock wlock
    now to unlock rlock
    finished
    

       我们再看一段读锁升级的代码:

    public void testRWLock_upgrade() {
            ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
            Lock rlock = lock.readLock();
            Lock wlock = lock.writeLock();
            System.out.println("now to get rlock");
            rlock.lock();
            System.out.println("now to get wlock");
            wlock.lock();
            System.out.println("now to unlock wlock");
            wlock.unlock();
            System.out.println("now to unlock rlock");
            rlock.unlock();
            System.out.println("finished");
        }
    

       上述代码执行中,已经有了读锁,再去试图获得写锁,将会无法获得, 程序一直堵塞,进入死锁状态,显示如下:

    now to get rlock
    now to get wlock
    

       另外,CountDownLatch是一个同步的辅助类,允许一个或多个线程,等待其他一组线程完成操作,再继续执行。
      CyclicBarrier也是一个同步的辅助类,允许一组线程相互之间等待,达到一个共同点,再继续执行。
      
    CountDownLatch和CyclicBarrier都是Synchronization  aid,即“同步辅助器”,既然都是辅助工具,在使用中有什么区别,各自的使用场景如何?  

    CountDownLatch场景举例:一年级期末考试要开始了,监考老师发下去试卷,然后坐在讲台旁边玩着手机等待着学生答题,有的学生提前交了试卷,并约起打球了,等到最后一个学生交卷了,老师开始整理试卷,贴封条,下班,陪老婆孩子去了。
    启发:CountDownLatch很像一个倒计时锁,倒计时结束,另一个线程才开始执行。就如监考老师要结束监考工作,必须等待所有学生都交了试卷,监考工作才能进入结束环节。
    
    CyclicBarrier场景举例:公司组织户外拓展活动,帮助团队建设,其中最重要的一个项目就是要求全体员工(包括女同事,BOSS,一个都不能少)都能翻越一个高达四米,而且没有任何抓点的高墙,才能继续进行其他项目。
    启发:CyclicBarrier可以看成是个障碍,所有的线程必须到齐后才能一起通过这个障碍。   
      另外,concurrent包中线程池部分,请参考我的另一篇博文《ScheduledThreadExecutor定时任务线程池》
  • 相关阅读:
    git常用命令
    Mybatis文档收集
    RocketMQ安装及配置
    vs code 插件收集
    idea中RunDashboard显示
    Error running ‘JeecgSystemApplication‘: Command line is too long. Shorten command line for JeecgSys
    shell脚本 for循环实现文件和目录遍历
    linux一次性解压多个.gz或者.tar.gz文件
    CentOS7挂载磁盘,4T磁盘挂载方法
    windows 安装Nginx服务
  • 原文地址:https://www.cnblogs.com/lizhangyong/p/8902886.html
Copyright © 2011-2022 走看看