zoukankan      html  css  js  c++  java
  • Semaphore信号量深度解析

    1. 使用指南

    package com.multthread;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    public class SemaphoreTest {
        //由于信号量是计数器递增,初始值可以随便设置
        static volatile Semaphore sh = new Semaphore(2);
    
        public static void main(String[] args) throws InterruptedException {
    
            ExecutorService es = Executors.newFixedThreadPool(2);
            // 将任务A加入线程池
            es.submit(()->{
                try {
                    System.out.println("t1...");
                    sh.release();
                }catch (Exception e){}
            });
            // 将任务B加入线程池
            es.submit(()->{
                try {
                    System.out.println("t2...");
                    sh.release();
                }catch (Exception e){}
            });
    
            // 等待子线程执行完release方法返回,注意这里release可以是同一个线程执行,只要调用了两次就行
            // 此函数入参=当初始信号计数+调用次数时,才会放行,同时将计数器state重置为0
            sh.acquire(4);
            
            // 将任务C加入到线程池
            es.submit(()->{
                try {
                    Thread.sleep(100);
                    System.out.println("t1...");
                    sh.release();
                }catch (Exception e){}
            });
    
            // 将任务D加入到线程池
            es.submit(()->{
                try {
                    System.out.println("t2...");
                    sh.release();
                }catch (Exception e){}
            });
            //由于state被重置为0了,所有所以这里入参写调用次数
            sh.acquire(2);
            System.out.println("main.....");
            es.shutdown();
        }
    }

    2.

      基于AQS实现,与CountDownLatch不同的是,Semaphore内部的计数器是递增的。初始化的时候可以执行一个计数器的值,但是需要在需要同步的地方调用acquire方法执行需要同步的线程数。并且,内部的AQS实现(sync)获取信号量有公平策略和非公平策略之分。

    3. 源码分析

    • 构造函数
    // 构造函数,默认采用非公平策略
    public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    
    public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }
    • release函数
    public void release() {
            sync.releaseShared(1);
    }
    
    public final boolean releaseShared(int arg) {
         // 尝试释放资源
    if (tryReleaseShared(arg)) {
           // 资源释放成功则调用park方法唤醒aqs队列里面最先挂起的线程 doReleaseShared();
    return true; } return false; } protected final boolean tryReleaseShared(int releases) {
           // CAS循环修改state值,直到修改成功
    for (;;) {
              // 获取当前的信号量值
    int current = getState();
              // 信号量值加releases,即+1
    int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded");
              // 使用CAS更新state的值 if (compareAndSetState(current, next)) return true; } } // 释放资源完毕,调用唤醒挂起线程 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
    • acquire方法
    public void acquire(int permits) throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireSharedInterruptibly(permits);
    }
    
    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
         // 尝试获取
            if (tryAcquireShared(arg) < 0)
           // 如果获取失败则加入到阻塞队列,然后再次尝试,如果失败则调用park方法挂起当前线程 doAcquireSharedInterruptibly(arg); }
    protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
  • 相关阅读:
    paramiko模块
    JMeter 在beanshell中进行时间戳加减
    订阅号服务开发-05实战总结
    订阅号服务开发-04主动消息推送
    订阅号服务开发02-素材资源管理
    订阅号服务开发03-被动消息应答
    订阅号服务开发01-搭建开发环境
    Docker11-实战-部署多套环境
    Docker10-实战-构建Java Web运行环境
    Docker09-实战-快速搭建wordpress
  • 原文地址:https://www.cnblogs.com/zjting/p/12830629.html
Copyright © 2011-2022 走看看