zoukankan      html  css  js  c++  java
  • 多线程——工具类之Semaphore

    一、Semaphore功能介绍

    Semaphore类相当于线程计数器,在获取Semaphore对象时设定可以产生的线程总数(线程并不是Semaphore类生成的,它只是统计线程的数量),创建Semaphore类对象如下方法所示:

    //创建一个Semaphore对象,Sync sync对象赋值为NonfairSync对象
    Semaphore sp = new Semaphore(1);
    
    
    //创建一个Semaphore对象,Sync sync对象赋值为FairSync对象
    Semaphore sp = new Semaphore(1,true);

    在创建线程以前调用Semaphore类的acquire()方法来判断是否还可以创建线程,acquire()方法每调用一次当前可创建的线程总数减一,并且这个方法是一个阻塞式的方法,如果当前线程数量已经达到上限线程会被阻塞,当满足创建线程的条件时程序就会继续,在线程运行结束以后调用Semaphore类release()方法来释放占用的可创建线程的数量。

    结论:Semaphore类可以控制并发情况下创建的线程总数

    二、Semaphore类方法分解

    如下是Semaphore类的构造方法:

        public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    
        public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }

    如下是对NonfairSync类和FairSync类的源码,从代码看似乎两个类对tryAcquireShared(int acquires)方法的实现完全不同,其实它们的实现基本相同,NonfairSync类调用的父类的nonfairTryAcquireShared(acquires)方法,此方法的实现如下所示,对比来看区别在于FairSync类在方法入口调用了hasQueuedPredecessors()方法添加了if判断,hasQueuedPredecessors代码如下所示。

     /**
         * NonFair version
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -2694183684443567898L;
    
            NonfairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
        }
    
        /**
         * Fair version
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = 2014338818796000944L;
    
            FairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                for (;;) {
                    if (hasQueuedPredecessors())
                        return -1;
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
        }
    abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1192457210091910933L;
    
            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
        }
    public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }

    1、acquire()/acquire(int)方法介绍

    如下所示,acquire()方法调用的是父类的acquireSharedInterruptibly(int arg)方法,这个方法调用子类的tryAcquireShared(int arg)如果没有线程数达到上限时则执行doAcquireSharedInterruptibly(arg),如下所示这个方法里面有一个死循环,当可创建的线程数量满足参数arg时,跳出死循环,创建线程的代码继续。

    结论:acquire()是一个阻塞式的方法,从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者当前线程中断时抛出InterruptedException异常,中断阻塞。

        public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

     2、acquireUninterruptibly()/acquireUninterruptibly()方法介绍

    这两个方法和acquire()的两个方法基本是一样的,唯一不同是,这两个调用的方法acquireShared(int)没有了当前线程是否中断的if判断并且当前这个方法不抛InterruptedException异常,所以在当前线程被中断时当前阻塞的方法不会中断。

    结论:acquireUninterruptibly是一个阻塞式的方法,从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。

    public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }

    三、样例演示 

    如下代码是一个简单的样例,运行下面代码,从打印信息的顺序就可以验证获取信号量的方法是一个阻塞时的,其它方法的功能验证,网友自己完成吧!

    public class ThreadTest {
    
        public static void main(String[] args) throws Exception {
            semaphoreTest();
        }
    
        public static void semaphoreTest() throws InterruptedException {
            final Semaphore semaphore = new Semaphore(1);
            System.out.println("1");
            semaphore.acquire();
            Thread t1 = new Thread() {
                @Override
                public void run() {
                    try {
                        sleep(3000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("释放");
                    semaphore.release();
                }
            };
            t1.start();
            semaphore.acquire();
            System.out.println("2");
        }
    }
  • 相关阅读:
    CalISBN.java
    Int2BinaryString.java
    PrintNumber.java
    AllSame.java
    第一个程序
    将博客搬至CSDN
    sqoop 1.4.4-cdh5.1.2快速入门
    hadoop的关键进程
    HIVE快速入门
    公钥,私钥和数字签名这样最好理解
  • 原文地址:https://www.cnblogs.com/cymiao/p/8405530.html
Copyright © 2011-2022 走看看