zoukankan      html  css  js  c++  java
  • JAVA并发工具类

    一、分而治之

    fork/join     二叉树  二分查找  快速排序  归并排序  mapreduce  动态规划

    1、fork/join(工作密取)

      RecursiveTask要有返回值

      RecursiveAction没有返回值

      invoke(同步)

      submit(有返回结果异步)

      execute(没有返回结果异步)

    2、countDownLatch(闭锁)只能使用一次

    作用:直指一个线程等待其他的线程执行完后再执行,(就是一个加强版的join())

    用一个线程可以有多个扣除点,扣除点可以在程序的中间,await可以有多个。

    3、CyclicBarrier可以使用一次

    作用:一组工作线程之间需要进行协作

    4、Semaphore(控制同时访问某个特定资源的线程数量,例如用于流量控制)

    Semaphore的大坑就是:release方法可以无限增加;所以一般会定义两个Semaphore变量,结合来使用。

    Semaphore只是控制流量,获取许可证;需要和锁配合使用才能完成总的并发控制

    5、Exchanger用于两个线程间的数据交换

    6、Future

      isDone()不管正常或者异常结束又或者自己取消,都会返回true

      isCancelled如果任务在完成前被取消,返回true;其他情况返回false

      cancel()(1)如果任务已经完成,或者已经取消或者由于某些原因不能取消,则返回false;

           (2)如果任务还没有执行,则会返回true,并且异步任务不会执行

         (3)如果任务已经开始,但是还没有执行完成,则返回true(mayInterrupIfRunning为true时,会中断线程)

    FutureTask  treiber  AQS实现

    二、原子类(CAS原理)乐观锁的思想

    悲观锁容易发生死锁

    原子操作实现方式:1、synchronized;2、原子类

    1、cas的原理:

    cpu硬件指令集的支持

    包括三个运算参数:内存地址V,期望值A,新值B

    不能更新成功的话,会一直自旋

    2、cas的问题

    ABA问题

    开销问题

    只能保证一个共享变量的原子操作

    3、AtomicBoolean,AtomicInteger,AtomicLong

    4、AtomicReference(多个变量同时进行更新,把多个变量包装成引用类),AtomicStampedReference()(有具体的版本号),AtomicMarkableReference(boolean标志版本)(只关心有没有变化)

    5、AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray[原子数组类实质上又另外创建了一个和原数组完全独立,但一模一样的数组]

    6、源自更新字段类AtomicIntegerFieldUpdate,AtomicLongFieldUpdate,AtomicReferenceFieldUpdate

    原子操作底层用的是CAS操作

    三、显示锁

    1、ReentrantLock(优先使用synchronized)

    排它锁

    线程退出时(执行完时),它所拥有的资源都会被释放

    Synchronized进入调锁的过程中后:Synchronized不提供中断和超时机制

    Synchronized消耗更少一点,Synchronized是语言特性,而显示锁是一种类

    Synchronized方法块内抛异常,代码脱离方法快,锁会自动释放

    Lock获取锁可以被中断,超时获取锁,尝试获取锁

    非公平锁充分的利用了线程被唤醒的这段时间

    公平锁(排队)与非公平锁(可以插队,效率更高)

    synchronized尽量使用notifyAll;ReentrantLock尽量使用signal

    一个Lock对应一个一个Condition

    一个Lock对应多个Condition

    lockInterruptibly

    tryLock

    tryLock(timeout)

    2、读写锁ReadWriteLock

    读是共享的,写是排它锁,最适合于读多写少的场景

    四、LockSupport(基础工具)

    五、AQS(抽象队列同步器)

            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }

    这个方法是ReentrantLock.NonfairSync.lock()方法

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    这个方法就是尝试获取独占所同步状态,这是AQS的方法

            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }

     这个方法是由ReentrantLock.NonfairSync.tryAcquire实现

            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }

    这个方法是由ReentrantLock.Sync.nonfairTryAcquire实现(尝试获取独占锁,或者重入独占锁)

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }

    这个方法是AQS的方法,这个方法是(1)先设置尾节点compareAndSetTail(2)如果前面设置没成功,就进入死循环enq方法,这个方法里会判断为节点是否为空,如果为空,就先设置head指向New Node()空节点,然后再循环;如果不为空,就设置compareAndSetTail,如果设置成功就可以退出了,否则继续循环。

        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

    这个就是AQS的enq方法

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

     这个方法就是加入到AQS队列以后发生的等待操作

    Springtemplate就是模板设计模式

    state有三个方法(getState,setState,compareAndSetState)

    state为0时,可以直接获取锁,不需要加入AQS队列

    state不为0时,需要尝试获取锁

    模板方法设计模式

    重入锁:

    读写锁:

    分布式锁:抢到本地锁的线程再去抢分布式锁;redis性能好;zookeeper稳定性好。

    读写锁的实现

    读锁的实现

        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }

    AQS的模板方法:

           protected final int tryAcquireShared(int unused) {
                Thread current = Thread.currentThread();
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                int r = sharedCount(c);
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);
            }

    尝试获取读锁:

      1、如果当前线程是写线程,直接返回-1(加入等待队列)

      2、如果当前头结点的后继节点是读线程直接返回1获取到所

      3、如果当前头结点的后继节点是写线程,直接执行fullTryAcquireShared方法(这个策略完全是为了照顾写线程,或者说是防止写线程过分的饥渴)

            final boolean readerShouldBlock() {
                return apparentlyFirstQueuedIsExclusive();
            }

    as

        final boolean apparentlyFirstQueuedIsExclusive() {
            Node h, s;
            return (h = head) != null &&
                (s = h.next)  != null &&
                !s.isShared()         &&
                s.thread != null;
        }

    as

            final int fullTryAcquireShared(Thread current) {
                /*
                 * This code is in part redundant with that in
                 * tryAcquireShared but is simpler overall by not
                 * complicating tryAcquireShared with interactions between
                 * retries and lazily reading hold counts.
                 */
                HoldCounter rh = null;
                for (;;) {
                    int c = getState();
                    if (exclusiveCount(c) != 0) {
                        if (getExclusiveOwnerThread() != current)
                            return -1;
                        // else we hold the exclusive lock; blocking here
                        // would cause deadlock.
                    } else if (readerShouldBlock()) {
                        // Make sure we're not acquiring read lock reentrantly
                        if (firstReader == current) {
                            // assert firstReaderHoldCount > 0;
                        } else {
                            if (rh == null) {
                                rh = cachedHoldCounter;
                                if (rh == null || rh.tid != getThreadId(current)) {
                                    rh = readHolds.get();
                                    if (rh.count == 0)
                                        readHolds.remove();
                                }
                            }
                            if (rh.count == 0)
                                return -1;
                        }
                    }
                    if (sharedCount(c) == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    if (compareAndSetState(c, c + SHARED_UNIT)) {
                        if (sharedCount(c) == 0) {
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            if (rh == null)
                                rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current))
                                rh = readHolds.get();
                            else if (rh.count == 0)
                                readHolds.set(rh);
                            rh.count++;
                            cachedHoldCounter = rh; // cache for release
                        }
                        return 1;
                    }
                }
            }

    循环尝试获取读锁:

      1、如果当前线程是写线程,直接返回-1(加入等待队列)

      2、如果当前头结点的后继节点是写线程且不是锁的重入状态,那么就直接退出-1(加入等待队列)

      3、如果写线程超量,直接抛出异常

      4、如果修改状态成功,直接返回1,获取读锁成功

        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

     写锁的实现

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    as

            protected final boolean tryAcquire(int acquires) {
                Thread current = Thread.currentThread();
                int c = getState();
                int w = exclusiveCount(c);
                if (c != 0) {
                    // (Note: if c != 0 and w == 0 then shared count != 0)
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // Reentrant acquire
                    setState(c + acquires);
                    return true;
                }
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }

    1、如果当前锁是共享锁,那么直接返回false

    2、如果可重入写线程数量太多超过最大值,抛出异常

    3、可重入线程正常直接返回true

    4、如果当前没有锁,且自己抢到了锁,直接返回true

    5、如果当前没有锁,且没有抢到锁,直接返回false

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }

    as

        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

    as

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    五、并发容器

    位运算的关键【a%2^n==a&(2^n-1)】

    hashmap会造成死循环:扩容时头插法惹的祸

    1、ConcurrentHashMap

    get   put   putIfAbsent

    putIfAbsent返回原来那个key所对应的值

     取模操作相当于位与操作

    位运算用途:权限控制和商品属性

    jdk1.7实现concurrenthashmap:segment数组(16缺省并发度这个数必须是2^n,这个数组也不允许扩容,使用hash值的高位部分进行hash)(ReentrantLock)创建segment元素时使用CAS保证线程安全  每一个segment下面包含一个hashentry数组table[]这个数组大小也必须是2^n,通过整个hash值进行hash,负载因子是0.75;①定位segment②定位hashentry③循环链表;get方法不加锁,只用了volatile{final hash;final key;volatile value;volatile next}      put方法 拿锁tryLock() 和lock()结合使用,提前new Node(key,value)   扩容也最好是2的倍数,迁移的时候会变更迁移方便,扩容会扩与扩原来的2倍   

    这两个方法尽量少使用:(size方法首先算两遍然后可能会全部加锁   containsValue也需要全部加锁)

    存在的问题:弱一致性,获取的过程中整个结构可能发生调整,因为get方法是没有加锁的

    扩容:stride步长+forwarding

    再设置sizeCtl

    size()=basecount+countercells

    强一致性:Collections.synchronizedMap(m),HashTable

    jdk1.8实现concurrenthashmap:CAS+synchronized

    数组+链表+红黑树(6~8)

    元素是Node(final,final,volatile,volatile)不是Entry(TreeNode继承于node(树叶),TreeBin(树根),Node);treebin之中使用了读写锁;

    forwardingnode扩容的时候需要用到占位

    sizeCtl(-1正在初始化)(0是默认值)(-n表示有多少线程正在扩容)(正数相当于负载因子)

    tabAt,setTabAt,casTabAt

    构造函数什么也没做

    get方法:数组+红黑树+链表

    put方法:初始化table,只有一个线程能够初始化成功(sizectl开控制这个并发);CAS操作把sizectl置为-1的线程初始化table;sizectl设置为阈值;CAS操作向数组里放入值;正在扩容时,帮助扩容;synchronized(锁住树根)网链表或者红黑树里面插入值【尾插法】,树与链表的转换;扩容

    负载因子仍然是0.75

    sizeCtl

    Node

    get put

    jkd1.8实现hashmap:treenode(继承于linkedhashmap.entry)

    2、跳表

    增加链表的快速查找性(redis,lucence)

    3、ConcurrentLinkedQueue

    无界非租塞队列,线程安全

    4、CopyOnWriteArrayList

    CopyOnWriteArraySet

    最适应读多写少的并发场景(白名单,黑名单)

    内存占用严重,只能保证最终一致性

    批量提交

    5、阻塞队列(尽量使用有界队列)(等待通知模式)

    有界就是put会阻塞的队列

    无界就是put不会阻塞的队列

    生产者消费者模式

    ArrayBlockingQueue(有界阻塞队列,先进先出原则,必须设定初始大小,只用一个锁,直接插入元素)【生产者和消费者是同一把锁】

    LinkedBlockingQueue(有界阻塞队列,先进先出原则,可以不设定初始大小默认就是Integer.MAX_Value,用了两个锁,插入元素时需要转换)【生产者和消费者是两把锁】

    PriorityBlockingQueue(无界阻塞队列)(默认,按照自然顺序,要么实现compareTo()方法,指定构造参数Comparator)

    DelayQueue(无界阻塞队列)(实现自己的缓冲系统,订单到期,限时支付)(延时获取元素)消息中间件

    SynchronousQueue(不存储元素)(每一个put操作都要等待一个take操作)(Exchanger)

    LinkedTransferQueue(无界阻塞队列)相比LinkedBlockingQueue,多了两个方法transfer()必须要消费者消费了以后方法才会返回(先直接给消费者,消费者没有则放入队列阻塞);tryTransfer()无论消费者是否接收,方法都立即返回(先给消费者,消费者没有就直接返回)

    LinkedBlockingDeque(双向阻塞队列(fork/join工作密取))可以从头和尾插入和删除元素

    六、线程池

    如果开启prestartAllCoreThreads,那么提交任务就会把线程直接放到queue里面;当queue里面满了的时候,就直接执行线程数直到线程最大值;如果队列设置太大;那么最大线程数和拒绝策略就没什么意义了。;队列的大小最好大于核心线程数,但是不能过大。

    1、ThreadPoolExecutor

      keepalivetime不对coresthread起作用,请对其他线程起作用

      ThreadFactory(作用是创建线程名字)

      RejectedExecutionHandler(拒绝策略)

        AbortPolicy(直接抛出异常,默认策略)

        CallerRunsPolicy让提交的线程执行该线程

    prestartAllCoreThreads设置一开始就会起来corepool线程

    allowCoreThreadTimeOut是否允许核心线程数也超时退出

        DiscardOldestPolicy(丢弃队列里面最老的任务)

        DiscardPolicy直接抛弃

    线程池的AOP模式

    计算密集型:加密,大数分解,正则【线程数适当小一点,机器的cpu核心数+1,操作系统调用线程就会造成页缺失,+1为了防止也确实】

    IO密集型:读取文件,数据库连接,网络通讯,rpc【线程数适当大一点:cpu的核心数×2】(NCPU*UCPU*(1+W/C))

    混合型:尽量拆分成为IO密集型和计算密集型(如果使用的时间相差过大,就不需要拆分了)

    FixedThreadPool(创建固定数量的线程池,适用于负载较重的服务器,使用了无界队列)

    CachedThreadPool会根据需要创建新线程(执行很多短期异步任务的程序)使用了SynchronousQueue阻塞队列

    SingleThreadPool创建单个线程,需要顺序保证执行任务,没有线程安全问题,使用了无界队列(保证任务串行执行)

    WorkStealingPool工作密取(fork/join)

    2、ScheduledThreadPoolExecutor

      (1)newSingleThreadScheduledExecutor只包含一个线程,只需要单个线程执行周期任务,保证顺序的执行各个任务

      (2)newScheduledThreadPool可以包含多个线程的,线程执行周期任务,适度控制后台线程数量的时候

      方法说明:

        schedule只执行一次,可以延时执行(抛出异常以后,任务直接中止执行;需要用try---catch)

        scheduleAtFixedRate任务超时怎么办?下一个任务马上开始执行(抛出异常以后,任务直接中止执行;需要用try---catch)

        scheduleWithFixedDelay(抛出异常以后,任务直接中止执行;需要用try---catch)

    3、CompletionService

    4、CompleteableFuture

    七、线程安全性

    栈封闭

    无状态(无状态的类即没有任何成员变量的类)【servlet是线程不安全的】

    让类不可变(final,String,包装类)只要不变,所有的成员变量都加上final关键字(Akka)

    volatile保证类的可见性(最适合一个线程写,多个线程读的情景)(1)可见性(2)禁止重排序

    加锁和CAS

    安全的发布

    ThreadLocal

    Collections.synchronizedList()和使用synchronized包装普通类(继承和委托)

    AtomicReference,CopyOnWriteArrayList

    死锁是由于加锁造成的(争夺资源顺序,操作者>资源数)(数据库事务也可能产生死锁)jps -v    ;jstack 7412

    解决办法:强制拿锁顺序必须一致,动态死锁原生hashcode解决(拿第三次锁);tryLock尝试加两个锁(但为了避免活锁,也要等待随机时间);

    活锁

    线程饥饿:读写锁的机制

    性能:线程上下文切换,锁同步,页缺失

    vmstat

    内存屏障指令,对性能有影响

    阻塞:将导致线程的挂起

    解决办法:

    同时锁两个锁

    减少锁的范围

    减少锁的力度(通过增加锁的数量)

    锁分段

    替换独占锁(读写锁的应用,CAS替换synchronized,并发容器)

    避免多余的锁(就是把两把锁合成一把锁,锁的粗化)

        

  • 相关阅读:
    css 网站
    python操作redis
    排名函数row_number(),rank(),
    行转列面试题
    hive建表failed: ParseException line 6:0 cannot recognize
    windows10系统安装
    between ...and...
    系统变量path误删之后的恢复方法
    python的第三方库
    1054, "Unknown column 'serviceDate' in 'where clause'
  • 原文地址:https://www.cnblogs.com/erdanyang/p/12333302.html
Copyright © 2011-2022 走看看