zoukankan      html  css  js  c++  java
  • 线程同步工具类

    写在前面

    同步工具类主要包括闭锁(如CountDownLatch),栅栏(如CyclicBarrier),信号量(如Semaphore)和阻塞队列(如LinkedBlockingQueue)等;

    使用同步工具类可以协调线程的控制流;

    同步工具类封装了一些状态,这些状态决定线程是继续执行还是等待,此外同步工具类还提供了修改状态的方法;

    下面将简单介绍以上同步工具类;

    闭锁

    可以让一个线程等待一组事件发生后(不一定要线程结束)继续执行;

    以CountDownLatch为例,内部包含一个计数器,一开始初始化为一个整数(事件个数),发生一个事件后,调用countDown方法,计数器减1,await用于等待计数器为0后继续执行当前线程;

    举个例子如下,main线程等待其它子线程的事件发生后继续执行main线程:

    package concurrency;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    class TaskTest implements Runnable {
    
        private CountDownLatch latch;
        private int sleepTime;
    
        /**
         * 
         */
        public TaskTest(int sleepTime, CountDownLatch latch) {
            this.sleepTime = sleepTime;
            this.latch = latch;
        }
    
        /**
         * @see java.lang.Runnable#run()
         */
        @Override
        public void run() {
            try {
                CountDownLatchTest.print(" is running。");
                TimeUnit.MILLISECONDS.sleep(sleepTime);
                CountDownLatchTest.print(" finished。");
                //计数器减减
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    public class CountDownLatchTest {
        public static void main(String[] args) {
            int count = 10;
            final CountDownLatch latch = new CountDownLatch(count);
            ExecutorService es = Executors.newFixedThreadPool(count);
            for (int i = 0; i < count; i++) {
                es.execute(new TaskTest((i + 1) * 1000, latch));
            }
    
            try {
                CountDownLatchTest.print(" waiting...");
                //主线程等待其它事件发生
                latch.await();
                //其它事件已发生,继续执行主线程
                CountDownLatchTest.print(" continue。。。");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                es.shutdown();
            }
        }
        
        public static void print(String str){
            SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
            System.out.println("[" + dfdate.format(new Date()) + "]" + Thread.currentThread().getName() + str);
        }
    }

    结果打印如下:

    [09:41:43]pool-1-thread-1 is running。
    [09:41:43]pool-1-thread-6 is running。
    [09:41:43]main waiting...
    [09:41:43]pool-1-thread-10 is running。
    [09:41:43]pool-1-thread-4 is running。
    [09:41:43]pool-1-thread-5 is running。
    [09:41:43]pool-1-thread-2 is running。
    [09:41:43]pool-1-thread-3 is running。
    [09:41:43]pool-1-thread-7 is running。
    [09:41:43]pool-1-thread-8 is running。
    [09:41:43]pool-1-thread-9 is running。
    [09:41:44]pool-1-thread-1 finished。
    [09:41:45]pool-1-thread-2 finished。
    [09:41:46]pool-1-thread-3 finished。
    [09:41:47]pool-1-thread-4 finished。
    [09:41:48]pool-1-thread-5 finished。
    [09:41:49]pool-1-thread-6 finished。
    [09:41:50]pool-1-thread-7 finished。
    [09:41:51]pool-1-thread-8 finished。
    [09:41:52]pool-1-thread-9 finished。
    [09:41:53]pool-1-thread-10 finished。
    [09:41:53]main continue。。。
    

     此外,FutureTask也可用作闭锁,其get方法会等待任务完成后返回结果,否则一直阻塞直到任务完成;

    信号量

    控制同时执行某个指定操作的数量,常用于实现资源池,如数据库连接池,线程池...
    以Semaphore为例,其内部维护一组资源,可以通过构造函数指定数目,其它线程在执行的时候,可以通过acquire方法获取资源,有的话,继续执行(使用结束后释放资源),没有资源的话将阻塞直到有其它线程调用release方法释放资源;

    举个例子,如下代码,十个线程竞争三个资源,一开始有三个线程可以直接运行,剩下的七个线程只能阻塞等到其它线程使用资源完毕才能执行;

    package concurrency;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    public class SemaphoreTest {
        
        public static void print(String str){
            SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
            System.out.println("[" + dfdate.format(new Date()) + "]" + Thread.currentThread().getName() + str);
        }
        
        public static void main(String[] args) {
            // 线程数目
            int threadCount = 10;
            // 资源数目
            Semaphore semaphore = new Semaphore(3);
            
            ExecutorService es = Executors.newFixedThreadPool(threadCount);
    
            // 启动若干线程
            for (int i = 0; i < threadCount; i++)
                es.execute(new ConsumeResourceTask((i + 1) * 1000, semaphore));
        }
    }
    
    class ConsumeResourceTask implements Runnable {
        private Semaphore semaphore;
        private int sleepTime;
    
        /**
             * 
             */
        public ConsumeResourceTask(int sleepTime, Semaphore semaphore) {
            this.sleepTime = sleepTime;
            this.semaphore = semaphore;
        }
    
        public void run() {
            try {
                //获取资源
                semaphore.acquire();
                SemaphoreTest.print(" 占用一个资源...");
                TimeUnit.MILLISECONDS.sleep(sleepTime);
                SemaphoreTest.print(" 资源使用结束,释放资源");
                //释放资源
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    [10:30:11]pool-1-thread-1 占用一个资源...
    [10:30:11]pool-1-thread-2 占用一个资源...
    [10:30:11]pool-1-thread-3 占用一个资源...
    [10:30:12]pool-1-thread-1 资源使用结束,释放资源
    [10:30:12]pool-1-thread-4 占用一个资源...
    [10:30:13]pool-1-thread-2 资源使用结束,释放资源
    [10:30:13]pool-1-thread-5 占用一个资源...
    [10:30:14]pool-1-thread-3 资源使用结束,释放资源
    [10:30:14]pool-1-thread-8 占用一个资源...
    [10:30:16]pool-1-thread-4 资源使用结束,释放资源
    [10:30:16]pool-1-thread-6 占用一个资源...
    [10:30:18]pool-1-thread-5 资源使用结束,释放资源
    [10:30:18]pool-1-thread-9 占用一个资源...
    [10:30:22]pool-1-thread-8 资源使用结束,释放资源
    [10:30:22]pool-1-thread-7 占用一个资源...
    [10:30:22]pool-1-thread-6 资源使用结束,释放资源
    [10:30:22]pool-1-thread-10 占用一个资源...
    [10:30:27]pool-1-thread-9 资源使用结束,释放资源
    [10:30:29]pool-1-thread-7 资源使用结束,释放资源
    [10:30:32]pool-1-thread-10 资源使用结束,释放资源

    栅栏

    栅栏用于等待其它线程,且会阻塞自己当前线程;

    所有线程必须同时到达栅栏位置后,才能继续执行;

    举个例子如下:

    package concurrency;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    class CyclicBarrierTaskTest implements Runnable {
        private CyclicBarrier cyclicBarrier;
    
        private int timeout;
    
        public CyclicBarrierTaskTest(CyclicBarrier cyclicBarrier, int timeout) {
            this.cyclicBarrier = cyclicBarrier;
            this.timeout = timeout;
        }
    
        @Override
        public void run() {
            TestCyclicBarrier.print(" 正在running...");
            try {
                TimeUnit.MILLISECONDS.sleep(timeout);
                TestCyclicBarrier.print(" 到达栅栏处,等待其它线程到达");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
    
            TestCyclicBarrier.print(" 所有线程到达栅栏处,继续执行各自线程任务...");
        }
    }
    
    public class TestCyclicBarrier {
    
        public static void print(String str) {
            SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
            System.out.println("[" + dfdate.format(new Date()) + "]"
                    + Thread.currentThread().getName() + str);
        }
    
        public static void main(String[] args) {
            int count = 5;
            
            ExecutorService es = Executors.newFixedThreadPool(count);
    
            CyclicBarrier barrier = new CyclicBarrier(count, new Runnable() {
    
                @Override
                public void run() {
                    TestCyclicBarrier.print(" 所有线程到达栅栏处,可以在此做一些处理...");
                }
            });
            for (int i = 0; i < count; i++)
                es.execute(new CyclicBarrierTaskTest(barrier, (i + 1) * 1000));
        }
    
    }
    [11:07:00]pool-1-thread-2 正在running...
    [11:07:00]pool-1-thread-1 正在running...
    [11:07:00]pool-1-thread-5 正在running...
    [11:07:00]pool-1-thread-3 正在running...
    [11:07:00]pool-1-thread-4 正在running...
    [11:07:01]pool-1-thread-1 到达栅栏处,等待其它线程到达
    [11:07:02]pool-1-thread-2 到达栅栏处,等待其它线程到达
    [11:07:03]pool-1-thread-3 到达栅栏处,等待其它线程到达
    [11:07:04]pool-1-thread-4 到达栅栏处,等待其它线程到达
    [11:07:05]pool-1-thread-5 到达栅栏处,等待其它线程到达
    [11:07:05]pool-1-thread-5 所有线程到达栅栏处,可以在此做一些处理...
    [11:07:05]pool-1-thread-1 所有线程到达栅栏处,继续执行各自线程任务...
    [11:07:05]pool-1-thread-2 所有线程到达栅栏处,继续执行各自线程任务...
    [11:07:05]pool-1-thread-5 所有线程到达栅栏处,继续执行各自线程任务...
    [11:07:05]pool-1-thread-3 所有线程到达栅栏处,继续执行各自线程任务...
    [11:07:05]pool-1-thread-4 所有线程到达栅栏处,继续执行各自线程任务...

    阻塞队列

    阻塞队列提供了可阻塞的入队和出对操作,如果队列满了,入队操作将阻塞直到有空间可用,如果队列空了,出队操作将阻塞直到有元素可用;

    队列可以为有界和无界队列,无界队列不会满,因此入队操作将不会阻塞;

    下面将使用阻塞队列LinkedBlockingQueue举个生产者-消费者例子,生产者每隔1秒生产1个产品,然后有6个消费者在消费产品,可以发现,每隔1秒,只有一个消费者能够获取到产品消费,其它线程只能等待...

    如下代码:

    package concurrency;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    //生产者
    public class Producer implements Runnable {
        private final BlockingQueue<String> fileQueue;
    
        public Producer(BlockingQueue<String> queue) {
            this.fileQueue = queue;
    
        }
    
        public void run() {
            try {
                while (true) {
                    TimeUnit.MILLISECONDS.sleep(1000);
                    String produce = this.produce();
                    System.out.println(Thread.currentThread() + "生产:" + produce);
                    fileQueue.put(produce);
                }
    
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    
        public String produce() {
            SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
            return dfdate.format(new Date());
        }
    
        public static void main(String[] args) {
            BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
    
            for (int i = 0; i < 1; i++) {
                new Thread(new Producer(queue)).start();
            }
            for (int i = 0; i < 6; i++) {
                new Thread(new Consumer(queue)).start();
            }
        }
    }
    
    // 消费者
    class Consumer implements Runnable {
        private final BlockingQueue<String> queue;
    
        public Consumer(BlockingQueue<String> queue) {
            this.queue = queue;
        }
    
        public void run() {
            try {
                while (true) {
                    TimeUnit.MILLISECONDS.sleep(1000);
                    System.out.println(Thread.currentThread() + "prepare 消费");
                    System.out.println(Thread.currentThread() + "starting:"
                            + queue.take());
                    System.out.println(Thread.currentThread() + "end 消费");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    Thread[Thread-1,5,main]prepare 消费
    Thread[Thread-3,5,main]prepare 消费
    Thread[Thread-4,5,main]prepare 消费
    Thread[Thread-2,5,main]prepare 消费
    Thread[Thread-6,5,main]prepare 消费
    Thread[Thread-5,5,main]prepare 消费
    Thread[Thread-0,5,main]生产:11:36:36
    Thread[Thread-1,5,main]starting:11:36:36
    Thread[Thread-1,5,main]end 消费
    Thread[Thread-1,5,main]prepare 消费
    Thread[Thread-0,5,main]生产:11:36:37
    Thread[Thread-4,5,main]starting:11:36:37
    Thread[Thread-4,5,main]end 消费
    Thread[Thread-4,5,main]prepare 消费
    Thread[Thread-0,5,main]生产:11:36:38
    Thread[Thread-3,5,main]starting:11:36:38
    Thread[Thread-3,5,main]end 消费
    ...

     参考资料:java并发编程实战

  • 相关阅读:
    SPSS Clementine 数据挖掘入门 (2)
    Oracle与SQL Server数据库管理对比
    在SharePoint中修改AD用户密码的WebPart
    【html】html 特殊字符大全
    【javascript】csshover 解决 ie6 下 hover 兼容问题
    【css】纯 css 制作带三角的边框
    【javascript】无缝滚动——上下
    【css】利用小数解析差异解决浏览器兼容性问题
    【javascript】checkbox——类似邮箱全选功能(完整版)
    【javascript】无缝滚动——左右
  • 原文地址:https://www.cnblogs.com/chenpi/p/5358579.html
Copyright © 2011-2022 走看看