zoukankan      html  css  js  c++  java
  • Java多线程整理

    目录:

    1.volatile变量

    2.Java并发编程学习

    3.CountDownLatch用法

    4.CyclicBarrier使用

    5.BlockingQueue使用

    6.任务执行器Executor
    7.CompletionService使用
    8.ConcurrentHashMap使用
    9.Lock使用

     

    一、 volatile变量

      1.volatile原理:volatile的原理实际上是告诉处理器,不要把变量缓存在寄存器或者相对于其他处理器不可见的地方,而是把变量放在主存,每次读写操作都在主存上进行操作。另外,被申明为volatile的变量也不会与其它内存中的变量进行重排序。

      2.volatile同步:volatile是同步的一个子集,只保证了变量的可见性,但是不具备原子特性。这就是说线程能够自动发现 volatile 变量的最新值。相对于同步而言,volatile的优势:a.简易性,可以像使用其他变量一样使用volatile变量;b.volatile变量不会造成线程阻塞;c.如果读操作远远大于写操作,volatile 变量还可以提供优于锁的性能优势。

      3.正确使用volatile条件:对变量的写操作不依赖于当前值;该变量没有包含在具有其他变量的不变式中;

     
    /*
     * 对于第一条原则:对变量的写操作不依赖于当前值;
     * 虽然i++只有一条语句,实际上这条语句是分三步执行的,读入i,i加1,写入i;
     * 若在第三步执行过程前,其他线程对i进行了改动,此时的结果将是错的。因此即使使用了volatile进行控制,并不能保证这个操作是线程安全的。
    */
    
    private volatile int i=1;
    ... 
    i++;
    
    /*
     * 这类问题的解决方案有两种: 
     * 1.一种是采用synchronized进行同步控制,这显然违背了volatile的初衷
     * 2.一种是采用CPU原语进行控制。在jdk1.5之后,java.util.concurrent.atomic包下的很多类就是采用这种方式进行控制,这样可以在保持性能的情况下,保证数据的线程安全。
     */
     

    二、Java并发编程学习

      在java 并发编程实践中对线程安全的定义如下:当多个线程访问一个类时,如果不用考虑这些线程在运行时环境下的调度和交替运行,并且不需要额外的同步及在调用方代码不必做其他的协调,这个类的行为仍然是正确的,那么这个类就是线程安全的。完全由线程安全的类构成的程序不一定就是线程安全的。

     
    /*
     * 如下代码所示: 
     * 虽然Vector是一个线程安全的类,但是对于由Vector线程安全的方法组成上面的逻辑,显然不是线程安全的。 
     * 当然,由线程不安全的类构成的程序,也不一定不是线程安全的。
     */
    
    Vector v;
     
    if(!v.contains(o)){
    v.add(o);
    }
    
    /*
     * 微妙的可见性:
     * 当变量的读入写出被不同的线程共享时,必须使用同步。若不使用同步将有可能发生与直觉大相径庭的错误。
     */
    public class TestVisiable {
    
        static int x = 0, y = 0;
        static int b = 0, a = 0;
    
        public static void main(String[] args) throws InterruptedException {
            Thread t1 = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    a = 1;
                    x = b;
                }
            });
            Thread t2 = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    b = 1;
                    y = a;
                }
            });
            t1.start();
            t2.start();
            t1.join();
            t2.join();
            System.out.println(x + " " + y);
        }
    }
    
    /*
     * 由于没有同步,程序运行的可能结果为(1,0) (0,1),(1,1),然而,还有可能为(0,0),不可思议吧。 
     * 1.这是由于为了加快并发执行,JVM内部采用了重排序的机制导致。
     * 2.最低限的安全性在java中,java存储模型要求java变量的获取和存储都是原子操作,但是有两种类型的变量是比较特殊的,没有申明为volitale的long和double变量。 
     * 原因在于:
     * 1.在java中long和double变量为64位,jvm允许将64位的读写操作划分为两个32位的操作。
     * 2.当多个线程同时读写非volatile的long或者double类型数据时,将有可能得到数据是一个数的高32位和另一个数的低32位组成的数。
     * 3.这样的结果显然是完全不对的。因此对应long或double类型的数据用于多线程共享时,必须申明加上volatile或者进行同步。
     */
     

    三、CountDownLatch用法

      在jdk API中如下描述:一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。用给定的计数初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。

      CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。

     
    /*
     * 下面给出一个详细的应用场景 百米赛跑:
     * 比赛共有10名运动员参与,10名运动在起跑线上统一等待号令员发起起跑的号令,通过终点时号令员统计该运动员的时间,当所有运动员都跑到终点时,报告每个人的成绩。
     * 这里实际上就可以采用CountDownLatch来解决起跑线上设置一个CountDownLatch,当号令员没有发起起跑号令时,所有运动员都在起跑线上等待。
     * 在终点设置一个CountDownLatch,起跑后号令员一直等待,当所有运动员都通过终点后,它会报告成绩。 用代码实现如下:
     */
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.CountDownLatch;
    
    public class Commander {
    
        private final CountDownLatch     startSignal;                            // 起跑信号
        private final CountDownLatch     endSignal;                              // 终点信号
        private Long                     startTime;
        private final Map<Integer, Long> scoreMap = new HashMap<Integer, Long>();
    
        public void waitStart() {
            try {
                startSignal.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public synchronized void start() {
            startTime = System.currentTimeMillis();
            startSignal.countDown();
            try {
                endSignal.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public void reach(int id) {
            endSignal.countDown();
            scoreMap.put(id, System.currentTimeMillis() - startTime);// 统计时间
        }
    
        public Commander(int num){
            startSignal = new CountDownLatch(1);
            endSignal = new CountDownLatch(num);
        }
    
        public static void main(String[] args) {
            int runnerNum = 10;
            Commander c = new Commander(runnerNum);
            for (int i = 0; i < runnerNum; i++) {
                new Thread(new Runner(i + 1, c)).start();
            }
            c.start(); // 发起号令
            for (Integer i : c.scoreMap.keySet()) {
                System.out.println(i + "号运动员,耗时" + c.scoreMap.get(i));
            }
        }
    }
    
    class Runner implements Runnable {
    
        Commander commander;
        int       id;
    
        public Runner(int id, Commander commander){
            this.id = id;
            this.commander = commander;
    
        }
    
        @Override
        public void run() {
            commander.waitStart();
            try {
                Thread.sleep((long) (Math.random() * 1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            commander.reach(id);
        }
    }
     

    四、CyclicBarrier使用

      CyclicBarrier在jdk中描述:一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环的 barrier。

      实际上CyclicBarrier类似于CountDownLatch也是个计数器。不同的是,当线程调用await方法后,必须所有线程都到达了,才能分别进入后面的执行过程。一旦有一个线程未到达,所有线程都会等待,有点像一部电影的名字《一个都不能少》。

     
    /*
     * 考虑一个应用场景: 
     * 老师带领学生去春游,下午回来时,需要整队,然后统一坐车回去。 
     * 在这里,老师和已经到达的学生,必须等待所有学生归队后才能回去。 用代码实现如下:
     */
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    public class Teacher {
    
        private final CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() {
    
                                                @Override
                                                public void run() {
                                                    System.out.println("所有学生已经归队");
                                                }
                                            });
    
        public void comeBack(int i) {
            System.out.println(i + "已经归队。");
            try {
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            Teacher t = new Teacher();
            for (int i = 0; i < 20; i++) {
                new Thread(new Student(i + 1, t)).start();
            }
    
        }
    }
    
    class Student implements Runnable {
    
        private final int     id;
        private final Teacher teacher;
    
        public Student(int id, Teacher teacher){
            this.id = id;
            this.teacher = teacher;
        }
    
        @Override
        public void run() {
            try {
                Thread.sleep((long) (Math.random() * 1000));
                teacher.comeBack(id);
                System.out.println(id + "上车");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    }
     

    五、BlockingQueue 学习

      从名字上看,BlockingQueue是阻塞队列的意思。这个队列主要提供下面的功能:

      1.阻塞队列提供了可阻塞的take和put方法,另外可定时的poll和offer实际原理也是一样的。

      2.如果BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒,同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间时才会被唤醒继续操作。

      阻塞队列有一般又为两类:无限阻塞队列和有限阻塞队列。有限阻塞队列中,当队列满时,调用put方法将会阻塞;而无限阻塞队列中,put方法是不会阻塞的。很显然这个队列的一种最常用的场景就是:生产者-消费者 模式。

     
    /* 它有以下具体实现类:
     * ArrayBlockingQueue:采用数组作为存储队列的阻塞队列,这个队列采用FIFO的方式管理数据
     * LinkedBlockingQueue:采用链式结构作为存储队列,同样它也采用FIFO的方式管理数据
     * PriorityBlockingQueue:采用基于优先级堆的极大优先级队列作为存储队列。
     * SynchronousQueue:特殊的BlockingQueue,其中每个 put 必须等待一个 take,反之亦然。
     * DelayQueue:这是一个无限阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部。
     * 
     * 通过查看源代码可以看到这个类内部是采用PriorityQueue作为存储队列
     * DelayQueue的一些常用的场景
     * a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
     * b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
     * c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
     * */
     

    六、任务执行器Executor

      记得在大学的时候,有一次写程序时,需要创建很多的线程来处理各种socket请求,于是写了一个线程类,每出现一个socket请求,就创建一个线程。后来,老师指出,每次创建线程的开销比较大,可以将线程与具体的业务逻辑分离开来,然后用一个队列,保存一定量的线程,每次需要的时候就去取,不用的时候就还回去,这样可以循环使用,避免重复创建线程的开销,于是乎,对代码进行重构,写了大段的代码,发现以后需要用到多线程的地方都可要用它,后来在网上找资料才知道,那叫线程池。

      jdk1.5的升级,给我们带来一个很特殊的包java.util.concurrent,翻阅API,可以看到jdk中已经封装了线程池,简单两行便可实现。这个包中提供了Executor的接口,接口定义如下:

    void execute(Runnable command);

    传入一个runnable接口的实现,它便自动为我们创建线程和执行,任务的提交者再也不用为了并发执行,自己写一大段代码来创建并管理各种线程。扩展了Executor的有ExecutorServiceScheduledExecutorService,AbstractExecutorServiceScheduledThreadPoolExecutorThreadPoolExecutor.由于Runnable接口中只提供了一个不带返回值run方法,因此当任务需要返回值时,Executor就不能满足需求了,于是出现了ExecutorService,这个接口继承了Executor,对提交任务的接口进行了扩展,引入了Callable接口,该接口定义如下:

    public interface Callable<V> {
      V call() throws Exception;
    }

    同时接口将任务执行过程进行管理,分为三个状态,提交,shutdown,terminate。在AbstractExecutorService中可以看到submit(Callable c)的实现,实际上它会先创建一个Future对象,然后再调用execute(Runnable command)方法,执行任务。可以很明显的知道Future肯定是继承了Runnable接口。通过Future接口,我们可以获取由call方法调用的返回值。Executor接口的两个具体实现是ThreadPoolExecutor和ScheduledThreadPoolExecutor,通过名字可以看出,ThreadPoolExecutor是通过采用线程池来执行每个提交的任务,ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,主要用于满足延迟后运行任务,或者定期执行任务的需求。虽然,我们可以通过直接调用ThreadPoolExecutor和ScheduledThreadPoolExecutor的构造函数生成Executor,但是很多情况下,没有这个必要,因为jdk为我们作了简化工作,通过Executors这个工厂类,可以只需传入一些简单的参数,便可以得到我们需要的Executor对象。

    七、CompletionService学习

      若在采用Executor执行任务时,可用通过采用Future来获取单个任务执行的结果,在Future中提供了一个get方法,该方法在任务执行返回之前,将会阻塞。当向Executor提交批处理任务时,并且希望在它们完成后获得结果,如果用FutureTask,你可以循环获取task,并用future.get()去获取结果,若没有完成则阻塞,这对于对任务结果需要分别对待的时候是可行的。但是若所有task产生的结果都可以被同等看待,这时候采用前面这样的方式显然是不可行了,因为若当前的task没有完成,而后面的其它task已经完成,你也得等待,这个实效性不高。显然很多时候,统一组task产生的结果都应该是没有区别的,也就是满足上述第二种情况。这个时候咋办呢?jdk为我们提供了一个很好的接口CompletionService,这个接口的具体实现类是ExecutorCompletionService。该类中定义下面三个属性:

    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    executor由构造函数传入,aes只是用于生成Future对象。特别要注意是completionQueue。它维护了一个保存Future对象的BlockingQueue。当这个Future对象状态是结束的状态的时候,也就是task执行完成之后,会将它加入到这个Queue中。到底是在哪里将完成的Future加入到队列里面的呢?又是怎么知道task是什么时候结束的呢?在ExecutorCompletionService中定义了一个QueueingFuture类,该类的实现:

     
    private class QueueingFuture extends FutureTask<Void> {
            QueueingFuture(RunnableFuture<V> task) {
                super(task, null);
                this.task = task;
            }
            protected void done() { completionQueue.add(task); }
            private final Future<V> task;
        }
     

    可以看到在done方法中,它会把当前的task加入到阻塞队列中。追踪done方法可以看到,该方法定义在FutureTask中,默认实现为空,从注释可以看出,当Future的状态转为isDone的时候,就会调用该方法。调用端在调用CompletionService的take方法时,实际上调用的是BlockingQueue的take方法,由前面的学习中,我们知道,当队列中有内容时,该队列会立即返回队列中的对象,当队列为空时,调用线程将会阻塞。而只要有任务完成,调用线程就会跳出阻塞,获得结果。

    八、ConcurrentHashMap学习

      java中提供对HashMap是我们使用得比较多的一个类,单该类是非线程安全的,若处于多线程环境中,则需要通过synchronized关键字进行同步(通过查看Collections.synchronizedMap方法,可以知道,实际上该方法对实现也是通过synchronized关键字进行同步控制),虽然它能保证线程安全,但是,由于需要对整个map进行加锁,这样做的并发性能往往不是很理想,尤其是map中数据量比较大的时候。jdk1.5之后,java.util.concurrent包中提供了ConcurrentHashMap,一方面,该类自己保证了线程安全性,另一方面,该类也提供了一些复合操作的原子性接口。

      ConcurrentHashMap与HashMap一样,同样是哈希表,但是采用不同对锁机制--分离锁,即采用不同的锁来同步不同的数据块,以减少对锁对竞争。   在ConcurrentHashMap内部采用了一个包含16个锁对象对数组,每个锁负责同步hash Bucket的1/16,bucket中的每个对象通过它的hashCode计算得到它所在锁对象。假设hash算法的实现能够提供合理的扩展性,并且关键字能够以统一的方式访问,这会将对于锁的请求减少到原来的1/16.这种基于分离锁设计的技术实现能够使得ConcurrentHashMap支持16个并发的请求。

    九、Lock使用

      在java5.0以前都是采用synchronized关键字进行同步控制,所有对象都自动含有单一的锁,JVM负责跟踪对象被加锁的次数。如果一个对象被解锁,其计数变为0。在任务(线程)第一次给对象加锁的时候,计数变为1。每当这个相同的任务(线程)在此对象上获得锁时,计数会递增。只有首先获得锁的任务(线程)才能继续获取该对象上的多个锁。每当任务离开一个synchronized方法,计数递减,当计数为0的时候,锁被完全释放,此时别的任务就可以使用此资源。由于这些锁是由JVM来控制,因此也叫隐式锁。

      在jdk5.0以后,提供了显示锁,即Lock,可以说是对隐式锁的功能的扩展,主要有两个。一个是ReentrantLock,另一个是ReentrantReadWriteLock,前者是普通重入锁,后者是可重入的读写锁。这些锁提供了两种锁竞争机制:公平竞争和非公平竞争,公平竞争的实现实际上是采用一个队列保存等待的线程,当当前线程释放锁之后,取出队头的线程唤醒,使之可以获取锁。java中Lock接口的定义:

     
    public interface Lock {
        void lock();
        void lockInterruptibly() throws InterruptedException;
        boolean tryLock();
        boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
        void unlock();
        Condition newCondition();
    }
     

    Lock与synchronized的区别:

    1.synchronized是在JVM层面上实现的,不但可以通过一些监控工具监控synchronized的锁定,而且在代码执行时出现异常,JVM会自动释放锁。而Lock的释放必须由程序自己保证,常用的写法是把是把释放锁的代码写到try{}finally中,在finally块中释放锁。

    2.如果使用 synchronized ,如果A不释放,B将一直等下去,不能被中断,如果使用Lock,如果A不释放,可以使B在等待了足够长的时间以后,中断等待,而干别的事情。

    3.synchronize实际上是ReentrantLock和Condition的组合的简化版

    4.相对于synchonized独占锁,ReentrantReadWriteLock通过分离读锁和写锁,提供可共享的读锁提高读并发性能。

  • 相关阅读:
    今日总结
    今日总结
    今日总结
    今日总结
    今日总结
    java自学
    java自学
    Java自学
    Java自学
    java自学
  • 原文地址:https://www.cnblogs.com/android-blogs/p/5765090.html
Copyright © 2011-2022 走看看