zoukankan      html  css  js  c++  java
  • JAVA线程&&JUC

    线程

    一、线程创建的3种方式

    1、继承Thread 类
    public class Thread implements Runnable
    

    重写run方法即可

    public class ThreadTest extends Thread {
        @Override
        public void run() {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("继承Thread方法实现多线程");
        }
    }
    
    //调用
        ThreadTest th1=new ThreadTest();
        th1.start();
    
    2、实现Runnable接口

    本质上和继承Thread类一样,因为Thread类也实现了Rannable接口

    public class ThreadTest1 implements Runnable{
        @Override
        public void run() {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("实现Runable的线程开启方式");
        }
    }
    
    3、实现Callable接口并返回值
    public class ThreadTest2 implements Callable<String> {
        @Override
        public String call() throws Exception {
            Thread.sleep(2000);
            return "hello";
        }
    }
    
    //调用
    Callable<String> callable = new ThreadTest2();
    FutureTask<String> task=new FutureTask<>(callable);
    Thread th4 = new Thread(task);
    th4.start();
    //task.get() 获取返回值
    //这个get方法可能会产生阻塞
    System.out.println("从子线程中获取:"+task.get());
    

    分析:FutureTask 实现RunnableFuture,RunnableFuture继承了Runnable接口

    public class FutureTask<V> implements RunnableFuture<V>
    
    public interface RunnableFuture<V> extends Runnable, Future<V>
    

    二、线程池

    //ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
    //ExecutorService threadPool =Executors.newFixedThreadPool(5);//创建固定的线程池大小
    //ExecutorService threadPool =Executors.newCachedThreadPool();//可伸缩的线程池
    
    /**
     * 最大线程该如何定义
     *
     * 1. CPU密集型,几核CPU就定义为几,可以保证CPU效率最高 Runtime.getRuntime().availableProcessors()
     * 2. IO 密集型 判断你程序中十分消耗IO的线程数
     */
    System.out.println("CPU核心数:"+Runtime.getRuntime().availableProcessors());
    //自定义线程池  工作中,使用ThreadPoolExecutor 自己创建
    ExecutorService threadPool=new ThreadPoolExecutor(2,//核心线程池大小
            5,//最大线程池大小
            3,//非核心线程闲置存活时间
            TimeUnit.SECONDS,//超时单位
            new LinkedBlockingQueue<>(5),//阻塞队列
            Executors.defaultThreadFactory(),//线程工厂,创建线程,一般不修改
            new ThreadPoolExecutor.AbortPolicy()//拒绝策略:对新来的线程不做处理,抛出异常
    );
    //阻塞队列存放个数+最大线程池个数=同时允许使用线程池的个数
    for(int i=0;i<10;i++){
        threadPool.execute(()->{
            System.out.println(Thread.currentThread().getName());
        });
    }
    //线程池使用完,程序结束,要关闭线程池
    threadPool.shutdown();
    
    拒绝策略
    AbortPolicy()//对新来的线程不做处理,抛出异常
    CallerRunsPolicy()//哪来的回哪里,用线程池的线程处理
    DiscardPolicy()//队列满了,丢弃当前任务,不会抛出异常
    DiscardOldestPolicay()//队列满了,把队列首个poll丢出,自己加入队列,不抛出异常 e.getQueue().poll();
    

    并发

    1、synchronized

    非静态方法上

    ​ 锁的对象是 该方法的类的实例

    静态方法上

    ​ 锁的对象是类的字节码 如:Person.class

    同步代码块

    ​ 锁的对象是自己定义的

    2、ReentrantLock

    创建锁
    Lock lock = new ReentrantLock();
    
    加锁
    lock.lock();
    
    解锁
    lock.unlock();
    

    3、CAS

    java.util.concurrent.atomic
    //CAS  compareAndSet() 比较并交换
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(2020);
    
        //如果我期望的值达到了,就更新
        atomicInteger.compareAndSet(2020,2021);
    
        System.out.println(atomicInteger.get());
    }
    

    线程之间的通信

    等待唤醒 synchronized

    public synchronized void incrnumber() throws InterruptedException {
        while (number!=0){//不要用if判断,应该用while防止虚假唤醒
            //等待
           this.wait();
        }
        number ++;
        System.out.println(Thread.currentThread().getName()+"number="+number);
        //通知其他线程
        this.notifyAll();
    }
    

    等待唤醒 ReentrantLock

    	private int number=0;
        final Lock lock=new ReentrantLock();
        final Condition conditionA = lock.newCondition();
        final Condition conditionB = lock.newCondition();
        //condition.wait();//等待
        //condition.signalAll();//唤醒全部
    
        //+1
        public  void incrnumber() throws InterruptedException {
                //1、加锁
                lock.lock();
    
                try{
                    while (number!=0){//不要用if判断,应该用while防止虚假唤醒
                        // A 等待
                        conditionA.await();
                    }
                    number ++;
                    System.out.println(Thread.currentThread().getName()+" number="+number);
                    //唤醒 B
                    conditionB.signal();
                }catch (Exception e){}finally {
                    //解锁
                    lock.unlock();
                }
        }
    
        //-1
        public  void decrnumber() throws InterruptedException {
            //加锁
                lock.lock();
                try{
                    while(number==0){
                        //B 等待
                        conditionB.await();
                    }
                    number-- ;
                    System.out.println(Thread.currentThread().getName()+" number="+number);
                    //唤醒 A
                    conditionA.signal();
                }catch (Exception e){}
                finally {
                    //解锁
                    lock.unlock();
                }
        }
    

    减法计数器CountDownLatch

    CountDownLatch countDownLatch = new CountDownLatch(6);
    
    for (int i=0;i<6;i++){
        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"---go---out");
            //数量 减1
            countDownLatch.countDown();
        },String.valueOf(i)).start();
    }
    
    //等待计数器归零 再向下执行
    countDownLatch.await();
    
    System.out.println("执行 完成 6");
    

    加法计数器CyclicBarrier

    /**
     * 集齐7颗龙珠召唤神龙
     */
    
    //计数,线程
    //如果收集不齐,会阻塞
    CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
        System.out.println("召唤神龙成功");
    });
    
    for(int i=0;i<7;i++){
        new Thread(()->{
            //这里面如果要拿外面的变量,需要拿 final 类型变量
            System.out.println(Thread.currentThread().getName()+":收集龙珠");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }).start();
    }
    

    读写锁ReadWriteLock

    /**
     * 独占锁:写锁 一次只能被一个线程占有
     * 共享锁:读锁 可以被多个线程拥有
     *
     * 读-读:共存
     * 读-写:不能共存
     * 写-写:不能共存
     */
    class MyCacheLock{
        private volatile Map<String,Object> map=new HashMap<>();
        //读写锁
        private ReadWriteLock readWriteLock =  new ReentrantReadWriteLock();
    
        //存 写入的时候只希望同时只有一个线程 写
        public void put(String key,Object value){
            //写锁
            readWriteLock.writeLock().lock();
    
            try {
                System.out.println(Thread.currentThread().getName()+"写入");
                map.put(key,value);
                System.out.println(Thread.currentThread().getName()+"写入OK");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //释放锁
                readWriteLock.writeLock().unlock();
            }
        }
    
        //取 所有人都可以读
        public void get(String key){
            //读锁
            readWriteLock.readLock().lock();
            try {
                map.get(key);
                System.out.println(Thread.currentThread().getName()+"读取");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //释放锁
                readWriteLock.readLock().unlock();
            }
    
        }
    }
    

    信号量Semaphore

    Semaphore也是一个线程同步的辅助类,可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数。

    Semaphore的主要方法摘要:

      void acquire():从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。

      void release():释放一个许可,将其返回给信号量。

      int availablePermits():返回此信号量中当前可用的许可数。

      boolean hasQueuedThreads():查询是否有线程正在等待获取。

    /**
     * 抢车位
     * 如果已经满了,一直等待直到释放为止
     */
    //作用:多个共享资源互斥的使用  并发限流,控制最大线程数
    public static void main(String[] args) {
        //线程数量  有3个停车位
        final Semaphore semaphore = new Semaphore(3);
        for (int i=0;i<6;i++){
            new Thread(()->{
                //得到/获取  semaphore.acquire();
    
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"抢到车位");
                    //停两秒钟
                    TimeUnit.SECONDS.sleep(2);
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    //离开车位
                    semaphore.release();
                }
                //释放 semaphore.release();
    
            },String.valueOf("线程"+i)).start();
        }
    }
    

    并发集合

    1、List

     //并发下arraylist是不安全的
    // List<String> list = new ArrayList<>();
    
     //1、解决方案 1
     //List<String> list = new Vector<>();
    
     //解决方案 2
    // List<String> list =Collections.synchronizedList(new ArrayList<>());
    
     //解决方案 3 写入时复制COW 计算机程序设计领域的一种优化策略
     //CopyOnWriteArrayList 比Vector好,Vector有syncthronized,效率会降低
     List<String> list=new CopyOnWriteArrayList<>();
     for(int i=0;i<10;i++){
         new Thread(()->{
             list.add(UUID.randomUUID().toString().substring(0,5));
             System.out.println(list);
         },String.valueOf(i)).start();
     }
    

    2、Set

    //Set集合在多线程下 也是不安全的
    // Set<String> set = new HashSet<>();
    
     //1解决方案
     Set<String> set= Collections.synchronizedSet(new HashSet<>());
    
     //2解决方案
    // Set<String> set=new CopyOnWriteArraySet<>();
    
     for(int i=0;i<30;i++){
         new Thread(()->{
             set.add(UUID.randomUUID().toString().substring(0,5));
             System.out.println(set);
         },String.valueOf(i)).start();
     }
    

    3、Map

    //Map集合在多线程下 也是不安全的
    Map<String,String> map = new HashMap<>();
    
    //1解决方案
    //Map<String,String> map= Collections.synchronizedMap(new HashMap<>());
    
    //2解决方案
    //Map<String,String> map=new ConcurrentHashMap<>();
    
    for(int i=0;i<30;i++){
        new Thread(()->{
            map.put(UUID.randomUUID().toString().substring(0,5),"AAAA");
            System.out.println(map);
        },String.valueOf(i)).start();
    }
    

    BlockingQueue

    ArrayBlockingQueue

    public static void main(String[] args) {
        //collection
        //List
        //Set
        //BlockingQueue 不是新的东西
        /**
         * 什么情况下使用 阻塞队列
         * 多线程:线程池,多线程并发处理 A 调用 B
         *
         */
    }
    
    //抛出异常
    @Test
    public void test1(){
        //要设置队列的大小
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(arrayBlockingQueue.add("a"));
        System.out.println(arrayBlockingQueue.add("b"));
        System.out.println(arrayBlockingQueue.add("c"));
    
        //java.lang.IllegalStateException: Queue full
        //超过大小会抛出异常
        //System.out.println(arrayBlockingQueue.add("c"));
    
    
        //移除元素并返回 移除的元素
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
    
        //java.util.NoSuchElementException
        //没有元素后移除会抛出异常
        //System.out.println(arrayBlockingQueue.remove());
    
    }
    
    /**
     * 不抛出异常
     */
    @Test
    public  void test2(){
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
    
        //element()监测 队列首个元素,要抛出异常
        //arrayBlockingQueue.element();
    
        //peek监测 队列首个元素 没有返回null 不抛出异常
        //System.out.println(arrayBlockingQueue.peek());
    
        //存值 offer
        System.out.println(arrayBlockingQueue.offer("a"));
        System.out.println(arrayBlockingQueue.offer("b"));
        System.out.println(arrayBlockingQueue.offer("c"));
       //满了  不抛出异常,返回 Boolean 值
        System.out.println(arrayBlockingQueue.offer("d"));
    
        // element()  返回队列首个元素值,不做移除
        System.out.println(arrayBlockingQueue.element());
        System.out.println(arrayBlockingQueue.peek());
    
        //移除 poll 返回移除的值
        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll());
    
        //若移除空了 没有值返回null
        System.out.println(arrayBlockingQueue.poll());
    }
    
    /**
     * 等待,阻塞(一直阻塞)
     */
    public void test3() throws InterruptedException {
        ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
    
        //存值 put
        arrayBlockingQueue.put("a");
        arrayBlockingQueue.put("b");
        arrayBlockingQueue.put("c");
        //队列没有位置了,会一直等待 阻塞
        //arrayBlockingQueue.put("d");
    
        //取值 take()
        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());
        //如果没有元素 会一直阻塞
        System.out.println(arrayBlockingQueue.take());
    
    }
    
    /**
     * 等待,阻塞(等待超时)
     */
    @Test
    public void test4() throws InterruptedException {
        ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
    
        //存 offer
        arrayBlockingQueue.offer("a",2, TimeUnit.SECONDS);
        arrayBlockingQueue.offer("a",2, TimeUnit.SECONDS);
        arrayBlockingQueue.offer("a",2, TimeUnit.SECONDS);
        //队列满了等待2秒 等待超过两秒就退出
        arrayBlockingQueue.offer("a",2, TimeUnit.SECONDS);
    
        //取 poll
        arrayBlockingQueue.poll(2,TimeUnit.SECONDS);
        arrayBlockingQueue.poll(2,TimeUnit.SECONDS);
        arrayBlockingQueue.poll(2,TimeUnit.SECONDS);
    
        //等待超过两秒 队列中还没有值就 超时退出
        arrayBlockingQueue.poll(2,TimeUnit.SECONDS);
    }
    
    总结:

    1、add() remove() 添加 移除,当添加时超过大小,移除时没有元素时抛出异常

    2、offer() poll() 存值、取值,存值时空间不够返回false,取值时没有时返回Null

    3、put() take() 存值、取值 ,存值时空间不够会阻塞,取值时没有元素会一直阻塞

    4、offer() pool(),设置阻塞等待的时间,超过时间没有存值或取值就放弃等待退出

    SynchronousQueue

    /**
     * 同步队列
     * 和其他的BlockingQueue 不一样 ,SynchronousQueue不存元素
     * put 了一个元素,必须从里面先take取出来,否则不能在put进去值
     */
    public class SynchronousQueueDemo {
        @Test
        public void Test1() throws InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(2);
    
            BlockingQueue<String> queue = new SynchronousQueue<>();
    
            new Thread(()->{
                try {
                    System.out.println(Thread.currentThread().getName()+"put 1");
                    queue.put("1");
                    System.out.println(Thread.currentThread().getName()+"put 2");
                    queue.put("2");
                    System.out.println(Thread.currentThread().getName()+"put 3");
                    queue.put("3");
                } catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    countDownLatch.countDown();
                }
            },"t1").start();
    
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName()+"take"+queue.take());
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName()+"take"+queue.take());
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName()+"take"+queue.take());
                } catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    countDownLatch.countDown();
                }
            },"t2").start();
    
            countDownLatch.await();
        }
    }
    
  • 相关阅读:
    if
    C#
    C#
    C#
    .net 5.0
    .net 5.0
    .net 5.0
    设计模式
    GAN网络中采用导向滤波的论文
    pytorch~多loss的选择
  • 原文地址:https://www.cnblogs.com/harriets-zhang/p/14652255.html
Copyright © 2011-2022 走看看