zoukankan      html  css  js  c++  java
  • 线程之Semaphore之事例

    3. Semaphore使用事例

    2.信号量介绍:

    1.Java并发编程——信号量与互斥量

    ====

    3. Semaphore使用事例

    前面说到过锁和synchronized, 现在说的同步机制信号量(Semaphore)是个什么概念呢。又有什么区别呢?
     Lock和synchronized是锁的互斥,一个线程如果锁定了一资源,那么其它线程只能等待资源的释放。也就是一次只有一个线程执行,这到这个线程执行完毕或者unlock。而Semaphore可以控制多个线程同时对某个资源的访问。

    Semaphore实现的功能就类似厕所有5个坑,假如有10个人要上厕所,那么同时只能有多少个人去上厕所呢?同时只能有5个人能够占用,当5个人中 的任何一个人让开后,其中等待的另外5个人中又有一个人可以占用了。另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的参数选项。

    当然单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。
     信号量用在多线程多任务同步的,一个线程完成了某一个动作就通过信号量告诉别的线程,别的线程再进行某些动作。

    也就是说Semaphore不一定是锁定某个资源,而是流程上的概念。比方说有A,B两个线程,B线程的操作可能要等A线程执行完毕之后才执行,这个任务 并不一定是锁定某一资源,还可以是进行一些计算或者数据处理之类,它们也许并不访问共享变量,只是逻辑上的先后顺序。
     java中计数信号量(Semaphore)维护着一个许可集。调用acquire()获取一个许可,release()释放一个许可。

    在java中,还可以设置该信号量是否采用公平模式,如果以公平方式执行,则线程将会按到达的顺序(FIFO)执行,如果是非公平,则可以后请求的有可能排在队列的头部。Semaphore当前在多线程环境下被扩放使用,操作系统的信号量是个很重要的概念,在进程控制方面都有应用。Java并发库Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数。

    使用信号量解决死锁问题

    public class BadLockTest {   
     protected Object obj1 = new Object();
     protected Object obj2 = new Object();  
     protected ExecutorService executorService = Executors.newCachedThreadPool();   
     protected Task1 test1=new Task1();   
     protected Task2 test2=new Task2();  
      public static void main(String[] args) {    
        BadLockTest test = new BadLockTest();      
      for(int i=0;i<50;i++){         
       test.test1.setCount(i);     
       test.test2.setCount(i);          
       test.executorService.execute(test.test1);           
    test.executorService.execute(test.test2); } } class Task1 implements Runnable { public int count; public void setCount(int count){ this.count=count; } @Override public void run() { synchronized (obj1) { System.out.println("task1得到obj1对象锁"+count); synchronized (obj2) { System.out.println("task1得到obj2对象锁"+count); } } } } class Task2 implements Runnable { public int count; public void setCount(int count){ this.count=count; } @Override public void run() { synchronized (obj2) { System.out.println("task1得到obj1对象锁"+count); synchronized (obj1) { System.out.println("task1得到obj2对象锁"+count); } } } }}

    得到结果:

    task1得到obj1对象锁1
    task1得到obj1对象锁1
    

    可从结果就知道已经发生了死锁。
     信号量可以控制资源能被多少线程访问,这里我们指定只能被一个线程访问,就做到了类似锁住。

     而信号量可以指定去获取的超时时间,我们可以根据这个超时时间,去做一个额外处理。
     对于无法成功获取的情况,一般就是重复尝试,或指定尝试的次数,也可以马上退出。

    public class BadLockTest {   
      protected ExecutorService executorService = Executors.newCachedThreadPool();   
     protected Task1 test1=new Task1();  
      protected Task2 test2=new Task2();  
      protected Semaphore s1=new Semaphore(1);  
      protected Semaphore s2=new Semaphore(1); 
    public static void main(String[] args) { BadLockTest test = new BadLockTest(); for(int i=0;i<50;i++){ test.test1.setCount(i); test.test2.setCount(i); test.executorService.execute(test.test1); test.executorService.execute(test.test2); } } class Task1 implements Runnable { public int count; public void setCount(int count){ this.count=count; } @Override public void run() { try { if(s2.tryAcquire(1, TimeUnit.SECONDS)){ System.out.println("task1得到obj1对象锁"+count); if(s1.tryAcquire(1, TimeUnit.SECONDS)){ System.out.println("task1得到obj2对象锁"+count); } } s2.release(); s1.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } class Task2 implements Runnable { public int count; public void setCount(int count){ this.count=count; } @Override public void run() { // synchronized (obj2) { try { if(s1.tryAcquire(1, TimeUnit.SECONDS)){ System.out.println("task2得到obj1对象锁"+count); if(s2.tryAcquire(1, TimeUnit.SECONDS)){ System.out.println("task2得到obj2对象锁"+count); } } s1.release(); s2.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }}

    结果:

    task1得到obj2对象锁49
    task2得到obj2对象锁49

    概述

    为了提高接口的响应速度,可以使用ThreadPoolExecutor + Runnable 或者ThreadPoolExecutor 并发调用 技术来并行执行task。但是ThreadPoolExecutor有个特点,就是当core线程不足以应付请求的时候,会将task加入到队列中。一旦使用队列,那么就可能出现队列爆掉或者队列导致的内存溢出问题。

    为了尽快提供接口响应速度,但是又不想使用队列特性的话。可以使用信号量来做到。

    Semaphore信号量管理着一组许可,在执行操作时需要首先获得许可,并在使用后释放许可。如果已经没有许可了, acquire方法将一直阻塞,直到有许可。

    信号量简单例子

    ThreadPoolExecutor中使用信号量

    在ThreadPoolExecutor中,我们在定义core线程参数的时候,比如定义为10个,那么使用信号量的时候,初始化参数也设置为10.

    Semaphore<Integer> sem= new Semaphore<>(10);

    ThreadPoolExecutor中,如果不想用到队列,就必须保证线程池中始终只有core线程在工作。那么当请求太多,core线程处理不过来的时候,用信号量进行阻塞,

    保证只有当core线程的某些线程执行完后,阻塞才解开。

    这里使用JAVA并发编程一书中的例子来说明信号量的基本用法。

    public class BoundedHashSet<T> {
        public static void main(String[] args) throws Exception {
            BoundedHashSet<String> set = new  BoundedHashSet<>(2);
            set.add("1");
            set.add("2");
            set.remove("2");
            set.add("3");
            System.out.println(JSON.toJSONString(set));
        }
        
        private final Set<T> tempSet;
        private final Semaphore sem;
        public BoundedHashSet(int size) {
            this.tempSet = Collections.synchronizedSet(new HashSet<T>());
            sem = new Semaphore(size);
        }
        
        public boolean add(T o) throws Exception {
            sem.acquire();
            boolean isAdd = false;
            try{
                isAdd = tempSet.add(o);
                return isAdd;
            }
            finally {
                if (!isAdd) {
                    sem.release();
                }
            }
        }
        
        public boolean remove(Object o) {
            boolean isRemoved = tempSet.remove(o);
            if (isRemoved) {
                sem.release();
            }
            return isRemoved;
        }
    }
    

    这里例子实现了有界阻塞的HashSet。只允许这个HashSet存放两个元素,如果想存第三个元素,必须等到有人把HashSet中的元素remove掉。

    每次add之前先申请一个许可,如果能申请到,则正常添加元素。申请不到,则acquire()方法会一直阻塞。remove操作里面,则有一个释放许可的操作。

     

    2.信号量介绍:

    原文地址  By Jakob Jenkov  翻译:寒桐  校对:方腾飞

    Semaphore(信号量) 是一个线程同步结构,用于在线程间传递信号,以避免出现信号丢失(译者注:下文会具体介绍),或者像锁一样用于保护一个关键区域。自从5.0开始,jdk在java.util.concurrent包里提供了Semaphore 的官方实现,因此大家不需要自己去实现Semaphore。但是还是很有必要去熟悉如何使用Semaphore及其背后的原理

    本文的涉及的主题如下:

    1. 简单的Semaphore实现
    2. 使用Semaphore来发出信号
    3. 可计数的Semaphore
    4. 有上限的Semaphore
    5. 把Semaphore当锁来使用

    一、简单的Semaphore实现

    下面是一个信号量的简单实现:

    public class Semaphore {
    private boolean signal = false;
    
    public synchronized void take() {
    this.signal = true;
    this.notify();
    }
    
    public synchronized void release() throws InterruptedException{
    while(!this.signal) wait();
    this.signal = false;
    
    }
    
    }

    Take方法发出一个被存放在Semaphore内部的信号,而Release方法则等待一个信号,当其接收到信号后,标记位signal被清空,然后该方法终止。

    使用这个semaphore可以避免错失某些信号通知。用take方法来代替notify,release方法来代替wait。如果某线程在调用release等待之前调用take方法,那么调用release方法的线程仍然知道take方法已经被某个线程调用过了,因为该Semaphore内部保存了take方法发出的信号。而wait和notify方法就没有这样的功能。

    当用semaphore来产生信号时,take和release这两个方法名看起来有点奇怪。这两个名字来源于后面把semaphore当做锁的例子,后面会详细介绍这个例子,在该例子中,take和release这两个名字会变得很合理。

    二、使用Semaphore来产生信号

    下面的例子中,两个线程通过Semaphore发出的信号来通知对方

    Semaphore semaphore = new Semaphore();
    SendingThread sender = new SendingThread(semaphore);
    ReceivingThread receiver = new ReceivingThread(semaphore);
    receiver.start();
    sender.start();
    public class SendingThread { Semaphore semaphore = null; public SendingThread(Semaphore semaphore){ this.semaphore = semaphore; } public void run(){ while(true){ //do something, then signal this.semaphore.take(); } } } public class RecevingThread { Semaphore semaphore = null;
    public ReceivingThread(Semaphore semaphore){ this.semaphore = semaphore; } public void run(){ while(true){ this.semaphore.release(); //receive signal, then do something... } } }

    三、可计数的Semaphore

    上面提到的Semaphore的简单实现并没有计算通过调用take方法所产生信号的数量。可以把它改造成具有计数功能的Semaphore。下面是一个可计数的Semaphore的简单实现。

    public class CountingSemaphore {
    private int signals = 0;
    public synchronized void take() { this.signals++; this.notify(); } public synchronized void release() throws InterruptedException{ while(this.signals == 0) wait(); this.signals--; } }

    四、有上限的Semaphore

    上面的CountingSemaphore并没有限制信号的数量。下面的代码将CountingSemaphore改造成一个信号数量有上限的BoundedSemaphore。

    public class BoundedSemaphore {
    private int signals = 0;
    private int bound   = 0;
    public BoundedSemaphore(int upperBound){
    this.bound = upperBound;
    }
    
    public synchronized void take() throws InterruptedException{
    while(this.signals == bound) wait();
    this.signals++;
    this.notify();
    }
    
    public synchronized void release() throws InterruptedException{
    while(this.signals == 0) wait();
    this.signals--;
    this.notify();
    }
    
    }

    在BoundedSemaphore中,当已经产生的信号数量达到了上限,take方法将阻塞新的信号产生请求,直到某个线程调用release方法后,被阻塞于take方法的线程才能传递自己的信号。

    五、把Semaphore当锁来使用

    当信号量的数量上限是1时,Semaphore可以被当做锁来使用。通过take和release方法来保护关键区域。请看下面的例子:

    BoundedSemaphore semaphore = new BoundedSemaphore(1);
    ...
    semaphore.take();
    try{
    //critical section
    } finally {
    semaphore.release();
    } 

    在前面的例子中,Semaphore被用来在多个线程之间传递信号,这种情况下,take和release分别被不同的线程调用。但是在锁这个例子中,take和release方法将被同一线程调用,因为只允许一个线程来获取信号(允许进入关键区域的信号),其它调用take方法获取信号的线程将被阻塞,知道第一个调用take方法的线程调用release方法来释放信号。对release方法的调用永远不会被阻塞,这是因为任何一个线程都是先调用take方法,然后再调用release。

    通过有上限的Semaphore可以限制进入某代码块的线程数量。设想一下,在上面的例子中,如果BoundedSemaphore 上限设为5将会发生什么?意味着允许5个线程同时访问关键区域,但是你必须保证,这个5个线程不会互相冲突。否则你的应用程序将不能正常运行。

    必须注意,release方法应当在finally块中被执行。这样可以保在关键区域的代码抛出异常的情况下,信号也一定会被释放。

    原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 信号量

    1.Java并发编程——信号量与互斥量

    信号量用于线程同步,互斥量用户保护资源的互斥访问。

    信号量与互斥量的区别

    • 互斥量用于线程的互斥,信号线用于线程的同步。
    • 互斥量值只能为0/1,信号量值可以为非负整数。信号量可以实现多个同类资源的多线程互斥和同步。
    • 互斥量的加锁和解锁必须由同一线程分别对应使用,信号量可以由一个线程释放,另一个线程得到。

    信号量Semaphore

    信号量是在多线程环境中,线程间传递信号的一种方式。

    简单的Semaphore实现

    public class Semaphore {
    private boolean signal = false;   //使用signal可以避免信号丢失
    public synchronized void take() {
        this.signal = true;
        this.notify();
    }
    public synchronized void release() throws InterruptedException{
        while(!this.signal) //使用while避免假唤醒
            wait();
        this.signal = false;
        }
    }
    

    使用场景

    Semaphore semaphore = new Semaphore();
    SendingThread sender = new SendingThread(semaphore);
    ReceivingThread receiver = new ReceivingThread(semaphore);
    receiver.start();
    sender.start();
    
    public class SendingThread {
        Semaphore semaphore = null;
        public SendingThread(Semaphore semaphore){
            this.semaphore = semaphore;
        }
        public void run(){
            while(true){
                //do something, then signal
                this.semaphore.take();
            }
        }
    }
    
    public class RecevingThread {
        Semaphore semaphore = null;
        public ReceivingThread(Semaphore semaphore){
            this.semaphore = semaphore;
        }
        public void run(){
            while(true){
            this.semaphore.release();
            //receive signal, then do something...
            }
        }
    }
    

    可计数的Semaphore

    上面提到的Semaphore的简单实现并没有计算通过调用take方法所产生信号的数量。可以把它改造成具有计数功能的Semaphore。

    public class CountingSemaphore {
        private int signals = 0;
        public synchronized void take() {
            this.signals++;
            this.notify();
        }
    public synchronized void release() throws InterruptedException{
        while(this.signals == 0) 
            wait();
        this.signals--;
        }
    }
    

    有上限的Semaphore

    可以将上面的CountingSemaphore改造成一个信号数量有上限的BoundedSemaphore

    public class BoundedSemaphore {
        private int signals = 0;
        private int bound   = 0;
        public BoundedSemaphore(int upperBound){
            this.bound = upperBound;
        }
        public synchronized void take() throws InterruptedException{
            while(this.signals == bound) 
                wait();
            this.signals++;
            this.notify();
        }
        public synchronized void release() throws InterruptedException{
            while(this.signals == 0) 
                wait();
            this.signals--;
            this.notify();
        }
    }
    

    在BoundedSemaphore中,当已经产生的信号数量达到了上限,take方法将阻塞新的信号产生请求,直到某个线程调用release方法后,被阻塞于take方法的线程才能传递自己的信号。

    Java内置的Semaphore

    java.util.concurrent包中有Semaphore的实现,可以设置参数,控制同时访问的个数。
    下面的Demo中申明了一个只有5个许可的Semaphore,而有20个线程要访问这个资源,通过acquire()和release()获取和释放访问许可。

    final Semaphore semp = new Semaphore(5);
    ExecutorService exec = Executors.newCachedThreadPool();
    for (int index = 0; index < 20; index++) {
        final int NO = index;
        Runnable run = new Runnable() {
            public void run() {
                try {
                    // 获取许可
                    semp.acquire();
                    System.out.println("Accessing: " + NO);
                    Thread.sleep((long) (Math.random() * 10000));
                    // 访问完后,释放
                    semp.release();
                    System.out.println("-----------------" + semp.availablePermits());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        exec.execute(run);
    }
    exec.shutdown();
    

    互斥量Mutex

    互斥量:提供对资源的独占访问,只能为0/1,如果某一个资源同时只能允许一个访问者对其访问,可以使用互斥量控制线程对其访问。

    互斥量实现:

    public class Mutex {
    private boolean isLocked = false;
    public synchronized void lock() {
        while(this.isLocked) //使用while可以避免线程 假唤醒
            wait();
        this.isLocked= true;
        }
    }
    public synchronized void unlock() throws InterruptedException{
        this.isLocked= false;
        this.notify();
        }
    }
    

    在Mutex中,我们添加了一个signal用于保存信号。

    将互斥量当作来使用:

    Mutex mutex = new Mutex();
    mutex.lock();
    ...
    //临界区
    mutex.unlock();
    

    互斥量的加锁和解锁必须由同一个线程分别对应使用。

    参考

  • 相关阅读:
    JS中的call、apply、bind 用法解疑
    style、currentStyle、getComputedStyle(不同浏览器获取css样式)区别介绍
    Ajax 整理总结(进阶)
    Ajax 整理总结(入门)
    js_面向对象编程
    李炎恢bootstarp_项目实战__瓢城企业(注释+源码)
    bootstrap学习笔记--bootstrap排版类的使用
    bootstrap学习笔记--bootstrap网格系统
    对mysql快速批量修改,查重
    MyISAM 和 InnoDB 的区别与优化
  • 原文地址:https://www.cnblogs.com/awkflf11/p/12637521.html
Copyright © 2011-2022 走看看