zoukankan      html  css  js  c++  java
  • 《Java核心技术》---- 多线程

    API:

    java.lang.Object

    • void notifyAll() 解除那些在该对象上调用wait方法的线程的阻塞状态。该方法只能在同步方法或同步块内部调用。如果当前线程不是对象锁的持有者,该方法抛出一个IllegalMonitorStateException异常。
    • void nofity() 随机选择一个在该对象上调用wait方法的线程,解除其阻塞状态。
    • void wait() 导致线程进入等待状态直到它被通知。该方法只能在同步方法或同步块内部调用。如果当前线程不是对象锁的持有者,该方法抛出一个IllegalMonitorStateException异常。
    • void wait(long millis)
    • void wait(long millis, int nanos) 导致线程进入等待状态直到它被通知或经过指定的时间。

    java.lang.Thread

    • static void sleep(long millis) 休眠给定的毫秒数。
    • Thread(Runnable target) 构造一个新线程,用于调用给定target的run()方法。
    • void start() 启动这个线程,将引发调用run()方法。这个方法将立即返回,并且新线程将并行运行。
    • void run()调用关联Runnable的run方法。
    • void interrupt() 向线程发送中断请求。线程的中断状态将被设置为true。如果目前该线程被一个sleep调用阻塞,那么, InterruptedException异常被抛出。
    • static boolean interrupted() 测试当前线程(即正在执行这一命令的线程)是否被中断。这是一个静态方法。这一调用会产生副作用,它将当前的中断状态重置为false.
    • boolean isInterrupted() 测试线程是否被终止。不像静态的中断方法,这一调用不改变线程的中断状态。
    • static Thread currentThread() 返回代表当前执行线程的Thread对象。
    • void setPriority(int newPriority) 设置线程优先级,一般使用Thread.NORM_PRIORITY.
    • static void MIN_PRIORITY 最小优先级,值为1。
    • static int NORM_PRIORITY 值为5。
    • static int MAX_PRIORITY 最大优先级,值为10。
    • static void yield() 导致当前执行线程处于让步状态。如果有其他的可运行线程具有至少与此线程同样高的优先级,那么这些线程接下来会被调度。注意,这是一个静态方法。
    • void setDaemon(boolean isDaemon) 标识该线程为守护线程或用户线程,这一方法必须在线程启动之前调用。
    • static void setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler) 设置未捕获异常的默认处理器。
    • static Thread.UncaughtExceptionHandler getDefaultUncaughtExceptionHandler() 获取未捕获异常的默认处理器。
    • void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler) 设置未捕获异常的处理器
    • Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() 获取未捕获异常的处理器。

    java.lang.Thread.UncaughtExceptionHandler

    • void uncaughtException(Thread t, Throwable e) 当一个线程因为捕获异常而终止,但规定要将客户报告记录到日志中。

    java.lang.ThreadLocal<T>

    • T get() 得到这个线程的当前值。如果是首次调用get,会调用initialize()来得到这个值。
    • protected initialize() 应该覆盖这个方法来提供一个初始值。默认情况下,这个方法返回null。
    • void set<T t> 为这个线程设置一个新值。
    • void remove() 删除对应这个线程的值。

    java.lang.Runnable

    • void run() 必须覆盖这个方法,并在这个方法中提供所要执行的任务指令。

    java.util.concurrent.ArrayBlockingQueue<E>

    • ArrayBlockingQueue(int capacity)
    • ArrayBlockingQueue(int capacity, boolean fair) 构造一个带有指定的容量和公平性设置的阻塞队列。该队列用循环数组实现。

    java.util.concurrent.LinkedBlockingQueue<E>

    • LinkedBlockingQueue() 构造一个无上限的阻塞队列或双向队列,用链表实现。
    • LinkedBlockingQueue(int capacity)
    • LinkedBlockingQueue(int capacity, boolean fair) 构造一个带有指定的容量的阻塞队列或双向队列。该队列用链表实现。

    java.util.concurrent.DelayQueue<E extends Delayed>

    • DelayQueue() 构造一个包含Delayed元素的无界的阻塞时间有限的阻塞队列。只有那些延迟已经超过时间的元素可以从队列中移出。
    • getDelay(TimeUnit unit) 得到该对象的延迟,用给定的时间单位进行度量。

    java.util.concurrent.PriorityBlockingQueue<E>

    • PriorityBlockingQueue()
    • PriorityBlockingQueue(int initialCapacity)
    • PriorityBlockingQueue(int initialCapacity, Comparator<? super E>) 构造一个无边界阻塞优先队列,用堆实现。

    java.util.concurrent.BlockingQueue<E>

    • void put(E element) 添加元素,在必要时阻塞
    • E take() 移出并返回头元素,必要时阻塞
    • boolean offer(E element, long time, TimeUnit unit) 添加给定的元素,如果成功返回true,必要时阻塞,直至元素已经被添加或超时
    • E poll(long time, TimeUnit unit) 移出并返回头元素,必要时阻塞,直至元素可用或超时完成。失败时返回null。

    java.util.concurrent.BlockingDeque<E>

    • void putFirst(E element)
    • void putLast<E element) 添加元素,必要时阻塞
    • E takeFirst()
    • E takeLast() 移出并返回头元素或尾元素,必要时阻塞
    • boolean offerFirst(E element, long time, TimeUnit unit)
    • boolean offerLast(E element, long time, TimeUnit unit) 添加给定元素,成功返回true,必要时阻塞直至元素被添减或超时。
    • E pollFirst(long time, TimeUnit unit)
    • E pollLast(long time, TimeUnit unit) 移动并返回头元素或尾元素,必要时阻塞,直到元素可用或超时,失败时返回null。

    java.util.concurrent.TransferQueue<E>

    • void transfer(E element)
    • boolean tryTransfer(E element, long time, TimeUnit unit) 传输一个值,或者尝试在给定的超时时间内传输这个值,这个调用将阻塞,直到另一个线程将元素删除。第二个方法会在调用成功时返回true。

    java.util.concurrent.locks.Lock

    • void lock() 获取这个锁;如果锁同时被另一个线程拥有则发生阻塞。
    • void unlock() 释放这个锁。
    • Condition newCondition() 返回一个与该锁相关的条件对象。
    • boolean tryLock() 尝试获得锁而没有发生阻塞;如果成功返回真。
    • boolean tryLock(long time, TimeUnit unit) 尝试获得锁,阻塞时间不会超过给定的值,如果成功返回true。
    • void lockInterruptibly() 获得锁,但是会不确定地发生阻塞。如果线程被中断,抛出一个InterruptedException.

    java.util.concurrent.locks.Condition

    • void await() 将该线程放到条件的等待集中。
    • void signalAll() 解除该条件的等待集中的所有线程的阻塞状态。
    • void signal() 从该条件的等待集中随机地选择一个线程,解除其阻塞状态。
    • boolean await(long time, TimeUnit unit) 进入该条件的等待集,直到线程从等待集移出或等待了制定的时间后解除阻塞。如果因为等待时间到了而返回,会返回false,否则返回true。
    • void awaitUninterruptibly() 进入该条件的等待集,直到线程从等待集移出才解除阻塞。如果线程被中断,该方法不会抛出InterruptedException异常。

    java.util.concurrent.locks.ReentrantLock

    • ReentrantLock() 构建一个可以被用来保护临界区的可重入锁。
    • ReentrantLock(boolean fair) 构建一个带有公平策略的锁。一个公平锁偏爱等待时间最长的线程。但是,这一公平的保证将大大降低性能。所以,默认情况下,锁没有被强制为公平的。

    java.util.concurrent.locks.ReentrantReadWriteLock

    • Lock readLock() 得到一个可以被多个读操作共用的读锁,但会排斥所有写操作。
    • Lock writeLock() 得到一个写锁,排斥所有其他的读操作和写操作。

    java.utl.concurrent.ThreadLocalRandom

    • static ThreadLocalRandom current() 返回特定于当前线程的Random类实例。eg. int random = ThreadLocalRandom.current().nextInt(upperBound);

    并发编程中的三个概念:

    1,原子性,即一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。

    2,可见性,指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。

    3,有序性,即程序执行的顺序按照代码的先后顺序执行。

    一,中断线程

      可以调用interrupt()方法发送中断请求。但是,如果线程被阻塞,就无法检测中断状态。这是产生InterruptedException异常的地方。当在一个被阻赛的线程(调用sleep或wait)上调用interrupt方法时,阻塞调用将会被InterruptedException中断。

    发出中断请求并不意味着线程立即会终止,发送请求只是要引起线程注意,要求中断的线程可以决定如何处理这个中断请求。某些线程会处理完异常后,继续执行,而不理会中断。但是更普遍的情况是,线程将简单地将中断作为一个终止的请求。即如下代码:

    public void run() {
        try {
            ...
            while(!Thread.currentThread().isInterrupted() && more work to check) {
                do more work;
            }
        } catch (InterruptedException ex) {
            // Thread was interrupted during sleep or wait;
        } finally {
            //cleanup, if required;
        }
        // exiting the run method terminates the thread
    }

    如果在线程调用sleep方法后,isInterrupted检测没有必要也没用。如果在中断状态调用sleep方法,它也不会休眠。相反,它将清除这一状态并抛出InterruptedException。

    二,线程状态

    • New(新创建)
    • Runnable(可运行)
    • Blocked(被阻塞)
    • Waiting(等待)
    • Timed waiting(计时等待)
    • Terminated(被终止)

    1,新创建线程

      new Thread(r), 当一个线程处于新建状态时,程序还没有开始运行线程中的代码。

    2,可运行线程

      一旦调用start方法,线程处于runnable状态。一个可运行的线程可能正在运行也可能没有运行。

    一旦一个线程开始运行,它不必始终保持运行。抢占式调度系统给每一个可运行线程一个时间片来执行任务,当时间片用完,操作系统剥夺该线程的运行权,并给另一个线程运行机会。

    3,被阻塞线程和等待线程

      当线程处于被阻塞或等待状态时,它暂时不活动。

      进入被阻塞或等待状态的情况:

    •   当一个线程试图获取一个内部的对象锁,而该锁被其他线程持有,则该线程进入阻塞状态。当所有其他线程释放该锁,并且线程调度器允许该线程持有这把锁时,该线程将变成非阻塞状态。
    •       When the thread waits for another thread to notify the scheduler of a condition, it enters the waiting state.
    •   有一些方法有一个超时参数,调用它们导致线程进入计时等待状态。这一状态将一直保持到超时期满或者接收到适当的通知。这些方法有:Thread.sleep(), Object.wait(), Thread.join(), Lock.tryLock以及Condition.await()

    三,线程属性

    1,线程优先级

    在java中,每个线程有一个优先级。默认情况下,一个线程继承它的父线程的优先级。可以使用setPriority方法提高或降低任何一个线程的优先级。可以将优先级设置为MIN_PRIORITY(在Thread类中定义为1)与MAX_PRIORITY(定义为10)之间的任何值。NORM_PRIORITY被定义为5。

    2,守护线程

    在Java中有两类线程:用户线程 (User Thread)、守护线程 (Daemon Thread)。

    所谓守护 线程,是指在程序运行的时候在后台提供一种通用服务的线程,比如垃圾回收线程就是一个很称职的守护者,并且这种线程并不属于程序中不可或缺的部分。因 此,当所有的非守护线程结束时,程序也就终止了,同时会杀死进程中的所有守护线程。反过来说,只要任何非守护线程还在运行,程序就不会终止。

    用户线程和守护线程两者几乎没有区别,唯一的不同之处就在于虚拟机的离开:如果用户线程已经全部退出运行了,只剩下守护线程存在了,虚拟机也就退出了。 因为没有了被守护者,守护线程也就没有工作可做了,也就没有继续运行程序的必要了。

    可以通过调用 t.setDaemon(true) 将线程转换为守护线程。但需要注意以下几点: 

    (1) thread.setDaemon(true)必须在thread.start()之前设置,否则会跑出一个IllegalThreadStateException异常。你不能把正在运行的常规线程设置为守护线程。 

    (2) 在Daemon线程中产生的新线程也是Daemon的。

    (3) 守护线程应该永远不去访问固有资源,如文件、数据库,因为它会在任何时候甚至在一个操作的中间发生中断。

    3,未捕获异常处理器

    线程的run方法不能抛出任何被检测的异常,也就是说各个线程需要自己把自己的checked exception处理掉。这一点是通过java.lang.Runnable.run()方法声明(因为此方法声明上没有throw exception部分)进行了约束。

    但是,线程还是有可能抛出unchecked exception, 当这类异常被抛出时,线程就会终结,而对于朱线程和其他线程完全不受影响,且完全感知不到某个线程抛出的异常。在Java中,线程方法的异常,都应该在先车观念代码run方法内进行try-catch并处理掉,换句话说,我们不能捕获从线程中逃逸的异常。

    不过,在线程死亡之前,异常被传递到一个用于未被捕获异常的处理器。该处理器必须属于一个实现Thread.UncaughtExceptionHandler接口的类。这个接口只有一个方法:

    void uncaughtException(Thread t, Throwable e)

    可以用setUncaughtExceptionHandler方法为任何线程安装一个处理器。也可以用Thread类的静态方法setDefaultUncaughtExceptionHandler为所有线程安装一个默认的处理器。

    package com.ivy.thread;
    
    import java.lang.Thread.UncaughtExceptionHandler;
    
    public class ThreadExceptionTest {
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            Thread t = new Thread(new ExceptionThread());
            t.setUncaughtExceptionHandler(new MyUncheckedExceptionhandler());
            t.start();
        }
    
    }
    
    class MyUncheckedExceptionhandler implements UncaughtExceptionHandler {
    
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            // TODO Auto-generated method stub
            System.out.println("caught exception:" + e);
        }
        
        
    }
    
    class ExceptionThread implements Runnable {
    
        @Override
        public void run() {
            // TODO Auto-generated method stub
            throw new RuntimeException("throw runtime exception");
        }
        
    }

    四,同步

    前题:

    package com.ivy.thread.unsynch;
    
    public class Bank {
    
        private final double[] accounts;
        
        public Bank(int n, double initialBalance) {
            accounts = new double[n];
            for (int i=0; i< accounts.length; i++) {
                accounts[i] = initialBalance;
            }
        }
        
        public void transfer(int from, int to, double amount) {
            if (accounts[from] < amount) 
                return;
            System.out.println(Thread.currentThread());
            accounts[from] -= amount;
            System.out.printf("%10.2f from %d to %d", amount, from, to);
            accounts[to] += amount;
            System.out.printf(" Total Balance : %10.2f%n", getTotalBalance());
        }
        
        public double getTotalBalance() {
            double sum = 0;
            
            for(double a : accounts) {
                sum += a;
            }
            return sum;
        }
        
        public int size() {
            return accounts.length;
        }
    }
    package com.ivy.thread.unsynch;
    
    public class TransferRunnable implements Runnable{
    
        private Bank bank;
        private int fromAccount;
        private double maxAmount;
        private int DELAY = 10;
        
        public TransferRunnable(Bank b, int from, double max) {
            bank = b;
            fromAccount = from;
            maxAmount = max;
        }
    
        @Override
        public void run() {
            try {
                while(true) {
                    int toAccount = (int)(bank.size() * Math.random());
                    double amount = maxAmount * Math.random();
                    bank.transfer(fromAccount, toAccount, amount);
                    Thread.sleep((int)(DELAY * Math.random()));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
        }
    }
    package com.ivy.thread.unsynch;
    
    public class UnsynchBankTest {
    
        public static final int NACCOUNTS = 100;
        public static final double INITIAL_BALANCE = 1000;
        
        public static void main(String[] args) {
            // TODO Auto-generated method stub
    
            Bank b = new Bank(NACCOUNTS, INITIAL_BALANCE);
            for (int i=0; i<NACCOUNTS; i++) {
                TransferRunnable r = new TransferRunnable(b, i, INITIAL_BALANCE);
                Thread t = new Thread(r);
                t.start();
            }
        }
    
    }

    1,竞争条件

    当两个线程试图同时更新同一个账户的时候,就出现了竞争条件,结果会有误差,原因是操作不是原子性的。

    Java提供了两种防止代码块受并发访问的干扰的方式:

    • synchronized关键字
    • 使用锁和条件对象

    2,锁对象

    myLock.lock();
    try {
       // do something
    }
    finally {
        myLock.unlock();
    }

    这一结构确保任何时刻只有一个线程进入临界区。一旦一个线程持有了锁对象,其他任何线程都无法通过lock语句。当其他线程调用lock时,它们被阻塞,直到第一个线程释放锁对象。

    使用锁来保护Bank类的transfer方法:

    public class Bank {
    
        private Lock bankLock = new ReentrantLock();
        ...
        
        public void transfer(int from, int to, double amount) {
            bankLock.lock();
            try {
                if (accounts[from] < amount) 
                    return;
                System.out.println(Thread.currentThread());
                accounts[from] -= amount;
                System.out.printf("%10.2f from %d to %d", amount, from, to);
                accounts[to] += amount;
                System.out.printf(" Total Balance : %10.2f%n", getTotalBalance());
            }
            finally {
                bankLock.unlock();
            }
        }
        
        ...
    }

    每个Bank对象有自己的ReentrantLock对象,如果两个线程试图访问同一个Bank对象,那么锁以串行方式提供服务。但是如果两个线程访问不同的Bank对象,每个线程得到不同的锁对象,两个线程都不会发生阻塞。

    锁是可重入的,因为线程可以重复地获得已经持有的锁。锁保持一个持有计数来跟踪对lock方法的嵌套调用。线程在每一次调用lock都要调用unlock来释放锁。由于这一特性,被一个锁保护的代码可以调用另一个使用相同的锁的方法。

    3,条件对象

    线程进入临界区,却发现在某一条件满足之后它才能执行。要使用一个条件对象来管理那些已经获得了一个锁但是却不能做有用工作的线程。条件对象经常被称为条件变量。

    当帐户没有足够余额时,需要等待直到另一个线程向帐户中注入资金。但是,这一线程刚刚获得了对bankLock的排他性访问,因此别的线程没有进行存款操作的机会。所以我们需要条件对象来解决这个问题。

    一个锁对象可以有一个或多个相关的条件对象。可以用newCondition方法获得一个条件对象。如下:

    class Bank {
        private Condition sufficientFunds;
        
        public Bank() {
            sufficientFunds = bankLock.newCondition();
        }
    }

    如果A线程在运行transfer()发现余额不足时,会调用sufficientFunds.await(),这时A线程就被阻塞,并且放弃了锁。于是另一个线程B就可以拥有这把锁,并进行增加帐户余额的操作。

    等待获得锁的线程B和调用await()方法的线程A存在本质上的不同。一旦一个线程(例如A线程)调用了await()方法,它进入该条件的等待集。当锁可用时,该线程A不能马上解除阻塞。相反,它处于阻塞状态,直到另一个线程(线程B)调用同一条件上的signalAll方法为止。

    所以当B线程转帐时,应该调用sufficientFunds.signalAll();来重新激活因为这一条件而等待的所有线程(线程A)。当这些线程从等待集当中移出时,他们再次成为可运行的,调度器将再次激活它们。同时,它们会试图重新进入原来的对象。一旦锁可用,它们中的某个将从await调用返回,获得该锁并从被阻塞的地方继续执行。

    至关重要的是最终需要某个其他线程调用signalAll方法。当一个线程调用await方法时,它没办法重新激活其自身,只能依赖于其他线程。如果没有其他线程来重新激活等待的线程,它就永远不再运行了。一般,在对象的状态有可能使等待线程的方向改变时调用signalAll()比较恰当。

    public class Bank {
    
        private Lock bankLock = new ReentrantLock();
        private Condition sufficientFunds = bankLock.newCondition();
        
            ...
        public void transfer(int from, int to, double amount) {
            bankLock.lock();
            try {
                while (accounts[from] < amount) {
                    sufficientFunds.await();
                }
                System.out.println(Thread.currentThread());
                accounts[from] -= amount;
                System.out.printf("%10.2f from %d to %d", amount, from, to);
                accounts[to] += amount;
                System.out.printf(" Total Balance : %10.2f%n", getTotalBalance());
                sufficientFunds.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            finally {
                bankLock.unlock();
            }
        }
        
        ...
    }
        

    注意调用signalAll不会立即激活一个等待线程。它仅仅解除等待线程的阻塞,以便这些线程可以在当前线程退出同步方法之后,通过竞争实现对对象的访问。

    另一个方法signal(), 是随机解除等待集中某个线程的阻塞状态。这比解除所有线程的阻塞更加有效,但也存在危险。如果随机选择的线程发现自己仍然不能运行,那么它再次被阻塞。如果没有其他线程再次调用signal,那么系统就死锁了。

    当一个线程拥有某个条件的锁时,它仅仅可以在该条件上调用await/signalAll或signal方法。

    总结一下:

    • 锁用来保护代码片断,任何时刻只能由一个线程执行被保护的代码。
    • 锁可以管理试图进入被保护代码段的线程。
    • 锁可以拥有一个或多个相关的条件对象。
    • 每个条件对象管理那些已经进入被保护的代码段但还不能运行的线程。

    4,synchronized关键字

    Java中的每个对象都有一个内部锁。如果一个方法用synchronized关键字声明,那么对象的锁将保护整个方法。也就是说,要调用该方法,线程必须获得内部的锁对象。

    换句话说,

    public synchronized void method() {
        method body;
    }

    等价于:

    public void method() {
        this.intrinsicLock.lock();
        try {
            method body;
        }
        finally{
            this.intrinsicLock.unlock();
        }
    }

    内部对象锁只有一个相关条件。wait()方法添加一个线程到等待集中,nontifyAll/notify方法解除等待线程的阻塞状态。

    改进后的transfer()方法: 

    public synchronized void transfer2(int from, int to, double amount) {
            bankLock.lock();
            try {
                while (accounts[from] < amount) {
                    wait();
                }
                System.out.println(Thread.currentThread());
                accounts[from] -= amount;
                System.out.printf("%10.2f from %d to %d", amount, from, to);
                accounts[to] += amount;
                System.out.printf(" Total Balance : %10.2f%n", getTotalBalance());
                notifyAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            finally {
                bankLock.unlock();
            }
        }

    5,同步阻塞

    每个Java对象有一个锁。线程可以通过调用同步方法获得锁。还可以通过进入一个同步阻塞获得锁。

    当线程进入如下形式的阻塞,就会获得obj的锁。

    synchronized(obj) {
        critical section;
    }

    例如: 

    public class Bank {
        private final double[] accounts;
        private Object lock = new Object();
    
        ...
        
        public void transfer(int from, int to, int amount) {
            synchronized(lock) {
                accounts[from] -= amount;
                accounts[to] += amount;
            }
            System.out.printf(" Total Balance : %10.2f%n", getTotalBalance());
        }
        
        ...
    
    }

    lock对象被创建仅仅是用来使用每个Java对象持有的锁。通过使用一个对象的锁来实现程序控制的原子操作。

    6,监视器

    监视器具有的特性:

    • 监视器是只包含私有域的类。
    • 每个监视器类的对象有一个相关的锁。
    • 使用该锁对所有的方法进行加锁。换句话说,如果调用obj.method(),那么obj对象的锁是在方法调用开始时自动获得,并且当方法返回时自动释放该锁。因为所有的域是私有的,这样的安排可以确保一个线程在对对象操作时,没有其他线程能访问该域。
    • 该锁可以有任意多个相关条件。

    7,Volatile关键字

    i,volatile关键字的两层语义:

    一旦一个共享变量(类的成员变量,类的静态成员变量)被volatile修饰后,就具备了两层语义:

    • 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这个新值对其他线程来说是立即可见的。
    • 禁止进行指令重排序。

    先看一段代码,假如线程1先执行,线程2后执行:

    //线程1
    boolean stop = false;
    while(!stop){
    	doSomething();
    }
    
    //线程2
    stop = true;
    

     这段代码是很典型的一段代码,很多人在中断线程时可能都会采用这种标记办法。但是事实上,这段代码会完全运行正确么?即一定会将线程中断么?不一定,也许在大多数时候,这个代码能够把线程中断,但是也有可能会导致无法中断线程(虽然这个可能性很小,但是只要一旦发生这种情况就会造成死循环了)。

    下面解释一下这段代码为何有可能导致无法中断线程。在前面已经解释过,每个线程在运行过程中都有自己的工作内存,那么线程1在运行的时候,会将stop变量的值拷贝一份放在自己的工作内存当中。

    那么当线程2更改了stop变量的值之后,但是还没来得及写入主存当中,线程2转去做其他事情了,那么线程1由于不知道线程2对stop变量的更改,因此还会一直循环下去。但是用volatile修饰之后就变得不一样了:

    • 使用volatile关键字会强制将修改的值立即写入主存;
    • 使用volatile关键字的话,当线程2进行修改时,会导致线程1的工作内存中环村变量stop的缓存行无效。
    • 由于线程1的工作内存中缓存变量stop的缓存行无效,所以线程1再次读取stop的值回去主存读取。所以拿到的就是最新的值。

    所以volatile保证了多线程的可见性

    ii,volatile并不保证原子性

    再看一个例子:

    public class Test {
    	public volatile int inc = 0;
    	
    	public void increase() {
    		inc++;
    	}
    	
    	public static void main(String[] args) {
    		final Test test = new Test();
    		for(int i=0;i<10;i++){
    			new Thread(){
    				public void run() {
    					for(int j=0;j<1000;j++)
    						test.increase();
    				};
    			}.start();
    		}
    		
    		while(Thread.activeCount()>1)  //保证前面的线程都执行完
    			Thread.yield();
    		System.out.println(test.inc);
    	}
    }
    

    运行它会发现每次运行结果都不一致,都是一个小于10000的数字。

    这里面就有一个误区了,volatile关键字能保证可见性没有错,但是上面的程序错在没能保证原子性。可见性只能保证每次读取的是最新的值,但是volatile没办法保证对变量的操作的原子性。

      在前面已经提到过,自增操作是不具备原子性的,它包括读取变量的原始值、进行加1操作、写入工作内存。那么就是说自增操作的三个子操作可能会分割开执行,就有可能导致下面这种情况出现:

      假如某个时刻变量inc的值为10,

      线程1对变量进行自增操作,线程1先读取了变量inc的原始值,然后线程1被阻塞了;

      然后线程2对变量进行自增操作,线程2也去读取变量inc的原始值,由于线程1只是对变量inc进行读取操作,而没有对变量进行修改操作,所以不会导致线程2的工作内存中缓存变量inc的缓存行无效,所以线程2会直接去主存读取inc的值,发现inc的值时10,然后进行加1操作,并把11写入工作内存,最后写入主存。

      然后线程1接着进行加1操作,由于已经读取了inc的值,注意此时在线程1的工作内存中inc的值仍然为10,所以线程1对inc进行加1操作后inc的值为11,然后将11写入工作内存,最后写入主存。

      那么两个线程分别进行了一次自增操作后,inc只增加了1。

      解释到这里,可能有朋友会有疑问,不对啊,前面不是保证一个变量在修改volatile变量时,会让缓存行无效吗?然后其他线程去读就会读到新的值,对,这个没错。这个就是上面的happens-before规则中的volatile变量规则,但是要注意,线程1对变量进行读取操作之后,被阻塞了的话,并没有对inc值进行修改。然后虽然volatile能保证线程2对变量inc的值读取是从内存中读取的,但是线程1没有进行修改,所以线程2根本就不会看到修改的值。

      根源就在这里,自增操作不是原子性操作,而且volatile也无法保证对变量的任何操作都是原子性的。

      把上面的代码改成以下任何一种都可以达到效果:

      采用synchronized:

    public class Test {
        public  int inc = 0;
        
        public synchronized void increase() {
            inc++;
        }
        
        public static void main(String[] args) {
            final Test test = new Test();
            for(int i=0;i<10;i++){
                new Thread(){
                    public void run() {
                        for(int j=0;j<1000;j++)
                            test.increase();
                    };
                }.start();
            }
            
            while(Thread.activeCount()>1)  //保证前面的线程都执行完
                Thread.yield();
            System.out.println(test.inc);
        }
    }
    复制代码
    public class Test {
        public  int inc = 0;
        
        public synchronized void increase() {
            inc++;
        }
        
        public static void main(String[] args) {
            final Test test = new Test();
            for(int i=0;i<10;i++){
                new Thread(){
                    public void run() {
                        for(int j=0;j<1000;j++)
                            test.increase();
                    };
                }.start();
            }
            
            while(Thread.activeCount()>1)  //保证前面的线程都执行完
                Thread.yield();
            System.out.println(test.inc);
        }
    }
    复制代码

      采用Lock:

    public class Test {
        public  int inc = 0;
        Lock lock = new ReentrantLock();
        
        public  void increase() {
            lock.lock();
            try {
                inc++;
            } finally{
                lock.unlock();
            }
        }
        
        public static void main(String[] args) {
            final Test test = new Test();
            for(int i=0;i<10;i++){
                new Thread(){
                    public void run() {
                        for(int j=0;j<1000;j++)
                            test.increase();
                    };
                }.start();
            }
            
            while(Thread.activeCount()>1)  //保证前面的线程都执行完
                Thread.yield();
            System.out.println(test.inc);
        }
    }

    复制代码
    public class Test {
        public  int inc = 0;
        Lock lock = new ReentrantLock();
        
        public  void increase() {
            lock.lock();
            try {
                inc++;
            } finally{
                lock.unlock();
            }
        }
        
        public static void main(String[] args) {
            final Test test = new Test();
            for(int i=0;i<10;i++){
                new Thread(){
                    public void run() {
                        for(int j=0;j<1000;j++)
                            test.increase();
                    };
                }.start();
            }
            
            while(Thread.activeCount()>1)  //保证前面的线程都执行完
                Thread.yield();
            System.out.println(test.inc);
        }
    }
    复制代码

      采用AtomicInteger:

    public class Test {
        public  AtomicInteger inc = new AtomicInteger();
         
        public  void increase() {
            inc.getAndIncrement();
        }
        
        public static void main(String[] args) {
            final Test test = new Test();
            for(int i=0;i<10;i++){
                new Thread(){
                    public void run() {
                        for(int j=0;j<1000;j++)
                            test.increase();
                    };
                }.start();
            }
            
            while(Thread.activeCount()>1)  //保证前面的线程都执行完
                Thread.yield();
            System.out.println(test.inc);
        }
    }

    复制代码
    public class Test {
        public  AtomicInteger inc = new AtomicInteger();
         
        public  void increase() {
            inc.getAndIncrement();
        }
        
        public static void main(String[] args) {
            final Test test = new Test();
            for(int i=0;i<10;i++){
                new Thread(){
                    public void run() {
                        for(int j=0;j<1000;j++)
                            test.increase();
                    };
                }.start();
            }
            
            while(Thread.activeCount()>1)  //保证前面的线程都执行完
                Thread.yield();
            System.out.println(test.inc);
        }
    }
    复制代码

      在java 1.5的java.util.concurrent.atomic包下提供了一些原子操作类,即对基本数据类型的 自增(加1操作),自减(减1操作)、以及加法操作(加一个数),减法操作(减一个数)进行了封装,保证这些操作是原子性操作。atomic是利用CAS来实现原子性操作的(Compare And Swap),CAS实际上是利用处理器提供的CMPXCHG指令实现的,而处理器执行CMPXCHG指令是一个原子性操作。

    iii,volatile保证有序性

    在前面提到volatile关键字能禁止指令重排序,所以volatile能在一定程度上保证有序性。

    volatile关键字禁止指令重排序有两层意思:

      1)当程序执行到volatile变量的读操作或者写操作时,在其前面的操作的更改肯定全部已经进行,且结果已经对后面的操作可见;在其后面的操作肯定还没有进行;

      2)在进行指令优化时,不能将在对volatile变量访问的语句放在其后面执行,也不能把volatile变量后面的语句放到其前面执行。

    可能上面说的比较绕,举个简单的例子:

    //x、y为非volatile变量
    //flag为volatile变量
    
    x = 2;        //语句1
    y = 0;        //语句2
    flag = true;  //语句3
    x = 4;         //语句4
    y = -1;       //语句5
    

    由于flag变量为volatile变量,那么在进行指令重排序的过程的时候,不会将语句3放到语句1、语句2前面,也不会讲语句3放到语句4、语句5后面。但是要注意语句1和语句2的顺序、语句4和语句5的顺序是不作任何保证的。并且volatile关键字能保证,执行到语句3时,语句1和语句2必定是执行完毕了的,且语句1和语句2的执行结果对语句3、语句4、语句5是可见的。

    iv,使用volatile的场景

    synchronized关键字是防止多个线程同时执行一段代码,那么就会很影响程序执行效率,而volatile关键字在某些情况下性能要优于synchronized,但是要注意volatile关键字是无法替代synchronized关键字的,因为volatile关键字无法保证操作的原子性。通常来说,使用volatile必须具备以下2个条件:

      1)对变量的写操作不依赖于当前值

      2)该变量没有包含在具有其他变量的不变式中

      实际上,这些条件表明,可以被写入 volatile 变量的这些有效值独立于任何程序的状态,包括变量的当前状态。

      事实上,上面的2个条件需要保证操作是原子性操作,才能保证使用volatile关键字的程序在并发时能够正确执行。

    8,死锁

    有可能会因为每个线程要等待锁而导致所有线程都被阻塞。这样的状态称为死锁。遗憾的是,Java编程语言没有任何东西可以避免或打破死锁现象,必须仔细设计程序,以确保不会出现死锁。

    9,线程局部变量

    在线程间共享变量有风险,所以有时候要避免共享变量,使用ThreadLocal辅助类为各个线程提供各自的实例。Java的ThreadLocal不是设计用来解决多线程安全问题的,事实证明也解决不了,共享变量a还是会被随意更改。ThreadLocal无能为力。所以,一般用ThreadLocal都不会将一个共享变量放到线程的ThreadLocal中。一般来讲,存放到ThreadLocal中的变量都是当前线程,本身就独一无二的一个变量。其他线程本身就不能访问,存到ThreadLocal中只是为了方便在程序中同一个线程之间传递这个变量。 

    10,锁测试与超时

    线程在调用lock()方法来获得另一个线程所持有的锁的时候,很可能发生阻塞。应该更加谨慎地申请锁。tryLock()方法试图申请一个锁,在成功获得锁后返回true,否则,立即返回false,而且线程可以立即离开去做其他事情。

    if(myLock.tryLock()) {
        //now the thread owns the lock
        try {...}
        finally {myLock.unlock();}
    } else {
        //do something else
    }

    也可以调用tryLock时,使用超时参数: 

    if (myLock.tryLock(100, TimeUnit.MILLISECONDS))

    如果调用带有超时参数的tryLock,那么线程在等待期间被中断,将抛出InterruptedException异常。这是一个非常有用的特性,因为允许程序打破死锁。也可以调用lockInterruptibly方法,它相当于一个超时设为无限的tryLock方法。

    在等待一个条件时,也可以提供一个超时:

    myCondition.await(100, TimeUnit.MILLISECONDS)

    如果一个线程被另一个线程通过调用signalAll或signal激活,或者超时时限已达到,或者线程被中断,那么await方法将返回。

    11,读/写锁

    如果很多线程从一个数据结构读取数据而很少线程修改其中数据的话,ReentrantReadWriteLock类非常有用。

    下面是使用读/写锁的必要步骤:

      a. 构造一个ReentrantReadWriteLock对象

    private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

      b, 抽取读锁和写锁

    private Lock readLock = rwl.readLock();
    private Lock writeLock = rwl.writeLock();

      c, 对所有的获取方法加读锁:

    public double getTotalBalance() {
        readLock.lock();
        try {...}
        finally { readLock.unlock();}
    }

      d, 对所有的修改方法加写锁:

    public void transfer(...) {
        writeLock.lock();
        try {...}
        finally {writeLock.unlock();}
    }

    五,阻塞队列

    阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列,下图展示了如何通过阻塞队列来合作:



    线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素

    从5.0开始,JDK在java.util.concurrent包里提供了阻塞队列的官方实现。尽管JDK中已经包含了阻塞队列的官方实现,但是熟悉其背后的原理还是很有帮助的。

    阻塞队列的实现类似于带上限的Semaphore的实现。下面是阻塞队列的一个简单实现

    public class BlockingQueue {
    
        private List queue = new LinkedList();
    
        private int  limit = 10;
    
        public BlockingQueue(int limit){
    
            this.limit = limit;
    
        }
    
        public synchronized void enqueue(Object item) throws InterruptedException {
    
            while(this.queue.size() == this.limit) {
    
                wait();
    
            }
    
            if(this.queue.size() == 0) {
    
                notifyAll();
    
            }
    
            this.queue.add(item);
    
        }
    
        public synchronized Object dequeue() throws InterruptedException{
    
            while(this.queue.size() == 0){
    
                wait();
    
            }
    
            if(this.queue.size() == this.limit){
    
                notifyAll();
    
            }
    
            return this.queue.remove(0);
    
        }
    
    }                    

    必须注意到,在enqueue和dequeue方法内部,只有队列的大小等于上限(limit)或者下限(0)时,才调用notifyAll方法。如果队列的大小既不等于上限,也不等于下限,任何线程调用enqueue或者dequeue方法时,都不会阻塞,都能够正常的往队列中添加或者移除元素。

    六,线程安全的集合

    java.util.concurrent包提供了映射表/有序集合队列的高效实现:ConcurrentHashMap/ConcurrentSkipListMap/ConcurrentSkipListSet/ConcurrentLinkedQueue.

    七,Callable 与 Future

    在前面的文章中我们讲述了创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。

    而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

    Callable与Runnable

    Runnable是一个接口,在它里面只声明了一个run()方法,由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果:

    public interface Runnable {
        public abstract void run();
    }
    

    Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call(),可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型:

    public interface Callable<V> {
        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }
    

     那么怎么使用Callable呢?一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本:

    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    

    第一个submit方法里面的参数类型就是Callable。一般情况下我们使用第一个submit方法和第三个submit方法,第二个submit方法很少使用。

    Future

    Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

      Future类位于java.util.concurrent包下,它是一个接口:

    public interface Future<V> {
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        V get() throws InterruptedException, ExecutionException;
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

       在Future接口中声明了5个方法,下面依次解释每个方法的作用:

    • cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
    • isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
    • isDone方法表示任务是否已经完成,若任务完成,则返回true;
    • get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
    • get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

      也就是说Future提供了三种功能:

      1)判断任务是否完成;

      2)能够中断任务;

      3)能够获取任务执行结果。

      因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

    FutureTask

    我们先来看一下FutureTask的实现:

    public class FutureTask<V> implements RunnableFuture<V>
    

       FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现:

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();
    }
    

       可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

      FutureTask提供了2个构造器:

    public FutureTask(Callable<V> callable) {
    }
    public FutureTask(Runnable runnable, V result) {
    }
    

      事实上,FutureTask是Future接口的一个唯一实现类。

    Callable&Future 示例:

    package com.ivy.thread.unsynch;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class MyCallable implements Callable<String>{
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            ExecutorService executor = Executors.newFixedThreadPool(10);
            List<Future<String>> futures = new ArrayList<>();
            MyCallable myCallable = new MyCallable();
            for (int i = 0; i<100; i++) {
                Future<String> future = executor.submit(myCallable);
                futures.add(future);
            }
            
            for (Future<String> f : futures) {
                try {
                    System.out.println(f.get());
                } catch (InterruptedException | ExecutionException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            executor.shutdown();
        }
    
        @Override
        public String call() throws Exception {
            // TODO Auto-generated method stub
            Thread.sleep(1000);
            return Thread.currentThread().getName();
        }
    
    }

    结果:

    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-3
    pool-1-thread-4
    pool-1-thread-5
    pool-1-thread-6
    pool-1-thread-7
    pool-1-thread-8
    pool-1-thread-9
    pool-1-thread-10
    pool-1-thread-2

    ...

    Callable&FutureTask示例:

    package com.ivy.thread.unsynch;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.FutureTask;
    
    public class MyCallableFutureTask implements Callable<String>{
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            ExecutorService executor = Executors.newFixedThreadPool(10);
            MyCallable myCallable = new MyCallable();
            List<FutureTask<String>> futures = new ArrayList<>();
            
            for (int i = 0; i<100; i++) {
                FutureTask<String> task = new FutureTask<>(myCallable);
                executor.submit(task);
                futures.add(task);
            }
            
            for (FutureTask<String> f : futures) {
                try {
                    System.out.println(f.get());
                } catch (InterruptedException | ExecutionException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            executor.shutdown();
        }
    
        @Override
        public String call() throws Exception {
            // TODO Auto-generated method stub
            Thread.sleep(1000);
            return Thread.currentThread().getName();
        }
    
    }

    九,执行器

    线程池中包含许多准备运行的空闲线程。将Runnable对象交给线程池,就会有一个线程调用run方法,当run方法退出时,线程不会死亡,而是在池中准备为下一个请求提供服务。另一个使用线程池的理由是减少并发线程的数目。创建大量线程会大大降低性能甚至使虚拟机崩溃。如果有个会创建许多线程的算法,应该使用一个线程数固定的线程池以限制并发线程的总数。

    执行器类有许多静态工厂方法来构建线程池:

    • newCachedThreadPool 必要时创建新线程;空闲线程会被保留60秒
    • newFixedThreadPool 该池包含固定数量的线程;空闲线程会一直被保留
    • newSingleThreadExecutor 只有一个线程的池,该线程顺序执行每个提交的任务
    • newScheduledThreadPool 用于预定执行而构建的固定线程池,代替Timer
    • newSingleThreadScheduledExecutor 用于预定执行而构建的单线程池

    线程池

    前三个方法返回的是实现了ExecutorService接口的ThreadPoolExecutor类的对象,ExecutorService接口有三个方法:

    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);

    一般情况下我们使用第一个submit方法和第三个submit方法,第二个submit方法很少使用。

    当用完一个线程池的时候,调用shutdown。该方法启动该线程的关闭序列。被关闭的执行器不再接受新的任务。当所有任务都完成以后,线程池中的线程死亡。另一种方法是调用shutdownNow,这时线程池会取消尚未开始的所有任务并试图中断正在运行的线程。

    在使用连接池时应该做的事情,示例见Callable&Future示例:

    1. 调用Executors类中静态方法newCachedThreadPool或newFixedThreadPool.
    2. 调用submit提交Runnable或Callable对象。
    3. 如果想要取消一个任务,或如果提交Callable对象,那就要保存好返回的Future对象。
    4. 当不再提交任何任务时,调用shutdown.

    预定执行

    ScheduledExecutorService接口具有为预定执行或重复执行任务而设计的方法。它是一种允许使用线程池机制的Timer的泛化。Executors类的newScheduledThreadPool和newSingleThreadScheduledExecutor方法返回实现了ScheduledExecutorService接口的对象。可以预定Runnable或Callable在初始的延迟后只运行一次,也可以预定一个Runnable对象周期性地运行。

    Fork-Join框架

    什么是fork/join框架

      fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能;设计的目的是为了处理那些可以被递归拆分的任务。

      fork/join框架与其它ExecutorService的实现类相似,会给线程池中的线程分发任务,不同之处在于它使用了工作窃取算法,所谓工作窃取,指的是对那些处理完自身任务的线程,会从其它线程窃取任务执行。

      fork/join框架的核心是ForkJoinPool类,该类继承了AbstractExecutorService类。ForkJoinPool实现了工作窃取算法并且能够执行 ForkJoinTask任务。

    基本使用方法

      在使用fork/join框架之前,我们需要先对任务进行分割,任务分割代码应该跟下面的伪代码类似:

    if (任务足够小){
      直接执行该任务;
    }else{ 将任务一分为二; 执行这两个任务并等待结果;
    }

      首先,我们会在ForkJoinTask的子类中封装以上代码,不过一般我们会使用更加具体的ForkJoinTask类型,如 RecursiveTask(可以返回一个结果)或RecursiveAction

      当写好ForkJoinTask的子类后,创建该对象,该对象代表了所有需要完成的任务;然后将这个任务对象传给ForkJoinPool实例的invoke()去执行即可。

    示例:

    package com.ivy.thread;
    
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveTask;
    
    public class ForkJoinTest {
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            final int SIZE = 10000000;
            double[] numbers = new double[SIZE];
            for(int i=0;i<SIZE;i++) {
                numbers[i] = Math.random();
            }
            Counter counter = new Counter(numbers, 0, numbers.length, new Filter() {
    
                @Override
                public boolean accept(double t) {
                    return t > 0.5;
                }
                
            });
            
            ForkJoinPool pool = new ForkJoinPool();
            pool.invoke(counter);
            System.out.println(counter.join());
        }
    
    }
    
    interface Filter {
        boolean accept(double t);
    }
    
    class Counter extends RecursiveTask<Integer> {
    
        public static final int THRESHOLD = 1000;
        private double[] values;
        private int from;
        private int to;
        private Filter filter;
        
        public Counter(double[] values, int from, int to, Filter filter) {
            this.values = values;
            this.from = from;
            this.to = to;
            this.filter = filter;
        }
        @Override
        protected Integer compute() {
            if(to-from < THRESHOLD) {
                int count = 0;
                for(int i=from;i<to; i++) {
                    if (filter.accept(values[i])) {
                        count ++;
                    }
                }
                return count;
            } else {
                int mid =(from+to)/2;
                Counter first = new Counter(values, from, mid, filter);
                Counter second = new Counter(values, mid, to, filter);
                invokeAll(first, second);
                return first.join() + second.join();
            }
        }
        
    }
  • 相关阅读:
    将单向链表按某值划分为左边小、中间相等、右边大的形式
    数组中的数字按某值划分为左边小、中间相等、右边大的形式
    Kendo UI for jQuery管理数据有妙招!轻松将数据存储为JSON
    DevExpress Xamarin.Forms v21.1
    界面控件Telerik UI for WinForm初级教程
    WPF应用程序的主题颜色如何修改?DevExpress调色板工具很好用
    DevExpress WinForm模板库可快速创建Windows样式的应用程序界面
    Kendo UI for jQuery数据管理使用教程:Spreadsheet
    开发框架DevExtreme入门级教程
    跨平台.NET应用程序界面开发新亮点
  • 原文地址:https://www.cnblogs.com/IvySue/p/6803259.html
Copyright © 2011-2022 走看看