zoukankan      html  css  js  c++  java
  • 并发编程(二)concurrent 工具类

    并发编程(二)concurrent 工具类

    一、CountDownLatch

    经常用于监听某些初始化操作,等初始化执行完毕后,通知主线程继续工作。

    import java.util.concurrent.CountDownLatch;
    
    public class CountDownLatchTest extends Thread {
        private final static CountDownLatch countDown = new CountDownLatch(2); // (1)
    
        @Override
        public void run() {
            // 唤醒线程线程
            countDown.countDown(); // (2)
            System.out.println(Thread.currentThread().getName() + "执行完毕...");
        }
    
        public static void main(String[] args) {
    
            new Thread(new CountDownLatchTest()).start();
            new Thread(new CountDownLatchTest()).start();
            try {
                Thread.sleep(1000);
                countDown.await();  // (3)
                System.out.println(Thread.currentThread().getName() + "继续执行...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    1. 声明一个 CountDownLatch 对象,参数 2 表示被阻塞的线程需要被唤醒再次才能执行。

      final CountDownLatch countDown = new CountDownLatch(2);
      
    2. countDown() 调用两次后,主线程才会继续执行

      countDown.countDown();
      
    3. 阻塞当前线程-main

      countDown.await();
      
    4. 执行结果如下:

      Thread-1执行完毕...
      Thread-0执行完毕...
      main继续执行...  // Thread-0, Thread-1 执行完成才会继续执行主线程
      

    二、CyclicBarrier

    假设有只有的一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个没有准备了,大家都等待。

    import java.io.IOException;  
    import java.util.Random;  
    import java.util.concurrent.BrokenBarrierException;  
    import java.util.concurrent.CyclicBarrier;  
    import java.util.concurrent.ExecutorService;  
    import java.util.concurrent.Executors; 
    public class UseCyclicBarrier {
    
    	static class Runner implements Runnable {  
    	    private CyclicBarrier barrier;  
    	    private String name;  
    	    
    	    public Runner(CyclicBarrier barrier, String name) {  
    	        this.barrier = barrier;  
    	        this.name = name;  
    	    }  
    	    @Override  
    	    public void run() {  
    	        try {  
    	            Thread.sleep(1000 * (new Random()).nextInt(5));  
    	            System.out.println(name + " 准备OK.");  
    	            barrier.await(); //(1) 
    	        } catch (InterruptedException e) {  
    	            e.printStackTrace();  
    	        } catch (BrokenBarrierException e) {  
    	            e.printStackTrace();  
    	        }  
    	        System.out.println(name + " Go!!");  
    	    }  
    	} 
    	
        public static void main(String[] args) throws IOException, InterruptedException {  
            CyclicBarrier barrier = new CyclicBarrier(2);  // (2) 
            ExecutorService executor = Executors.newFixedThreadPool(2);  
            
            executor.submit(new Thread(new Runner(barrier, "Thread-1")));
            executor.submit(new Thread(new Runner(barrier, "Thread-2")));
      
            executor.shutdown();  
        }  
    }
    
    1. await() 阻塞当前的线程。

      barrier.await();
      
    2. 声明一个 CyclicBarrier 对象,参数 2 表示 barrier 必须有两个线程都准备好了才能执行。

      CyclicBarrier barrier = new CyclicBarrier(2);
      
    3. 执行结果如下:

      Thread-1 准备OK.
      Thread-2 准备OK.
      Thread-1 Go!!
      Thread-2 Go!!
      
    4. 修改 CyclicBarrier barrier = new CyclicBarrier(3) 后这两个线程都会被阻塞, 执行结果如下:

      Thread-1 准备OK.
      Thread-2 准备OK.
      

    三、Future

    future模式请参考这里

    四、Semaphore

    Semaphore 信号量非常适合高并发访问。

    public class UseSemaphore {  
        public static void main(String[] args) {  
            // 线程池  
            ExecutorService exec = Executors.newCachedThreadPool();  
            // 只能5个线程同时访问  
            final Semaphore semp = new Semaphore(5); // (1)
            // 模拟20个客户端访问  
            for (int index = 0; index < 20; index++) {  
                final int NO = index;  
                Runnable run = new Runnable() {  
                    public void run() {  
                        try {  
                            // 获取许可  
                            semp.acquire(); // (2)
                            System.out.println("Accessing: " + NO);  
                            //模拟实际业务逻辑
                            Thread.sleep((long) (Math.random() * 10000));  
                            // 访问完后,释放  
                            semp.release(); // (3)
                        } catch (InterruptedException e) {
                        	;
                        }  
                    }  
                };  
                exec.execute(run);  
            } 
            
            try {
    			Thread.sleep(10);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
            
            System.out.println(semp.getQueueLength());
    
            // 退出线程池  
            exec.shutdown();  
        }  
    } 
    
    1. 声明一个 Semaphore 对象,参数 5 表示最多有5个线程同时访问。

      final Semaphore semp = new Semaphore(5);
      
    2. semp.acquire() 获取 semp 对象,如果超过5个线程,那么其余的线程就会阻塞,直到有线程执行完毕。

    3. semp.release() 释放 semp 对象,这样其余的线程就可以执行了。

    补充:

    • PV(page view) 网站的总访问量,页面浏览量或点击量,用户每刷新一次就会记录一次。

    • UV(unique vistor) 访问网站的一台电脑客户端为一个访客。一般来讲,时间上以00:00~24:00之内相同的客户端记录一次。

    • QPS(query per second) 即每秒查询数,QPS 很大程度代表了系统业务的繁忙程度。一旦当前 QPS 超过所设定的预警阀值,可以考虑对集群扩容,以免压力过大导致宕机。

    • RT(response time) 即请求的响应时间,这个指标非常关键,直接说明客户端的体验,因此任何系统设计师都想降低 RT 时间。

    对系统进行峰值评估,采用所谓的80/20原则,即80%的请求20%的时间到达:

    QRS = (PV * 80%) / (24 * 60 * 60 * 20%)
    

    五、ReentrantLock

    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ReentrantLockTest implements Runnable {
    	
    	private Lock lock = new ReentrantLock(); // (1)
    	
    	public void run(){
    		try {
    			lock.lock(); // (2)
    			System.out.println(Thread.currentThread().getName() + "进入..");
    			Thread.sleep(1000);
    			System.out.println(Thread.currentThread().getName() + "退出..");
    			Thread.sleep(1000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			lock.unlock(); // (3)
    		}
    	}
    	
    	public static void main(String[] args) throws InterruptedException {
    		final ReentrantLockTest ur = new ReentrantLockTest();
    
            for (int i = 0; i < 10; i++) {
                new Thread(ur).start();
            }
    	}
    }
    
    1. ReentrantLock 一般用法:

      private Lock lock = new ReentrantLock();
      try {
          lock.lock();
          //do something
      } finally {
          lock.unlock();
      }
      
      
    2. condition 使用方法,注意 condition 可以实例化多个:

      Lock lock = new ReentrantLock();
      Condition condition = lock.newCondition();
      condition.await(); //阻塞线程,释放锁
      condition.signal();//唤醒线程,不释放锁
      
    3. 公平锁(true)和非公平锁(false),非公平锁执行效率比公平锁高

      Lock lock = new ReentrantLock(boolean isFair);
      
    4. 读写锁,实现读写分离的锁,适用于读多写少的情况下(读读共享,读写互斥)

      private ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock(); // (1)
      private ReadLock readLock = rwlock.readLock();    // (2)
      private WriteLock writeLock = rwlock.writeLock(); // (3)
      
      public void read(){
          try {
              readLock.lock();
              // do something
          } finally {
              readLock.unlock();
          }
      }
      
      public void write(){
          try {
              writeLock.lock();
              // do something
          } finally {
              writeLock.unlock();
          }
      }
      

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    Kendo UI for ASP.NET MVC 的一些使用经验
    AI智能技术监控学生上课行为,智慧管理加强校园教学质量
    如何通过ffmpeg 实现实时推流和拉流保存的功能
    如何测试流媒体服务器的并发能力?
    TSINGSEE青犀视频H265播放器FLV.js播放一段时间后报内存不足怎么处理?
    互动电视的未来应该是什么样的?
    牛客练习赛89 题解
    【洛谷P3934】炸脖龙 I
    【CF1463F】Max Correct Set
    【CF1496F】Power Sockets
  • 原文地址:https://www.cnblogs.com/binarylei/p/10024263.html
Copyright © 2011-2022 走看看