zoukankan      html  css  js  c++  java
  • java多线程基本概述(二十四)——Semaphore

    正常的锁在任何时刻都只允许一个任务访问一项资源,而计数信号量允许n个任务同时访问这个资源。

    一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。

    Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问:

     class Pool {
       private static final int MAX_AVAILABLE = 100;
       private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
    
       public Object getItem() throws InterruptedException {
         available.acquire();
         return getNextAvailableItem();
       }
    
       public void putItem(Object x) {
         if (markAsUnused(x))
           available.release();
       }
    
       // Not a particularly efficient data structure; just for demo
    
       protected Object[] items = ... whatever kinds of items being managed
       protected boolean[] used = new boolean[MAX_AVAILABLE];
    
       protected synchronized Object getNextAvailableItem() {
         for (int i = 0; i < MAX_AVAILABLE; ++i) {
           if (!used[i]) {
              used[i] = true;
              return items[i];
           }
         }
         return null; // not reached
       }
    
       protected synchronized boolean markAsUnused(Object item) {
         for (int i = 0; i < MAX_AVAILABLE; ++i) {
           if (item == items[i]) {
              if (used[i]) {
                used[i] = false;
                return true;
              } else
                return false;
           }
         }
         return false;
       }
    
     }
     

    获得一项前,每个线程必须从信号量获取许可,从而保证可以使用该项。该线程结束后,将项返回到池中并将许可返回到该信号量,从而允许其他线程获取该项。注意,调用 acquire() 时无法保持同步锁,因为这会阻止将项返回到池中。信号量封装所需的同步,以限制对池的访问,这同维持该池本身一致性所需的同步是分开的。

    将信号量初始化为 1,使得它在使用时最多只有一个可用的许可,从而可用作一个相互排斥的锁。这通常也称为二进制信号量,因为它只能有两种状态:一个可用的许可,或零个可用的许可。按此方式使用时,二进制信号量具有某种属性(与很多 Lock 实现不同),即可以由线程释放“锁”,而不是由所有者(因为信号量没有所有权的概念)。在某些专门的上下文(如死锁恢复)中这会很有用。

    此类的构造方法可选地接受一个公平 参数。当设置为 false 时,此类不对线程获取许可的顺序做任何保证。特别地,闯入 是允许的,也就是说可以在已经等待的线程前为调用 acquire() 的线程分配一个许可,从逻辑上说,就是新线程将自己置于等待线程队列的头部。当公平设置为 true 时,信号量保证对于任何调用获取方法的线程而言,都按照处理它们调用这些方法的顺序(即先进先出;FIFO)来选择线程、获得许可。注意,FIFO 排序必然应用到这些方法内的指定内部执行点。所以,可能某个线程先于另一个线程调用了 acquire,但是却在该线程之后到达排序点,并且从方法返回时也类似。还要注意,非同步的 tryAcquire 方法不使用公平设置,而是使用任意可用的许可。

    通常,应该将用于控制资源访问的信号量初始化为公平的,以确保所有线程都可访问资源。为其他的种类的同步控制使用信号量时,非公平排序的吞吐量优势通常要比公平考虑更为重要。

    此类还提供便捷的方法来同时 acquire 和释放多个许可。小心,在未将公平设置为 true 时使用这些方法会增加不确定延期的风险。

    package tij;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;
    
    /**
     * Created by huaox on 2017/4/2.
     *
     */
    
    class Pool<T>{
        private int size;
        private List<T> items = new ArrayList<T>();
        private volatile boolean[] checkedOut;//被签出的对象
        private Semaphore semaphore;
        Pool(Class<T> tClass,int size) {
            this.size = size;
            checkedOut = new boolean[size];
            semaphore = new Semaphore(size, true);
            for (int i = 0; i < size; i++) {
                try {
                    items.add(tClass.newInstance());
                } catch (IllegalAccessException | InstantiationException e) {
                    e.printStackTrace();
                }
            }
        }
        T checkOut() throws InterruptedException {
                semaphore.acquire();//在许可可用前将被阻塞,直到许可可用
                return getItem();
        }
        void checkIin(T item){
           if (realease(item))//
               semaphore.release();
        }
        synchronized T getItem(){
            for (int i = 0; i < size; i++)
                if(!checkedOut[i]){
                    checkedOut[i]=true;
                    return items.get(i);
                }
            return null;
        }
        synchronized boolean realease(T item){
            int index = items.indexOf(item);
            if(index<0)
                return false;
            if(checkedOut[index]){
               checkedOut[index]=false;
                return true;
            }
            return false;
        }
    }
    class Flat{
        private volatile double d ;
        private static int count = 0;
        private final int id = count++;
        public Flat() {
            for (int i = 0; i < 10000; i++) {
                d+=(Math.E+Math.PI)/(double) i;
            }
            //System.out.println("creating tij.Flat object "+id+" ");
        }
        public String toString() {
            return "tij.Flat id "+id;
        }
    }
    class CheckOutTask<T> implements Runnable{
        private static int count = 0;
        private final int id = count++;
        private Pool<T> pool;
        CheckOutTask(Pool<T> pool){
            this.pool = pool;
        }
        public void run() {
               try {
                   T item = pool.checkOut();
                   System.out.println(this + " is checking out "+ item+" ");
                   TimeUnit.SECONDS.sleep(2);
                   pool.checkIin(item);
                   System.out.println(this + " is checking  in "+ item+" ");
               }catch (InterruptedException e) {
                   e.printStackTrace();
               }
        }
        public String toString() {
            return "tij.CheckOutTask " + id;
        }
    }
    
    
    public class Test5 {
        private final static int SIZE = 5;
        public static void main(String[] args) throws Exception{
            final Pool<Flat> pool = new Pool<>(Flat.class,SIZE);
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = 0; i < SIZE; i++)
                executorService.execute(new CheckOutTask<Flat>(pool));
            System.out.println("All checkoutTask created");
            List<Flat> list =new ArrayList<Flat>();
            for (int i = 0; i < SIZE; i++) {
                Flat flat = pool.checkOut();
                System.out.println("main         "+i+" is checking out "+flat);
                list.add(flat);
            }
            Future<?> future = executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Flat flat = pool.checkOut();
                        System.out.println("another is checking "+flat);
                    } catch (InterruptedException e) {
                        System.out.println("InterruptedException form checkout");
                    }
                }
            });
            TimeUnit.SECONDS.sleep(3);
            future.cancel(true);
            System.out.println("checking in objects in"+list);
            for (Flat flat : list)
                pool.checkIin(flat);
            executorService.shutdown();
        }
    }

    输出结果:

    All checkoutTask created
    tij.CheckOutTask 0 is checking out tij.Flat id 0 
    tij.CheckOutTask 1 is checking out tij.Flat id 1 
    main         0 is checking out tij.Flat id 2
    main         1 is checking out tij.Flat id 3
    main         2 is checking out tij.Flat id 4
    tij.CheckOutTask 1 is checking  in tij.Flat id 1 
    tij.CheckOutTask 2 is checking out tij.Flat id 1 
    main         3 is checking out tij.Flat id 0
    tij.CheckOutTask 0 is checking  in tij.Flat id 0 
    tij.CheckOutTask 3 is checking out tij.Flat id 1 
    tij.CheckOutTask 2 is checking  in tij.Flat id 1 
    tij.CheckOutTask 4 is checking out tij.Flat id 1 
    tij.CheckOutTask 3 is checking  in tij.Flat id 1 
    tij.CheckOutTask 4 is checking  in tij.Flat id 1 
    main         4 is checking out tij.Flat id 1
    InterruptedException form checkout
    checking in objects in[tij.Flat id 2, tij.Flat id 3, tij.Flat id 4, tij.Flat id 0, tij.Flat id 1]
    
    Process finished with exit code 0
  • 相关阅读:
    [转]The Machine SID Duplication Myth (and Why Sysprep Matters)
    The Machine SID Duplication Myth (and Why Sysprep Matters)
    [转]Shell(Bash) mysql数据库操作
    [转]GNU Sourcehighlight 语法高亮代码
    [原]DRBD双主模式问题
    mpstat
    DRBD试用
    drbd双主模式问题
    虚IP的添加和删除
    为什么5%的技术人员开发效率是其他95%的20倍?
  • 原文地址:https://www.cnblogs.com/soar-hu/p/6742376.html
Copyright © 2011-2022 走看看