zoukankan      html  css  js  c++  java
  • java.util.concurrent.Semaphore 使用

    1. 概述

       Semaphore(信号)  并不存在真正的许可 只是维护一个计数器, 通常用来限定进入一些资源的线程数

     accquire()  方法获取许可 成功则计数器值-1 没有则阻塞直到一个可用的许可(即计数器>0)

       release() 方法 潜在的释放了申请人(通过给计数器值+1)

    2. 示例一(单独测试信号量增减  availabelPermits对超出数量的线程的阻塞)

    package com.rocky.semaphore;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    public class SemaphoreTest {
    
        public static void main(String[] args) {
            Semaphore semaphore = new Semaphore(5, false);//no fair 并行最大为5,阻塞后来的
            ExecutorService service = Executors.newCachedThreadPool();
            for(int i=0; i<10; i++){
                service.execute(new Worker(semaphore));
            }
            service.shutdown();
        }
    
    }
    
    class Worker implements Runnable{
    
        private Semaphore semaphore;
        Worker(Semaphore semaphore){
            this.semaphore = semaphore;
        }
        @Override
        public void run() {
            try {
                semaphore.acquire();//获取许可
                try {
                    System.out.println(Thread.currentThread().getName()+" accessing...");
                    Thread.sleep((long) (Math.random()*3000));
                } finally{
                    semaphore.release();//释放许可
                    System.out.println(Thread.currentThread().getName()+" leaving...");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }}

    说明:   默认是非公平的 进来一个线程获取许可, 则state减1,直到值为0 以下是源码片段

            final int nonfairTryAcquireShared(int acquires) {//acquires值为1
                for (;;) {
                    int available = getState();//当前state值
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))//remaining>0 则CAS修改state 成功获取许可
                        return remaining;
                }
            }
    
      //remainimg<0 返回后,执行下面方法
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);//创建共享型节点加入等待队列 队列为空则仿制头结点并建立联系
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);//再次尝试获取许可
                        if (r >= 0) {//成功获取许可
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        break;
                }
            } catch (RuntimeException ex) {
                cancelAcquire(node);
                throw ex;
            }
            // Arrive here only if interrupted
            cancelAcquire(node);
            throw new InterruptedException();
        }

    3. 示例二(一个生产者与一个消费者 两组信号量 此消彼长 为0阻塞)

    package com.rocky.semaphore;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    public class ProducerCustomRealizeWithSemaphore {
    
        public static void main(String[] args) {
            SemaphoreBuffer semaphoreBuffer = new SemaphoreBuffer();
            Producer producer = new Producer(semaphoreBuffer);
            Customer customer = new Customer(semaphoreBuffer);
            ExecutorService service = Executors.newCachedThreadPool();
            service.execute(customer);
            service.execute(producer);
            service.shutdown();
        }
        
    }
    
    class SemaphoreBuffer{
        List<Integer> list = new ArrayList<Integer>();
        Semaphore producerSemaphore = new Semaphore(1);// 允许并行的线程数为1
        Semaphore customerSemaphore = new Semaphore(0);// 0即state的初始值 则一开始消费就阻塞了 见上例说明中remaining<0
        
        public void put(int num){
            try {
                producerSemaphore.acquire();
                try {
                    list.add(num);
                } finally{
                    customerSemaphore.release();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        public int get(){
            try {
                customerSemaphore.acquire();
                try{
                    return list.remove(0);
                }finally{
                    producerSemaphore.release();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 0;
        }
        
    }
    
    class Customer implements Runnable{
        private SemaphoreBuffer buffer;
        Customer(SemaphoreBuffer buffer){
            this.buffer = buffer;
        }
        @Override
        public void run() {
            while(!Thread.interrupted()){
                int num = buffer.get();
                System.out.println("Customer get the num "+num);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
        }
    }
    
    class Producer implements Runnable{
        private SemaphoreBuffer buffer;
        Producer(SemaphoreBuffer buffer){
            this.buffer = buffer;
        }
        int c =0;
        @Override
        public void run() {
    
            while(!Thread.interrupted()){
                buffer.put(c);
                System.out.println("Producer put the num "+c);
                c++;
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        
    }

     4. 示例三(多个生产者与多个消费者 运用阻塞队列  线程安全 )

    package com.rocky.semaphore;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class ProducerCustomRealizeWithSemaphoreLinkedBlockingQueue {
    
        static AtomicInteger c = new AtomicInteger(1);
        public static void main(String[] args) {
            ExecutorService service = Executors.newCachedThreadPool();
            CakeStand stand = new CakeStand();
            service.execute(new CakeProducer(stand, "producer1", c));
            service.execute(new CakeProducer(stand, "producer2", c));
            service.execute(new CakeProducer(stand, "producer3", c));
            service.execute(new CakeCustomer(stand, "customer1"));
            service.execute(new CakeCustomer(stand, "customer2"));
        }
    }
    
    class Cake{
        private String name;
        Cake(String name){
            this.name = name;
        }
        public String toString(){
            return name;
        }
    }
    
    class CakeStand{
        BlockingQueue<Cake> queue = new LinkedBlockingQueue<Cake>(15);
        Semaphore notFull = new Semaphore(10);//生产信号量
        Semaphore notEmpty = new Semaphore(0);//消费信号量
        public void put(Cake cake){
            try {
                notFull.acquire();
                try{
                    queue.put(cake);
                }finally{
                    notEmpty.release();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        
        }
        
        public Cake take(){
            try {
                notEmpty.acquire();
                try{
                    Cake cake =  queue.take();
                    return cake;
                }finally{
                    notFull.release();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                
            }
            return null;
        }
    }
    
    class CakeProducer implements Runnable{
    
        private CakeStand stand;
        private String name;
        private AtomicInteger c;
        public CakeProducer(CakeStand stand, String name, AtomicInteger c) {
            this.stand = stand;
            this.name = name;
            this.c = c;
        }
        @Override
        public void run() {
            while(!Thread.interrupted()){
                String str = "cake-"+c.getAndIncrement();
                System.out.println("生产:"+name+"-"+str);
                stand.put(new Cake(str));
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
        }
    }
    class CakeCustomer implements Runnable{
    
        private CakeStand stand;
        private String name;
        public CakeCustomer(CakeStand stand, String name){
            this.stand = stand;
            this.name = name;
            
        }
        
        @Override
        public void run() {
            while(!Thread.interrupted()){
                Cake cake = stand.take();
                System.err.println("消费:"+name+"-"+cake.toString());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    说明1): 传统的消费者 生产者使用wait/notify模式等待和相互唤醒, Semaphore通过信号量的值控制运行(>0)和阻塞(<=0),

    两组信号量可以使两组角色彼此唤醒,使用阻塞队列可以确保线程安全。

    2) 可以额外创建一个信号量Semaphore mutex = new Semaphore(1); 在获取本组信号量之后再获取metex信号量可以实现互斥锁效果

  • 相关阅读:
    Kubernetes(k8s)中namespace的作用、反向代理访问k8s中的应用、k8s监控服务heapster
    Kubernetes(k8s)中dashboard的部署。
    Kubernetes(k8s)中Pod资源的健康检查
    Kubernetes(k8s)安装dns附件组件以及使用
    Kubernetes(k8s)的deployment资源
    Kubernetes(k8s)的Service资源
    Kubernetes(k8s)的RC(Replication Controller)副本控制器
    Kubernetes(k8s)常用资源的使用、Pod的常用操作
    简易图书管理系统(主要是jsp+servlet的练习),基于jsp+servlet的图书管理系统
    js 提取 sql 条件 表名 limit
  • 原文地址:https://www.cnblogs.com/rocky-fang/p/6768142.html
Copyright © 2011-2022 走看看