zoukankan      html  css  js  c++  java
  • java 并发编程

    闭锁

      一种可以延迟线程的进度直到其到达终止状态.可以用来确保某些活动直到其他活动都完成后才继续执行

      例如:

    1. 确保某个计算在其需要的所有资源都被初始化了之后才继续执行.
    2. 确保某个服务在其他依赖的服务都启动了之后才开始执行
    3. 等待某个操作的所有参与者(如LOL) 都就绪了之后再继续执行.

    锁的实现

    1.CountDownLatch

      CountDownLatch 是一种灵活的闭锁实现. 可以在以上的各种类型情况下使用.它可以使一个或多个线程等待一组事件的发生. 

      闭锁状态包括一个计数器,该计数器被初始化为一个正数.表示需要被等待的事件的数量. countDown 方法用于递减计数器,表示有一个事件已经发生,而await 方法等待计数器到达零时就会执行, 否则会一直阻塞直到计数器为零,或者等待中的线程中断, 或者等待超时.

    import java.util.concurrent.*;
    
    public class TestHarness {
        public long timeTasks(int nThreads, final Runnable task)
                throws InterruptedException {
            final CountDownLatch startGate = new CountDownLatch(1);
            final CountDownLatch endGate = new CountDownLatch(nThreads);
    
            for (int i = 0; i < nThreads; i++) {
                Thread t = new Thread() {
                    public void run() {
                        try {
                            startGate.await();
                            try {
                                task.run();
                            } finally {
                                endGate.countDown();
                            }
                        } catch (InterruptedException ignored) {
                        }
                    }
                };
                t.start();
            }
    
            long start = System.nanoTime();
            startGate.countDown();
            endGate.await();
            long end = System.nanoTime();
            return end - start;
        }
    
        public static void main(String[] args) throws InterruptedException {
            new TestHarness().timeTasks(9, new Runnable() {
                public void run() {
                    System.out.println(this);
                }
            });
        }
    }

      以上程序.它使用两个闭锁,分别表示起始门 "startGate" 和 结束门 "endGate" 来确保所有线程都准备就绪后才继续执行,而每个线程做的最后一件事都是让 "endGate" 减一,这能使主线程高效地等待直到所有工作线程都执行完成,因此可以统计所消耗的时间.

    2.FutureTask

      futureTask 也可以用作闭锁. futureTask 是通过Callable 来实现的. 相当于一种可用于生产结果的runnable , 并且可以用于以下3钟等待状态

    1. 等待运行(Waiting to run )
    2. 正在运行(Running)
    3. 运行完成(completed)

    执行完成 ,表示计算的所有可能结束的方式. 包括 正常结束,由于取消而结束和由于异常而结束等.

    Future.get() 的行为取决于任务的状态,如果任务已完成,那么get 会立即返回结果,get 将阻塞直到这个任务进去完成状态.然后返回结果或者抛出异常. FutureTask 将计算结果从执行计算的线程传递到获取这个结果的线程, 而 FutureTask 的规范确保了这种传递的过程能实现结果的安全发布.

    public class Preloader {
        ProductInfo loadProductInfo() throws DataLoadException {
            return null;
        }
    
        private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(
                new Callable<ProductInfo>() {
                    public ProductInfo call() throws DataLoadException {
                        return loadProductInfo();
                    }
                });
        private final Thread thread = new Thread(future);
    
        public void start() {
            thread.start();
        }
    
        public ProductInfo get() throws DataLoadException, InterruptedException {
            try {
                return future.get();
            } catch (ExecutionException e) {
                Throwable cause = e.getCause();
                if (cause instanceof DataLoadException)
                    throw (DataLoadException) cause;
                else
                    throw LaunderThrowable.launderThrowable(cause);
            }
        }
    
        interface ProductInfo {
        }
    }
    
    class DataLoadException extends Exception {
    }

      

    3. Semaphore 

      计数信号量(Counting Semaphore) 用来控制同是访问某个特定资源的操作数量,或者同时执行某个指定操作的数量. 计数信号量还可以用来实现某种资源池,或者对容器施加边界(如:blockingQueue)

      Semaphore 中管理着一组虚拟许可(permit), 许可的初始化数量可以通过构造函数来指定,在执行操作前先获取许可(只要还有剩余许可),并在使用后释放,如果没有许可,那么acquire 将阻塞到有许可(或者直到被中断或者操作超时). release 方法将返回一个许可给信号量(许可与线程无关,一个许可可以在一个线程获取,在另一个线程释放,且不具备重入性)

    public class BoundedHashSet <T> {
        private final Set<T> set;
        private final Semaphore sem;
    
        public BoundedHashSet(int bound) {
            this.set = Collections.synchronizedSet(new HashSet<T>());
            sem = new Semaphore(bound);
        }
    
        public boolean add(T o) throws InterruptedException {
            sem.acquire();
            boolean wasAdded = false;
            try {
                wasAdded = set.add(o);
                return wasAdded;
            } finally {
                if (!wasAdded)
                    sem.release();
            }
        }
    
        public boolean remove(Object o) {
            boolean wasRemoved = set.remove(o);
            if (wasRemoved)
                sem.release();
            return wasRemoved;
        }
    }

     4. Barrier

      栅栏 类似于闭锁,它能阻塞一组线程直到某个事件发生,栅栏与闭锁的区别关键在于:所有线程必须同时到达栅栏的位置,才能继续,闭锁用于等待某件事情,而栅栏用于实现一些协议.例如: 几个人决定在某个地方集合:'所有人6:00 在 麦当劳碰头,到了以后要等其他人,之后再讨论下一步要做的事.'

      CyclicBarrier 可以使一定数量的参与方法反复在栅栏位置会聚,它在并行迭代算法中非常有用,这种算法通常将一个问题拆分成一系列相互独立的子问题,当线程到达栅栏位置时将调用await 方法, 这个方法将阻塞直到所有线程都到达栅栏位置,如果所有线程都到达栅栏位置,那么栅栏将打开, 此时所有线程都被释放,而栅栏将被重置以便下次使用, 如果对await的调用超时,或者await 阻塞的线程被中断, 那么栅栏将被认为是打破了, 所有阻塞的await 调用都将终止并抛出 BrokenBarrierException . 如果成功通过栅栏,那么await 将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来"选举" 产生一个领导线程.并在下一次迭代中由该领导线程执行一些特殊的工作.CyclicBarrier 还可以是你将一个栅栏操作传递给构造函数,这是一个Runnable ,当成功通过栅栏时会(在一个子任务线程中) 执行它.但在阻塞线程被释放钱是不能执行的.

      在模拟程序中经常使用栅栏.

      

    public class CellularAutomata {
        private final Board mainBoard;
        private final CyclicBarrier barrier;
        private final Worker[] workers;
    
        public CellularAutomata(Board board) {
            this.mainBoard = board;
            int count = Runtime.getRuntime().availableProcessors();
            this.barrier = new CyclicBarrier(count,
                    new Runnable() {
                        public void run() {
                            mainBoard.commitNewValues();
                        }});
            this.workers = new Worker[count];
            for (int i = 0; i < count; i++)
                workers[i] = new Worker(mainBoard.getSubBoard(count, i));
        }
    
        private class Worker implements Runnable {
            private final Board board;
    
            public Worker(Board board) { this.board = board; }
            public void run() {
                while (!board.hasConverged()) {
                    for (int x = 0; x < board.getMaxX(); x++)
                        for (int y = 0; y < board.getMaxY(); y++)
                            board.setNewValue(x, y, computeValue(x, y));
                    try {
                        barrier.await();
                    } catch (InterruptedException ex) {
                        return;
                    } catch (BrokenBarrierException ex) {
                        return;
                    }
                }
            }
    
            private int computeValue(int x, int y) {
                // Compute the new value that goes in (x,y)
                return 0;
            }
        }
    
        public void start() {
            for (int i = 0; i < workers.length; i++)
                new Thread(workers[i]).start();
            mainBoard.waitForConvergence();
        }
    
        interface Board {
            int getMaxX();
            int getMaxY();
            int getValue(int x, int y);
            int setNewValue(int x, int y, int value);
            void commitNewValues();
            boolean hasConverged();
            void waitForConvergence();
            Board getSubBoard(int numPartitions, int index);
        }
    }

    张孝祥的案例:

    public class CyclicBarrierTest {
        public static void main(String[] args) {
            ExecutorService service = Executors.newCachedThreadPool();
            final CyclicBarrier cb = new CyclicBarrier(3); // 三个线程同时到达
            for (int i = 0; i < 3; i++) {
                Runnable runnable = new Runnable() {
                    public void run() {
                        try {
                            Thread.sleep((long) (Math.random() * 10000));
                            System.out.println("线程"
                                    + Thread.currentThread().getName()
                                    + "即将到达集合地点1,当前已有"
                                    + (cb.getNumberWaiting() + 1)
                                    + "个已到达"
                                    + (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊"
                                            : "正在等候"));
                            try {
                                cb.await();
                            } catch (BrokenBarrierException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                            Thread.sleep((long) (Math.random() * 10000));
                            System.out.println("线程"
                                    + Thread.currentThread().getName()
                                    + "即将到达集合地点2,当前已有"
                                    + (cb.getNumberWaiting() + 1)
                                    + "个已到达"
                                    + (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊"
                                            : "正在等候"));
                            try {
                                cb.await();
                            } catch (BrokenBarrierException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                            Thread.sleep((long) (Math.random() * 10000));
                            System.out.println("线程"
                                    + Thread.currentThread().getName()
                                    + "即将到达集合地点3,当前已有"
                                    + (cb.getNumberWaiting() + 1)
                                    + "个已到达"
                                    + (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊"
                                            : "正在等候"));
                            try {
                                cb.await();
                            } catch (BrokenBarrierException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                };
                service.execute(runnable);
            }
            service.shutdown();
        }
    }

      另一种形式的栅栏是 Exchanger ,它是一种两方(Two-Party)栅栏 , 各方在栅栏位置上交换数据,当两方执行不对称的操作时, Exchanger 会非常有用.

      例如: 当一个线程想缓冲区写入数据, 而另一个线程用缓冲区中读取数据.这些线程可以使用Exchanger来汇聚,并将满的缓冲区与空的缓冲区交换.当两个线程通过Exchanger交换对象时.这种交换就把两个对象安全地发布给另一方.

      数据交换的实际取决于应用程序的相应需求. 最简单的方案是. 当缓冲区被填满时,由填充任务进行交换. 当缓冲区为空时,由清空任务进行交换. 这样会把需要交换的次数降至最低, 但如果新数据的到达不可预测,那么一些数据的处理过程就将延迟.另一个方法是,不仅当缓冲区被填满时进行交换. 并且当缓冲区被充到一定程度,并保持一段时间后.也进行交换.

    /**
     * 
     */
    package mjorcen.nio.test2;
    
    import java.util.LinkedList;
    import java.util.List;
    import java.util.concurrent.Exchanger;
    
    /**
     * 
     * 
     * @author mjorcen
     * @email mjorcen@gmail.com
     * @dateTime Jan 19, 2015 6:57:56 PM
     * @version 1
     */
    public class ExchangerTest {
        final Exchanger<List<String>> exchanger;
    
        public ExchangerTest(Exchanger<List<String>> exchanger) {
            super();
            this.exchanger = exchanger;
        }
    
        public static void main(String[] args) {
            Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
    
            new Thread(new ExchangerThread01(exchanger)).start();
            new Thread(new ExchangerThread02(exchanger)).start();
    
        }
    }
    
    class ExchangerThread01 implements Runnable {
        final Exchanger<List<String>> exchanger;
    
        public ExchangerThread01(Exchanger<List<String>> exchanger) {
            super();
            this.exchanger = exchanger;
        }
    
        /*
         * (non-Javadoc)
         * 
         * @see java.lang.Runnable#run()
         */
        public void run() {
            System.out.println("ExchangerThread01 begin ... ");
            try {
                List<String> list = new LinkedList<String>();
                for (int i = 0; i < 20; i++) {
                    list.add("str_01_" + i);
                }
                list = exchanger.exchange(list);
                for (String string : list) {
                    System.out.println("Thread01 is " + string);
                }
                System.out.println("ExchangerThread01 end ... ");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    class ExchangerThread02 implements Runnable {
        final Exchanger<List<String>> exchanger;
    
        public ExchangerThread02(Exchanger<List<String>> exchanger) {
            super();
            this.exchanger = exchanger;
        }
    
        /*
         * (non-Javadoc)
         * 
         * @see java.lang.Runnable#run()
         */
        public void run() {
            System.out.println("ExchangerThread02 begin... ");
            List<String> list = new LinkedList<String>();
            for (int i = 0; i < 10; i++) {
                list.add("str_02_" + i);
            }
            try {
                Thread.sleep(1000);
                list = exchanger.exchange(list);
                for (String string : list) {
                    System.out.println("Thread02 is " + string);
                }
                System.out.println("ExchangerThread02 end ... ");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }

    6. 构建高效且可伸缩的结果缓存

    public class Memoizer <A, V> implements Computable<A, V> {
        private final ConcurrentMap<A, Future<V>> cache
                = new ConcurrentHashMap<A, Future<V>>();
        private final Computable<A, V> c;
    
        public Memoizer(Computable<A, V> c) {
            this.c = c;
        }
    
        public V compute(final A arg) throws InterruptedException {
            while (true) {
                Future<V> f = cache.get(arg);
                if (f == null) {
                    Callable<V> eval = new Callable<V>() {
                        public V call() throws InterruptedException {
                            return c.compute(arg);
                        }
                    };
                    FutureTask<V> ft = new FutureTask<V>(eval);
                    f = cache.putIfAbsent(arg, ft);
                    if (f == null) {
                        f = ft;
                        ft.run();
                    }
                }
                try {
                    return f.get();
                } catch (CancellationException e) {
                    cache.remove(arg, f);
                } catch (ExecutionException e) {
                    throw LaunderThrowable.launderThrowable(e.getCause());
                }
            }
        }
    }

    以上内容出自: <<java并发编程实践>>

  • 相关阅读:
    CODE[VS] 1018 单词接龙
    Linux提示BOOT空间不足问题
    CODE[VS] 1017 乘积最大
    关于printf输出结果的一些问题
    CODE[VS] 1220 数字三角形
    redux
    Promise面试题
    学习Promise笔记
    js 事件委托 事件代理
    前端通信、跨域
  • 原文地址:https://www.cnblogs.com/mjorcen/p/4233897.html
Copyright © 2011-2022 走看看