zoukankan      html  css  js  c++  java
  • Java高并发--------JDK并发包-------3

    JDK并发包

    3.1同步控制

    synchronized、obj.wait()、obj.notify()

    3.1.1关键字synchronized的功能扩展:重入锁

    java.util.concurrent.locks.ReentrantLock类来实现的

    手动加锁:lock.lock()

    手动解锁:lock.unlock()

    重复锁:

      lock.lock()

      lock.lock()

        操作

      lock.unlock()

      lock.unlock()

    中断响应

    如果一个线程正在等待锁,那么它依然可以收到一个通知,被告知无须等待,就可以爬了。

    import java.util.concurrent.locks.ReentrantLock;
    
    public class IntLock implements Runnable {
        public static ReentrantLock lock1 = new ReentrantLock();
        public static ReentrantLock lock2 = new ReentrantLock();
        int lock;
    
        public IntLock(int lock){
            this.lock = lock;
        }
    
        @Override
        public void run() {
    
            try {
                if (lock == 1){
                    lock1.lockInterruptibly();
                    Thread.sleep(500);
                    lock2.lockInterruptibly();
                }else {
                    lock2.lockInterruptibly();
                    Thread.sleep(500);
                    lock1.lockInterruptibly();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
    
                if (lock1.isHeldByCurrentThread()){
                    lock1.unlock();
                }
                if (lock2.isHeldByCurrentThread()){
                    lock2.unlock();
                }
                System.out.println(Thread.currentThread().getId() + "线程退出");
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            IntLock r1 = new IntLock(1);
            IntLock r2 = new IntLock(2);
    
            Thread t1 = new Thread(r1);
            Thread t2 = new Thread(r2);
            t1.start();
            t2.start();
            Thread.sleep(1000);
            //中断其中一个线程
            t2.interrupt();
    
    
        }
    }

    lockInterruptibly
    public void lockInterruptibly() throws InterruptedException
    1)如果当前线程未被中断,则获取锁。

    2)如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。

    3)如果当前线程已经保持此锁,则将保持计数加 1,并且该方法立即返回。

    4)如果锁被另一个线程保持,则出于线程调度目的,禁用当前线程,并且在发生以下两种情况之一以
    前,该线程将一直处于休眠状态:
    1)锁由当前线程获得;或者

    2)其他某个线程中断当前线程。

    5)如果当前线程获得该锁,则将锁保持计数设置为 1。
    如果当前线程:
    1)在进入此方法时已经设置了该线程的中断状态;或者

    2)在等待获取锁的同时被中断。

    则抛出 InterruptedException,并且清除当前线程的已中断状态。

    6)在此实现中,因为此方法是一个显式中断点,所以要优先考虑响应中断,而不是响应锁的普通获取或
    重入获取。

    指定者: 接口 Lock 中的 lockInterruptibly
    抛出: InterruptedException 如果当前线程已中断。

    锁申请等待限时

    tryLock(等待时间, 单位)

    带参数:就申请一段时间,没有申请成功就爬

    不带参数:就是相当于时间为0,没有就直接爬

    公平锁

    就是先来后到等待咯

    ReentrantLock lock = new ReentrantLock(true)

    非公平锁,就随机

    分类

    锁:

      lock():直接锁,一直等

      lockInterruptiby():获得锁,但是优先响应中断

      tryLock():尝试获得锁,获取不到,就直接爬了

      tryLock(long time, TimeUnit unit):获得锁,等一会获取不到就直接爬

    解锁

      unlock():释放锁

    3.1.2重复锁的好搭档:Condition

    Condition与ReentrantLock的关系,类似Object.wait()和Object.notify()的关系

    void await()  //使线程等待,释放所有锁
    
    void awaitUninterruptibly() //但是不会再等待过程中响应中断
    
    long awatiNanos(long nanosTimeout) 
    
    boolean await(long time, TimeUnit unit)
    
    boolean awaitUntil(Date deadline)
    
    void signal()  // 唤醒一个等待中的线程
    
    void signalAll()  //唤醒所有线程
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ReenterLockCondition implements Runnable{
    
        public static ReentrantLock lock = new ReentrantLock();
        public static Condition condition = lock.newCondition();
        @Override
        public void run() {
            try {
                lock.lock();
                condition.await();
                System.out.println("thread is going on");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            ReenterLockCondition t1 = new ReenterLockCondition();
            Thread t = new Thread(t1);
            t.start();
            Thread.sleep(2000);
    
            //主线程设置唤醒
            lock.lock();
            condition.signal();
            lock.unlock();
        }
    }

    执行步骤:

      t线程执行到await(),就等待

      main线程获得锁,唤醒等待队列中的一个线程,释放锁

      t线程被唤醒,执行!

    3.1.3允许多个线程同时访问:信号量(Semphore)

    public Semaphore(int permits)
    
    public Semaphore(int permits, boolean fair)  //第二参数指定是否公平

     方法:

    public void acquire() : 申请一个准入的许可
    
    public void acquireUninterruptibly() :申请一个准入许可,但是不会响应中断
    
    public boolean tryAcquire()  :申请一个许可,但是不等待
    
    public boolean tryAcquire(long timeout, TimeUnit unit):申请一个许可,申请一段时间
    
    public void release():释放

    其实这个使建立再reentryrantLock基础上的

    使用方法跟ReentrantLock一样的

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    public class SemapDemo implements Runnable{
    
        final Semaphore semp = new Semaphore(5);
        @Override
        public void run() {
            try {
                semp.acquire();
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getId() + ":done");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                semp.release();
            }
        }
    
        public static void main(String[] args) {
            ExecutorService exec = Executors.newFixedThreadPool(20);
            final SemapDemo demo = new SemapDemo();
            for (int i = 0; i < 20; i++) {
                exec.submit(demo);
            }
            
            exec.shutdown();
        }
    }

    3.1.4ReadWriteLock读写锁

    读写分离锁

      允许多个线程同时读

      但是读写,写写使不允许同时的

    如果,读操作的次数远远大于写操作的次数,则读写锁就可以发挥最大的功效,提升系统的性能

    import java.util.concurrent.locks.Lock;
    
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    public class ReadWriteLockDemo {
        
        private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        private static Lock readLock = readWriteLock.readLock();
        private static Lock writeLock = readWriteLock.writeLock();
        private int value;
    
        public void handleRead(Lock lock) throws InterruptedException{
            try{
                lock.lock();
                Thread.sleep(1000);
                System.out.println(value);
            }finally {
                lock.unlock();
            }
        }
    
        public void handleWrite(Lock lock, int index) throws InterruptedException{
            try{
                lock.lock();
                Thread.sleep(1);
                value = index;
            }finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) {
            final ReadWriteLockDemo demo = new ReadWriteLockDemo();
            Runnable readRunnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        demo.handleRead(readLock);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
    
            Runnable writeRunnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        demo.handleWrite(writeLock, 1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
    
            for (int i = 0; i < 18; i++) {
                new Thread(readRunnable).start();
            }
    
            for (int i = 0; i < 20; i++) {
                new Thread(writeRunnable).start();
            }
        }
    }

    3.1.5倒计数器CountDownLatch 和循环栅栏CyclicBarrier

    import java.util.Random;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class CountDownLatchDemo implements Runnable {
    
        static final CountDownLatch end = new CountDownLatch(10);
        static final CountDownLatchDemo demo = new CountDownLatchDemo();
        @Override
        public void run() {
    
            try {
                Thread.sleep(new Random().nextInt(10) * 1000);
                System.out.println("check complete");
                end.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
    
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            ExecutorService exec = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 20; i++) {
                exec.submit(demo);
            }
            end.await(); // 等待10个线程全部完成,才能执行接下来的步骤
            System.out.println("Fire");
            exec.shutdown();
        }
    }

    CyclicBarrier:凑个10个来一轮。凑够10个来一轮

    能响应中断

    import java.util.concurrent.*;
    
    public class CyclicBarrierDemo implements Runnable {
    
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
        static final CyclicBarrierDemo demo = new CyclicBarrierDemo();
        @Override
        public void run() {
            try {
                cyclicBarrier.await();
                Thread.sleep(2000);
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("check complete");
        }
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService exec = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 20; i++) {
                exec.submit(demo);
            }
            System.out.println("Fire");
            exec.shutdown();
        }
    }

    3.1.7线程阻塞工具类LockSupport

    限流

    桶漏发:不管你来势汹汹,我都稳定输出

    令牌发:每隔一段时间产生一个令牌,你们自己抢

    3.2线程复用:线程池

    线程创建关闭开销大

    大量线程抢夺资源

    线程太多拖垮应用系统

    线程池种类

    1 public static ExecutorService newFixedThreadPool(int nThreads)  // 固定大小
    2 
    3 public static ExecutorService newSingleThreadExecutor()    // 一个线程的线程池
    4 
    5 public static ExecutorService new CachedThreadPool()     //根据当下任务,动态的扩充
    6 
    7 public static ScheduledExecutorService newSingleThreadScheduledExecutor()   // 一个线程的线程池,但是可以指定延时执行
    8  
    9 public static ScheduledExecutorService  newScheduledThreadPool(int corePoolSize)   //再上面的基础上,可以指定线程池的大小

    线程池内部的实现

    以固定线程池为例

    public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler)
    }
    int corePoolSize
    指定线程池中的线程数量
    int maximumPoolSize
    指定了线程池中的最大线程数量
    long keepAliveTime
    当前线程数大于corePoolSize时,多余的空闲线程的存活时间
    TimeUnit unit
    时间单位
    BlockingQueue<Runnable> workQueue)
    任务队列,等待的任务队列
    ThreadFactory threadFactory
    线程工厂,用来创建线程的
    RejectedExecutionHandler handler
    任务太多时,拒绝策略

    BlockingQueue阻塞队列;专门用来存未执行的任务

    ctrl + alt + B 查看接口的实现类

    SynchronousQueue(直接提交队列)

    每一个插入,就必定伴随一个删除

    相当于容量为1;

    但是总体上相当于:任务没有被真实保存,而总是任务提交给线程执行;如果没有空闲线程,则尝试创建新的线程,如果线程达到最大值,就丢弃任务,所以使用SynchronousQueue需要很大的maximumPoolSize

    ArrayBlockingQueue(有界的任务队列)

    public ArrayBlockingQueue(int capacity)

    任务来了,如果线程 < corePoolSize 则创建线程;corePoolSize满了,则放入队列;若队列满了,扩充线程;若大于maximumPoolSize,则丢弃;

    LinkedBlockingQueue(无界任务队列)

    线程数维持在corePoolSize,然后有任务来,一直加入

    PriorityBlockingQueue (优先任务队列)

    特殊的无界队列,任务都按照先进先出执行

    3.2.4拒绝策略

    AbortPolicy 该策略会直接抛出异常,组织系统正常工作
    CallerPRunsPolicy 只要线程池未关闭,肯定会执行
    DiscardOldestPolicy 该策略会丢弃一个最老的请求
    DiscardPolicy 该策略会默默的丢弃无法处理的任务

    3.2.5线程工厂

     用execute()方法,可以显示哪个线程出错

     3.2.9分治

     3.3JDK的并发容器

    • ConcurrentHashMap:高效的并发HashMap
    • CopyOnWriteArrayList:list
    • ConcurrentLinkedQueue:高效的并发队列,使用链表实现,线程安全的LinkedList
    • BlockingQueue:链表、数组实现的阻塞队列
    • ConcurrentSkipListMap:跳表

    3.3.2线程安全的HashMap

    1.  ConcurrentHashMap
    2. Collections.synchronized(new HashMap())

     3.3.3有关List的线程安全

    public static List<String> 1 = Collections.synchronizedList(new LinkedList<String>)-----线程安全的lsit

    3.3.4高效读写的队列:深度剖析ConcurrentLinkedQueue类

    高并发环境中性能最好的队列

    太难了,不想看具体内容,日后补充吧

    3.3.5高效读取:不变模式下的CopyOnWriteArrayList

    在读写不会阻塞,只是在写写的时候阻塞

    当这个List需要修改时,我并不修改原有的内容,而是对原有的数据进行一次复制,将修改的内容写入副本当中,写完之后,再用修改完的副本替换原有的数据,这样就不影响读了。

    3.3.6数据共享通道:BlockingQueue

    我们希望线程A能够通知线程B,又希望线程A不知道线程B的存在。

    其实是用来存储服务线程的

    offer():压入队列,满了,就false

    poll():取出队列,空的,就null

    put():压入队列,满了,就等

    take():取出,空的,就等

    ArrayBlockingQueue:环状数组

    3.3.7随机数据结构:跳表

    查询时间:(logn),在并发结构中用来存储Map,Map会是一个有序的

  • 相关阅读:
    3. 无重复字符的最长子串
    字节跳动 最小栈
    排序
    线程的优先级
    线程的操作方法
    线程的生命周期
    实现线程的方式:Thread类重写run();Runnable类重写run();Callable类重写call();实现线程的方式
    Java thread run() start() 是干什么的以及区别
    Java thread 多线程
    助教工作学期总结
  • 原文地址:https://www.cnblogs.com/sicheng-li/p/13036720.html
Copyright © 2011-2022 走看看