zoukankan      html  css  js  c++  java
  • java多线程

    锁的底层

    1,java代码   synchronized

    2,java字节码   monitorenter  monitorexit monitor监视器监视这把锁

    3,执行过程中自动锁升级   从new出来升级到偏向锁,从偏向锁升级到自旋锁,从自旋锁升级到重量级锁

    4,lock comxchg

     

    synchronized底层

    1,无锁状态

    2,偏向锁,markword记录这个线程id,环境中没有其他线程来争抢这块资源

    3,自旋锁,当去执行一块资源的时候,发现其他线程已经占用了该资源,此时本线程等待其他线程执行完,等待过程中持续消耗cpu资源,默认自旋10次升级为重量级锁,自旋锁存在于用户态,不经过内核态,因此效率高。应用场景:执行时间短,线程数少。

    4,重量级锁,去操作系统申请资源加这把锁synchronized

     

    sychronized属于可重入锁,因为只要锁对象相同sychronized方法可以调用其他sychronized方法

     

    线程通信的方式:

    1,wait/notify

    2,locksupport.park/unpark

    3,semaphore.acquire/release 配合join

    4,condition.await/singleAll  condition的本质是等待队列个数,生产者和消费者都存在队列

    wait释放锁,notify不会释放锁,wait回来继续执行的时候还需要拿到锁才会继续执行

    Thread的API

    start开启

    yied舍弃当前线程cpu执行权,让给其他线程,然后排队,属于从运行状态变为可运行状态

    join加入到另一个线程,等待另一个线程执行完之后继续执行当前线程,属于运行状态变为wait状态

    park等待唤醒,不会阻塞

    interrupt可以打断睡眠状态和阻塞状态,但是需要对异常进行处理

     

    volatile

    1,线程可见性       volatile尽量修饰简单的值,不要修饰引用的值,如果修饰的是一个对象,当对象内部属性改变之后volatile是不可见的

    2,阻止指令重排序  

    3,不能保证原子性  

    Timer

    Schedule(task, Date) 在未来时间执行一次任务,也可以多个任务在之前的时间执行
    Schedule(TimerTask task, Date firsttime,long period) 此方法用于在指定的时间执行一次之后任务之后,在指定的period的时间间隔后不停的执
    行任务
    Schedule(TimerTask task, long delay)以当前时间为参考,在延迟指定的秒数后执行一次性任务
    Schedule(TimerTask task, long delay,long period)以当前时间为参考,在延迟指定的秒数后执行一次性任务并且在period后重复执行任务,执行时间是从上次任务结束时间开始计算。凡是带period的都会在时间间隔后重复执行。
    Schedule和ScheduleAtFixedRate的区别在于,如果指定开始执行的时间在当前系统运行时间之前,scheduleAtFixedRate会把已经过去的时间也作为周期执行,而schedule不会把过去的时间算上
    ScheduleAtFixedRate(TimerTask task,long delay,long period)   有延迟,下次任务相对于上次任务的开始时间开始执行:
    scheduleAtFixedRate(TimerTask task,Date startTime,long period)   指定开始执行的时间在当前系统运行时间之前,会把已经过去的时间也作为周期执行TimerTask有一个cancel方法用于将当前任务从任务队列删除。(也就是队列中不会执行该任务,如果正在执行中调用该方法会执行完毕)
    Timer的cancel方法用于清空所有任务队列(如果有任务正在执行会等任务执行完清空;有时候不一定会停掉,因为cancel方法不一定能抢到queue对象的锁)

    并发库

    atomic  AtomicInteger 保证原子性  所有的Atomic开头的类都是通过UnSafe里面compareAndSwap的CAS操作来实现的

    LongAdder保证原子性,底层是分段锁,每个锁分开计算最后求总的数据,在线程较多时,自增比atomicInteger和synchronized更快,由于synchronized需要申请系统资源,因此synchronized比atomicInteger慢

    AQS队列同步器

    reentantLock可以替代synchronized,必须手动解锁,即在finally执行解锁,
    synchronized是系统自带的,属于自动解锁,只要任务执行完就解锁了,reentantLock可以出现各种condition,也就是不同的等待队列,reentantLock底层是CAS的实现,而synchronized默认经过4种状态的锁升级
    reentantLock可以tryLock尝试锁定,如果锁不了可以不继续拿锁,做其他事情,也就是reentantLock不会阻塞
    reentantLock可以用lockInterruptibly申请打断锁,也就是上锁后能被打断
    reentantLock构造方法传true可以申请公平锁,公平锁底层是队列结构,先到的线程排在队列前面,依次往后排,讲求先来后到,而默认的非公平锁是谁抢到算谁的。synchronized默认非公平锁。

     CAS操作存在ABA问题,ABA问题即一条线程拿到一条数据做了更改,另一个线程拿过去改了之后又改回来,然后原来的线程发现值并没变,其实值已经被改过,虽然是同一个值,解决办法:加版本号控制

    CountDownLatch t.join()方法只会使主线程(或者说调用t.join()的线程)进入等待池并等待t线程执行完毕后才会被唤醒。并不影响同一时刻处在运行状态的其他线程。
    CyclicBarrier同步屏障

    如何让多线程发挥最大效率?因为单核cpu是线程调度,cpu在同一时刻只能执行一条计算机指令,如何让多核cpu同时对多个线程执行指令?
    MarriagePhaser阶段锁
    ReadWriteLock 读写锁  读锁加了之后,如果是正在读这条数据,其他线程也能进来读,如果是写线程,需要在外面等待,如果正在写这条数据,那
    么其他线程即不能进来读也不能进来写。所以写锁的好处是,大大的提高了读的效率,如果说一边读一边写,写的时候会自动加锁,读的时候无法访问
    SELECT ... LOCK IN SHARE MODE  加读锁
    SELECT ... FOR UPDATE   加写锁,update语句自动加锁

    semaphore信号量,信号量指定个数,如果个数为0,其他线程等待,如果有,其他线程拿取,执行。作用:限流,最多允许多少线程同时运行。
    它与FixedThreadPool的区别是,FixedThreadPool线程池内,永远只有固定个数的线程,比如5个。而semaphore可以有无数个线程,但是同时运行的个数只能是信号量的固定个数。场景,分批发短信

    exchanger交换者,线程之间交换数据用的,只能两个线程交换,如果3个,可以自定义逻辑

    locksupport有park和unpark方法,类似于wait/notify,不过locksupport不需要同步代码块,并且可以先unpark提前告知已解锁,park时不会被阻塞,locksupport基于Unsafe

    park-unpark和wait-notify的作用是一样的,当然这两个有一点不一样就是park-unpark是可以顺序反着来的,

    可以先unpark(发放许可),然后park,在park的时候不会阻塞;

    但是wait-notify就不行,如果先执行notify,再wait的时候,那么当前线程会一直处于阻塞状态

    阻塞队列

    ArrayBlockingQueue是定长的阻塞队列,静态数组结构,不存在扩容机制,锁是用的ReentrantLock,锁对象就是数组对象,保证先进先出。
    LinkedBlockingQueue是链表结构,node节点,存取分别是不同的node节点,所以采用的是锁分离技术,也就是上两把锁,写与写之间是互斥的,读与读之间也是互斥的,所以读写之间互不排斥,性能更高,存putLock,取takeLock,也保证先进先出。
    SynchronousQueue 使用cas加自旋达到乐观锁的效果,自旋达到一定次数之后采用LockSupport.unpark()来阻塞,以公平设置为true的队列以先进先出顺序授予线程访问权限
    LinkedTransferQueue 与SynchronousQueue差不多,不同点在于存元素LinkedTransferQueue 能控制是否阻塞
    PriorityBlockingQueue优先级队列 使用数组加二叉堆,存不会阻塞二叉堆结构无限堆上去,出队会阻塞。锁是用的ReentrantLock。
    优先级队列和通常的栈和队列一样,只不过里面的每一个元素都有一个"优先级”,在处理的时候,首先处理优先级最高的。如果两个元素具有相同的优先级,则按照他们插入到队列中的先后顺序处理。优先级队列可以通过链表,数组,堆或者其他数据结构实现
    LinkedBlockingDeque和LinkedBlockingQueue差不多,但是LinkedBlockingDeque是双向链表,即可以从头存也可以从尾存,锁是用的ReentrantLock
    DelayQueue延时队列,和PriorityBlockingQueue差不多存不会阻塞,取会阻塞同时取的时候需要判断堆顶元素是否过期。

     

    什么是守护线程

    在t.start()之前设置t.setDaemon(true)即守护线程,主线程执行完之后守护线程立即销毁,不会继续运行

     synchronizedCollection,synchronizedMap,synchronizedList,synchronizedSet

    Collections.synchronizedList(new ArrayList())同步容器

    public class Demo1 {
        public static void main(String[] args){
            List<String> list = Collections.synchronizedList(new ArrayList<String>());
            list.add("1");
            list.add("2");
            list.add("3");
            synchronized (list) {
                for (String s : list) {
                    System.out.println(s);
                }
            }
        }
    }

    执行add()等方法的时候是加了synchronized关键字的,但是listIterator(),iterator()却没有加.所以在使用的时候需要加上synchronized

    ThreadFactory的作用,工厂模式,将创建线程并且控制线程数量的任务交给工厂来完成

    public class TestThreadFactory {
        public static void main(String[] args) {
            //创建线程(并发)池,自动调节线程池大小
            ExecutorService es =  Executors.newCachedThreadPool(new WorkThreadFactory());
    
            //同时并发5个工作线程
            es.execute(new WorkRunnable());
            es.execute(new WorkRunnable());
            es.execute(new WorkRunnable());
            es.execute(new WorkRunnable());
            es.execute(new WorkRunnable());
            //指示当所有线程执行完毕后关闭线程池和工作线程,如果不调用此方法,jvm不会自动关闭
            es.shutdown();
    
            try {
                //等待线程执行完毕,不能超过2*60秒,配合shutDown
                es.awaitTermination(2*60, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    class WorkThreadFactory implements ThreadFactory {
    
        private AtomicInteger atomicInteger = new AtomicInteger(0);
    
        @Override
        public Thread newThread(Runnable r)
        {
            int c = atomicInteger.incrementAndGet();
            System.out.println("create no " + c + " Threads");
            return new WorkThread(r, atomicInteger);//通过计数器,可以更好的管理线程
        }
    
    }
    
    class WorkRunnable implements Runnable{
        @Override
        public void run() {
            System.out.println("complete a task");
        }
    }
    
    class WorkThread extends Thread{
        private Runnable target;   //线程执行目标
        private AtomicInteger counter;
    
        public WorkThread(Runnable target, AtomicInteger counter) {
            this.target = target;
            this.counter = counter;
        }
        @Override
        public void run() {
            try {
                target.run();
            } finally {
                int c = counter.getAndDecrement();
                System.out.println("terminate no " + c + " Threads");
            }
        }
    }

    Executor框架
    通过Executor框架来启动线程比使用Thread的start方法更好,
    除了更易管理,效率更好(用线程池实现,节约开销)外,
    还有关键的一点:有助于避免this逃逸问题——如果我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,此时可能会访问到初始化了一半的对象用Executor在构造器中。
    Executor作为灵活且强大的异步执行框架,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于消费者,并用Runnable来表示任务,Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。


    ThreadPoolExecutor是自定义线程池,默认的四种线程池都是基于ThreadPoolExecutor实现的
    Executor 和 ExecutorService 这两个接口主要的区别是:ExecutorService 接口继承了 Executor 接口,是 Executor 的子接口
    Executor 和 ExecutorService 第二个区别是:Executor 接口定义了 execute()方法用来接收一个Runnable接口的对象,而 ExecutorService 接口中的 submit()方法可以接受Runnable和Callable接口的对象。
    Executor 和 ExecutorService 接口第三个区别是 Executor 中的 execute() 方法不返回任何结果,而 ExecutorService 中的 submit()方法可以通
    过一个 Future 对象返回运算结果。
    Executor 和 ExecutorService 接口第四个区别是除了允许客户端提交一个任务,ExecutorService 还提供用来控制线程池的方法。比如:调用
    shutDown() 方法终止线程池。
    Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。Executors 类提供工厂方法用来创建不同类型的线程池。

    CompletionService接口定义了一组任务管理接口:
        submit() - 提交任务
        take() - 获取任务结果
        poll() - 获取任务结果
    ExecutorCompletionService类是CompletionService接口的实现
        ExecutorCompletionService内部管理者一个已完成任务的阻塞队列
        ExecutorCompletionService引用了一个Executor, 用来执行任务
        submit()方法最终会委托给内部的executor去执行任务
        take/poll方法的工作都委托给内部的已完成任务阻塞队列
        如果阻塞队列中有已完成的任务, take方法就返回任务的结果, 否则阻塞等待任务完成
        poll与take方法不同, poll有两个版本:
            无参的poll方法 --- 如果完成队列中有数据就返回, 否则返回null
            有参数的poll方法 --- 如果完成队列中有数据就直接返回, 否则等待指定的时间, 到时间后如果还是没有数据就返回null
            ExecutorCompletionService主要用与管理异步任务 (有结果的任务, 任务完成后要处理结果)
    ExecutorCompletionService是如何执行任务, 又是如何将任务的结果存储到完成队列中的呢?
        ExecutorCompletionService在submit任务时, 会创建一个QueueingFuture, 然后将创建的QueueingFuture丢给executor, 让executor完成任务的执行工作
        QueueingFuture继承与FutureTask类, 而FutureTask实现了两个接口Runnable和Future.
        Runnable一般表示要执行的任务的过程, 而Future则表述执行任务的 结果 (或者说是任务的一个句柄, 可获取结果, 取消任务等).
        因此FutureTask就是一个有结果可期待的任务. FutureTask实现了run方法, 我们指定此方法一般是在在工作线程(不是submit线程) 执行的。
        FutureTask构造的时候需要一个Callable<V>参数, Callable表示一个任务的执行过程, 在run方法中恰好调用了Callable.call(), 也就是任务工作在工作线程中执行.
        那么任务执行完了会返回结果, 这个结果是要在submit线程(就是提交任务的线程)中使用的, 那么如何让submit线程可以反问到呢? 答案也是在FutureTask类中, 我们可以看到run方法中执行任务(Callable.call())获取结果后, 会掉用一个set()方法, set() 将获取的结果存储到FuturnTask的一个outcome字段中, 这个过程是同步的, 所以其他线程稍后访问是可以读取到值的
        ExecutorCompletionService中的完成队列中正好存储的是FuturnTask的子类, 当然可以调用FutureTask的get方法, FutureTask的get方法就是获取outcome值 (get()方法中调用了report()方法, report中返回了outcome字段).

    发令枪  (也叫倒计数锁存器)

    倒计数锁存器(CountDown Latch)是异常性障碍,允许一个或多个线程等待一个或者多个其他线程来做某些事情。

    再比如:主线程中开4个线程A、B、C、D去执行任务,分别是计算上一年1、2、3、4四个季度的某某数据,主线程一直等待到 A、B、C、D 执行完毕(数据计算完毕)后再进行数据汇总。

    使用发令枪完成一个100减到1的实例

    public class CountDownLatchTest {
    public static int nomal = 100;
    // TODO 发令枪数量设置
    private static final int N = 10;

    public static void main(String rgs[]) throws InterruptedException {
    //构造一个用给定计数初始化的 CountDownLatch。
    CountDownLatch doneSignal = new CountDownLatch(1);
    Executor service = Executors.newCachedThreadPool();
    for (int i = 0; i < N; ++i)
        //假如实际开的线程小于CountDownLatch构造器中的值,那么主线程会永远休眠
        //假如实际开的线程大于CountDownLatch构造器中的值,那么当锁存器计数减到0时,主线程会提交运行
    service.execute(new WorkerRunnable(doneSignal, i));
    //使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间
    doneSignal.await();
    System.out.println("主线程继续执行");
    ((ExecutorService) service).shutdownNow();
    }
    }


    class WorkerRunnable implements Runnable {
    private final CountDownLatch doneSignal;
    private final int i;
    WorkerRunnable(CountDownLatch doneSignal, int i) {
    this.doneSignal = doneSignal;
    this.i = i;
    }
    public void run() {
    doWork(i);
    //递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
    doneSignal.countDown();
    }

    // TODO 其他业务测试在此方法内修改代码即可
    private synchronized void doWork(int i) {
    CountDownLatchDemo.print(i);;
    }
    }
    public class CountDownLatchDemo {
        public static void print(int i){
            while (CountDownLatchTest.nomal>0){
                CountDownLatchTest.nomal-=1;
                System.out.println("当前线程:"+i+"   nomal:"+CountDownLatchTest.nomal);
            }
        }
    }

     关于ThreadPoolExecutor

    引入线程池的好处:

    1、重用线程池中的线程,避免因频繁创建和销毁线程造成的性能消耗。
    2、更加有效的控制线程的最大并发数,防止线程过多抢占资源造成的系统阻塞。
    3、对线程进行有效的管理。

     构造函数

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)

    corePoolSize

    核心线程数。在创建线程池之后,默认情况下线程池中并没有任何的线程,而是等待任务到来才创建线程去执行任务,当线程池中的线程数目达到 corePoolSize后,新来的任务将会被添加到缓存队列中,也就是那个workQueue,除非调用ThreadPoolExecutor#prestartAllCoreThreads() 方法或者是 ThreadPoolExecutor # prestartCoreThread() 方法(从这两个方法的名字就可以看出是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或一个线程)。

    PS:很多人不知道这个数该填多少合适,其实也不必特别纠结,根据实际情况填写就好,实在不知道,就按照阿里工程师的写法取下列值就好了:

    int NUMBER_OF_CORES = Runtime.getRuntime().availableProcessors();

    maximumPoolSize

    线程池中的最大线程数。表示线程池中最多可以创建多少个线程,很多人以为它的作用是这样的:”当线程池中的任务数超过 corePoolSize 后,线程池会继续创建线程,直到线程池中的线程数小于maximumPoolSize“,其实这种理解是完全错误的。它真正的作用是:当线程池中的线程数等于 corePoolSize 并且 workQueue 已满,这时就要看当前线程数是否大于 maximumPoolSize,如果小于maximumPoolSize 定义的值,则会继续创建线程去执行任务, 否则将会调用去相应的任务拒绝策略来拒绝这个任务。另外超过 corePoolSize的线程被称做"Idle Thread", 这部分线程会有一个最大空闲存活时间(keepAliveTime),如果超过这个空闲存活时间还没有任务被分配,则会将这部分线程进行回收。

    keepAliveTime

    控制"idle Thread"的空闲存活时间。这个idle Thread就是上面提到的超过 corePoolSize 后新创建的那些线程,默认情况下,只有当线程池中的线程数大于corePoolSize,且这些"idle Thread"并没有被分配任务时,这个参数才会起作用。另外,如果调用了ThreadPoolExecutor#allowCoreThreadTimeOut(boolean) 的方法,在线程池中的线程数不大于corePoolSize,且这些core Thread 也没有被分配任务时,keepAliveTime 参数也会起作用。

    unit

    参数keepAliveTime的时间单位

    workQueue

    阻塞队列。如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到该队列当中,注意只要超过了 corePoolSize 就会把任务添加到该缓存队列,添加可能成功也可能不成功,如果成功的话就会等待空闲线程去执行该任务,若添加失败(一般是队列已满),就会根据当前线程池的状态决定如何处理该任务(若线程数 < maximumPoolSize 则新建线程;若线程数 >= maximumPoolSize,则会根据拒绝策略做具体处理)。

    常用的阻塞队列有:

    1)ArrayBlockingQueue       //基于数组的先进先出队列,此队列创建时必须指定大小;
    2)LinkedBlockingQueue      //基于链表的先进先出队列,如果创建时没有指定此队列大小,
    则默认为Integer.MAX_VALUE; 3)synchronousQueue        //这个队列比较特殊,它不会保存提交的任务,
    而是将直接新建一个线程来执行新来的任务。

    threadFactory

    线程工厂。用来为线程池创建线程,当我们不指定线程工厂时,线程池内部会调用Executors.defaultThreadFactory()创建默认的线程工厂,其后续创建的线程优先级都是Thread.NORM_PRIORITY。如果我们指定线程工厂,我们可以对产生的线程进行一定的操作。

    handler

    拒绝执行策略。当线程池的缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

    ThreadPoolExecutor.AbortPolicy:         // 丢弃任务并抛出RejectedExecutionException异常。
    ThreadPoolExecutor.DiscardPolicy:       // 也是丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy:    // 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:      // 由调用线程处理该任务
  • 相关阅读:
    01、MyBatis HelloWorld
    02、Spring-HelloWorld
    【Java多线程】多线程虚假唤醒
    【Java基础】Java 8 对接口做了什么新支持
    【Mongodb】分片集群
    【Mongodb】 Mongodb 事务
    【Mongodb】聚合运算
    【Mongodb】导入文件
    【Mongodb】 可复制集搭建
    【Mongodb】Mongodb QuickStart
  • 原文地址:https://www.cnblogs.com/yeg0zj/p/14493995.html
Copyright © 2011-2022 走看看