一、生产者、消费者协作机制:
生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到队列上,而消费者从列队上取数据或任务,如果队列长度有限,在队列满的时候,生产者等待,而在队列为空的时候,消费者等待。
/** * 使用两个栈 实现队列 * * 生产者,消费者协作模式:
共享变量是一个阻塞队列,当队列满了生产者wait(),当队列为空消费者wait(); * * 阻塞队列有: * 接口:BlockingQueue、BlockingDeque 双端队列 * 基数数组的实现类:ArrayBlockingQueue * 基于链表的实现类:LinkedBlockingQueue、LinkedBlockingDeque * 基于堆的实现类:PriorityBlockingQueue * */ public class QueueTest<E> { private ArrayDeque<E> producerStack = null; private ArrayDeque<E> consumerStack = null; private static int num = 0; public static void main(String[] args) throws InterruptedException { QueueTest<String> eQueueTest = new QueueTest<>(); new Thread(new Runnable() { @Override public void run() { try { eQueueTest.producer(); Thread.sleep( (int)(Math.random() * 100)); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // Thread.sleep( (int)(Math.random() * 100)); new Thread(new Runnable() { @Override public void run() { try { eQueueTest.consumer(); Thread.sleep( (int)(Math.random() * 100)); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } public QueueTest() { producerStack = new ArrayDeque<>(10); consumerStack = new ArrayDeque<>(10); } public synchronized void producer() throws InterruptedException { while (true){ while(producerStack.size() == 10){ this.wait(); } producerStack.addLast((E) String.valueOf(++num));
System.out.println("producerStack size :" + producerStack.size() ); for (E e : producerStack) { System.out.println("producerStack..." + e.toString()); } this.notifyAll(); } } public synchronized void consumer() throws InterruptedException { while(true){ while(producerStack.isEmpty()){ this.wait(); }
consumerStack.addLast(producerStack.pollLast());
System.out.println("consumerStack size :" + consumerStack.size() ); for (E c : consumerStack) { System.out.println("consumerStack..." + c.toString()); } System.out.println("------------------------------------------------------------"); if(consumerStack.size() == 10){ while(consumerStack.size() > 0){ System.out.println("consumerStack,pollLast :" + consumerStack.pollLast().toString()); } } this.notifyAll(); } } }
二、同步协作机制:
在一些程序,尤其是模拟仿真程序中,要求多个线程同时开始。
/** * 同步协作机制 * */ public class SynchronizationTest { private volatile boolean fired = false; public static void main(String[] args){ SynchronizationTest synchronizationTest = new SynchronizationTest(); new Thread(new Runnable() { @Override public void run() { try { //等待住线程唤醒 synchronizationTest.await(); System.out.println("thread 1: " + Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { //等待住线程唤醒 synchronizationTest.await(); System.out.println("thread 2: " + Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 唤醒所有线程 synchronizationTest.fire(); } public synchronized void await() throws InterruptedException { while (!fired){ wait(); } } public synchronized void fire(){ this.fired = true; notifyAll(); } }
三、主从协作机制(等待结束、异步结果):
主线程将任务分解为若干个子任务,为每个子任务创建一个线程,主线程在继续执行其他任务之前需要等待每个子任务执行完毕。
/** * 等待所有子线程执行完毕,主线程再执行 * * 协作对象 * * 同步类: * CountDownLatch * */ public class MyLatch { public static void main(String[] args) throws InterruptedException { int countNum = 10; CountDownLatch countDownLatch = new CountDownLatch(countNum); for (int j = 0; j < countNum; j++){ new Thread(new Runnable() { @Override public void run() { try { System.out.println("countDownLatch : " + Thread.currentThread().getName()); Thread.sleep((int)(Math.random() * 100)); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "thread" + j).start(); } countDownLatch.await(); System.out.println("collect worker result"); } }
/** * 主从模式,一种常见的模式是异步调用,异步调用返回一个Future对象,通过它获得最终的结果; * * 1.表示异步结果的接口 Future、FutureTask * * 2.用于执行异步任务的接口Executor,以及子接口ExecutorService * * 3.用于创建Executor和ExecutorService的工厂方法类Executors * * 4.Callable 创建子任务的接口 * */ public class AsynchronizationTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newCachedThreadPool();
List<MyFuture> myFutureList = new ArrayList<>(); for (int i = 1; i <= 10 ; i++) { myFutureList.add(new MyFuture("callable " + i)); } // 实现 callable 的子类实例 List<Future<String>> futures = executor.invokeAll(myFutureList); System.out.println("futures size : " + futures.size()); futures.forEach(p -> { try { if(!p.isDone()){ System.out.println("future is not done "); } System.out.println("futures" + p.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); executor.shutdown(); } /** * 实现 Callable 接口 */ static class MyFuture implements Callable<String> { private String name;
public MyFuture(String name){ this.name = name; } @Override public String call() throws Exception { return "callable " + name; } } }
四、集合点协作机制:
给所有线程到指定一个集合点,当都到达时才继续执行下面的程序。
/** * 多线程集合点协调模式 */ public class CyclicBarrierTest { public static void main(String[] args){ int num = 10; CyclicBarrier cyclicBarrier = new CyclicBarrier(num, new Runnable() { @Override public void run() { System.out.println("所有子线程已集合,继续执行任务..."); //所有线程集合后执行 } }); for (int i = 1; i <= num; i++) { new Thread(new Runnable() { @Override public void run() { try { Thread.sleep((int)(Math.random() * 100)); System.out.println(Thread.currentThread().getName() + " 等待集合..."); cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + " 已集合,继续执行..."); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }, "cyclicBarrier " + i).start(); } } }