zoukankan      html  css  js  c++  java
  • AQS 同步组件学习(一)

    CountDownLatch 实例代码:

    package com.mmall.concurrency.example.aqs;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @Slf4j
    public class CountDownLatchExample1 {
    
        private final static int threadCount = 200;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(() -> {
                    try {
                        test(threadNum);
                    } catch (Exception e) {
                        log.error("exception", e);
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
            countDownLatch.await();
            log.info("finish");
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            Thread.sleep(100);
            log.info("{}", threadNum);
            Thread.sleep(100);
        }
    }

    semaphore : 控制并发访问的线程个数

     通过提供同步机制,来控制当前访问的线程个数

     tryacquire: 尝试获取可用资源,如果获取不到就丢弃

    package com.mmall.concurrency.example.aqs;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    public class SemaphoreExample3 {
    
        private final static int threadCount = 20;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final Semaphore semaphore = new Semaphore(3);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(() -> {
                    try {
                        if (semaphore.tryAcquire()) { // 尝试获取一个许可
                            test(threadNum);
                            semaphore.release(); // 释放一个许可
                        }
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            log.info("{}", threadNum);
            Thread.sleep(1000);
        }
    }

    cyclicBarrier: 使用场景和countDownLatch的使用场景很类似,但是可以循环的使用

    实现了多个线程之间的相互等待,知道所有的线程都执行完成之后,才进行下一步的操作

    package com.mmall.concurrency.example.aqs;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    public class CyclicBarrierExample2 {
    
        private static CyclicBarrier barrier = new CyclicBarrier(5);
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService executor = Executors.newCachedThreadPool();
    
            for (int i = 0; i < 10; i++) {
                final int threadNum = i;
                Thread.sleep(1000);
                executor.execute(() -> {
                    try {
                        race(threadNum);
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            executor.shutdown();
        }
    
        private static void race(int threadNum) throws Exception {
            Thread.sleep(1000);
            log.info("{} is ready", threadNum);
            try {
                barrier.await(2000, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                log.warn("BarrierException", e);
            }
            log.info("{} continue", threadNum);
        }
    }

    设置达到资源屏障时优先执行的方法:

    package com.mmall.concurrency.example.aqs;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @Slf4j
    public class CyclicBarrierExample3 {
    
        private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
            log.info("callback is running");
        });
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService executor = Executors.newCachedThreadPool();
    
            for (int i = 0; i < 10; i++) {
                final int threadNum = i;
                Thread.sleep(1000);
                executor.execute(() -> {
                    try {
                        race(threadNum);
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            executor.shutdown();
        }
    
        private static void race(int threadNum) throws Exception {
            Thread.sleep(1000);
            log.info("{} is ready", threadNum);
            barrier.await();
            log.info("{} continue", threadNum);
        }
    }
  • 相关阅读:
    idea常用快捷键及操作
    Ubuntu 装nexus
    ubuntu安装gitlab
    ubuntu安装jdk,maven,tomcat
    ubuntu安装gitlab-ci-runner、注册
    ubuntu开启远程shell,开启上传下载
    Ubuntu安装软件提示boot空间不足
    POJ3461 KMP简单变形输出模式串在主串出现的次数
    涨姿势stl map['a']['a']=b;
    对链表的操作(数据结构线性表算法设计练习)
  • 原文地址:https://www.cnblogs.com/wcgstudy/p/11509780.html
Copyright © 2011-2022 走看看