(一)创建线程
要想明白线程机制,我们先从一些基本内容的概念下手。
线程和进程是两个完全不同的概念,进程是运行在自己的地址空间内的自包容的程序,而线程是在进程中的一个单一的顺序控制流,因此,单个进程可以拥有多个线程。
还有就是任务和线程的区别。线程似乎是进程内的一个任务,准确点讲,任务是由执行线程来驱动的,而任务是附着在线程上的。
1、现在正式讲讲线程的创建。
正如我们前面讲的,任务是由执行线程驱动的,没有附着任务的线程根本就不能说是线程,所以我们在创建线程的时候,将任务附着到线程上。
所谓的任务,对应的就是Runnable,我们要在这个类中编写相应的run()方法来描述这个任务所要执行的命令,接着就是将任务附着到线程上。像是这样:
Thread thread = new Thread(new Runnable(){ @Override public void run(){ ... } });
接着我们只要通过start()启动该Thread就行。
2、线程的启动
一种方式就是上面使用的:创建一个Thread的子类,然后实现run()方法,接着同样是通过start()来开启它。注意,Thread的子类只能承载一个任务。
另外一种方式就是通过Executor(执行器)来实现。
Executor会在客户端和任务之间提供一个间接层,由这个间接层来执行任务,并且允许管理异步任务的执行,而无需通过显式的管理线程的生命周期。
ExecutorService exec = Executors.newCachedThreadPool();
exec.executor(new RunnableClass);
其中,CachedThreadPool是一种线程池。
3、线程池
线程池在多线程处理技术中是一个非常重要的概念,它会将任务添加到队列中,然后在创建线程后自动启动这些任务。线程池的线程都是后台线程,每个线程都是用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。线程池中的线程数目是有一个最大值,但这并不意味着只能运行这样多的线程,它的真正意思是同时能够运行的最大线程数目,所以可以等待其他线程运行完毕后再启动。
线程池都有一个线程池管理器,用于创建和管理线程池,还有一个工作线程,也就是线程池中的线程。我们必须提供给线程池中的工作线程一个任务,这些任务都是实现了一个任务接口,也就是Runnable。线程池还有一个重要的组成:任务队列,用于存放没有处理的任务,这就是一种缓冲机制。
通过线程池的介绍,我们可以知道,使用到线程池的情况就是这样:需要大量的线程来完成任务,并且完成任务的时间比较短,就像是我们现在的服务器,同时间接受多个请求并且处理这些请求。
java除了上面的CachedThreadPool,还有另一种线程池:FixedThreadPool。CachedThreadPool会在执行过程中创建与所需数量相同的线程,然后在它回收旧线程的时候停止创建新的线程,也就是说,它每次都要保证同时运行的线程的数量不能超过所规定的最大数目。而FixedThreadPool是一次性的预先分配所要执行的线程,像是这样:
ExecutorService exec = Executors.newFixedThreadPool(5);
就是无论要分配的线程的数目是多少,都是运行5个线程。这样的好处是非常明显的,就是用于限制线程的数目。CachedThreadPool是按需分配线程,直到有的线程被回收,也就是出现空闲的时候才会停止创建新的线程,这个过程对于内存来说,代价是非常高昂的,因为我们不知道实际上需要创建的线程数量是多少,只会一直不断创建新线程。
看上去似乎FixedThreadPool比起CachedThreadPool更加好用,但实际上使用更多的是CachedThreadPool,因为一般情况下,无论是什么线程池,现有线程都有可能会被自动复用,而CachedThreadPool在线程结束的时候就会停止创建新的线程,也就是说,它能确保结束掉的线程的确是结束掉了,不会被重新启动,而FixedThreadPool无法保证这点。
接下来我们可以看看使用上面两种线程池的简单例子:
public void main(String[] args){ ExecutorService cachedExec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++){ cachedExec.execute(new RunnableClass); } cachedExec.shutdown(); ExecutorService fixedExec = Executors.newFixedThreadPool(3); for(int i = 0; i < 5; i++){ fixedExec.execute(new RunnableClass); } fixedExec.shutdown(); }
CachedThreadPool会不断创建线程直到有线程空闲下来为止,而FixedThreadPool会用3个线程来执行5个任务。
在java中,还有一种执行线程的模式:SingleThreadExecutor。顾名思义,该执行器只有一个线程。它就相当于数量为1的FixedThreadPool,如果我们向它提交多个任务,它们就会按照提交的顺序排队,直到上一个任务执行完毕,因为它们就只有一个线程可以运行。这种方式是为了防止竞争,因为任何时刻都只有一个任务在运行,从而不需要同步共享资源。
(二)Thread的生命周期
之前讲到Thread的创建,那是Thread生命周期的第一步,其后就是通过start()方法来启动Thread,它会执行一些内部的管理工作然后调用Thread的run()方法,此时该Thread就是alive(活跃)的,而且我们还可以通过isAlive()方法来确定该线程是否启动还是终结。
一旦启动Thread后,我们就只能执行一个方法:run(),而run()方法就是负责执行Thread的任务,所以终结Thread的方法很简单,就是终结run()方法。仔细查看文档,我们会发现里面有一个方法:stop(),似乎可以用来停止Thread,但是这个方法已经被废除了,因为它存在着内部的竞争。
我们经常需要一个不断执行的Thread,然后在某个特定的条件下才会终结它,方法有很多,但最常用的有设定标记和中断Thread两种方式。
我们将之前例子中的Thread改写一下:
public class RandomCharacterGenerator extends Thread implements CharacterSource { static char[] chars; static String charArray = "abcdefghijklmnopqrstuvwxyz0123456789"; static { chars = charArray.toCharArray(); } private volatile boolean done = false; Random random; CharacterEventHandler handler; public RandomCharacterGenerator() { random = new Random(); handler = new CharacterEventHandler(); } public int getPauseTime() { return (int) (Math.max(1000, 5000 * random.nextDouble())); } @Override public void addCharacterListener(CharacterListener cl) { handler.addCharacterListener(cl); } @Override public void removeCharacterListener(CharacterListener cl) { handler.removeCharacterListener(cl); } @Override public void nextCharacter() { handler.fireNewCharacter(this, (int) chars[random.nextInt(chars.length)]); } public void run() { while(!done){ nextCharacter(); try { Thread.sleep(getPauseTime()); } catch (InterruptedException ie) { return; } } } public void setDone(){ done = true; } }
现在我们多了一个标记:done,这样我们就可以在代码中通过调用setDone()来决定什么时候停止该Thread。这里使用了volatile关键字,它主要是为了同步。这点会放在同步这里讲。
设定标记的最大问题就是我们必须等待标记的状态,这样就会造成延迟。当然,这种延迟是无法避免的,但必须想办法缩短到最小。于是,中断Thread这种方法就有它的发挥地方了。
我们可以通过interrupt()方法来中断Thread,该方法会造成两个副作用:
1.它会导致任何的阻塞方法都会抛出InterruptedException,我们必须强制性的捕获这个错误哪怕我们根本就不需要处理它,这也是java的异常处理机制让人诟病的一个地方。
2.设定Thread对象内部的标记来指示此Thread已经被中断了,像是这样:
public void run(){
while(!isInterrupted()){
...
}
}
虽然无法避免延迟,但是延迟已经被缩短了。
无论是采用标记还是中断的方法,我们之所以无法消除延迟的原因是我们无法确定是检查标记先还是调用方法先,这就是所谓的race condition,是线程处理中永远无法避免的话题。
Thread不仅可以被终结,还可以暂停,挂起和恢复。
Thread原本有suspend()方法和resume()方法来执行挂起和恢复,但它们和stop()出于同样的原因,都被废除了。
我们可以通过sleep()方法来挂起Thread,当在指定的时间后,它就会自动恢复。严格意义上讲,sleep并不等同于suspend,真正的suspend应该是由一个线程来挂起另一个线程,但是sleep只会影响当前的Thread。要想真正实现挂起和恢复,我们可以使用等待和通知机制,但这个机制最大的问题就是我们的Thread必须使用该技术来编写。
Thread在终结后,如果有可能,我们还需要对它进行善后。即使Thread已经被终结了,但是其他对象只要还持有它的引用,它们就可以调用该Thread的资源,这也会导致该Thread无法被回收。
但我们有时候还是希望继续保持该Thread的引用,因为我们想要判别它是否真的已经完成了工作,可以使用join()方法。
join()方法会被阻塞住直到Thread完成它的run()方法,但是这个存在风险:第一次对join()方法的调用可能会一直被阻塞住很长时间直到Thread真正完成,所以,一般情况下我们还是使用isAlive()方法来判断。
由于我们可以通过实现一个Runnable接口来定义我们的任务,所以在判断所在线程是否已经中断的时候,就有一个问题:该任务还没有绑定到任何线程上。我们可以通过currentThread()方法来获得当前Thread的引用,接着调用isInterrupted()来判断线程是否中断。
现在开始进入线程编程中最重要的话题---数据同步,它是线程编程的核心,也是难点,就算我们理解了数据同步的基本原理,但是我们也无法保证能够写出正确的同步代码,但基本原理是必须掌握的。
要想理解数据同步的基本原理,首先就要明白,为什么我们要数据同步?
public class CharacterDisplayCanvas extends JComponent implements CharacterListener { protected FontMetrics fm; protected char[] tmpChar = new char[1]; protected int fontHeight; public CharacterDisplayCanvas() { setFont(new Font("Monospaced", Font.BOLD, 18)); fm = Toolkit.getDefaultToolkit().getFontMetrics(getFont()); fontHeight = fm.getHeight(); } public CharacterDisplayCanvas(CharacterSource cs) { this(); setCharacterSource(cs); } public void setCharacterSource(CharacterSource cs) { cs.addCharacterListener(this); } public synchronized void newCharacter(CharacterEvent ce) { tmpChar[0] = (char) ce.character; repaint(); } public Dimension preferredSize() { return new Dimension(fm.getMaxAscent() + 10, fm.getMaxAdvance() + 10); } protected synchronized void paintComponent(Graphics gc) { Dimension d = getSize(); gc.clearRect(0, 0, d.width, d.height); if (tmpChar[0] == 0) { return; } int charWidth = fm.charWidth((int) tmpChar[0]); gc.drawChars(tmpChar, 0, 1, (d.width - charWidth) / 2, fontHeight); } }
仔细查看上面的代码,我们就会发现,有两个方法的前面多了一个新的关键字:synchronized。让我们看看这两个方法为什么要添加这个关键字。
newCharacter()用于显示新字母,而paintComponent()负责调整和重画canvas。这两个方法存在着race condition,也就是竞争,因为它们访问的是同一份数据,最重要的是它们是由不同的线程所调用的,这就导致我们无法保证它们的调用是按照正确的顺序来进行,可能在newCharacter()方法未被调用前paintComponent()方法就已经重新绘制canvas。
之所以产生竞争,除了这两个方法访问的是同一份数据之外,还和它们是非automic有关。一个程序如果被认为是automic,那么就表示它是无法被中断的,不会有中间状态。使用synchronized,就能保证该方法无法被中断,那么其他线程就无法在该方法没有完成前调用它。
结合对象锁的知识,我们可以简单的讲解一下synchronized的原理:一个线程如果想要调用另一个线程的synchronized方法,而且该方法正在被其他线程调用,那么这个线程就必须等待,等待其他线程释放该方法所在的对象的锁,然后获得该锁执行该方法。锁机制能够确保同一时间只有一个线程能够调用该方法,也就能保证只有一个线程能够访问数据。
还记得我们之前通过使用标记来结束线程的时候,将该标记用volatile修饰?如果我们不用volatile,又能使用什么方法呢?
如果单单只是上面的知识,我们可能会想到利用synchronized来同步run()和setDone(),因为就是这两个方法在竞争done这个数据。但是这样存在很大的问题:run()会在done没有被设置true前永远不会结束,但是done标记却要等到run()方法结束后才能由setDone()方法进行设置。
这就是一个死锁,永远解不开的锁。
产生死锁的原因有很多,像是上面这种情况就是一个典型的代表,主要原因就是run()方法的scope(范围)太大。所谓的scope,指的是获取锁到释放锁的时间,而run()方法的scope是一个循环,除非done设置为true。这种需要依赖其他线程的方法来结束执行的方法,如果将整个方法设置为同步,就会出现死锁。
所以,最好的方法就是将scope缩小。
我们可以不用对整个方法进行同步,而是对需要访问的数据进行同步,也就是对done使用volatile。
要想理解volatile的工作原理,我们必须清楚变量的加载机制。java的内存模型允许线程能够在local memory中持有变量的值,所以这也就导致某个线程改变该变量的值时,其他线程可能不会察觉到该变量的变化。这种情况只是一种可能,并不代表一定会出现,但像是循环执行这种操作,就增加了这种可能。
所以,我们要做的事情其实很简单,就是让线程从同一个地方取出变量而不是自己维护一份。使用volatile,每次使用该变量都要从主存储器中读取,每次改变该变量时,也要存入主存储器,而且加载和存储都是automic,无论是否是long或者double变量(这两种类型的存储是非automic的)。
值得注意的,run()方法和setDone()方法本身就是automic,因为setDone()方法仅有一个存储操作,而run()方法也只有一个读取操作,其余部分根本就需要该值保持不变,也就是说,这两个方法其实本身就不存在竞争。
但让人更加困惑的是,volatile本身的存在现在也引起人们的关注:它到底有没有必要?
volatile是以moot point(未决点)来实现的:变量永远都从主存储器中读取,但这也只是JDK 1.2之前的情况,现在的虚拟机实现使得内存模式越来越复杂,而且也得到了极大的优化,并且这种趋势只会一直持续下去。也就是说,基于内存模式的volatile可能会因为内存模式的不断优化而逐渐变得没有意义。
volatile的使用是有局限的,它仅仅解决因内存模式而引发的问题,而且只能用在对变量的automic操作上,也就是访问该变量的方法只可以有单一的加载或者存储。但很多方法都是非automic,像是递增或者递减操作,就允许存在中间状态,因为它们本身就是载入,变更和存储的简化而已,也就是所谓的syntactic sugar(语法糖)。
我们大概可以这样理解volatile的使用条件:强迫虚拟机不要临时复制变量,哪怕我们在许多情况下都不会使用它们。
volatile是否可以运用在数组上,让整个数组中的所有元素都被同步呢?凡是使用java的人都会对这样的幻想嗤之以鼻,因为实际情况是只有数组的引用才会被同步,数组中的元素不会是volatile的,虚拟机还是可以将个别元素存储于local的寄存器中,没有任何方法可以指定数组的元素应该以volatile的方式来处理。
我们上面的同步问题是发生在展示随机数字与字母的显示组件,现在我们继续将功能完善:玩家可以输入所显示的字母,并且正确就会得分。
(四)同步方法和同步块
在之前例子的基础上,我们增加新的功能:根据正确与不正确的响应来显示玩家的分数。
public class ScoreLabel extends JLabel implements CharacterListener { private volatile int score = 0; private int char2type = -1; private CharacterSource generator = null, typist = null; public ScoreLabel(CharacterSource generator, CharacterSource typist) { this.generator = generator; this.typist = typist; if (generator != null) { generator.addCharacterListener(this); } if (typist != null) { typist.addCharacterListener(this); } } public ScoreLabel() { this(null, null); } public synchronized void resetGenerator(CharacterSource newCharactor) { if (generator != null) { generator.removeCharacterListener(this); } generator = newCharactor; if (generator != null) { generator.addCharacterListener(this); } } public synchronized void resetTypist(CharacterSource newTypist) { if (typist != null) { typist.removeCharacterListener(this); typist = newTypist; } if (typist != null) { typist.addCharacterListener(this); } } public synchronized void resetScore() { score = 0; char2type = -1; setScore(); } private synchronized void setScore() { SwingUtilities.invokeLater(new Runnable() { @Override public void run() { setText(Integer.toString(score)); } }); } @Override public synchronized void newCharacter(CharacterEvent ce) { if (ce.source == generator) { if (char2type != -1) { score--; setScore(); } char2type = ce.character; } else { if (char2type != ce.character) { score--; } else { score++; char2type = -1; } setScore(); } } }
这里我们将newCharacter()方法用synchronized进行同步,是因为这个方法会被多个线程调用,而我们根本就不知道哪个线程会在什么时候调用这个方法。这就是race condition。
变量的volatile无法解决上面的多线程调度问题,因为这里的问题是方法调度的问题,而且更加可怕的是,需要共享的变量不少,其中有些变量是作为条件判断,这就会导致在这些条件变量没有正确的设置前,有些线程已经开始启动了。
这并不是简单的将这些变量设置为volatile就能解决的问题,因为就算这些变量的状态不对,其他线程依然能够启动。
这里有几个方法的同步是需要引起我们注意的:resetScore(),resetGenerator()和resetTypist()这几个方法是在重新启动时才会被调用,似乎我们不需要为此同步它们:其他线程这时根本就没有开始启动!!
但是我们还是需要同步这些方法,这是一种防卫性的设计,保证整个Class所有相关的方法都是线程安全的。遗憾的是,我们必须这样考虑,因为多线程编程的最大问题就是我们永远也不知道我们的程序会出现什么问题,所以,任何可能会引起线程不安全的因素我们都要尽量避免。
这也就引出我们的问题:如何能够对两个不同的方法同步化以防止多个线程在调用这些方法的时候影响对方呢?
对方法做同步化,能够控制方法执行的顺序,因为某个线程上已经运行的方法无法被其他线程调用。这个机制的实现是由指定对象本身的lock来完成的,因为方法需要访问的对象的lock被一个线程占有,但值得注意的是,所谓的对象锁其实并不是绑定在对象上,而是对象实例上,如果两个线程拥有对象的两个实例,它们都可以同时访问该对象,
同步的方法如何和没有同步的方法共同执行呢?
所有的同步方法都会执行获取对象锁的步骤,但是没有同步的方法,也就是异步方法并不会这样,所以它们能够在任意的时间点被任意的线程执行,而不管到底是否有同步方法在执行。
关于对象锁的话题自然就会引出一个疑问:静态的同步方法呢?静态的同步方法是无法获取对象锁的,因为它没有this引用,对于它的调用是不存在对象的。但静态的同步方法的确是存在的,那么它又是怎样运作的呢?
这需要另一个锁:类锁。
我们可以从对象实例上获得锁,也能从class(因为class对象的存在)上获得锁,即使这东西实际上是不存在的,因为它无法实现,只是帮助我们理解的概念。值得注意的是,因为一个class只有一个class对象,所以一个class只有一个线程可以执行同步的静态方法,而且与对象的锁毫无相关,类锁可以再对象锁外被独立的获得和释放,一个非静态的同步方法如果调用同步的静态方法,那么它可以同时获得这两个锁。
提供synchronized关键字的目的是为了让对象中的方法能够循序的进入,大部分数据保护的需求都可以由这个关键字实现,但在更加复杂的同步化情况中还是太简单了。
在java这个对象王国里,难道真的是没有Lock这个对象的容身之处吗?答案当然是不可能的,J2SE 5.0开始提供Lock这个接口:
private Lock scoreLock = new ReentrantLock(); public void newCharacter(CharacterEvent ce){ if(ce.source == generator){ try{ scoreLock.lock(); if(char2type != -1){ score--; setScore(); } char2type = ce.character; }finally{ scoreLock.unlock(); } } else{ try{ scoreLock.lock(); if(char2type != ce.character){ score--; } else{ score++; char2type = -1; } setScore(); }finally{ scoreLock.unlock(); } }
Lock这个接口有两个方法:lock()和unlock(),我们可以在开始的时候调用lock(),然后在结束的时候调用unlock(),这样就能有效的同步化这个方法。
我们可以看到,其实使用Lock接口只是为了让Lock更加容易被管理:我们可以存储,传递,甚至是抛弃,其余和使用synchronized是一样的,但更加灵活:我们可以在有需要的时候才获取和释放锁,因为lock不再依附于任何调用方法的对象,我们甚至可以让两个对象共享同一个lock!也可以让一个对象占有多个lock!!
使用Lock接口,是一种明确的加锁机制,之前我们的加锁是我们无法掌握的,我们无法知道是哪个线程的哪个方法获得锁,但能确保同一时间只有一个线程的一个方法获得锁,现在我们可以明确得的把握这个过程,灵活的设置lock scope,将一些耗时和具有线程安全性的代码移出lock scope,这样我们就可以写出高效而且线程安全的程序代码,不用像之前一样,为了防止未知错误必须对所有相关方法进行同步。
使用lock接口,可以方便的利用它里面提供的一些便利的方法,像是tryLock(),它可以尝试取得锁,如果无法获取,我们就可以执行其他操作,而不是浪费时间在等待锁的释放。tryLock()还可以指定等待锁的时间。
synchronized不仅可以同步方法,它还可以同步一个程序块:
public void newCharacter(CharacterEvent ce){ if(ce.source == generator){ synchronized(this)[ if(char2type != -1){ score--; setScore(); } char2type = ce.character; } } else{ synchronized(this){ if(char2type != ce.character){ score--; } else{ score--; char2type = -1; } setScore(); } } }
如果是为了缩小lock的范围,我们依然还是可以使用synchronized而不是使用lock接口,而且这种方式才是更加常见的,因为使用lock接口时我们需要创建新的对象,需要异常管理。我们可以lock住其他对象,如被共享的数据对象。
选择synchronized整个方法还是代码块,都没有什么问题,但lock scope还是尽可能的越小越好。
考虑到newCharacter()这个方法里面出现了策略选择,我们可以对它进行重构:
private synchronized void newGeneratorCharacter(int c){ if(char2type != -1){ score--; setScore(); } char2type = c; } private synchronized void newTpistCharacter(int c){ if(char2type != c){ score--; } else{ score++; char2type = -1; } setScore(); } public synchronized void newCharacter(CharacterEvent ce){ if(ce.source == generator){ newGeneratorCharacter(ce.character); } else{ newTypistCharacter(ce.character); } }
我们会注意到,两种策略方法都要用synchronized锁住,但真的有必要吗?因为它们是private,只会在该对象中使用,没有理由要让这些方法获取锁,因为它们也只会被对象内的synchronized方法调用,而这时已经获得锁了。但是我们还是要这样做,考虑到以后的开发者可能不知道调用这些方法之前需要获取锁的情况。
由此可见,java的锁机制远比我们想象中要聪明:它并不是盲目的在进入synchronized程序代码块时就开始获取锁,如果当前的线程已经获得锁,根本就没有必要等到锁被释放还是去获取,只要让synchronized程序段运行就可以。如果没有获取锁,也就不会将它释放掉。这种机制之所以能够运行是因为系统会保持追踪递归取得lock的数目,最后会在第一个取得lock的方法或者代码块退出的时候释放锁。
这就是所谓的nested lock。
之前我们使用的ReentrantLock同样支持nested lock:如果lock的请求是由当前占有lock的线程发出,内部的nested lock就会要求计数递增,调用unlock()就会递减,直到计数为0就会释放该锁。但这个是ReentrantLock才具有的特性,其他实现了Lock这个接口的类并不具有。
nested lock是非常重要的,因为它有利于避免死锁的发生。死锁的发生远比我们想象中要更常见,像是方法间的相互调用,更加常见的情况就是回调,像是Swing编程中依赖事件处理程序与监听者的窗口系统,考虑一下监听者经常变动的情况,同步简直就是一个恶梦!!
Synchronized无法知道lock被递归调用的次数,但是使用ReentrantLock可以做到这点。我们可以通过getHoldCount()方法来获得当前线程对lock所要求的数量,如果数量为0,代表当前线程并未持有锁,但是还不能知道锁是自由的,我们必须通过isLocked()来判断。我们还可以通过isHeldByCurrentThread()来判断lock是否由当前的线程所持有,getQueueLength()可以用来取得有多少个线程在等待取得该锁,但这个只是预估值。
在多线程编程中经常讲到死锁,但是即使没有涉及到同步也有可能会产生死锁。死锁之所以是个问题,是因为它会让程序无法正确的执行,更加可怕的是,死锁是很难被检测的,特别是多线程编程往往都会是一个复杂的程序,它可能永远也不会被发现!!
更加悲哀的是,系统无法解决死锁这种情况!
最后一个问题是关于公平的授予锁。
我们知道,锁是要被授予线程的,但是应该按照什么依据来授予呢?是按照先到先得吗?还是服务请求最多?或者是对系统最有利的形式来授予?java的同步行为最接近第三种,因为同步并不是用来对特殊情况授予锁,它是通用的,所以没有理由让锁按照到达的顺序来授予,应该是由各实现所定义在底层线程系统的行为所决定,但ReentrantLock提供了一种选项可以按照先进先出的顺序获取锁:new ReentrantLock(true),这是为了防止发生锁饥饿的现象。
我们可以根据自己的具体实现来决定这种公平。
最后,我们来总结一下:
1.对于同时涉及到静态和非静态方法的同步情况,使用lock对象更加容易,因为lock对象无关于使用它的对象。
2.将整个方法同步化是最简单的,但是这样范围会变大,让确实没有必要的程序段无效率的持有锁。
3.如果涉及到太多的对象,使用同步块机制也是有问题的,同步块无法解决跨方法的锁范围。
(五)等待与通知机制
在之前我们关于停止Thread的讨论中,曾经使用过设定标记done的做法,一旦done设置为true,线程就会结束,一旦为false,线程就会永远运行下去。这样做法会消耗掉许多CPU循环,是一种对内存不友好的行为。
java中的对象不仅拥有锁,而且它们本身就可以通过调用相关方法使自己成为等待者和通知者。
Object对象本身有两个方法:wait()和notify()。wait()会等待条件的发生,而notify()会通知正在等待的线程此条件已经发生,它们都必须从synchronized方法或块中调用。
这种等待-通知机制的目的究竟是为何?
等待-通知机制是一种同步机制,但它更像是一个通信机制,能够让一个线程与另一个线程在某个特定条件下进行通信。但是,该机制却没有指定特定条件是什么。
等待-通知机制能否取代synchronized机制吗?当然不行,等待-通知机制并不会解决synchronized机制能够解决的竞争问题,实际上,这两者是相互配合使用的,而且它本身也存在竞争问题,这是需要通过synchronzied来解决的。
private boolean done = true; public synchronized void run(){ while(true){ try{ if(done){ wait(); }else{ repaint(); wait(100); } }catch(InterruptedException e){ return; } } } public synchronized void setDone(boolean b){ done = b; if(timer == null){ timer = new Thread(this); timer.start(); } if(!done){ notify(); } }
这里的done已经不是volatile,因为我们不只是设定个标记值,我们还需要在设定标记的同时自动发送一个通知。所以,我们现在是通过synchronized来保护对done的访问。
run()方法不会在done为false时自动退出,它会通过调用wait()方法让线程在这个方法中等待,直到其他线程调用notify()方法。
这里有几个地方值得我们注意。
首先,我们这里通过使用wait()方法而不是sleep()方法来使线程休眠,因为wait()方法需要线程持有该对象的同步锁,当wait()方法执行的时候,该锁就会被释放,而当收到通知的时候,线程需要在wait()方法返回前重新获得该锁,就好像一直都持有锁一样。这个技巧是因为在设定与发送通知以及测试与取得通知之间是存在竞争的,如果wait()和notify()在持有同步锁的同时没有被调用,是完全没有办法保证此通知会被接收到的,并且如果wait()方法在等待前没有释放掉锁,是不可能让notify()方法被调用到,因为它无法取得锁,这也是我们之所以使用wait()而不是sleep()的另一个原因。如果使用sleep()方法,此锁就永远不会被释放,setDone()方法也永远不会执行,通知也永远不会送出。
接着就是这里我们对run()进行同步化。我们之前讨论过,对run()进行同步是非常危险的,因为run()方法是绝对不可能会完成的,也就是锁永远不会被释放,但是因为wait()本身就会释放掉锁,所以这个问题也被避免了。
我们会有一个疑问:如果在notify()方法被调用的时候,没有线程在等待呢?
等待-通知机制并不知道所送出通知的条件,它会假设通知在没有线程等待的时候是没有被收到的,因为这时它也只是返回且通知也被遗失掉,稍后执行wait()方法的线程就必须等待另一个通知。
上面我们讲过,等待-通知机制本身也存在竞争问题,这真是一个讽刺:原本用来解决同步问题的机制本身竟然也存在同步问题!其实,竞争并不一定是个问题,只要它不引发问题就行。我们现在就来分析一下这里的竞争问题:
使用wait()的线程会确认条件不存在,这通常是通过检查变量实现的,然后我们才调用wait()方法。当其他线程设立了该条件,通常也是通过设定同一个变量,才会调用notify()方法。竞争是发生在下列几种情况:
1.第一个线程测试条件并确认它需要等待;
2.第二个线程设定此条件;
3.第二个线程调用notify()方法,这并不会被收到,因为第一个线程还没有进入等待;
4.第一个线程调用wait()方法。
这种竞争就需要同步锁来实现。我们必须取得锁以确保条件的检查和设定都是automic,也就是说检查和设定都必须处于锁的范围内。
既然我们上面讲到,wait()方法会释放锁然后重新获取锁,那么是否会有竞争是发生在这段期间呢?理论上是会有,但系统会阻止这种情况。wait()方法与锁机制是紧密结合的,在等待的线程还没有进入准备好可以接收通知的状态前,对象的锁实际上是不会被释放的。
我们的疑问还在继续:线程收到通知,是否就能保证条件被正确的设定呢?抱歉,答案不是。在调用wait()方法前,线程永远应该在持有同步锁时测试条件,在从wait()方法返回时,该线程永远应该重新测试条件以判断是否还需要等待,这是因为其他的线程同样也能够测试条件并判断出无需等待,然后处理由发出通知的线程所设定的有效数据。但这是在只有一个线程在等待通知,如果是多个线程在等待通知,就会发生竞争,而且这是等待-通知机制所无法解决的,因为它能解决的只是内部的竞争以防止通知的遗失。多线程等待最大的问题就是,当一个线程在其他线程收到通知后再收到通知,它无法保证这个通知是有效的,所以等待的线程必须提供选项以供检查状态,并在通知已经被处理的情形下返回到等待的状态,这也是我们为什么总是要将wait()放在循环里面的原因。
wait()也会在它的线程被中断时提前返回,我们的程序也必须要处理该中断。
在多线程通知中,我们如何确保正确的线程收到通知呢?答案是不行的,因为我们根本就无法保证哪一个线程能够收到通知,能够做到的方法就是所有等待的线程都会收到通知,这是通过notifyAll()实现的,但也不是真正的唤醒所有等待的线程,因为锁的问题,实质上所有的线程都会被唤醒,但是真正在执行的线程只有一个。
之所以要这样做,可能是因为有一个以上的条件要等待,既然我们无法确保哪一个线程会被唤醒,那就干脆唤醒所有线程,然后由它们自己根据条件判断是否要执行。
等待-通知机制可以和synchronized结合使用:
private Object doneLock = new Object(); public void run(){ synchronized(doneLock){ while(true){ if(done){ doneLock.wait(); }else{ repaint(); doneLock.wait(100); } }catch(InterruptedException e){ return; } } } public void setDone(boolean b){ synchronized(doneLock){ done = b; if(timer == null){ timer = new Thread(this); timer.start(); } if(!done){ doneLock.notify(); } } }
这个技巧是非常有用的,尤其是在具有许多对对象锁的竞争中,因为它能够在同一时间内让更多的线程去访问不同的方法。
最后我们要介绍的是条件变量。
J2SE5.0提供了Condition接口。Condition接口是绑定在Lock接口上的,就像等待-通知机制是绑定在同步锁上一样。
private Lock lock = new ReentrantLock(); private Condition cv = lockvar.newCondition(); public void run(){ try{ lock.lock(); while(true){ try{ if(done){ cv.await(); }else{ nextCharacter(); cv.await(getPauseTime(), TimeUnit.MILLISECONDS); } }catch(InterruptedException e){ return; } } }finally{ lock.unlock(); } } public void setDone(boolean b){ try{ lock.lock(); done = b; if(!done){ cv.signal(); }finally{ lock.unlock(); } } }
上面的例子好像是在使用另一种方式来完成我们之前的等待-通知机制,实际上使用条件变量是有几个理由的:
1.条件变量在使用Lock对象时是必须的,因为Lock对象的wait()和notify()是无法运作的,因为这些方法已经在内部被用来实现Lock对象,更重要的是,持有Lock对象并不表示持有该对象的同步锁,因为Lock对象和对象所关联的同步锁是不同的。
2.Condition对象不像java的等待-通知机制,它是被创建成不同的对象,对每个Lock对象都可以创建一个以上的Condition对象,于是我们可以针对个别的线程或者一群线程进行独立的设定,也就是说,对同一个对象上所有被同步化的在等待的线程都得等待相同的条件。
基本上,Condition接口的方法都是复制等待-通知机制,但是提供了避免被中断或者能以相对或绝对时间来指定时限的便利。
前面我们已经讲过如何让对象具有Thread安全性,让它们能够在同一时间在两个或以上的Thread中使用。Thread的安全性在多线程设计中非常重要,因为race condition是非常难以重现和修正的,我们很难发现,更加难以改正,除非将这个代码的设计推翻来过。
同步最大的问题不是我们在需要同步的地方没有使用同步,而是在不需要同步的地方使用了同步,导致效率极度低下。所以,我们要想办法限制同步,因为无谓的同步比起无谓的运算还更加让人无语。
但是否有办法完全避免同步呢?
在有些情况下是可以的。我们可以使用之前的volatile关键字来解决这个问题,因为volatile修饰的变量是被完整的存储的,在读取它们的时候,能够确保它们是有效的,也就是最近一次存入的值。但这也是可以避免同步的唯一情况,如果有多个线程同时访问同一份数据,就必须明确的同步化所有对该数据的访问以防止各种race condition。
为什么无法完全避免呢?
每组线程都有自己的一组寄存器,但系统将某个线程分配给CPU时,它会把该线程持有的信息加载到CPU的寄存器中,在分配不同的线程给CPU前,它会将寄存器的信息保存下来,所以线程之间绝不会共享保存在寄存器中的数值,但是通过使用volatile,我们可以确保变量不会保持在寄存器中,这点我们在之前的文章中已经说过了,这就能够确保变量是真正的共享于线程之间。但是同步为什么能够解决这个问题呢?因为当虚拟机进入synchronized方法或者synchronized块的时候,它必须重新加载原本已经缓冲到自有寄存器上的数据,也就是存入到主存储器中。
也就是说,除了使用volatile和同步,我们就没有方法保证被线程共享的数据在访问上的安全性,但事实证明,volatile并不是值得推荐的解决方法,所以也只剩下同步了。
既然这样,我们唯一能够做到的就是学会恰当的使用同步。
同步的目的就是防止race condition导致数据在不一致或者变动中间状态被使用到,这段期间会禁止线程间的竞争。但这个保证会因为一个微妙的问题而变得不可信:线程间可能在同步的程序代码运行前就开始竞争。
并不是所有的race condition都应该避免,只有在无Thread安全性的程序段中的race condition才会被认为是问题。我们可以使用两种方法来解决这个问题:使用synchronized程序代码来防止race condition的发生或者将程序设计成无需同步(或仅使用最少的同步)就具有Thread安全性。
对于第二种方法,我们应该尽可能缩小同步的范围并重新组织程序代码以便让具有Thread安全性的段落能够被移出synchronized块之外,这点非常重要,如果有足够的程序代码能够被移出synchronized块之外,我们甚至根本就不需要进行同步。
我们可以使用volatile来减少同步,但是volatile只能针对单一的载入或者存储操作,但很多情况下都不是这样子,所以它的使用是比较不常见的。
J2SE 5.0提供了一组atomic class来处理更加复杂的情况。相对于只能处理单一的atomic操作,这些class能够让多个操作被atomic地对待,这样我们也就有可能不需要同步就能实现同步机制所做到的一切。
我们可以用AtomicInteger,AtomicLong,AtomicBoolean和AtomicReference这四个class实现的四个基本的atomic类型来处理integer,long,boolean和对象。这些class都提供了两个构造器:默认的构造器的值为0,false,false或者null,另一个构造器是以程序设计者所指定的值来初始化和创建变量。set()和get()这两个方法就提供了volatile变量所具有的的功能:能够atomic的设定与取得值,因为它们能够确保数据的读写是从主存储器上运行的。但是这些class还提供了更多的操作来适应volatile无法解决的情况。
getAndSet()能够在返回初始值的时候atomic的设定变量成新值,完全不需要任何的同步lock。compareAndSet()与weakCompareAndSet()是有条件的修改程序的方法,这两个方法都要取用两个参数:在方法启动时预期数据所具有的的值,以及要把数据所设定成的值。它们都只会在变量具有预期值的时候才会设定成新值,如果当前值不等于预期值,该变量就不会被重新赋值并且返回false。这两个方法之间有什么区别吗?第二个方法少了一项保证:如果方法返回的值false,该变量不会被变动,但是这并不表示现有值不是预期值,也就是说,这个方法不管初始值是否是预期值都可能会无法更新改值。
incrementAndGet(),decrementAndGet(),getAndIncrement()和getAndDecrement()提供了前置递增,前置递减,后递增和后递减,之所以有这些方法,是因为这些操作都不是atomic的。
addAndGet()和getAndAdd()提供了"前置"和"后置"的运算符给指定值的加法运算,它们能够让程序对变量增或者减一个指定值,包括了负值,所以我们就不需要一个相对的减法运算。
atomic package目前没有实现atomic字符或者浮点变量,但是我们可以使用AtomicInteger来处理字符,就像是字符常量一样,但是使用atomic的浮点数需要atomic带有只读浮点数值的受管理对象。我们也没有实现atomic数组,并没有功能能够对整个数组做atomic化的变动,最多就是通过使用AtomicInteger,AtomicLong和AtomicReference的数组来模型化,但是数组的大小必须在构造时就指定好,并且在操作过程中必须提供索引。至于Boolean的atomic数组,同样也可以通过AtomicInteger来实现。
atomic package还有两个类:AtomicMarkableReference和AtomicStampedReterence。这两个类能够让mark或stamp跟在任何对象的引用上,更精确点讲,AtomicMarkableReference提供了一种包括对象引用结合boolean的数据结构,而AtomicStampedReference提供了一种包括对象引用结合integer的数据结构。
其实,atomic class的这些方法在本质上是一样的。在使用的时候,get()方法需要我们传入一个数组作为参数,stamp或者mark被存储在数组的第一个元素而引用被正常返回。其他的get()方法就只是返回引用,mark或者stamp。
set()与compareAndSet()方法需要额外的参数来代表mark或者stamp。最后,这些class都带有attemptMark()或attemptStamp()方法,用来依据期待的引用设定mark或者stamp。
到了这里,我们也许会欣喜的将每个程序或者class改成只用atomic变量,事实上,这种尝试并不只是替换变量那么简单。atomic class并不是同步工具的直接替代品,它们的使用会让我们的程序设计更加复杂,就算只是一些简单的class也是这样。
我们来举一个例子:
public class ScoreLabel extends JLabel implements CharacterListener { private volatile int score = 0; private int char2type = -1; private CharacterSource generator = null, typist = null; private Lock scoreLock = new ReentrantLock(); public ScoreLabel(CharacterSource generator, CharacterSource typist) { this.generator = generator; this.typist = typist; if (generator != null) { generator.addCharacterListener(this); } if (typist != null) { typist.addCharacterListener(this); } } public ScoreLabel() { this(null, null); } public void resetGenerator(CharacterSource newCharactor) { try { scoreLock.lock(); if (generator != null) { generator.removeCharacterListener(this); } generator = newCharactor; if (generator != null) { generator.addCharacterListener(this); } } finally { scoreLock.unlock(); } } public void resetTypist(CharacterSource newTypist) { if (typist != null) { typist.removeCharacterListener(this); typist = newTypist; } if (typist != null) { typist.addCharacterListener(this); } } public synchronized void resetScore() { score = 0; char2type = -1; setScore(); } private synchronized void setScore() { SwingUtilities.invokeLater(new Runnable() { @Override public void run() { setText(Integer.toString(score)); } }); } @Override public synchronized void newCharacter(CharacterEvent ce) { if (ce.source == generator) { if (char2type != -1) { score--; setScore(); } char2type = ce.character; } else { if (char2type != ce.character) { score--; } else { score++; char2type = -1; } setScore(); } } }
为了修改这个类,我们需要三个修改:简单的变量代换,算法的变更和重新尝试操作,每一个修改都要保持class的synchronized版本语义的完整,而这些都是依赖于程序代码所有的效果,所以我们必须确保程序代码的最终效果和synchronized版本是一致的,这个目的也是重构的基本原则:在不影响代码外在表现下对代码进行内在的修改,也是面向对象的核心思想。
变量代换是最简单的操作,我们只要将之前所使用的变量替换成atomic变量。像是我们这里就可以将resetScore()中的score和char2type这两个变量修改成atomic变量。但有意思的是,将这两个变量一起变更的动作并不是atomic地完成,还是有可能会让char2type变量的变更在完成前就变更了score。这似乎是个问题,但实际上并不是这样,因为我们还保持住这个类在synchronized版本上的语义。要记住,同步的目的是为了消除有问题的race condition,有一些race condition根本就不是问题。我们再来举个例子:resetScore()和newCharacter()方法都是synchronized,但这也只是意味着两者不会同时运行,被拖延住的newCharacter()方法的调用还是可能会因为到达的顺序或者取得lock的顺序而延迟运行,所以打字输入的事件可能会等到resetScore()方法完成后才会被传递,但这时传递到的只是个已经过时的事件,这些也是出于同样的原因:在resetScore()方法中同时变更两个变量这个动作并没有被atomic地处理。
第二个修改是变更算法。
我们来看看resetGenerator()和resetTypist()这两个方法的新的实现。之前我们对这两个方法所做的,就是尝试将两者的同步lock分离。这的确是个不错的主意,这两个方法都没有变动score或者char2type变量,事实上,它们甚至也没有变动到相互共享的变量,因为resetGenerator()方法和resetTypist()的同步lock只是用来保护此方法不受多个Thread同时地调用。但是,如果只是简单的将generator变量变成AtomicReference,那么之前我们解决过的问题都会重新引发。
之所以会是这样,是因为resetGenerator()这个方法封装的状态并不只是generator变量的值,让generator变量变成AtomicReference表示我们知道对该变量的操作是atomic地发生,但当我们要从resetGenerator()方法中完全的删除掉同步时,我们还必须确保整个由此方法所封装住的状态还是一致的,而这些所谓的状态,包括了在字母源产生器上ScoreLabel对象的登记(this对象),在这个方法完成后,我们要确保this对象只有被登记过一次到唯一一个产生器上,也就是被分配到generator的instance变量上的那一个。
想一下这个具体的情景:现有的产生器是generatorA,某个线程以generatorB产生器调用resetGenerator(),而另一个线程以称为generatorC的产生器来调用此方法。
我们之前的代码是这样的:
if (generator != null) { generator.removeCharacterListener(this); } generator = newCharactor; if (generator != null) { generator.addCharacterListener(this); }
这段代码最大的问题就是:两个线程同时要求generatorA删除this对象,实际上它会被删除两次,ScoreLabel对象同样也会加入generatorB和generatorC。这两个结果都是错的。
但是我们前面使用了synchronized来防止这样的错误,但如果是没有使用synchronized的前提下:
if (newGenerator != null) { newGenerator.addCharacterListener(this); } oldGenerator = generator.getAndSet(newGenerator); if (oldGenerator != null) { oldGenerator.removeCharacterListener(this); }
当它被两个线程同时调用时,ScoreLabel对象会被generatorB和generatorC登记,各个线程随后会atomic地设定当前的产生器,因为它们是同时运行,可能会有不同的结果:假设第一个线程先运行,它会从getAndSet()中取回generatorA,然后将ScoreLabel对象从generatorA的监听器中删除,而第二个线程从getAndSet()中取回generatorB并从generatorB的监听器删除ScoreLabel。如果第二个线程先运行,变量会稍有不同,但结果永远会是一样的:不管哪一个对象被分配给genrator的instance变量,它就是ScoreLabel对象所监听的那一个,并且是唯一的一个。
但是这里会有一个副作用:交换之后监听器会从旧的数据来源中被删除掉,且监听器会在交换前被加入到新的数据来源,它现在有可能接收到既不是现有的产生器也不是打字输入来源所产出的字符。之前newCharacter()方法会检查来源是否为产生器的来源,并在不是的时候会假设来源是打字输入来源。现在就不再是这样,newCharacter()方法必须在处理它之前确认字母的来源,它也必须忽略掉来自不正确的监听器的字母。
所以我们的最后一步就是重新尝试操作:
@Override public synchronized void newCharacter(CharacterEvent ce) { int oldChar2type; if (ce.source == generator.get()) { oldChar2type = char2type.getAndSet(ce.character); if (oldChar2type != -1) { score.decrementAndGet(); setScore(); } } else if (ce.source == typist.get()) { while (true) { oldChar2type = char2type.get(); if (oldChar2type != ce.character) { score.decrementAndGet(); break; } else if (char2type.compareAndSet(oldChar2type, -1)) { score.incrementAndGet(); break; } } setScore(); }
newCharacter()这个方法的修改是最大的,因为它现在必须要丢弃任何不是来自于所属来源的事件。
重点是打字输入事件的处理。我们需要检查输入的字母是否是正确的,如果不是,玩家就会被惩罚,这是通过atomic地递减分数来完成的。如果字母被正确的输入,玩家无法被立即的给予奖赏,相对的,char2type变量要先被更新,分数只有在char2type被正确更新时才会更新。如果更新的操作失败了,这代表着在我们处理此事件时,有其他事件已经被其他线程处理过了,并且是成功处理完。
什么叫成功的处理完?它表示我们必须重新处理事件,因为我们是基于这样的假设:假设正在使用的该变量值不会被变更并且程序代码完成时也是这样,所有已经被我们设定为具有特定值的变量就确实应该是那个值,但因为这与其他线程冲突,这些假设也就被破坏了。通过重新尝试处理事件,就好像从未遇到冲突一样。
所以我们才需要将这段代码封装在一个无穷循环里:程序不会离开循环直到事件被成功处理掉。显然在多个事件间存在race condition,循环会确保没有一个事件会被漏掉或者处理了超过一次。只要我们确实只处理有效的事件一次,事件被处理的顺序就不重要了,因为在处理完每个事件后,数据就会保持在完好的状态。实际上,当我们使用同步的时候,也是一样的情形:多个事件并没有以指定的顺序进行,它们只是根据授予lock的顺序来进行。
整个代码如:
public class ScoreLabel extends JLabel implements CharacterListener { private AtomicInteger score = new AtomicInteger(0); private AtomicInteger char2type = new AtomicInteger(-1); private AtomicReference<CharacterSource> generator = null; private AtomicReference<CharacterSource> typist = null; public ScoreLabel(CharacterSource generator, CharacterSource typist) { this.generator = new AtomicReference<CharacterSource>(); this.typist = new AtomicReference<CharacterSource>(); if (generator != null) { generator.addCharacterListener(this); } if (typist != null) { typist.addCharacterListener(this); } } public ScoreLabel() { this(null, null); } public void resetGenerator(CharacterSource newGenerator) { CharacterSource oldGenerator; if (newGenerator != null) { newGenerator.addCharacterListener(this); } oldGenerator = generator.getAndSet(newGenerator); if (oldGenerator != null) { oldGenerator.removeCharacterListener(this); } } public void resetTypist(CharacterSource newTypist) { CharacterSource oldTypist; if (newTypist != null) { newTypist.addCharacterListener(this); } oldTypist = typist.getAndSet(newTypist); if (oldTypist != null) { oldTypist.removeCharacterListener(this); } } public synchronized void resetScore() { score.set(0); char2type.set(-1); setScore(); } private synchronized void setScore() { SwingUtilities.invokeLater(new Runnable() { @Override public void run() { setText(Integer.toString(score.get())); } }); } @Override public synchronized void newCharacter(CharacterEvent ce) { int oldChar2type; if (ce.source == generator.get()) { oldChar2type = char2type.getAndSet(ce.character); if (oldChar2type != -1) { score.decrementAndGet(); setScore(); } } else if (ce.source == typist.get()) { while (true) { oldChar2type = char2type.get(); if (oldChar2type != ce.character) { score.decrementAndGet(); break; } else if (char2type.compareAndSet(oldChar2type, -1)) { score.incrementAndGet(); break; } } setScore(); } } }
atomic变量的目的只是避免同步的性能因素,但是为什么它在无穷循环中的时候还比较快呢?从技术上来讲,答案肯定不是那个无穷循环,额外的循环只会发生在atomic操作失败的时候,这是因为与其他线程发生冲突。要发生一个真正的无穷循环,就需要无穷的冲突。如果使用同步,这也会是一个问题:无数的线程要访问lock同样也会让程序无法正常操作。另一个方面,实际上,atomic class与同步间的性能差异通常不是很大。
所以,我们有必要平衡同步和atomic变量的使用。在使用同步的时候,线程在取得lock前 会被block住而不能执行。这能够让程序因为其他的线程被阻挡不能执行而atomic地来运行。当使用atomic变量的时候,线程是能够并行的运行相同的代码,atomic变量的目的不是消除不具有线程安全性的race condition,它们的目的是要让程序代码具有线程安全性,所以就不用特地去防止race condition。
使用atomic变量正是应了这样的一句老话:天下没有白吃的午餐!我们避开了同步,但是在所运行的工作量上却付出了代价。这就是所谓的乐观同步:我们的程序代码抓住保护变量的值并作出在这瞬间没有其他修改的假设,然后程序代码就计算出该变量的新值并尝试更新该变量。如果有其他线程同时修改了这个变量,这个更新就失败并且程序必须重新执行这些步骤,并且使用变量的最新修改过的值。
上面例子中出现了数据交换,也就是在取得旧值的同时atomic地设定新值的能力。这是通过使用getAndSet()方法来完成的,使用这个方法就能确保只有一个线程能够取得并使用该值。
如果有更复杂的数据交换时该如何处理?如果值的设定要依据该旧值又该如何处理?
我们可以通过把get()和compareAndSet()方法放在循环中来处理。get()方法用来取得旧值,以计算新值,然后再通过使用compareAndSet()方法来设定新值----只有在旧值没有被更动的时候才会设定为新值,如果这个方法失败,整个操作可以重新进行,因为当前的线程在失败时都没有动到任何数据。虽然调用过get()方法,计算过新值,数据的交换并不是被个别地atomic,如果它成功,这个顺序可以被认为是atomic的,因为它只在没有其他线程变动该值时才会成功。
compareAndSet()这个方法处理的其实就是所谓比较与设定这种情况。它是只有在当前值是预期值的时候才atomic地设定值的能力,这个方法是在atmoic层提供条件支持能力的重要方法。,它甚至能够用来实现出由mutex所提供的同步能力。
如果是更加复杂的比较该如何处理?如果比较是要依据旧值或者外部值该如何处理?
我们依然可以通过把get()和compareAndSet()方法放在循环中来处理。因为数据交换和比较其实是差不多的,唯一的区别就是get()方法取得旧值的目的是为了用来比较或者只用来完成atomic地交换,而复杂的比较是可以用来观察是否要继续操作。
虽然atomic class可用的数据类型列表数量是相当大的,但它依然不是完整的。它不能支持字符和浮点数据类型,虽然支持一般对象类型,但是没有对更复杂的对象类型提供支持,像是String这类对象就具有很多方便的操作。但是,我们是在JAVA中编程,也就是面向对象编程,我们完全可以将数据类型封装进只读的数据对象中来对任何新类型实现atomic的支持,然后此数据对象通过改变atomic引用到新的数据对象,就可以被atomic地变动。但这也仅仅在嵌入数据对象的值不会被任何方式变动的时候才有效。任何对数据对象的变动必须只能通过改变引用到不同的对象来完成,旧对象的值是不变的。所有由数据对象所封装的值,不管是直接还是非直接,必须是只读的才能使这个技巧有效的运作。
所以,我们是不可能atomic地改变浮点数值,但是我们可以atomic地改变对象引用到不同的浮点数值,只要浮点数值是只读的,它就具有线程安全性。
这就是我们实现的浮点数值的atomic class:
public class AtomicDouble extends Number{ private AtomicReference<Double> value; public AtomicDouble(){ this(0.0); } public AtomicDouble(double initVal){ value = new AtomicReference<Double>(new Double(initVal)); } public double get(){ return value.get().doubleValue(); } public void set(double newVal){ value.set(new Double(newVal)); } public boolean compareAndSet(double expect, double update){ Double origVal, newVal; newVal = new Double(update); while(true){ origVal = value.get(); if(Double.compare(origVal.doubleValue(), expect) == 0){ if(value.compareAndSet(origVal, newVal)){ return true; }else{ return false; } } } } public boolean weakCompareAndSet(double expect, double update){ return compareAndSet(expect, update); } public double getAndSet(double setVal){ Double origVal, newVal; newVal = new Double(setVal); while(true){ origVal = value.get(); if(value.compareAndSet(origVal, newVal)){ return origVal.doubleValue(); } } } public double getAndAdd(double delta){ Double origVal, newVal; while(true){ origVal = value.get(); newVal = new Double(origVal.doubleValue() + delta); if(value.compareAndSet(origVal, newVal)){ return origVal.doubleValue(); } } } public double addAndGet(double delta){ Double origVal, newVal; while(true){ origVal = value.get(); newVal = new Double(origVal.doubleValue() + delta); if(value.compareAndSet(origVal, newVal)){ return newVal.doubleValue(); } } } public double getAndIncrement(){ return getAndAdd((double)1.0); } public double getAndDecrement(){ return addAndGet((double)-1.0); } public double incrementAndGet(){ return addAndGet((double)1.0); } public double decrementAndGet(){ return addAndGet((double)-1.0); } public double getAndMultiply(double multiple){ Double origVal, newVal; while(true){ origVal = value.get(); newVal = new Double(origVal.doubleValue() * multiple); if(value.compareAndSet(origVal, newVal)){ return origVal.doubleValue(); } } } public double multiplyAndGet(double multiple){ Double origVal, newVal; while(true){ origVal = value.get(); newVal = new Double(origVal.doubleValue() * multiple); if(value.compareAndSet(origVal, newVal)){ return newVal.doubleValue(); } } } }
到现在为止,我们还只是对个别的变量做atomic地设定,还没有做到对一群数据atomic地设定。如果是这样,我们就必须通过创建封装这些要被变动值的对象来完成,之后这些值就可以通过atomic地变动对这些值的atomic引用来做到同时地改变。这样的运行方式其实和上面实现的AtomicDouble是一样的。
这同样也是需要在值没有以任何方式直接改变的情况下才会有效。任何对数据对象的改变是通过改变引用到不同的对象上来完成的,也就是所谓的额数据交换,旧的对象值必须没有被变动过,不管是直接还是间接封装的值都必须是只读的才能让这个技巧有效运作。
明白了这点,我们也清楚的知道了以高级atomic数据类型来执行大量的数据变更,我们会用到大量的临时对象,因为为了确保所有的操作是atomic的,我们必须在临时变量上来完成所有的计算,并且所有的值都是使用数据交换来做atomic地变更。实际上,对象的创建远比我们想象中要多得多:对每个事务动作都需要创建一个新的对象,每个atomic地比对和设定操作在失败而必须重新尝试的时候也需要创建新的对象。
所以,我们必须明白一件事:使用atomic变量会让我们的代码非常复杂,我们必须在它与使用同步间取得平衡:是否可以接受所有临时对象的创建?此技巧是否比同步好?
我们最后来讲一下Thread的局部变量。
任何线程都可以在任意的时间定义该线程私有的局部变量,而其他线程可以定义相同的变量以创建该变量自有的拷贝。这就意味着线程的局部变量无法用在线程间共享状态,对某个线程私有的变量所做的改变并不会反映在其他线程所持有的拷贝上,但这意味着对该变量的访问觉不需要同步化,因为它不可能让多个线程同时访问。
我们可以利用java.lang.ThreadLocal这个类来模型化:
public class ThreadLocal<T>{ protected T initialValue(); public T get(); public void set(T value); public void remove(); }
一般情况下,我们都是subclass这个ThreadLocal并覆写initialValue()这个方法来返回应该在线程第一次访问此变量时返回的值。我们还可以通过继承自这个类来让子线程继承父线程的局部变量。
事实上,ThreadLocal因为性能非常差的原因我们很少使用到,但是它在实现一些线程问题的时候还是非常有用的,像是Android的消息队列,就是使用到了这点。