zoukankan      html  css  js  c++  java
  • Java并发编程-Semaphore

      基于AQS的前世今生,来学习并发工具类Semaphore。本文将从Semaphore的应用场景、源码原理解析来学习这个并发工具类。

    1、 应用场景

      Semaphore用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。还可以用来实现某种资源池限制,或者对容器施加边界。

    1.1   当成锁使用

      控制同时访问某个特定资源的操作数量,代码如下:

    public class SemaphoreLock {
        public static void main(String[] args) {
            //1、信号量为1时 相当于普通的锁  信号量大于1时 共享锁
            Output o = new Output();
            for (int i = 0; i < 5; i++) {
                new Thread(() -> o.output()).start();
            }
        }
    }
    class Output {
        Semaphore semaphore = new Semaphore(1);
    
        public void output() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + " start at " + System.currentTimeMillis());
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " stop at " + System.currentTimeMillis());
            }catch(Exception e) {
                e.printStackTrace();
            }finally {
                semaphore.release();
            }
        }
    }

    1.2   线程通信信号

      线程间通信,代码如下:

    public class SemaphoreCommunication {
        public static void main(String[] args) {
            //2、线程间进行通信
            Semaphore semaphore = new Semaphore(1);
            new SendingThread(semaphore,"SendingThread");
            new ReceivingThread(semaphore,"ReceivingThread");
        }
    }
    class SendingThread extends Thread {
        Semaphore semaphore;
        String name;
    
        public SendingThread(Semaphore semaphore,String name) {
            this.semaphore = semaphore;
            this.name = name;
            new Thread(this).start();
        }
    
        public void run() {
            try {
                semaphore.acquire();
                for (int i = 0; i < 5; i++) {
                    System.out.println(name + ":" + i);
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            semaphore.release();
        }
    }
    
    class ReceivingThread extends Thread {
        Semaphore semaphore;
        String name;
    
        public ReceivingThread(Semaphore semaphore,String name) {
            this.semaphore = semaphore;
            this.name = name;
            new Thread(this).start();
        }
    
        public void run() {
            try {
                semaphore.acquire();
                for (int i = 0; i < 5; i++) {
                    System.out.println(name + ":" + i);
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            semaphore.release();
        }
    }

    1.3   资源池限制

      对资源池进行资源限制,代码如下:

    public class SemaphoreConnect {
        public static void main(String[] args) throws Exception {
            //3、模拟连接池数量限制
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = 0; i < 200; i++) {
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        Connection.getInstance().connect();
                    }
                });
            }
            executorService.shutdown();
            executorService.awaitTermination(1, TimeUnit.DAYS);
        }
    }
    class Connection {
        private static Connection instance = new Connection();
        private Semaphore semaphores = new Semaphore(10,true);
        private int connections = 0;
    
        private Connection() {
        }
    
        public static Connection getInstance() {
            return instance;
        }
    
        public void connect() {
            try {
                semaphores.acquire();
                doConnect();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                semaphores.release();
            }
        }
    
        private void doConnect() {
            synchronized (this) {
                connections ++;
                System.out.println("current get connections is : " + connections);
            }
    
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            synchronized (this) {
                connections --;
                System.out.println("after release current  connections is : " + connections);
            }
        }
    }

    1.4  容器边界限制

      对容器进行边界限制,代码如下:

    public class SemaphoreBoundedList {
        public static void main(String[] args) {
            //4、容器边界限制
            final BoundedList ba = new BoundedList(5);
            Runnable runnable1 = new Runnable() {
                    public void run() {
                        try {
                            ba.add("John");
                            ba.add("Martin");
                            ba.add("Adam");
                            ba.add("Prince");
                            ba.add("Tod");
                            System.out.println("Available Permits : " + ba.getSemaphore().availablePermits());
                            ba.add("Tony");
                            System.out.println("Final list: " + ba.getArrayList());
                        }catch (InterruptedException ie) {
                            Thread.interrupted();
                        }
                    }
            };
            Runnable runnable2 = new Runnable() {
                public void run() {
                    try {
                        System.out.println("Before removing elements: "+ ba.getArrayList());
                        Thread.sleep(5000);
                        ba.remove("Martin");
                        ba.remove("Adam");
                    }catch (InterruptedException ie) {
                        Thread.interrupted();
                    }
                }
            };
            Thread thread1 = new Thread(runnable1);
            Thread thread2 = new Thread(runnable2);
            thread1.start();
            thread2.start();
        }
    }
    class BoundedList<T> {
        private final Semaphore semaphore;
        private List arrayList;
    
        BoundedList(int limit) {
            this.arrayList = Collections.synchronizedList(new ArrayList());
            this.semaphore = new Semaphore(limit);
        }
    
    
        public boolean add(T t) throws InterruptedException {
            boolean added = false;
            semaphore.acquire();
            try {
                added = arrayList.add(t);
                return added;
            } finally {
                if (!added)
                    semaphore.release();
            }
    
        }
    
    
        public boolean remove(T t) {
            boolean wasRemoved = arrayList.remove(t);
            if (wasRemoved)
                semaphore.release();
            return wasRemoved;
        }
    
        public void remove(int index) {
            arrayList.remove(index);
            semaphore.release();
        }
    
        public List getArrayList() {
            return arrayList;
        }
    
    
        public Semaphore getSemaphore() {
            return semaphore;
        }
    }

    2、 源码原理解析

    2.1 获取信号

      获取信号的方法如下:

    public void acquire() throws InterruptedException {
       sync.acquireSharedInterruptibly(1);//共享式获取AQS的同步状态
    }

      调用的是AQS的acquireSharedInterruptibly方法:

    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())//线程中断 说明信号量对线程中断敏感
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0) //获取信号量失败 线程进入同步队列自旋等待
                doAcquireSharedInterruptibly(arg);
        }

      其中tryAcquireShared依赖的是Sync的实现,Sync提供了公平和非公平式的方式,先看非公平式。

    protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
    final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();//同步状态 当前的信号量许可数
                    int remaining = available - acquires;//减去释放的信号量 剩余信号量许可数
                    if (remaining < 0 ||//剩余信号量小于0 直接返回remaining 不做CAS
                        compareAndSetState(available, remaining))//CAS更新
                        return remaining;
                }
            }

      再看下公平式的。

    protected int tryAcquireShared(int acquires) {
                for (;;) {
                    if (hasQueuedPredecessors())//判断同步队列如果存在前置节点 获取信号量失败  其他和非公平式是一致的
                        return -1;
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }

      最后来看下,如果未获取到信号量的处理方法doAcquireSharedInterruptibly。

    private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);//线程进入同步队列
            boolean failed = true;
            try {
                for (;;) {//自旋
                    final Node p = node.predecessor();
                    if (p == head) {//当前节点的前置节点是AQS的头节点 即自己是AQS同步队列的第一个节点
                        int r = tryAcquireShared(arg); //再去获取信号量
                        if (r >= 0) {//获取成功
                            setHeadAndPropagate(node, r);//退出自旋
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node); //获取失败 就取消获取
            }
        }

    2.2 释放信号

      释放信号的方法如下:

    public void release() {
            sync.releaseShared(1);
        }

      调用的是AQS的releaseShared方法:

    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {//释放信号量
                doReleaseShared();//唤醒后续的线程节点
                return true;
            }
            return false;
    }

      tryReleaseShared交由子类Sync实现,代码如下:

    protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();//当前信号量许可数
                    int next = current + releases; //当前信号量许可数+释放的信号量许可数
                    if (next < current) // overflow 这个分支我看着永远走不进来呢
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))//CAS更新当前信号量许可数
                        return true;
                }
            }

      释放成功后,则继续调用doReleaseShared,唤醒后续线程节点可以来争取信号量了。

    private void doReleaseShared() {
            for (;;) {
                Node h = head; //头节点
                if (h != null && h != tail) {//同步队列中存在线程等待
                    int ws = h.waitStatus; //头节点线程状态
                    if (ws == Node.SIGNAL) {//头节点线程状态为SIGNAL 唤醒后续线程节点
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h); //唤醒下个节点
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }

      总结:Semaphore使用AQS同步状态来保存信号量的当前计数。它里面定义的acquireSharedInterruptibly方法会减少计数,当计数为非正值时阻塞线程,releaseShared方法会增加计数,在计数不超过信号量限制时要解除线程的阻塞。

    参考资料:

    https://github.com/lingjiango/ConcurrentProgramPractice

    https://www.caveofprogramming.com/java-multithreading/java-multithreading-semaphores-part-12.html

    https://java2blog.com/java-semaphore-example/

    http://tutorials.jenkov.com/java-util-concurrent/semaphore.html

  • 相关阅读:
    与客服聊天功能测试点
    京东优惠券如何测试
    Linux笔试题
    线程与线程池原理
    PyCharm 介绍、安装、入门使用
    银行APP测试用户体验性方面
    python的闭包
    列表解析2
    深入函数
    再谈装饰器@@@
  • 原文地址:https://www.cnblogs.com/iou123lg/p/9689491.html
Copyright © 2011-2022 走看看