一.从任务中产生返回值,Callable接口的使用
Callable是一种具有泛型类型参数的泛型,它的类型参数表示的是从方法call返回的值,而且必须使Executor.submit来去调用它.submit方法将会返回Future对象,它用Callable返回结果的特定类型进行了参数化,可以通过isDone方法来检测Future是否已经完成.当任务完成的时候,它具有一个结果,可以调用get方法来获取结果,get方法将阻塞,直到结果准备就绪.下面是示例:
import java.util.concurrent.*; import java.util.*; class TaskWithResult implements Callable<String> { private int id; public TaskWithResult(int id) { this.id = id; } public String call() { return "result of TaskWithResult " + id; } } public class CallableDemo { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); ArrayList<Future<String>> results = new ArrayList<Future<String>>();//Future接口记录结果 for(int i = 0; i < 10; i++) results.add(exec.submit(new TaskWithResult(i))); for(Future<String> fs : results) try { // get方法将阻塞,直到结果准备就绪 System.out.println(fs.get()); } catch(InterruptedException e) { System.out.println(e); return; } catch(ExecutionException e) { System.out.println(e); } finally { exec.shutdown();//关闭线程池 } } } /* 输出: result of TaskWithResult 0 result of TaskWithResult 1 result of TaskWithResult 2 result of TaskWithResult 3 result of TaskWithResult 4 result of TaskWithResult 5 result of TaskWithResult 6 result of TaskWithResult 7 result of TaskWithResult 8 result of TaskWithResult 9 */
二.后台线程
后台线程是在后台提供通用服务的一组线程,在所有非后台线程结束的时候,程序也就终止了,并且也就杀死进程中的所有后台线程.对于后台线程的使用,注意以下几点:
1.必须在线程启动前,调用setDaemon方法.才能将线程设为后台线程.可以通过调用isDaemon的方法判断一个线程是否是一个后台线程.
2.不使用线程池开启线程的时候,后台进程在不执行finally子句的情况下,就会终止其run方法.
class ADaemon implements Runnable { public void run() { try { print("Starting ADaemon"); TimeUnit.SECONDS.sleep(1); } catch(InterruptedException e) { print("Exiting via InterruptedException"); } finally { print("This should always run?"); } } } public class DaemonsDontRunFinally { public static void main(String[] args) throws Exception { Thread t = new Thread(new ADaemon()); t.setDaemon(true); t.start(); } } /* 输出: Starting Adaemon*/
因此使用线程池来完成对于守护线程的执行是一种更好的方式:
import java.util.concurrent.*; import static net.mindview.util.Print.*; class ADaemon implements Runnable { public void run() { try { print("Starting ADaemon"); TimeUnit.SECONDS.sleep(1); } catch(InterruptedException e) { print("Exiting via InterruptedException"); } finally { print("This should always run?"); } } } public class DaemonsDontRunFinally { public static void main(String[] args) throws Exception { Thread t = new Thread(new ADaemon()); ExecutorService s=Executors.newCachedThreadPool(); t.setDaemon(true); s.execute(t); Thread.sleep(10); s.shutdown(); } } /* 输出: Starting ADaemon This should always run? */
三.yield方法(礼让线程)
yield方法,给了具有相同优先级的其他线程执行的机会,但是依然是公平竞争,因此不保证其他线程能够执行.另外要注意:在调用yield方法的时候,锁并没有释放!
四.原子性和volatile
原子操作是指不能被线程调度中断的操作,一旦操作开始,那么它一定在可能发生的上下文切换之前执行完毕.原子性可以应用于除long和double之外的基本类型变量这样的操作.如果将一个域声明为volatile,那么只要对这个域产生了写的操作,那么所有的读操作都可以看到这个修改,即便使用了本地缓存,volatile域会立即被写入到主存中,而读取操作就发生在主存之中.
基本上,如果一个域可能被多个任务同时访问,或者这些任务中至少有一个是写入任务,那么你就应该将这个域设置为volatile的.
利用AtomicInteger,AtomicLong,AutomicReference等特殊的原子变量类提供了一系列原子性的操作.例如下面的示例,在不加锁的情况下实现线程安全:
import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.*; public class AtomicIntegerTest implements Runnable { private AtomicInteger i = new AtomicInteger(0); public int getValue() { return i.get(); } private void evenIncrement() { i.addAndGet(2); } public void run() { while(true) evenIncrement(); } public static void main(String[] args) { new Timer().schedule(new TimerTask() { public void run() { System.err.println("Aborting"); System.exit(0); } }, 5000); // 5秒后结束 ExecutorService exec = Executors.newCachedThreadPool(); AtomicIntegerTest ait = new AtomicIntegerTest(); exec.execute(ait); while(true) { int val = ait.getValue(); if(val % 2 != 0) { System.out.println(val); System.exit(0); } } } }
五.生产者消费者与队列
使用同步队列可以解决任务的协作问题,同步队列在任何时刻只允许一个任务插入或者移除元素,可以使用LinkedBlockingQueue,它是一个无界的队列,也可以使用ArrayBlockingQueue,它具有固定的尺寸,因此可以在它被阻塞之前,放入有限数量的元素.如果消费者试图从队列中取出对象,而该队列此时为空,那么这些队列还可以被挂起消费者,并且当有更多的元素可用的时候恢复消费者任务,而当生产者想要生产对象的时候,对于有界的队列,当队列已经充满了资源的时候,生产者也将被挂起,直到队列有可用的生产空间.下面是一个示例,它将多个LiftOff对象的执行串行化了,消费者是LiftOffRunner,它将每个LiftOff对象,从BlockingQueue中推出并且执行.
import java.util.concurrent.*; import java.io.*; import static net.mindview.util.Print.*; class LiftOffRunner implements Runnable { private BlockingQueue<LiftOff> rockets; public LiftOffRunner(BlockingQueue<LiftOff> queue) { rockets = queue; } public void add(LiftOff lo) { try { rockets.put(lo); } catch(InterruptedException e) { print("Interrupted during put()"); } } public void run() { try { while(!Thread.interrupted()) { LiftOff rocket = rockets.take(); rocket.run(); } } catch(InterruptedException e) { print("Waking from take()"); } print("Exiting LiftOffRunner"); } } public class TestBlockingQueues { static void getkey() { try { new BufferedReader( new InputStreamReader(System.in)).readLine(); } catch(java.io.IOException e) { throw new RuntimeException(e); } } static void getkey(String message) { print(message); getkey(); } static void test(String msg, BlockingQueue<LiftOff> queue) { print(msg); LiftOffRunner runner = new LiftOffRunner(queue); Thread t = new Thread(runner); t.start(); for(int i = 0; i < 5; i++) runner.add(new LiftOff(5)); getkey("Press 'Enter' (" + msg + ")"); t.interrupt(); print("Finished " + msg + " test"); } public static void main(String[] args) { test("LinkedBlockingQueue", // 没有界限的队列 new LinkedBlockingQueue<LiftOff>()); test("ArrayBlockingQueue", //有限队列 new ArrayBlockingQueue<LiftOff>(3)); test("SynchronousQueue", // 容量为1的队列 new SynchronousQueue<LiftOff>()); } }
六.死锁
当某个任务在等待另外一个任务,而后者又在等待别的任务,这样一直循环下去,直到这个链条上的任务又在等待第一个任务释放锁,这得到一个任务之间相互等待的连续循环,没有哪个线程能够继续,这被称之为死锁.
当四个条件满足的时候,将会发生死锁:
1.互斥条件.各个任务使用的资源至少有一个不是资源共享的.
2.至少有一个任务它必须持有一个资源且正在等待获取另一个当前被别的任务持有的资源
3.资源不能被任务抢占,任务必须把资源当作普通事件.
4.必须有循环等待,这样,一个任务等待其他任务所持有的资源,后者又在等待另外一个任务所持有的资源,这样一直下去,直到有一个任务在等待第一个任务所持有的资源,使得大家都被锁住.
因此要破坏死锁的话,只要破坏死锁中的任意一个条件即可.防止死锁最容易的方法就是破坏第4个条件.
七.乐观加锁&ReadWriteLock
某些Atomic类还允许执行所谓的"乐观加锁",这意味着当你执行某些计算的时候,实际上没有使用互斥,但是在这项计算完成,并且准备更新这个Atomic对象的时候,需要调用CompareAndSet方法,将旧值和新值一起交给这个方法,当旧值与它在Atomic对象中发现的值不一致的时候,那么这个操作将失败,这意味着某个任务已经于操作执行期间修改了一个对象.通常情况下,应该使用互斥(synchronized或Lock)来防止多个任务同时修改一个对象,但是这里我们是"乐观"的,因为我们保持数据出于无锁定的状态,并且希望没有任务来修改它,通过Atomic来替代synchronized或Lock可以获得性能的好处.传统的锁机制又成为"悲观锁",即每次拿数据的时候都认为会被别人修改,因此提供了加锁机制.下面是乐观锁的示例:
import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.*; import static net.mindview.util.Print.*; public class FastSimulation { static final int N_ELEMENTS = 100000; static final int N_GENES = 30; static final int N_EVOLVERS = 50; static final AtomicInteger[][] GRID = new AtomicInteger[N_ELEMENTS][N_GENES]; static Random rand = new Random(47); static class Evolver implements Runnable { public void run() { while(!Thread.interrupted()) { int element = rand.nextInt(N_ELEMENTS); for(int i = 0; i < N_GENES; i++) { int previous = element - 1; if(previous < 0) previous = N_ELEMENTS - 1; int next = element + 1; if(next >= N_ELEMENTS) next = 0; int oldvalue = GRID[element][i].get(); // Perform some kind of modeling calculation: int newvalue = oldvalue + GRID[previous][i].get() + GRID[next][i].get(); newvalue /= 3; // Average the three values if(!GRID[element][i] .compareAndSet(oldvalue, newvalue)) { // 在这里处理当旧值已经被别的线程更新的情况. 这里,我们只需要报告它并且忽略它 // 我们的模型会根据实际情况处理. print("Old value changed from " + oldvalue); } } } } } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < N_ELEMENTS; i++) for(int j = 0; j < N_GENES; j++) GRID[i][j] = new AtomicInteger(rand.nextInt(1000)); for(int i = 0; i < N_EVOLVERS; i++) exec.execute(new Evolver()); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); } }
ReadWriteLock对向数据结构相对不频繁的写入,但是有多个任务要经常读取这个数据结构的情况,进行了优化,ReadWriteLock使得你可以同时有多个读取者,只要它们都不试图写入即可,如果写锁已经被其他任务持有,那么任何读锁都不能被访问,直到这个写锁被释放为止.下面是示例,set方法获得一个写锁,以调用底层的ArrayList.set方法,而get方法要获取一个读锁,以调用底层的ArrayList.get()方法,另外get将检查是否有多个对象获取了读锁,如果是,则显示这种读取者的数量,以证明可以有多个读取者获得该锁.
import java.util.concurrent.*; import java.util.concurrent.locks.*; import java.util.*; import static net.mindview.util.Print.*; public class ReaderWriterList<T> { private ArrayList<T> lockedList; //使用给定的公平策略创建一个新的 ReentrantReadWriteLock。 private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); public ReaderWriterList(int size, T initialValue) { lockedList = new ArrayList<T>( Collections.nCopies(size, initialValue)); } public T set(int index, T element) { Lock wlock = lock.writeLock(); wlock.lock(); try { return lockedList.set(index, element); } finally { wlock.unlock(); } } public T get(int index) { Lock rlock = lock.readLock(); rlock.lock(); try { if(lock.getReadLockCount() > 1) print(lock.getReadLockCount()); return lockedList.get(index); } finally { rlock.unlock(); } } public static void main(String[] args) throws Exception { new ReaderWriterListTest(30, 1); } } class ReaderWriterListTest { ExecutorService exec = Executors.newCachedThreadPool(); private final static int SIZE = 100; private static Random rand = new Random(47); private ReaderWriterList<Integer> list = new ReaderWriterList<Integer>(SIZE, 0); private class Writer implements Runnable { public void run() { try { for(int i = 0; i < 20; i++) { list.set(i, rand.nextInt()); TimeUnit.MILLISECONDS.sleep(100); } } catch(InterruptedException e) { } print("Writer finished, shutting down"); exec.shutdownNow(); } } private class Reader implements Runnable { public void run() { try { while(!Thread.interrupted()) { for(int i = 0; i < SIZE; i++) { list.get(i); TimeUnit.MILLISECONDS.sleep(1); } } } catch(InterruptedException e) { } } } public ReaderWriterListTest(int readers, int writers) { for(int i = 0; i < readers; i++) exec.execute(new Reader()); for(int i = 0; i < writers; i++) exec.execute(new Writer()); } }
八.仿真
饭店仿真:任务并没有直接的互相干涉而是通过队列互相发送对象,接受任务将处理对象,将其当作一个信息来对待,而不是向他发送信息.
package concurrency.restaurant2; import enumerated.menu.*;//工具类 import java.util.concurrent.*; import java.util.*; import static net.mindview.util.Print.*; // 这个交给Writer,再交给chef: class Order { private static int counter = 0; private final int id = counter++; private final Customer customer; private final WaitPerson waitPerson; private final Food food; public Order(Customer cust, WaitPerson wp, Food f) { customer = cust; waitPerson = wp; food = f; } public Food item() { return food; } public Customer getCustomer() { return customer; } public WaitPerson getWaitPerson() { return waitPerson; } public String toString() { return "Order: " + id + " item: " + food + " for: " + customer + " served by: " + waitPerson; } } // 这是从chef返回的: class Plate { private final Order order; private final Food food; public Plate(Order ord, Food f) { order = ord; food = f; } public Order getOrder() { return order; } public Food getFood() { return food; } public String toString() { return food.toString(); } } class Customer implements Runnable { private static int counter = 0; private final int id = counter++; private final WaitPerson waitPerson; // 只有一个盘子可以从厨师那里得到: private SynchronousQueue<Plate> placeSetting = new SynchronousQueue<Plate>(); public Customer(WaitPerson w) { waitPerson = w; } public void deliver(Plate p) throws InterruptedException { // 再没有获得新的餐点的时候,placeSetting将一直阻塞 placeSetting.put(p); } public void run() { for(Course course : Course.values()) { Food food = course.randomSelection();//调用工具类产生食物 try { waitPerson.placeOrder(this, food); print(this + "eating " + placeSetting.take()); } catch(InterruptedException e) { print(this + "waiting for " + course + " interrupted"); break; } } print(this + "finished meal, leaving"); } public String toString() { return "Customer " + id + " "; } } class WaitPerson implements Runnable { private static int counter = 0; private final int id = counter++; private final Restaurant restaurant; BlockingQueue<Plate> filledOrders = new LinkedBlockingQueue<Plate>(); public WaitPerson(Restaurant rest) { restaurant = rest; } public void placeOrder(Customer cust, Food food) { try { restaurant.orders.put(new Order(cust, this, food)); } catch(InterruptedException e) { print(this + " placeOrder interrupted"); } } public void run() { try { while(!Thread.interrupted()) { //阻塞直到餐点已经准备好 Plate plate = filledOrders.take(); print(this + "received " + plate + " delivering to " + plate.getOrder().getCustomer()); plate.getOrder().getCustomer().deliver(plate); } } catch(InterruptedException e) { print(this + " interrupted"); } print(this + " off duty"); } public String toString() { return "WaitPerson " + id + " "; } } class Chef implements Runnable { private static int counter = 0; private final int id = counter++; private final Restaurant restaurant; private static Random rand = new Random(47); public Chef(Restaurant rest) { restaurant = rest; } public void run() { try { while(!Thread.interrupted()) { //阻塞直到有订单出现 Order order = restaurant.orders.take(); Food requestedItem = order.item(); //花时间准备订单: TimeUnit.MILLISECONDS.sleep(rand.nextInt(500)); Plate plate = new Plate(order, requestedItem); order.getWaitPerson().filledOrders.put(plate); } } catch(InterruptedException e) { print(this + " interrupted"); } print(this + " off duty"); } public String toString() { return "Chef " + id + " "; } } class Restaurant implements Runnable { private List<WaitPerson> waitPersons = new ArrayList<WaitPerson>(); private List<Chef> chefs = new ArrayList<Chef>(); private ExecutorService exec; private static Random rand = new Random(47); BlockingQueue<Order> orders = new LinkedBlockingQueue<Order>(); public Restaurant(ExecutorService e, int nWaitPersons, int nChefs) { exec = e; for(int i = 0; i < nWaitPersons; i++) { WaitPerson waitPerson = new WaitPerson(this); waitPersons.add(waitPerson); exec.execute(waitPerson); } for(int i = 0; i < nChefs; i++) { Chef chef = new Chef(this); chefs.add(chef); exec.execute(chef); } } public void run() { try { while(!Thread.interrupted()) { // 一个新的顾客出现,配上一个服务员: WaitPerson wp = waitPersons.get( rand.nextInt(waitPersons.size())); Customer c = new Customer(wp); exec.execute(c); TimeUnit.MILLISECONDS.sleep(100); } } catch(InterruptedException e) { print("Restaurant interrupted"); } print("Restaurant closing"); } } public class RestaurantWithQueues { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); Restaurant restaurant = new Restaurant(exec, 5, 2); exec.execute(restaurant); if(args.length > 0) TimeUnit.SECONDS.sleep(new Integer(args[0])); else { print("Press 'Enter' to quit"); System.in.read(); } exec.shutdownNow(); } } /* 输出: WaitPerson 0 received SPRING_ROLLS delivering to Customer 1 Customer 1 eating SPRING_ROLLS WaitPerson 3 received SPRING_ROLLS delivering to Customer 0 Customer 0 eating SPRING_ROLLS WaitPerson 0 received BURRITO delivering to Customer 1 Customer 1 eating BURRITO WaitPerson 3 received SPRING_ROLLS delivering to Customer 2 Customer 2 eating SPRING_ROLLS WaitPerson 1 received SOUP delivering to Customer 3 Customer 3 eating SOUP WaitPerson 3 received VINDALOO delivering to Customer 0 Customer 0 eating VINDALOO WaitPerson 0 received FRUIT delivering to Customer 1 ... */