zoukankan      html  css  js  c++  java
  • Semaphore

      信号量,用于控制并发的线程的数目。信号量在JUC下的实现,每当一个线程进入临界区信号量减少,线程释放锁后信号量增加。

    1.1 简单使用

      初始化permit为10的信号量,acquire减少2,release增加2,本质上等价于permit=5,acquire release都是1的信号量,并发线程数目为5个。

    public class Service {
        private Semaphore semaphore = new Semaphore(10);
        public void testMethod(){
            try {
                semaphore.acquire(2);
                System.out.println(Thread.currentThread().getName()+"  begain time"+System.currentTimeMillis());
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println(Thread.currentThread().getName()+"  end time"+System.currentTimeMillis());
            semaphore.release(2);
    
        }
    }

       Thread类把service对象注入。

    public class Thread extends java.lang.Thread {
        private Service service;
    
        public Thread(Service service) {
            this.service = service;
        }
    
        @Override
        public void run() {
            service.testMethod();
        }
    }

      测试类同时开启10个线程。

    public class Run {
        public static void main(String[] args) {
            Service service = new Service();
            for (int i = 0; i < 10; i++) {
                Thread thread = new Thread(service);
                thread.setName(i+"");
                thread.start();
            }
        }
    }

      从控制台的输出结果看到同时最多有5个线程同时在处理机上。

    0  begain time1556026437118
    1  begain time1556026437118
    3  begain time1556026437119
    2  begain time1556026437119
    4  begain time1556026437119
    0  end time1556026442120
    2  end time1556026442120
    5  begain time1556026442120
    3  end time1556026442120
    1  end time1556026442120
    7  begain time1556026442121
    6  begain time1556026442121
    4  end time1556026442120
    8  begain time1556026442121
    9  begain time1556026442121
    5  end time1556026447124
    9  end time1556026447124
    7  end time1556026447124
    6  end time1556026447124
    8  end time1556026447124

     1.2 release与permit

      release每次增加permit的数目,可以大于初始化Semaphore时permit的数量。

    public class testReleasePermit {
        public static void main(String[] args) throws InterruptedException {
            Semaphore semaphore = new Semaphore(4);
            System.out.println(semaphore.availablePermits());
            semaphore.acquire(4);
            System.out.println(semaphore.availablePermits());
            semaphore.release(1);
            semaphore.release(1);
            semaphore.release(1);
            semaphore.release(1);
            System.out.println(semaphore.availablePermits());
            semaphore.release(4);
            System.out.println(semaphore.availablePermits());
    
        }
    }

      初始化4,acquire4次变为0,release4次变为4,继续release变为8。初始化permit只是该信号量在初始化的时候允许的permit数目,而非最大的数目。

    4
    0
    4
    8

    1.3 信号量和中断

      当线程无法获得信号量的时候,该线程会被被park起来,即线程进入了WAITING状态,在WAITING状态的线程在被中断的时候会抛出异常,所以一个线程在acquire失败变成WAITING状态的时候是可以被中断的

    public class RunInter {
        public static void main(String[] args) throws InterruptedException {
            serviceInter serviceInter = new serviceInter();
            myThread thread1 = new myThread(serviceInter);
            myThread thread2 = new myThread(serviceInter);
            thread1.start();
            thread2.start();
            Thread.sleep(10);
            System.out.println(thread2.getState());
            thread2.interrupt();
    
        }
    }
    public class serviceInter {
        private Semaphore semaphore = new Semaphore(1);
    
        public void testMethod()  {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+"  begain"+System.currentTimeMillis());
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName()+"  end"+System.currentTimeMillis());
                semaphore.release();
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName()+"  进入了中断");
                e.printStackTrace();
            }
    
        }
    
    }

      Thread-0先获得了信号量并执行,Thread-1没有获得信号量进入WAITING状态,并且在main线程里调用thread2的interrupt方法后抛出了异常。

    Thread-0  begain1556029651021
    WAITING
    Thread-1  进入了中断
    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
        at Semaphore.serviceInter.testMethod(serviceInter.java:12)
        at Semaphore.myThread.run(myThread.java:14)
    Thread-0  end1556029655608

      把acquire改成acquireUninterruptibly,禁止中断没有获得信号量的线程,同样的测试代码结果如下。虽然还是抛出了异常但是异常是在thread2获得了信号量之后抛出的。

    Thread-0  begain1556030019636
    WAITING
    Thread-0  end1556030020637
    Thread-1  begain1556030020638
    Thread-1  进入了中断
    java.lang.InterruptedException: sleep interrupted
        at java.lang.Thread.sleep(Native Method)
        at Semaphore.serviceNonInter.testMethod(serviceNonInter.java:12)
        at Semaphore.myThreadNoInter.run(myThreadNoInter.java:14)

     1.4获取可用的信号量的个数

    • availablePermits,返回可用的信号量的个数
    • drainPermits,返回可用的信号量的个数并清零
    public class getAvailable {
        public static void main(String[] args) throws InterruptedException {
            Semaphore semaphore = new Semaphore(10);
            semaphore.acquire(4);
            System.out.println(semaphore.availablePermits());
            System.out.println(semaphore.drainPermits());
            System.out.println(semaphore.availablePermits());
        }
    }

     1.5 获取在等待信号量线程的个数

    • getQueueLength,获取在排队获取线程的个数
    • hasQueueThreads,判断是否有线程在等待

      getQueueLength并不是准确的,比如同时开启30个线程,在第一个线程获取信号量的时候剩下29个线程还没有完成启动过程,所以理应有29个线程在等待但最终输出的数字个能小于等于29。

      比如我同时开启5个线程,在testMethod中打印getQueueLength,在第一个线程启动的时候并没有打印4而是2。

    public static void main(String[] args) throws InterruptedException {
            serviceInter serviceInter = new serviceInter();
            for (int i = 0; i < 5; i++) {
                myThread thread = new myThread(serviceInter);
                thread.start();
                
            }
    
        }
    Thread-0  begain1556030481874
    2
    Thread-0  end1556030482877
    Thread-1  begain1556030482878
    3
    Thread-1  end1556030483883
    Thread-2  begain1556030483883
    2
    Thread-2  end1556030484887
    Thread-3  begain1556030484888
    1
    Thread-3  end1556030485888
    Thread-4  begain1556030485888
    0
    Thread-4  end1556030486891

    1.5 公平和非公平信号量

      在信号量中,公平指的是获取信号量的顺序和启动线程的顺序相同,即先启动的线程能够先获取信号量,非公平则不提供这种保证,默认情况下是非公平的,因为这样效率更高。

      理论上非公平信号量获取信号量的顺序和启动的顺序无关,但是我测试的结果却是按照顺序来的。

    1.6 tryAcquire

      一种非阻塞的实现,获取线程失败的时候不进入WAITING状态而是返回false,配合if判断可实现一个简单的CAS乐观锁。

      把Service中的acquire改成truAcquire,可以看到开启的线程同时返回,无论是获得信号量的还是没有获得信号量的线程。

    public class Service {
        private Semaphore semaphore = new Semaphore(1);
        private void testMethod(){
            try {
                if (semaphore.tryAcquire(1)){
                    System.out.println(Thread.currentThread().getName()+"  进入");
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    semaphore.release();
                }else {
                    System.out.println(Thread.currentThread().getName()+" 未成功进入");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }
    0  进入
    1 未成功进入
    2 未成功进入
    3 未成功进入
    4 未成功进入

      tryAcquire还有两个重载的方法,用于指定一次获取多少个信号量,和没有获得信号量等待的时间。

    1.7 多处理与单处理

    public class Service {
        private Semaphore semaphore = new Semaphore(3);
        private ReentrantLock lock = new ReentrantLock();
    
        public void sayHello(){
            try {
                semaphore.acquire();
                lock.lock();
                System.out.println(Thread.currentThread().getName()+" 获得了信号量");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            lock.unlock();
            semaphore.release();
        }
    }

      作者这个例子想演示的效果是同时有多个线程获得信号量,但获得信号量的线程必须获得锁才能工作。用信号量来控制工作线程的数目,这就是所谓的多进路。用可重入锁来实现只能有一个线程获得锁,这就是单处理和多处理的区别。

      这和只使用一个锁有什么区别的呢?从打印的结果来看并没有任何区别,加不加信号量情况下的单处理结果是一样的。从对线程状态的改变来看也没看出有什么区别,线程无法获得信号量的时候被挂在信号量的AQS的队列里,线程获得了信号量没有获得锁的时候被挂在了锁的AQS的队列里,区别在哪里呢????

    1.8 字符串池

      该池实现这样一个功能:池中的字符串是有限的,同时最多有permit个线程可以从字符串池中获取字符串。

      看了这个介绍后我自己写了一版本,这个版本是错的。Run和myThread都和前面没什么区别,stringPool实现了字符串池,我的理解很简单设置一个permit为3的信号量,在get方法里的remove方法前面用acquire拦住,这样同时最多只能有三个线程执行remove方法。听起来是对的,但是报错了。

    public class stringPool {
        private int poolSize = 5;
        private int semaphorePermit = 3;
        private Semaphore semaphore = new Semaphore(semaphorePermit);
        private List<String> pool;
    
        public stringPool() {
            pool = new ArrayList<>(poolSize);
            for (int i=0;i<poolSize;i++){
                pool.add(i+"");
            }
        }
    
    
        public String get(){
            String string = null;
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
             string = pool.remove(0);
            //System.out.println(pool);
             return string;
        }
    
        public void put(String string){
            pool.add(string);
            //System.out.println(pool);
            semaphore.release();
        }
    }
    public class myThread extends Thread {
        private stringPool stringPool;
    
        public myThread(Semaphore.Pool.stringPool stringPool) {
            this.stringPool = stringPool;
        }
    
        @Override
        public void run() {
            String s = stringPool.get();
            System.out.println(Thread.currentThread().getName()+" 获得了"+s);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            stringPool.put(s);
            System.out.println(Thread.currentThread().getName()+" 归还了"+s);
        }
    }
    public class Run {
        public static void main(String[] args) {
            stringPool stringPool = new stringPool();
            for (int i = 0; i < 10; i++) {
                myThread thread = new myThread(stringPool);
                thread.setName(i+"");
                thread.start();
            }
        }
    }

      控制台抛出的异常,显然是List为空还执行remove方法。我的池子里最多有5个字符串,信号量设置的为3,为什么还会越界呢??????permit为3,也就是同时最多有3个线程可以从pool里取字符,poolSize为5,所以pool里字符的个数应该是大于等于2的。

    0 获得了0
    1 获得了1
    2 获得了2
    1 归还了1
    3 获得了3
    0 归还了0
    2 归还了2
    5 获得了1
    4 获得了4
    3 归还了3
    7 获得了3
    6 获得了2
    4 归还了4
    5 归还了1
    Exception in thread "8" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
        at java.util.ArrayList.rangeCheck(ArrayList.java:657)
        at java.util.ArrayList.remove(ArrayList.java:496)
        at Semaphore.Pool.stringPool.get(stringPool.java:28)
        at Semaphore.Pool.myThread.run(myThread.java:12)
    7 归还了3
    9 获得了3
    6 归还了2
    9 归还了3
  • 相关阅读:
    Canvas
    Web API 接口-JavaScript全部api接口文档
    编程中的命名设计那点事
    线程池的使用
    SRW锁的使用
    内存屏障
    VC用Beep整几首歌听听~~~
    简单的多线程并发同步演示(4种同步方法)
    C语言生成程序问题
    文件操作(输出倒数第二行、逆序输出)
  • 原文地址:https://www.cnblogs.com/AshOfTime/p/10759629.html
Copyright © 2011-2022 走看看