import java.util.concurrent.*; import java.util.*; class TaskWithResult implements Callable<String> { private int id; public TaskWithResult(int id) { this.id = id; } public String call() { return "result of TaskWithResult " + id; } } public class CallableDemo { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); ArrayList<Future<String>> results = new ArrayList<Future<String>>();//Future接口记录结果 for(int i = 0; i < 10; i++) results.add(exec.submit(new TaskWithResult(i))); for(Future<String> fs : results) try { // get方法将阻塞,直到结果准备就绪 System.out.println(fs.get()); } catch(InterruptedException e) { System.out.println(e); return; } catch(ExecutionException e) { System.out.println(e); } finally { exec.shutdown();//关闭线程池 } } } /* 输出: result of TaskWithResult 0 result of TaskWithResult 1 result of TaskWithResult 2 result of TaskWithResult 3 result of TaskWithResult 4 result of TaskWithResult 5 result of TaskWithResult 6 result of TaskWithResult 7 result of TaskWithResult 8 result of TaskWithResult 9 */
class ADaemon implements Runnable { public void run() { try { print("Starting ADaemon"); TimeUnit.SECONDS.sleep(1); } catch(InterruptedException e) { print("Exiting via InterruptedException"); } finally { print("This should always run?"); } } } public class DaemonsDontRunFinally { public static void main(String[] args) throws Exception { Thread t = new Thread(new ADaemon()); t.setDaemon(true); t.start(); } } /* 输出: Starting Adaemon*/
import java.util.concurrent.*; import static net.mindview.util.Print.*; class ADaemon implements Runnable { public void run() { try { print("Starting ADaemon"); TimeUnit.SECONDS.sleep(1); } catch(InterruptedException e) { print("Exiting via InterruptedException"); } finally { print("This should always run?"); } } } public class DaemonsDontRunFinally { public static void main(String[] args) throws Exception { Thread t = new Thread(new ADaemon()); ExecutorService s=Executors.newCachedThreadPool(); t.setDaemon(true); s.execute(t); Thread.sleep(10); s.shutdown(); } } /* 输出: Starting ADaemon This should always run? */
import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.*; public class AtomicIntegerTest implements Runnable { private AtomicInteger i = new AtomicInteger(0); public int getValue() { return i.get(); } private void evenIncrement() { i.addAndGet(2); } public void run() { while(true) evenIncrement(); } public static void main(String[] args) { new Timer().schedule(new TimerTask() { public void run() { System.err.println("Aborting"); System.exit(0); } }, 5000); // 5秒后结束 ExecutorService exec = Executors.newCachedThreadPool(); AtomicIntegerTest ait = new AtomicIntegerTest(); exec.execute(ait); while(true) { int val = ait.getValue(); if(val % 2 != 0) { System.out.println(val); System.exit(0); } } } }
import java.util.concurrent.*; import java.io.*; import static net.mindview.util.Print.*; class LiftOffRunner implements Runnable { private BlockingQueue<LiftOff> rockets; public LiftOffRunner(BlockingQueue<LiftOff> queue) { rockets = queue; } public void add(LiftOff lo) { try { rockets.put(lo); } catch(InterruptedException e) { print("Interrupted during put()"); } } public void run() { try { while(!Thread.interrupted()) { LiftOff rocket = rockets.take(); rocket.run(); } } catch(InterruptedException e) { print("Waking from take()"); } print("Exiting LiftOffRunner"); } } public class TestBlockingQueues { static void getkey() { try { new BufferedReader( new InputStreamReader(System.in)).readLine(); } catch(java.io.IOException e) { throw new RuntimeException(e); } } static void getkey(String message) { print(message); getkey(); } static void test(String msg, BlockingQueue<LiftOff> queue) { print(msg); LiftOffRunner runner = new LiftOffRunner(queue); Thread t = new Thread(runner); t.start(); for(int i = 0; i < 5; i++) runner.add(new LiftOff(5)); getkey("Press 'Enter' (" + msg + ")"); t.interrupt(); print("Finished " + msg + " test"); } public static void main(String[] args) { test("LinkedBlockingQueue", // 没有界限的队列 new LinkedBlockingQueue<LiftOff>()); test("ArrayBlockingQueue", //有限队列 new ArrayBlockingQueue<LiftOff>(3)); test("SynchronousQueue", // 容量为1的队列 new SynchronousQueue<LiftOff>()); } }
import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.*; import static net.mindview.util.Print.*; public class FastSimulation { static final int N_ELEMENTS = 100000; static final int N_GENES = 30; static final int N_EVOLVERS = 50; static final AtomicInteger[][] GRID = new AtomicInteger[N_ELEMENTS][N_GENES]; static Random rand = new Random(47); static class Evolver implements Runnable { public void run() { while(!Thread.interrupted()) { int element = rand.nextInt(N_ELEMENTS); for(int i = 0; i < N_GENES; i++) { int previous = element - 1; if(previous < 0) previous = N_ELEMENTS - 1; int next = element + 1; if(next >= N_ELEMENTS) next = 0; int oldvalue = GRID[element][i].get(); // Perform some kind of modeling calculation: int newvalue = oldvalue + GRID[previous][i].get() + GRID[next][i].get(); newvalue /= 3; // Average the three values if(!GRID[element][i] .compareAndSet(oldvalue, newvalue)) { // 在这里处理当旧值已经被别的线程更新的情况. 这里,我们只需要报告它并且忽略它 // 我们的模型会根据实际情况处理. print("Old value changed from " + oldvalue); } } } } } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < N_ELEMENTS; i++) for(int j = 0; j < N_GENES; j++) GRID[i][j] = new AtomicInteger(rand.nextInt(1000)); for(int i = 0; i < N_EVOLVERS; i++) exec.execute(new Evolver()); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); } }
import java.util.concurrent.*; import java.util.concurrent.locks.*; import java.util.*; import static net.mindview.util.Print.*; public class ReaderWriterList<T> { private ArrayList<T> lockedList; //使用给定的公平策略创建一个新的 ReentrantReadWriteLock。 private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); public ReaderWriterList(int size, T initialValue) { lockedList = new ArrayList<T>( Collections.nCopies(size, initialValue)); } public T set(int index, T element) { Lock wlock = lock.writeLock(); wlock.lock(); try { return lockedList.set(index, element); } finally { wlock.unlock(); } } public T get(int index) { Lock rlock = lock.readLock(); rlock.lock(); try { if(lock.getReadLockCount() > 1) print(lock.getReadLockCount()); return lockedList.get(index); } finally { rlock.unlock(); } } public static void main(String[] args) throws Exception { new ReaderWriterListTest(30, 1); } } class ReaderWriterListTest { ExecutorService exec = Executors.newCachedThreadPool(); private final static int SIZE = 100; private static Random rand = new Random(47); private ReaderWriterList<Integer> list = new ReaderWriterList<Integer>(SIZE, 0); private class Writer implements Runnable { public void run() { try { for(int i = 0; i < 20; i++) { list.set(i, rand.nextInt()); TimeUnit.MILLISECONDS.sleep(100); } } catch(InterruptedException e) { } print("Writer finished, shutting down"); exec.shutdownNow(); } } private class Reader implements Runnable { public void run() { try { while(!Thread.interrupted()) { for(int i = 0; i < SIZE; i++) { list.get(i); TimeUnit.MILLISECONDS.sleep(1); } } } catch(InterruptedException e) { } } } public ReaderWriterListTest(int readers, int writers) { for(int i = 0; i < readers; i++) exec.execute(new Reader()); for(int i = 0; i < writers; i++) exec.execute(new Writer()); } }
package concurrency.restaurant2; import enumerated.menu.*;//工具类 import java.util.concurrent.*; import java.util.*; import static net.mindview.util.Print.*; // 这个交给Writer,再交给chef: class Order { private static int counter = 0; private final int id = counter++; private final Customer customer; private final WaitPerson waitPerson; private final Food food; public Order(Customer cust, WaitPerson wp, Food f) { customer = cust; waitPerson = wp; food = f; } public Food item() { return food; } public Customer getCustomer() { return customer; } public WaitPerson getWaitPerson() { return waitPerson; } public String toString() { return "Order: " + id + " item: " + food + " for: " + customer + " served by: " + waitPerson; } } // 这是从chef返回的: class Plate { private final Order order; private final Food food; public Plate(Order ord, Food f) { order = ord; food = f; } public Order getOrder() { return order; } public Food getFood() { return food; } public String toString() { return food.toString(); } } class Customer implements Runnable { private static int counter = 0; private final int id = counter++; private final WaitPerson waitPerson; // 只有一个盘子可以从厨师那里得到: private SynchronousQueue<Plate> placeSetting = new SynchronousQueue<Plate>(); public Customer(WaitPerson w) { waitPerson = w; } public void deliver(Plate p) throws InterruptedException { // 再没有获得新的餐点的时候,placeSetting将一直阻塞 placeSetting.put(p); } public void run() { for(Course course : Course.values()) { Food food = course.randomSelection();//调用工具类产生食物 try { waitPerson.placeOrder(this, food); print(this + "eating " + placeSetting.take()); } catch(InterruptedException e) { print(this + "waiting for " + course + " interrupted"); break; } } print(this + "finished meal, leaving"); } public String toString() { return "Customer " + id + " "; } } class WaitPerson implements Runnable { private static int counter = 0; private final int id = counter++; private final Restaurant restaurant; BlockingQueue<Plate> filledOrders = new LinkedBlockingQueue<Plate>(); public WaitPerson(Restaurant rest) { restaurant = rest; } public void placeOrder(Customer cust, Food food) { try { restaurant.orders.put(new Order(cust, this, food)); } catch(InterruptedException e) { print(this + " placeOrder interrupted"); } } public void run() { try { while(!Thread.interrupted()) { //阻塞直到餐点已经准备好 Plate plate = filledOrders.take(); print(this + "received " + plate + " delivering to " + plate.getOrder().getCustomer()); plate.getOrder().getCustomer().deliver(plate); } } catch(InterruptedException e) { print(this + " interrupted"); } print(this + " off duty"); } public String toString() { return "WaitPerson " + id + " "; } } class Chef implements Runnable { private static int counter = 0; private final int id = counter++; private final Restaurant restaurant; private static Random rand = new Random(47); public Chef(Restaurant rest) { restaurant = rest; } public void run() { try { while(!Thread.interrupted()) { //阻塞直到有订单出现 Order order = restaurant.orders.take(); Food requestedItem = order.item(); //花时间准备订单: TimeUnit.MILLISECONDS.sleep(rand.nextInt(500)); Plate plate = new Plate(order, requestedItem); order.getWaitPerson().filledOrders.put(plate); } } catch(InterruptedException e) { print(this + " interrupted"); } print(this + " off duty"); } public String toString() { return "Chef " + id + " "; } } class Restaurant implements Runnable { private List<WaitPerson> waitPersons = new ArrayList<WaitPerson>(); private List<Chef> chefs = new ArrayList<Chef>(); private ExecutorService exec; private static Random rand = new Random(47); BlockingQueue<Order> orders = new LinkedBlockingQueue<Order>(); public Restaurant(ExecutorService e, int nWaitPersons, int nChefs) { exec = e; for(int i = 0; i < nWaitPersons; i++) { WaitPerson waitPerson = new WaitPerson(this); waitPersons.add(waitPerson); exec.execute(waitPerson); } for(int i = 0; i < nChefs; i++) { Chef chef = new Chef(this); chefs.add(chef); exec.execute(chef); } } public void run() { try { while(!Thread.interrupted()) { // 一个新的顾客出现,配上一个服务员: WaitPerson wp = waitPersons.get( rand.nextInt(waitPersons.size())); Customer c = new Customer(wp); exec.execute(c); TimeUnit.MILLISECONDS.sleep(100); } } catch(InterruptedException e) { print("Restaurant interrupted"); } print("Restaurant closing"); } } public class RestaurantWithQueues { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); Restaurant restaurant = new Restaurant(exec, 5, 2); exec.execute(restaurant); if(args.length > 0) TimeUnit.SECONDS.sleep(new Integer(args[0])); else { print("Press 'Enter' to quit"); System.in.read(); } exec.shutdownNow(); } } /* 输出: WaitPerson 0 received SPRING_ROLLS delivering to Customer 1 Customer 1 eating SPRING_ROLLS WaitPerson 3 received SPRING_ROLLS delivering to Customer 0 Customer 0 eating SPRING_ROLLS WaitPerson 0 received BURRITO delivering to Customer 1 Customer 1 eating BURRITO WaitPerson 3 received SPRING_ROLLS delivering to Customer 2 Customer 2 eating SPRING_ROLLS WaitPerson 1 received SOUP delivering to Customer 3 Customer 3 eating SOUP WaitPerson 3 received VINDALOO delivering to Customer 0 Customer 0 eating VINDALOO WaitPerson 0 received FRUIT delivering to Customer 1 ... */