zoukankan      html  css  js  c++  java
  • JAVA-AbstractQueuedSynchronizer-AQS

     

    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    public class CountDownLatchExample1 {
        private final   static  int threadCount=200;
        public static void main(String[] args) throws  Exception{
            ExecutorService executorService= Executors.newCachedThreadPool();
            final CountDownLatch countDownLatch=new CountDownLatch(threadCount);
            for(int i=0;i<threadCount;i++){
                final int threadNum=i;
                executorService.execute(()->{
                    try{
                        test(threadNum);
                    }catch (Exception e){
                        log.error("exception",e);
                    }finally {
                        countDownLatch.countDown();
                    }
                });
            }
            //countDownLatch.await();  //保证前面的线程都执行完
            countDownLatch.await(10, TimeUnit.MILLISECONDS);  //在规定时间内执行     
            log.info("finish");
        }
        private static void test(int threadNum) throws Exception{
            Thread.sleep(100);
            log.info("{}",threadNum);
            Thread.sleep(100);
        }
    }
    

      

    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    @Slf4j
    public class SemaphoreExample1 {
        private final   static  int threadCount=20;
        public static void main(String[] args) throws  Exception{
            ExecutorService executorService= Executors.newCachedThreadPool();
            final Semaphore semaphore=new Semaphore(3);
            for(int i=0;i<threadCount;i++){
                final int threadNum=i;
                executorService.execute(()->{
                    try{
                        if(semaphore.tryAcquire()){  //可以指定执行的时间
                            test(threadNum);
                            semaphore.release();
                        }
                        //semaphore.acquire(3);//获取多个许可
                        //semaphore.acquire();//获取一个许可
                       // test(threadNum);
                       // semaphore.release();//释放一个许可
                        //semaphore.release(3);//释放多个许可
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                });
            }
    
            log.info("finish");
        }
        private static void test(int threadNum) throws Exception{
            log.info("{}",threadNum);
            Thread.sleep(100);
        }
    }
    

      

    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.*;
    
    @Slf4j
    public class CyclicBarrierExamle1 {
        //private static CyclicBarrier barrier=new CyclicBarrier(5);
        private static CyclicBarrier barrier=new CyclicBarrier(5,()->{
           log.info("callback is running");
        });
        public static void main(String[] args) throws Exception{
            ExecutorService executor= Executors.newCachedThreadPool();
            for(int i=0;i<10;i++){
                final int threadNum=i;
                Thread.sleep(1000);
                executor.execute(()->{
                    try {
                        race(threadNum);
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                });
            }
            executor.shutdown();
        }
        private static void race(int threadNum) throws Exception{
            Thread.sleep(100);
            log.info("{} is ready",threadNum);
            //barrier.await();
            try {
                barrier.await(2000, TimeUnit.MILLISECONDS);
            }catch (BrokenBarrierException |TimeoutException e){
                log.warn("BrokenBarrierException",e);
            }
    
            log.info("{}continue",threadNum);
        }
    }
    

      

    在读取中写入的方法:ReentrantReadWriteLock

    线程安全的写法。

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j
    public class LockExample1 {
        //请求总数
        public static  int clientTotal=5000;
        //同时并发执行的线程数
        public static int threadTotal=200;
        public static int count=0;
        private final static Lock lock=new ReentrantLock();
    
        public static void main(String[] args) throws Exception{
            ExecutorService executorService= Executors.newCachedThreadPool();
            final Semaphore semaphore=new Semaphore(threadTotal);
            final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
            for(int i=0;i<clientTotal;i++){
                final  int count=i;
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        add(count);
                        semaphore.release();
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size{}",count);
        }
        private static void add(int i){
             lock.lock();
             try {
                 count++;
             }finally {
                 lock.unlock();
             }
        }
    }
    

      

    import lombok.extern.slf4j.Slf4j;
    import java.util.Map;
    import java.util.Set;
    import java.util.TreeMap;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    @Slf4j
    public class LockExample2 {
        private final Map<String, Data> map=new TreeMap<>();
        private final ReentrantReadWriteLock lock=new ReentrantReadWriteLock();
        private final Lock readLock=lock.readLock();
        private final Lock writeLock=lock.writeLock();
        public Data get(String key){
          readLock.lock();
          try {
              return map.get(key);
          }finally {
              readLock.unlock();
          }
        }
        public Set<String> getAllKeys(){
           readLock.lock();
           try {
               return map.keySet();
           }finally {
               readLock.unlock();
           }
        }
        public Data put(String key, Data value){
            writeLock.lock();
            try {
                return map.put(key, value);
            }finally {
                readLock.unlock();
            }
        }
        class Data{
    
        }
    }
    

      

    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.locks.StampedLock;
    
    @Slf4j
    public class LockExample5 {
        //请求总数
        public static  int clientTotal=5000;
        //同时并发执行的线程数
        public static int threadTotal=200;
        public static int count=0;
        private final static StampedLock lock=new StampedLock();
    
        public static void main(String[] args) throws Exception{
            ExecutorService executorService= Executors.newCachedThreadPool();
            final Semaphore semaphore=new Semaphore(threadTotal);
            final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
            for(int i=0;i<clientTotal;i++){
                final  int count=i;
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        add(count);
                        semaphore.release();
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size{}",count);
        }
        private static void add(int i){
            long stamp= lock.writeLock();
            try {
                count++;
            }finally {
                lock.unlock(stamp);
            }
        }
    }
    

      

    import java.util.concurrent.locks.StampedLock;
    public class LockExample4 {
        class Point{
            private double x,y;
            private  final StampedLock sl=new StampedLock();
            void move(double deltaX,double deltaY){
                long stamp=sl.writeLock();
                try{
                    x+=deltaX;
                    y+=deltaY;
                }finally {
                    sl.unlockWrite(stamp);
                }
            }
    
            //乐观锁案例
            double distanceFromOrigin(){
                long stamp=sl.tryOptimisticRead();//获得一个乐观读锁
                double currentX=x,currentY=y;//将两个字段读入本地局部变量
                if(!sl.validate(stamp)){  //检查发出乐观锁后同时是否有其他锁发生
                    stamp=sl.readLock();//没有,再次获取一个读悲观锁
                    try{
                        currentX=x;  //将两个字段读入本地局部变量
                        currentY=y; //将两个字段读入本地局部变量
                    }finally {
                        sl.unlockRead(stamp);
                    }
                }
                return Math.sqrt(currentX*currentX+currentY*currentY);
            }
    
            //悲观锁读写案例
            void moveIfAtOrigin(double newX,double newY){
                long stamp=sl.readLock();
                try {
                    while (x==0.0&&y==0.0){ //循环,检查当前状态是否符合
                        long ws=sl.tryConvertToWriteLock(stamp);//将读锁转化为写锁
                        if(ws!=0L){  //确认转为写锁是否成功
                           stamp=ws;  //如果成功,替换票据
                           x=newX;   //进行状态改变
                           y=newY;   //进行状态改变
                            break;
                        }else {
                            sl.unlockRead(stamp);//如果不能成功转化为写锁
                            stamp=sl.writeLock();//显示直接进行写锁,然后通过循环再试
                        }
                    }
                }finally {
                    sl.unlock(stamp);//释放读锁和写锁
                }
            }
        }
    }
    

      

    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    @Slf4j
    public class LockExample6 {
        public static void main(String[] args) {
            ReentrantLock reentrantLock=new ReentrantLock();
            Condition condition=reentrantLock.newCondition();
            new Thread(()->{
                try {
                    reentrantLock.lock();
                    log.info("wait signal");
                    condition.await();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                log.info("get signal");
                reentrantLock.unlock();
            }).start();
    
            new Thread(()->{
                reentrantLock.lock();
                log.info("get lock");
                try {
                    Thread.sleep(300);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                condition.signalAll();
                log.info("send signal~");
                reentrantLock.unlock();
            }).start();
        }
    }
    

      

  • 相关阅读:
    http协议之状态码
    HTTP协议入门基础
    CI框架使用(一)
    memcache常见现象(一)雪崩现象
    memcached分布式一致性哈希算法
    编译php扩展
    memcached--delete--replace--set--get--incr--decr--stats
    memcached--add使用
    php5.3之命名空间
    MySQL优化(一)
  • 原文地址:https://www.cnblogs.com/sunliyuan/p/11259994.html
Copyright © 2011-2022 走看看