zoukankan      html  css  js  c++  java
  • Java多线程中协作机制

    一、生产者、消费者协作机制:

      生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到队列上,而消费者从列队上取数据或任务,如果队列长度有限,在队列满的时候,生产者等待,而在队列为空的时候,消费者等待。

    /**
     * 使用两个栈 实现队列
     *
     * 生产者,消费者协作模式:
       共享变量是一个阻塞队列,当队列满了生产者wait(),当队列为空消费者wait(); * * 阻塞队列有: * 接口:BlockingQueue、BlockingDeque 双端队列 * 基数数组的实现类:ArrayBlockingQueue * 基于链表的实现类:LinkedBlockingQueue、LinkedBlockingDeque * 基于堆的实现类:PriorityBlockingQueue *
    */ public class QueueTest<E> { private ArrayDeque<E> producerStack = null; private ArrayDeque<E> consumerStack = null; private static int num = 0; public static void main(String[] args) throws InterruptedException { QueueTest<String> eQueueTest = new QueueTest<>(); new Thread(new Runnable() { @Override public void run() { try { eQueueTest.producer(); Thread.sleep( (int)(Math.random() * 100)); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // Thread.sleep( (int)(Math.random() * 100)); new Thread(new Runnable() { @Override public void run() { try { eQueueTest.consumer(); Thread.sleep( (int)(Math.random() * 100)); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } public QueueTest() { producerStack = new ArrayDeque<>(10); consumerStack = new ArrayDeque<>(10); } public synchronized void producer() throws InterruptedException { while (true){ while(producerStack.size() == 10){ this.wait(); } producerStack.addLast((E) String.valueOf(++num));
    System.out.println(
    "producerStack size :" + producerStack.size() ); for (E e : producerStack) { System.out.println("producerStack..." + e.toString()); } this.notifyAll(); } } public synchronized void consumer() throws InterruptedException { while(true){ while(producerStack.isEmpty()){ this.wait(); }
    consumerStack.addLast(producerStack.pollLast());
    System.out.println(
    "consumerStack size :" + consumerStack.size() ); for (E c : consumerStack) { System.out.println("consumerStack..." + c.toString()); } System.out.println("------------------------------------------------------------"); if(consumerStack.size() == 10){ while(consumerStack.size() > 0){ System.out.println("consumerStack,pollLast :" + consumerStack.pollLast().toString()); } } this.notifyAll(); } } }

    二、同步协作机制:

      在一些程序,尤其是模拟仿真程序中,要求多个线程同时开始。

    /**
     * 同步协作机制
     *
     */
    public class SynchronizationTest {
    
        private volatile boolean fired = false;
    
        public static void main(String[] args){
    
            SynchronizationTest synchronizationTest = new SynchronizationTest();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        //等待住线程唤醒
                        synchronizationTest.await();
    
                        System.out.println("thread 1: " + Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        //等待住线程唤醒
                        synchronizationTest.await();
    
                        System.out.println("thread 2: " + Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
    
            // 唤醒所有线程
            synchronizationTest.fire();
        }
    
        public synchronized void await() throws InterruptedException {
            while (!fired){
                wait();
            }
        }
    
        public synchronized void fire(){
            this.fired = true;
            notifyAll();
        }
    
    }

    三、主从协作机制(等待结束、异步结果):

      主线程将任务分解为若干个子任务,为每个子任务创建一个线程,主线程在继续执行其他任务之前需要等待每个子任务执行完毕。 

    /**
     *  等待所有子线程执行完毕,主线程再执行
     *
     *  协作对象
     *
     *  同步类:
     *      CountDownLatch
     *
     */
    public class MyLatch {
    
        public static void main(String[] args) throws InterruptedException {
    
            int countNum = 10;
            CountDownLatch countDownLatch = new CountDownLatch(countNum);
    
            for (int j = 0; j < countNum; j++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            System.out.println("countDownLatch : " + Thread.currentThread().getName());
    
                            Thread.sleep((int)(Math.random() * 100));
    
                            countDownLatch.countDown();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
    
                    }
                }, "thread" + j).start();
            }
            countDownLatch.await();
            System.out.println("collect worker result");
    
        }
    
    }
    /**
     *  主从模式,一种常见的模式是异步调用,异步调用返回一个Future对象,通过它获得最终的结果;
     *
     *  1.表示异步结果的接口 Future、FutureTask
     *
     *  2.用于执行异步任务的接口Executor,以及子接口ExecutorService
     *
     *  3.用于创建Executor和ExecutorService的工厂方法类Executors
     *
     *  4.Callable 创建子任务的接口
     *
     */
    public class AsynchronizationTest {
    
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executor = Executors.newCachedThreadPool();
    List
    <MyFuture> myFutureList = new ArrayList<>(); for (int i = 1; i <= 10 ; i++) { myFutureList.add(new MyFuture("callable " + i)); } // 实现 callable 的子类实例 List<Future<String>> futures = executor.invokeAll(myFutureList); System.out.println("futures size : " + futures.size()); futures.forEach(p -> { try { if(!p.isDone()){ System.out.println("future is not done "); } System.out.println("futures" + p.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); executor.shutdown(); } /** * 实现 Callable 接口 */ static class MyFuture implements Callable<String> { private String name;
    public MyFuture(String name){ this.name = name; } @Override public String call() throws Exception { return "callable " + name; } } }

    四、集合点协作机制:

      给所有线程到指定一个集合点,当都到达时才继续执行下面的程序。

    /**
     *  多线程集合点协调模式
     */
    public class CyclicBarrierTest {
    
        public static void main(String[] args){
            int num = 10;
            CyclicBarrier cyclicBarrier = new CyclicBarrier(num, new Runnable() {
                @Override
                public void run() {
                    System.out.println("所有子线程已集合,继续执行任务..."); //所有线程集合后执行
                }
            });
    
    
            for (int i = 1; i <= num; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            Thread.sleep((int)(Math.random() * 100));
    
                            System.out.println(Thread.currentThread().getName() + " 等待集合...");
    
                            cyclicBarrier.await();
    
                            System.out.println(Thread.currentThread().getName() + " 已集合,继续执行...");
    
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (BrokenBarrierException e) {
                            e.printStackTrace();
                        }
                    }
                }, "cyclicBarrier " + i).start();
            }
        }
    }
  • 相关阅读:
    如何在Dynamics CRM 2011 的窗体表单上加载报表
    .Net程序员面试所需要的一些技术准备
    javascript常用数组算法总结
    jquery技巧总结
    《将博客搬至CSDN》
    解决SQL Server 阻止了对组件 'Ad Hoc Distributed Queries' 的 STATEMENT'OpenRowset/OpenDatasource' 的访问的方法
    jQuery编程的最佳实践
    SQL 存储过程、触发器
    T--SQL基本编程(变量的定义、变量的赋值/取值,分支语句,循环语句)
    SQL server表连接
  • 原文地址:https://www.cnblogs.com/haiyangwu/p/10432111.html
Copyright © 2011-2022 走看看