zoukankan      html  css  js  c++  java
  • Java多线程开发系列-线程间协作

    wait(),notify()和notifyAll()

    他们都是java.lang.Object的方法:

    • wait(): Causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object.
    • notify(): Wakes up a single thread that is waiting on this object's monitor.
    • notifyAll(): Wakes up all threads that are waiting on this object's monitor.

    wait()的使用方式:

    synchronized( lockObject )
    { 
        while( ! condition )
        { 
            lockObject.wait();
        }
         
        //take the action here;
    }

    notify()的使用方式:

    synchronized(lockObject) 
    {
        //establish_the_condition;
     
        lockObject.notify();
         
        //any additional code if needed
    }

    都需要同步,但notify不用在循环里。

    notifyAll()的使用方式:

    synchronized(lockObject) 
    {
        establish_the_condition;
     
        lockObject.notifyAll();
    }

    一般情况下使用notifyAll(),notify()较难掌控,容易出现想不到的问题。(牺牲性能,满足安全)

    一个简单的例子,2对执行完5遍之后最终将有2个线程一直在waiting状态:

    public class OutputThread1 {
      private static Object lock;
    
      public static void main(String[] args) {
        lock = new Object();
    
        Thread1 thread1 = new Thread1();
        Thread2 thread2 = new Thread2();
        Thread3 thread3 = new Thread3();
        Thread3 thread4 = new Thread3();
    
        thread1.start();
        thread2.start();
        thread3.start();
        thread4.start();
    
      }
    
      static class Thread1 extends Thread {
        @Override
        public void run() {
          synchronized (lock) {
            while (true) {
              try {
                lock.wait();
              } catch (InterruptedException e) {
              }
              System.out
                  .println("线程" + Thread.currentThread().getName() + ":" + "wait");
            }
          }
        }
      }
    
      static class Thread2 extends Thread {
        @Override
        public void run() {
          synchronized (lock) {
            while (true) {
              try {
                lock.wait();
              } catch (InterruptedException e) {
              }
              System.out
                  .println("线程" + Thread.currentThread().getName() + ":" + "wait");
            }
          }
        }
      }
    
      static class Thread3 extends Thread {
        @Override
        public void run() {
          for (int i = 0; i < 6; i++) {
            synchronized (lock) {
              try {
                Thread.sleep(100);
                lock.notifyAll();
              } catch (Exception e) {
              }
              System.out.println(
                  "线程" + Thread.currentThread().getName() + ":" + "notifyAll");
            }
          }
        }
      }
    
    }
    View Code

    一个依次输出121212...的例子

    public class OutputThread implements Runnable {
    
      private int num;
      private Object lock;
      
      public OutputThread(int num, Object lock) {
          super();
          this.num = num;
          this.lock = lock;
      }
    
      public void run() {
          try {
              while(true){
                  synchronized(lock){
                      lock.notify();
                      lock.wait();
                      System.out.println( Thread.currentThread().getName() + ":" +num);
                  }
              }
          } catch (InterruptedException e) {
              // TODO Auto-generated catch block
              e.printStackTrace();
          }
          
      }
      
      public static void main(String[] args){
          final Object lock = new Object();
          
          Thread thread1 = new Thread(new OutputThread(1,lock));
          Thread thread2 = new Thread(new OutputThread(2, lock));
          //Thread thread3 = new Thread(new OutputThread(3, lock));
          
          thread1.start();
          thread2.start();
          //thread3.start();
      }
    
    }
    View Code

    一个最简单的生产者、消费者的例子

    import java.util.ArrayList;
    import java.util.List;
    
    public class ProducerConsumerExampleWithWaitAndNotify {
      public static void main(String[] args)
      {
         List<Integer> taskQueue = new ArrayList<Integer>();
         int MAX_CAPACITY = 5;
         Thread tProducer = new Thread(new Producer(taskQueue, MAX_CAPACITY), "Producer");
         Thread tConsumer = new Thread(new Consumer(taskQueue), "Consumer");
         tProducer.start();
         tConsumer.start();
      } 
    }
    
    class Producer implements Runnable
    {
       private final List<Integer> taskQueue;
       private final int           MAX_CAPACITY;
     
       public Producer(List<Integer> sharedQueue, int size)
       {
          this.taskQueue = sharedQueue;
          this.MAX_CAPACITY = size;
       }
     
       @Override
       public void run()
       {
          int counter = 0;
          while (true)
          {
             try
             {
                produce(counter++);
             } 
             catch (InterruptedException ex)
             {
                ex.printStackTrace();
             }
          }
       }
     
       private void produce(int i) throws InterruptedException
       {
          synchronized (taskQueue)
          {
             while (taskQueue.size() == MAX_CAPACITY)
             {
                System.out.println("Queue is full " + Thread.currentThread().getName() + " is waiting , size: " + taskQueue.size());
                taskQueue.wait();
             }
               
             Thread.sleep(1000);
             taskQueue.add(i);
             System.out.println("Produced: " + i);
             taskQueue.notifyAll();
          }
       }
    }
    class Consumer implements Runnable
    {
       private final List<Integer> taskQueue;
     
       public Consumer(List<Integer> sharedQueue)
       {
          this.taskQueue = sharedQueue;
       }
     
       @Override
       public void run()
       {
          while (true)
          {
             try
             {
                consume();
             } catch (InterruptedException ex)
             {
                ex.printStackTrace();
             }
          }
       }
     
       private void consume() throws InterruptedException
       {
          synchronized (taskQueue)
          {
             while (taskQueue.isEmpty())
             {
                System.out.println("Queue is empty " + Thread.currentThread().getName() + " is waiting , size: " + taskQueue.size());
                taskQueue.wait();
             }
             Thread.sleep(1000);
             int i = (Integer) taskQueue.remove(0);
             System.out.println("Consumed: " + i);
             taskQueue.notifyAll();
          }
       }
    }
    View Code

    为什么要用notifyAll()方法,用notify()行吗。
    很多人在回答第二个问题的时候会想当然的说notify()是唤醒一个线程,notifyAll()是唤醒全部线程,但是唤醒然后呢,不管是notify()还是notifyAll(),最终拿到锁的只会有一个线程,那它们到底有什么区别呢?
    其实这是一个对象内部锁的调度问题,要回答这两个问题,首先我们要明白java中对象锁的模型,JVM会为一个使用内部锁(synchronized)的对象维护两个集合,Entry Set和Wait Set,也有人翻译为锁池和等待池,意思基本一致。
    对于Entry Set:如果线程A已经持有了对象锁,此时如果有其他线程也想获得该对象锁的话,它只能进入Entry Set,并且处于线程的BLOCKED状态。
    对于Wait Set:如果线程A调用了wait()方法,那么线程A会释放该对象的锁,进入到Wait Set,并且处于线程的WAITING状态。
    还有需要注意的是,某个线程B想要获得对象锁,一般情况下有两个先决条件,一是对象锁已经被释放了(如曾经持有锁的前任线程A执行完了synchronized代码块或者调用了wait()方法等等),二是线程B已处于RUNNABLE状态。
    那么这两类集合中的线程都是在什么条件下可以转变为RUNNABLE呢?
    对于Entry Set中的线程,当对象锁被释放的时候,JVM会唤醒处于Entry Set中的某一个线程,这个线程的状态就从BLOCKED转变为RUNNABLE。
    对于Wait Set中的线程,当对象的notify()方法被调用时,JVM会唤醒处于Wait Set中的某一个线程,这个线程的状态就从WAITING转变为RUNNABLE;或者当notifyAll()方法被调用时,Wait Set中的全部线程会转变为RUNNABLE状态。所有Wait Set中被唤醒的线程会被转移到Entry Set中。
    然后,每当对象的锁被释放后,那些所有处于RUNNABLE状态的线程会共同去竞争获取对象的锁,最终会有一个线程(具体哪一个取决于JVM实现,队列里的第一个?随机的一个?)真正获取到对象的锁,而其他竞争失败的线程继续在Entry Set中等待下一次机会。

    Condition

    Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition1的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,在阻塞队列那一篇博文中就讲述到了,阻塞队列实际上是使用了Condition来模拟线程间协作。

    Condition是个接口,基本的方法就是await()和signal()方法;
    Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()
    调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用
      Conditon中的await()对应Object的wait();
      Condition中的signal()对应Object的notify();
      Condition中的signalAll()对应Object的notifyAll()

    例子,使用重入锁,和两个Condition实例

    import java.util.PriorityQueue;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Test {
      private int queueSize = 10;
      private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
      private Lock lock = new ReentrantLock();
      private Condition notFull = lock.newCondition();
      private Condition notEmpty = lock.newCondition();
       
      public static void main(String[] args)  {
          Test test = new Test();
          Producer producer = test.new Producer();
          Consumer consumer = test.new Consumer();
            
          producer.start();
          consumer.start();
      }
        
      class Consumer extends Thread{
            
          @Override
          public void run() {
              consume();
          }
            
          private void consume() {
              while(true){
                  lock.lock();
                  try {
                      while(queue.size() == 0){
                          try {
                              System.out.println("队列空,等待数据");
                              notEmpty.await();
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                      queue.poll();                //每次移走队首元素
                      notFull.signal();
                      System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
                  } finally{
                      lock.unlock();
                  }
              }
          }
      }
        
      class Producer extends Thread{
            
          @Override
          public void run() {
              produce();
          }
            
          private void produce() {
              while(true){
                  lock.lock();
                  try {
                      while(queue.size() == queueSize){
                          try {
                              System.out.println("队列满,等待有空余空间");
                              notFull.await();
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                      queue.offer(1);        //每次插入一个元素
                      notEmpty.signal();
                      System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
                  } finally{
                      lock.unlock();
                  }
              }
          }
      }
    }
    View Code

    join

    在很多情况下,主线程生成并起动了子线程,如果子线程里要进行大量的耗时的运算,主线程往往将于子线程之前结束,但是如果主线程处理完其他的事务后,需要用到子线程的处理结果,也就是主线程需要等待子线程执行完成之后再结束,这个时候就要用到join()方法了。
    join()的作用是:“等待该线程终止”,这里需要理解的就是该线程是指的主线程等待子线程的终止。也就是在子线程调用了join()方法后面的代码,只有等到子线程结束了才能执行。
    例子

    public class JoinExample
    {
       public static void main(String[] args) throws InterruptedException
       {
          Thread t = new Thread(new Runnable()
             {
                public void run()
                {
                   System.out.println("First task started");
                   System.out.println("Sleeping for 2 seconds");
                   try
                   {
                      Thread.sleep(2000);
                   } catch (InterruptedException e)
                   {
                      e.printStackTrace();
                   }
                   System.out.println("First task completed");
                }
             });
          Thread t1 = new Thread(new Runnable()
             {
                public void run()
                {
                   System.out.println("Second task completed");
                }
             });
          t.start(); // Line 15
          t.join(); // Line 16
          t1.start();
       }
    }
    View Code

    yield

    yield意味着放手。一个线程告诉虚拟机它愿意让其他线程在它当前的位置被调度。这表明它主观想停一停(我不太忙,我做了个标记,等有空的时候再来标记处继续干,你们其他线程来抢吧)。
    不过,请注意,这只是一个提示,并不能保证会有任何效果。Yield是一种静态方法,也是本地方法。Yield告诉当前执行的线程给线程池中具有相同优先级的线程一个机会。
    它只能使线程从运行状态变为可运行状态,而不能处于等待或阻止状态。

    这个例子表明这2个线程都有互相谦让的美德:

    public class YieldExample
    {
       public static void main(String[] args)
       {
          Thread producer = new Producer1();
          Thread consumer = new Consumer1();
           
          producer.setPriority(Thread.MIN_PRIORITY); //Min Priority
          consumer.setPriority(Thread.MAX_PRIORITY); //Max Priority
           
          producer.start();
          consumer.start();
       }
    }
     
    class Producer1 extends Thread
    {
       public void run()
       {
          for (int i = 0; i < 5; i++)
          {
             System.out.println("I am Producer : Produced Item " + i);
             Thread.yield();
          }
       }
    }
     
    class Consumer1 extends Thread
    {
       public void run()
       {
          for (int i = 0; i < 5; i++)
          {
             System.out.println("I am Consumer : Consumed Item " + i);
             Thread.yield();
          }
       }
    }
    
    Output of above program “without” yield() method
    I am Consumer : Consumed Item 0
     I am Consumer : Consumed Item 1
     I am Consumer : Consumed Item 2
     I am Consumer : Consumed Item 3
     I am Consumer : Consumed Item 4
     I am Producer : Produced Item 0
     I am Producer : Produced Item 1
     I am Producer : Produced Item 2
     I am Producer : Produced Item 3
     I am Producer : Produced Item 4
    
    Output of above program “with” yield() method added
    I am Producer : Produced Item 0
     I am Consumer : Consumed Item 0
     I am Producer : Produced Item 1
     I am Consumer : Consumed Item 1
     I am Producer : Produced Item 2
     I am Consumer : Consumed Item 2
     I am Producer : Produced Item 3
     I am Consumer : Consumed Item 3
     I am Producer : Produced Item 4
     I am Consumer : Consumed Item 4
    View Code


    CountDownLatch

    CountDownLatch是一个计数器闭锁,通过它可以完成类似于阻塞当前线程的功能,即:一个线程或多个线程一直等待,直到其他线程执行的操作完成。CountDownLatch用一个给定的计数器来初始化,该计数器的操作是原子操作,即同时只能有一个线程去操作该计数器。调用该类await方法的线程会一直处于阻塞状态,直到其他线程调用countDown方法使当前计数器的值变为零,每次调用countDown计数器的值减1。当计数器值减至零时,所有因调用await()方法而处于等待状态的线程就会继续往下执行。这种现象只会出现一次,因为计数器不能被重置,如果业务上需要一个可以重置计数次数的版本,可以考虑使用CycliBarrier。
    在某些业务场景中,程序执行需要等待某个条件完成后才能继续执行后续的操作;典型的应用如并行计算,当某个处理的运算量很大时,可以将该运算任务拆分成多个子任务,等待所有的子任务都完成之后,父任务再拿到所有子任务的运算结果进行汇总。
    在一些应用场合中,需要等待某个条件达到要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作。 这个时候就可以使用CountDownLatch。CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。

    • public void countDown()

    递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少。如果新的计数为零,出于线程调度目的,将重新启用所有的等待线程。
    如果当前计数等于零,则不发生任何操作。

    • public boolean await(long timeout,TimeUnit unit) throws InterruptedException

    使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回 true 值。
    如果当前计数大于零,则出于线程调度目的,将禁用当前线程,且在发生以下三种情况之一前,该线程将一直处于休眠状态:

    以下例子在检测完3个子服务后,启动主服务。

    public abstract class BaseHealthChecker implements Runnable {
         
        private CountDownLatch _latch;
        private String _serviceName;
        private boolean _serviceUp;
         
        //Get latch object in constructor so that after completing the task, thread can countDown() the latch
        public BaseHealthChecker(String serviceName, CountDownLatch latch)
        {
            super();
            this._latch = latch;
            this._serviceName = serviceName;
            this._serviceUp = false;
        }
     
        @Override
        public void run() {
            try {
                verifyService();
                _serviceUp = true;
            } catch (Throwable t) {
                t.printStackTrace(System.err);
                _serviceUp = false;
            } finally {
                if(_latch != null) {
                    _latch.countDown();
                }
            }
        }
     
        public String getServiceName() {
            return _serviceName;
        }
     
        public boolean isServiceUp() {
            return _serviceUp;
        }
        //This methos needs to be implemented by all specific service checker
        public abstract void verifyService();
    }
    
    public class NetworkHealthChecker extends BaseHealthChecker
    {
        public NetworkHealthChecker (CountDownLatch latch)  {
            super("Network Service", latch);
        }
         
        @Override
        public void verifyService() 
        {
            System.out.println("Checking " + this.getServiceName());
            try
            {
                Thread.sleep(7000);
            } 
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            System.out.println(this.getServiceName() + " is UP");
        }
    }
    
    public class ApplicationStartupUtil 
    {
        //List of service checkers
        private static List<BaseHealthChecker> _services;
         
        //This latch will be used to wait on
        private static CountDownLatch _latch;
         
        private ApplicationStartupUtil()
        {
        }
         
        private final static ApplicationStartupUtil INSTANCE = new ApplicationStartupUtil();
         
        public static ApplicationStartupUtil getInstance()
        {
            return INSTANCE;
        }
         
        public static boolean checkExternalServices() throws Exception
        {
            //Initialize the latch with number of service checkers
            _latch = new CountDownLatch(3);
             
            //All add checker in lists
            _services = new ArrayList<BaseHealthChecker>();
            _services.add(new NetworkHealthChecker(_latch));
            _services.add(new CacheHealthChecker(_latch));
            _services.add(new DatabaseHealthChecker(_latch));
             
            //Start service checkers using executor framework
            Executor executor = Executors.newFixedThreadPool(_services.size());
             
            for(final BaseHealthChecker v : _services) 
            {
                executor.execute(v);
            }
             
            //Now wait till all services are checked
            _latch.await();
             
            //Services are file and now proceed startup
            for(final BaseHealthChecker v : _services) 
            {
                if( ! v.isServiceUp())
                {
                    return false;
                }
            }
            return true;
        }
    }
    
    public class Main {
        public static void main(String[] args) 
        {
            boolean result = false;
            try {
                result = ApplicationStartupUtil.checkExternalServices();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("External services validation completed !! Result was :: "+ result);
        }
    }
     
    Output in console:
     
    Checking Network Service
    Checking Cache Service
    Checking Database Service
    Database Service is UP
    Cache Service is UP
    Network Service is UP
    External services validation completed !! Result was :: true
    View Code

    CyclicBarrier

    字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。

    它更多是统一一组线程,做统一规划,协调线程组步调,比如在run方法的某一点设置一个标记点,指定线程组数量,在一个点中断-续做。

    叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。

      CyclicBarrier类位于java.util.concurrent包下,CyclicBarrier提供2个构造器:

    • public CyclicBarrier(int parties, Runnable barrierAction) 
    • public CyclicBarrier(int parties) 

      参数parties指让多少个线程或者任务等待至barrier状态;参数barrierAction为当这些线程都达到barrier状态时会执行的内容。

      最重要的方法就是await方法,它有2个重载版本:

    • public int await() throws InterruptedException, BrokenBarrierException { };
    • public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };

      第一个比较常用,用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;

      第二个是让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务。

    例子:

    public class HelloHP {
      public static void main(String args[])
          throws InterruptedException, BrokenBarrierException {
        CyclicBarrier barrier = new CyclicBarrier(4);
        Party first = new Party(1000, barrier, "PARTY-1");
        Party second = new Party(2000, barrier, "PARTY-2");
        Party third = new Party(3000, barrier, "PARTY-3");
        Party fourth = new Party(4000, barrier, "PARTY-4");
        first.start();
        second.start();
        third.start();
        fourth.start();
      }
    }
    
    class Party extends Thread {
      private int duration;
      private CyclicBarrier barrier;
    
      public Party(int duration, CyclicBarrier barrier, String name) {
        super(name);
        this.duration = duration;
        this.barrier = barrier;
      }
    
      @Override
      public void run() {
        try {
          Thread.sleep(duration);
          System.out
              .println(Thread.currentThread().getName() + " 第一次列队");
          barrier.await();
          System.out.println(
              Thread.currentThread().getName() + " 最后一次列队");
        } catch (InterruptedException | BrokenBarrierException e) {
          e.printStackTrace();
        }
      }
    }
    
    PARTY-1 第一次列队
    PARTY-2 第一次列队
    PARTY-3 第一次列队
    PARTY-4 第一次列队
    PARTY-1 最后一次列队
    PARTY-3 最后一次列队
    PARTY-4 最后一次列队
    PARTY-2 最后一次列队
    View Code

    包含2个构造函数的用法。参数barrierAction为当这些线程都达到barrier状态时会执行的内容

    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
     
    public class CyclicDemo {
     
        public static void main(String[] args) {
            
            List<Integer> dataList = Collections.synchronizedList(new ArrayList<Integer>());
            // Initializing cyclicbarrier
            CyclicBarrier cb = new CyclicBarrier(3, new ListBarrierAction(dataList));
            // starting threads
            for(int i = 0; i < 3; i++) {
                new Thread(new ListWorker(dataList, cb)).start();;
            }
     
        }    
    }
     
    class ListWorker implements Runnable{
        private CyclicBarrier cb;
        private List<Integer> dataList;
        ListWorker(List<Integer> dataList, CyclicBarrier cb) {
            this.dataList = dataList;
            this.cb = cb;
        }
        @Override
        public void run() {
            System.out.println("Executing run method for thread - " + Thread.currentThread().getName());
            for(int i = 0; i < 10; i++) {
                dataList.add(i);
            }
            
            try {
                System.out.println("Calling await.. " + Thread.currentThread().getName());
                cb.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        
        }
    }
    // Barrier action to be executed when barrier is tripped
    class ListBarrierAction implements Runnable {
        private List<Integer> dataList;
        ListBarrierAction(List<Integer> dataList){
            this.dataList = dataList;
        }
        @Override
        public void run() {
            System.out.println("In ListBarrierAction, start further processing on list with length " + dataList.size());
        }
    }
    
    
    Executing run method for thread - Thread-0
    Calling await.. Thread-0
    Executing run method for thread - Thread-2
    Executing run method for thread - Thread-1
    Calling await.. Thread-2
    Calling await.. Thread-1
    In ListBarrierAction, start further processing on list with length 30
    View Code

    注意,使用了Collections.synchronizedList来保证线程安全。

    Semaphore

    Semaphore翻译成字面意思为 信号量,Semaphore可以控同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。

      Semaphore类位于java.util.concurrent包下,它提供了2个构造器:

    public Semaphore(int permits) {          //参数permits表示许可数目,即同时可以允许多少线程进行访问
        sync = new NonfairSync(permits);
    }
    public Semaphore(int permits, boolean fair) {    //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
        sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
    }

      下面说一下Semaphore类中比较重要的几个方法,首先是acquire()、release()方法:

    • public void acquire() throws InterruptedException { } //获取一个许可
    • public void acquire(int permits) throws InterruptedException { } //获取permits个许可
    • public void release() { } //释放一个许可
    • public void release(int permits) { } //释放permits个许可

      acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。

      release()用来释放许可。注意,在释放许可之前,必须先获获得许可。
    假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用

    public class Test {
        public static void main(String[] args) {
            int N = 8;            //工人数
            Semaphore semaphore = new Semaphore(5); //机器数目
            for(int i=0;i<N;i++)
                new Worker(i,semaphore).start();
        }
         
        static class Worker extends Thread{
            private int num;
            private Semaphore semaphore;
            public Worker(int num,Semaphore semaphore){
                this.num = num;
                this.semaphore = semaphore;
            }
             
            @Override
            public void run() {
                try {
                    semaphore.acquire();
                    System.out.println("工人"+this.num+"占用一个机器在生产...");
                    Thread.sleep(2000);
                    System.out.println("工人"+this.num+"释放出机器");
                    semaphore.release();           
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    View Code
    • CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
    • 另外,CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。
    • Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。

    Exchanger

    Exchanger 是 JDK 1.5 开始提供的一个用于两个工作线程之间交换数据的封装工具类,简单说就是一个线程在完成一定的事务后想与另一个线程交换数据,则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据。其定义为 Exchanger<V> 泛型类型,其中 V 表示可交换的数据类型,对外提供的接口很简单,具体如下:

    • Exchanger():无参构造方法。
    • V exchange(V v):等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。
    • V exchange(V v, long timeout, TimeUnit unit):等待另一个线程到达此交换点(除非当前线程被中断或超出了指定的等待时间),然后将给定的对象传送给该线程,并接收该线程的对象。

    可以看出,当一个线程到达 exchange 调用点时,如果其他线程此前已经调用了此方法,则其他线程会被调度唤醒并与之进行对象交换,然后各自返回;如果其他线程还没到达交换点,则当前线程会被挂起,直至其他线程到达才会完成交换并正常返回,或者当前线程被中断或超时返回。
    例子:

    import java.util.concurrent.Exchanger;
    
    public class RunExchange {
      
      public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        Car car = new Car(exchanger);
        Bike bike = new Bike(exchanger);
        car.start();
        bike.start();
        System.out.println("exchanger end by:");
        System.out.println("Main end!");
    }
    
    }
    
    class Bike extends Thread {
      private Exchanger<String> exchanger;
    
      public Bike(Exchanger<String> exchanger) {
          super();
          this.exchanger = exchanger;
      }
    
      @Override
      public void run() {
          try {
              System.out.println(Thread.currentThread().getName() + ": " + exchanger.exchange("Bike"));
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
    }
    
    class Car extends Thread {
      private Exchanger<String> exchanger;
    
      public Car(Exchanger<String> exchanger) {
          super();
          this.exchanger = exchanger;
      }
    
      @Override
      public void run() {
          try {
              System.out.println(Thread.currentThread().getName() + ": " + exchanger.exchange("Car"));
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
    }
    View Code

    阻塞队列

    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
    1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
    2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
    阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

    • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。
    • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。
    • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列.以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。
    • DelayQueue:一个使用优先级队列实现的无界阻塞队列.基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
    • SynchronousQueue:一个生产和消费作业均衡的阻塞队列。


    ArrayBlockingQueue

    特点:

    • ArrayBlockingQueue是由数组支持的固定大小的有界队列。
    • 元素FIFO(先进先出)。
    • 元素被插入到尾部,并从队列的头部检索。
    • 一旦创建,就不能更改队列的容量。
    • 它提供阻塞插入和检索操作。
    • 它不允许空对象。
    • ArrayBlockingQueue是线程安全的。
    • 方法Iterator()中提供的迭代器按从第一个(head)到最后一个(tail)的顺序遍历元素。
    • ArrayBlockingQueue支持可选的公平性策略,用于排序等待的生产者和使用者线程。公平性设置为true时,队列按FIFO顺序授予线程访问权。ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

    1.非阻塞队列中的几个主要方法:

    •   add(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;
    •   remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;
    •   offer(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;
    •   poll():移除并获取队首元素,若成功,则返回队首元素;否则返回null;
    •   peek():获取队首元素,若成功,则返回队首元素;否则返回null
    •   对于非阻塞队列,一般情况下建议使用offer、poll和peek三个方法,不建议使用add和remove方法。因为使用offer、poll和peek三个方法可以通过返回值判断操作成功与否,而使用add和remove方法却不能达到这样的效果。注意,非阻塞队列中的方法都没有进行同步措施。

    2.阻塞队列中的几个主要方法:
      阻塞队列包括了非阻塞队列中的大部分方法,上面列举的5个方法在阻塞队列中都存在,但是要注意这5个方法在阻塞队列中都进行了同步措施。除此之外,阻塞队列提供了另外4个非常有用的方法:

    •   put(E e)
    •   take()
    •   offer(E e,long timeout, TimeUnit unit)
    •   poll(long timeout, TimeUnit unit)

    原生代码:
    使用可重入锁ReentrantLock保障公平性,使用Condition来判满和判空。

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
     
    private static final long serialVersionUID = -817911632652898426L;
     
    /** The queued items  */
    private final E[] items;
    /** items index for next take, poll or remove */
    private int takeIndex;
    /** items index for next put, offer, or add. */
    private int putIndex;
    /** Number of items in the queue */
    private int count;
     
    /*
    * Concurrency control uses the classic two-condition algorithm
    * found in any textbook.
    */
     
    /** Main lock guarding all access */
    private final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;
    
    ...
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            insert(e);
        } finally {
            lock.unlock();
        }
    }
    ...
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }
    }

    一个应用的例子:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.TimeUnit;
     
    public class ArrayBlockingQueueExample 
    {
        public static void main(String[] args) throws InterruptedException 
        {
            ArrayBlockingQueue<Integer> priorityBlockingQueue = new ArrayBlockingQueue<>(5);
     
            //Producer thread
            new Thread(() -> 
            {
                int i = 0;
                try
                {
                    while (true) 
                    {
                        priorityBlockingQueue.put(++i);
                        System.out.println("Added : " + i);
                         
                        Thread.sleep(TimeUnit.SECONDS.toMillis(1));
                    }
     
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
     
            }).start();
     
            //Consumer thread
            new Thread(() -> 
            {
                try
                {
                    while (true) 
                    {
                        Integer poll = priorityBlockingQueue.take();
                        System.out.println("Taked : " + poll);
                         
                        Thread.sleep(TimeUnit.SECONDS.toMillis(2));
                    }
     
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
     
            }).start();
        }
    }
    View Code

    LinkedBlockingQueue

    LinkedBlockingQueue基于链接节点,其中每个节点保存对下一个节点的引用。每次插入时都会动态创建链接节点,除非这会使队列超过容量。
    此队列对元素FIFO(先进先出)进行排序。队列的头是队列中出现时间最长的元素。队列的尾部是在队列中出现的时间最短的元素。在队列的尾部插入新元素,队列检索操作在队列的头部获取元素。
    特点:

    • LinkedBlockingQueue是可选的有界阻塞队列,默认无界容量(Integer.MAX_VALUE),也可以指定固定大小(public LinkedBlockingQueue(int capacity))。
    • 是线程安全的。类中的所有排队方法都在内部使用ReentrantLock原子地实现其效果。
    • LinkedBlockingQueue不允许空元素。它在尝试添加、放置或提供null时抛出NullPointerException。
    • 其他的和ArrayBlockingQueue类似。

    以下是指定一个有界为5的队列,满5生产者就阻塞。

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class LinkedBQ {
      
      public static void main(String[] args) {        
          // shared queue
          BlockingQueue<Integer> bQueue = new LinkedBlockingQueue<>(5);
          ExecutorService executor = Executors.newFixedThreadPool(2);
          executor.execute(new LinkedProducer(bQueue));
          executor.execute(new LinkedConsumer(bQueue));
          executor.shutdown();
      }
    }
    
    //Producer
    class LinkedProducer implements Runnable{
      BlockingQueue<Integer> bQueue;
      LinkedProducer(BlockingQueue<Integer> bQueue){
          this.bQueue = bQueue;
      }
      @Override
      public void run() {
          for(int i = 1; i < 11; i++){
              try {
                  bQueue.put(i);
                  System.out.println("Added to queue:" + i);
              } catch (InterruptedException e) {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
              }
          }
      }
    }
    //Consumer
    class LinkedConsumer implements Runnable{
      BlockingQueue<Integer> bQueue;
      LinkedConsumer(BlockingQueue<Integer> bQueue){
          this.bQueue = bQueue;
      }
      @Override
      public void run() {
          for(int i = 1; i < 11; i++){
              try {
                  System.out.println("Consumer retrieved: " + bQueue.take());
              } catch (InterruptedException e) {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
              }
          }
      }
    }
    
    Added to queue:1
    Added to queue:2
    Added to queue:3
    Added to queue:4
    Added to queue:5
    Consumer retrieved: 1
    Consumer retrieved: 2
    Consumer retrieved: 3
    Consumer retrieved: 4
    Consumer retrieved: 5
    Added to queue:6
    Consumer retrieved: 6
    Added to queue:7
    Added to queue:8
    Added to queue:9
    Consumer retrieved: 7
    Added to queue:10
    Consumer retrieved: 8
    Consumer retrieved: 9
    Consumer retrieved: 10
    View Code

    PriorityBlockingQueue

    是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以通过自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator对元素进行排序,但不能保证同优先级元素的顺序。
    例如:

    public class PriorityBQ {
     
        public static void main(String[] args) {
            BlockingQueue<Integer> bQueue = new PriorityBlockingQueue<>(10, new MyComparator());    
            // adding elements
            bQueue.add(10);
            bQueue.add(5);
            bQueue.add(1);
            bQueue.add(3);
            // retrieving (head of the queue)        
            System.out.println("Element- " + bQueue.poll());
        }
    }
    //Comparator class
    class MyComparator implements Comparator<Integer>{
     @Override
     public int compare(Integer o1, Integer o2) {
         return o2.compareTo(o1);
     }    
    }
    View Code

    SynchronousQueue

    没有内部容量。他的名字已经很清晰了:“同步队列”,生产即必须同步消费。
    由于没有容量,每个插入操作都必须等待另一个线程执行相应的读取操作。例如,如果使用put()方法将元素插入同步队列,则该方法将被阻止,直到另一个线程接收到该元素。如果试图从同步队列中检索元素,并且队列方法中没有元素,则使用相同的方法等待另一个线程插入该元素。
    无法查看同步队列,因为只有在尝试删除某个元素时该元素才存在。因此peek()方法总是返回空值。
    无法迭代SynchronousQueue,因为没有要迭代的内容。所以iterator()和spliterator()方法分别返回一个空的iterator或spliterator。
    Java中的SynchronousQueue与其他BlockingQueue实现一样,不允许空元素。它在尝试添加、放置或提供null时抛出NullPointerException。

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.SynchronousQueue;
     
    public class SychroQ {
     
        public static void main(String[] args) {
            BlockingQueue<Integer> bQueue = new SynchronousQueue<>();
            // Producer 
            new Thread(()->{
                for(int i = 0; i < 5; i++){
                    try {
                        System.out.println("Added to queue " + i);
                        bQueue.put(i);
                        Thread.sleep(200);                                 
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                  }
            }).start();
            
            // Consumer
            new Thread(()->{
                for(int i = 0; i < 5; i++){
                    try {
                        Thread.sleep(3000);
                        System.out.println("Consumer retrieved " + bQueue.take());                    
                        
                    } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                    }
                }
            }).start();
        }
    }
    View Code


    DelayQueue

    DelayQueue中的元素只能在其延迟过期时获取。
    其元素必须实现以下接口

    public interface Delayed extends Comparable<Delayed> {
        long getDelay(TimeUnit unit);
    }

    以下的例子是生产者指定5秒后才能消费。

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class DQDemo {
      public static void main(String[] args) {
          // delay of 5 seconds
          final long delay = 5000;
          BlockingQueue<DelayQElement> delayQ = new DelayQueue<DelayQElement>();
          // Producer thread
          new Thread(()->{
              for(int i = 0; i < 5; i++){
                  try {
                      delayQ.put(new DelayQElement("Element"+i, delay));
                      Thread.sleep(500);
                  } catch (InterruptedException e) {
                      // TODO Auto-generated catch block
                      e.printStackTrace();
                  }
              }
          }).start();
          
          // Consumer thread
          new Thread(()->{
              for(int i = 0; i < 5; i++){
                  try {
                      System.out.println(" Consumer got - " + delayQ.take().toString());
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      // TODO Auto-generated catch block
                      e.printStackTrace();
                  }
              }
      
          }).start();
      }
    }
    
    class DelayQElement implements Delayed{
      private String queueElement;
      private long expiryTime;
      DelayQElement(String queueElement, long delay){
          this.queueElement = queueElement;
          // Expirytime is current time + delay
          this.expiryTime = System.currentTimeMillis() + delay;
          System.out.println("Putting queueElement "  + queueElement + " expiry " + this.expiryTime);
      }
      
      @Override
      public long getDelay(TimeUnit unit) {
          long diff = expiryTime - System.currentTimeMillis();
          return unit.convert(diff, TimeUnit.MILLISECONDS);
      }
      
      @Override
      public int compareTo(Delayed o) {
          if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)){ 
              return -1; 
          } 
          if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)){
              return 1;
          }
          return 0;     
      }
      public String toString(){
          return queueElement + " Expiry Time= " + expiryTime;
      } 
     }
    View Code
  • 相关阅读:
    Kettle初使用
    Datax初使用
    代码层次上的软件质量属性
    第二周周总结
    软件质量属性---可修改性
    淘宝网中的软件质量属性
    第一周周总结
    2020寒假(12)
    2020寒假(11)
    2020寒假(10)
  • 原文地址:https://www.cnblogs.com/starcrm/p/12469364.html
Copyright © 2011-2022 走看看