zoukankan      html  css  js  c++  java
  • 【JAVACONCURRENT】 利用SEMAPHORE 实现有界阻塞容器

    package concurrentTest;
    
    import java.util.Collections;
    import java.util.HashSet;
    import java.util.Set;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicInteger;
    //有界容器
    public class BoundedSet<T> {
    
        private final Set<T> set;
        private final Semaphore sem;
        
        public BoundedSet(int bound){
            this.set = Collections.synchronizedSet(new HashSet<T>());
            sem = new Semaphore(bound);
        }
        
        public Set<T> getSet() {
            return set;
        }
    
        public boolean add(T o) throws InterruptedException{
            
            sem.acquire();
            boolean wasAdded = false;
            try{
                wasAdded = set.add(o);
                return wasAdded;
            }finally{
                if(!wasAdded)
                    sem.release();
            }
        }
        
        public boolean remove(Object o){
            boolean wasRemoverd = set.remove(o);
            if(wasRemoverd)
                sem.release();
            return wasRemoverd;
        }
        
        class ThreadFactoryDemo implements ThreadFactory{
            
            private boolean isDeamon;
            private String threadName;
            private AtomicInteger inc = new AtomicInteger(0);
            
            public ThreadFactoryDemo(boolean isDeamon,String threadName){
                this.isDeamon = isDeamon;
                this.threadName = threadName;
            }
            
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(this.isDeamon);
                t.setName(this.threadName + inc.getAndIncrement());
                return t;
            }
        }
        
        class RunnableDemo implements Runnable{
            
            private  int i;
            private  BoundedSet<Integer> test;
            public RunnableDemo(int i,BoundedSet<Integer> test){
                this.i = i;
                this.test = test;
            }
            
            @Override
            public void run() {
                try {
                    test.add(i);
                    System.out.println("Set add something already!" + Thread.currentThread().getName() + " Now set content =" + test.getSet().toString());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        
        class RemoveRunnableDemo implements Runnable{
            
            private  int i;
            private  BoundedSet<Integer> test;
            public RemoveRunnableDemo(int i,BoundedSet<Integer> test){
                this.i = i;
                this.test = test;
            }
            
            @Override
            public void run() {
                
                while(!test.remove(i));
                System.out.println("Set remove something already! Now set content =" + test.getSet().toString());
            }
        }
        
        public static void main(String[] args) {
            
            int boundSize = 5;
            final int[] array = {0,1,2,3,4,5};
            final BoundedSet<Integer> test = new BoundedSet<Integer>(boundSize);
            ExecutorService executor = Executors.newFixedThreadPool(boundSize+1,test.new ThreadFactoryDemo(true,"semaphore-thread-"));
            for(int i=0;i<boundSize+1;i++){
                executor.execute(test.new RunnableDemo(array[i],test));
            }
            sleep(1000);
            Thread thread = new Thread(test.new RemoveRunnableDemo(array[0],test));
            thread.setName("remove task");
            thread.setDaemon(false);
            thread.start();
        }
        
        private static void sleep(int time){
            
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

     虽然测试打印语句会出现延迟导致输出错误,但实现是对的。 

  • 相关阅读:
    redis集群报Jedis does not support password protected Redis Cluster configurations异常解决办法
    redis集群密码设置
    Redis 3.2.4集群实战
    Redis3.2.4 Cluster集群搭建
    redis集群环境的搭建和错误分析
    Centos iptables防火墙关闭启动详解
    动态的表格数据
    ubuntu使用抓包工具,charles
    layer结合art实现弹出功能
    禅道开源版源码安装
  • 原文地址:https://www.cnblogs.com/lixusign/p/2469454.html
Copyright © 2011-2022 走看看