zoukankan      html  css  js  c++  java
  • JAVA并发工具类

    一、分而治之

    fork/join     二叉树  二分查找  快速排序  归并排序  mapreduce  动态规划

    1、fork/join(工作密取)

      RecursiveTask要有返回值

      RecursiveAction没有返回值

      invoke(同步)

      submit(有返回结果异步)

      execute(没有返回结果异步)

    2、countDownLatch(闭锁)只能使用一次

    作用:直指一个线程等待其他的线程执行完后再执行,(就是一个加强版的join())

    用一个线程可以有多个扣除点,扣除点可以在程序的中间,await可以有多个。

    3、CyclicBarrier可以使用一次

    作用:一组工作线程之间需要进行协作

    4、Semaphore(控制同时访问某个特定资源的线程数量,例如用于流量控制)

    Semaphore的大坑就是:release方法可以无限增加;所以一般会定义两个Semaphore变量,结合来使用。

    Semaphore只是控制流量,获取许可证;需要和锁配合使用才能完成总的并发控制

    5、Exchanger用于两个线程间的数据交换

    6、Future

      isDone()不管正常或者异常结束又或者自己取消,都会返回true

      isCancelled如果任务在完成前被取消,返回true;其他情况返回false

      cancel()(1)如果任务已经完成,或者已经取消或者由于某些原因不能取消,则返回false;

           (2)如果任务还没有执行,则会返回true,并且异步任务不会执行

         (3)如果任务已经开始,但是还没有执行完成,则返回true(mayInterrupIfRunning为true时,会中断线程)

    FutureTask  treiber  AQS实现

    二、原子类(CAS原理)乐观锁的思想

    悲观锁容易发生死锁

    原子操作实现方式:1、synchronized;2、原子类

    1、cas的原理:

    cpu硬件指令集的支持

    包括三个运算参数:内存地址V,期望值A,新值B

    不能更新成功的话,会一直自旋

    2、cas的问题

    ABA问题

    开销问题

    只能保证一个共享变量的原子操作

    3、AtomicBoolean,AtomicInteger,AtomicLong

    4、AtomicReference(多个变量同时进行更新,把多个变量包装成引用类),AtomicStampedReference()(有具体的版本号),AtomicMarkableReference(boolean标志版本)(只关心有没有变化)

    5、AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray[原子数组类实质上又另外创建了一个和原数组完全独立,但一模一样的数组]

    6、源自更新字段类AtomicIntegerFieldUpdate,AtomicLongFieldUpdate,AtomicReferenceFieldUpdate

    原子操作底层用的是CAS操作

    三、显示锁

    1、ReentrantLock(优先使用synchronized)

    排它锁

    线程退出时(执行完时),它所拥有的资源都会被释放

    Synchronized进入调锁的过程中后:Synchronized不提供中断和超时机制

    Synchronized消耗更少一点,Synchronized是语言特性,而显示锁是一种类

    Synchronized方法块内抛异常,代码脱离方法快,锁会自动释放

    Lock获取锁可以被中断,超时获取锁,尝试获取锁

    非公平锁充分的利用了线程被唤醒的这段时间

    公平锁(排队)与非公平锁(可以插队,效率更高)

    synchronized尽量使用notifyAll;ReentrantLock尽量使用signal

    一个Lock对应一个一个Condition

    一个Lock对应多个Condition

    lockInterruptibly

    tryLock

    tryLock(timeout)

    2、读写锁ReadWriteLock

    读是共享的,写是排它锁,最适合于读多写少的场景

    四、LockSupport(基础工具)

    五、AQS(抽象队列同步器)

            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }

    这个方法是ReentrantLock.NonfairSync.lock()方法

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    这个方法就是尝试获取独占所同步状态,这是AQS的方法

            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }

     这个方法是由ReentrantLock.NonfairSync.tryAcquire实现

            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }

    这个方法是由ReentrantLock.Sync.nonfairTryAcquire实现(尝试获取独占锁,或者重入独占锁)

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }

    这个方法是AQS的方法,这个方法是(1)先设置尾节点compareAndSetTail(2)如果前面设置没成功,就进入死循环enq方法,这个方法里会判断为节点是否为空,如果为空,就先设置head指向New Node()空节点,然后再循环;如果不为空,就设置compareAndSetTail,如果设置成功就可以退出了,否则继续循环。

        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

    这个就是AQS的enq方法

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

     这个方法就是加入到AQS队列以后发生的等待操作

    Springtemplate就是模板设计模式

    state有三个方法(getState,setState,compareAndSetState)

    state为0时,可以直接获取锁,不需要加入AQS队列

    state不为0时,需要尝试获取锁

    模板方法设计模式

    重入锁:

    读写锁:

    分布式锁:抢到本地锁的线程再去抢分布式锁;redis性能好;zookeeper稳定性好。

    读写锁的实现

    读锁的实现

        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }

    AQS的模板方法:

           protected final int tryAcquireShared(int unused) {
                Thread current = Thread.currentThread();
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                int r = sharedCount(c);
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);
            }

    尝试获取读锁:

      1、如果当前线程是写线程,直接返回-1(加入等待队列)

      2、如果当前头结点的后继节点是读线程直接返回1获取到所

      3、如果当前头结点的后继节点是写线程,直接执行fullTryAcquireShared方法(这个策略完全是为了照顾写线程,或者说是防止写线程过分的饥渴)

            final boolean readerShouldBlock() {
                return apparentlyFirstQueuedIsExclusive();
            }

    as

        final boolean apparentlyFirstQueuedIsExclusive() {
            Node h, s;
            return (h = head) != null &&
                (s = h.next)  != null &&
                !s.isShared()         &&
                s.thread != null;
        }

    as

            final int fullTryAcquireShared(Thread current) {
                /*
                 * This code is in part redundant with that in
                 * tryAcquireShared but is simpler overall by not
                 * complicating tryAcquireShared with interactions between
                 * retries and lazily reading hold counts.
                 */
                HoldCounter rh = null;
                for (;;) {
                    int c = getState();
                    if (exclusiveCount(c) != 0) {
                        if (getExclusiveOwnerThread() != current)
                            return -1;
                        // else we hold the exclusive lock; blocking here
                        // would cause deadlock.
                    } else if (readerShouldBlock()) {
                        // Make sure we're not acquiring read lock reentrantly
                        if (firstReader == current) {
                            // assert firstReaderHoldCount > 0;
                        } else {
                            if (rh == null) {
                                rh = cachedHoldCounter;
                                if (rh == null || rh.tid != getThreadId(current)) {
                                    rh = readHolds.get();
                                    if (rh.count == 0)
                                        readHolds.remove();
                                }
                            }
                            if (rh.count == 0)
                                return -1;
                        }
                    }
                    if (sharedCount(c) == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    if (compareAndSetState(c, c + SHARED_UNIT)) {
                        if (sharedCount(c) == 0) {
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            if (rh == null)
                                rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current))
                                rh = readHolds.get();
                            else if (rh.count == 0)
                                readHolds.set(rh);
                            rh.count++;
                            cachedHoldCounter = rh; // cache for release
                        }
                        return 1;
                    }
                }
            }

    循环尝试获取读锁:

      1、如果当前线程是写线程,直接返回-1(加入等待队列)

      2、如果当前头结点的后继节点是写线程且不是锁的重入状态,那么就直接退出-1(加入等待队列)

      3、如果写线程超量,直接抛出异常

      4、如果修改状态成功,直接返回1,获取读锁成功

        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

     写锁的实现

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    as

            protected final boolean tryAcquire(int acquires) {
                Thread current = Thread.currentThread();
                int c = getState();
                int w = exclusiveCount(c);
                if (c != 0) {
                    // (Note: if c != 0 and w == 0 then shared count != 0)
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // Reentrant acquire
                    setState(c + acquires);
                    return true;
                }
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }

    1、如果当前锁是共享锁,那么直接返回false

    2、如果可重入写线程数量太多超过最大值,抛出异常

    3、可重入线程正常直接返回true

    4、如果当前没有锁,且自己抢到了锁,直接返回true

    5、如果当前没有锁,且没有抢到锁,直接返回false

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }

    as

        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

    as

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    五、并发容器

    位运算的关键【a%2^n==a&(2^n-1)】

    hashmap会造成死循环:扩容时头插法惹的祸

    1、ConcurrentHashMap

    get   put   putIfAbsent

    putIfAbsent返回原来那个key所对应的值

     取模操作相当于位与操作

    位运算用途:权限控制和商品属性

    jdk1.7实现concurrenthashmap:segment数组(16缺省并发度这个数必须是2^n,这个数组也不允许扩容,使用hash值的高位部分进行hash)(ReentrantLock)创建segment元素时使用CAS保证线程安全  每一个segment下面包含一个hashentry数组table[]这个数组大小也必须是2^n,通过整个hash值进行hash,负载因子是0.75;①定位segment②定位hashentry③循环链表;get方法不加锁,只用了volatile{final hash;final key;volatile value;volatile next}      put方法 拿锁tryLock() 和lock()结合使用,提前new Node(key,value)   扩容也最好是2的倍数,迁移的时候会变更迁移方便,扩容会扩与扩原来的2倍   

    这两个方法尽量少使用:(size方法首先算两遍然后可能会全部加锁   containsValue也需要全部加锁)

    存在的问题:弱一致性,获取的过程中整个结构可能发生调整,因为get方法是没有加锁的

    扩容:stride步长+forwarding

    再设置sizeCtl

    size()=basecount+countercells

    强一致性:Collections.synchronizedMap(m),HashTable

    jdk1.8实现concurrenthashmap:CAS+synchronized

    数组+链表+红黑树(6~8)

    元素是Node(final,final,volatile,volatile)不是Entry(TreeNode继承于node(树叶),TreeBin(树根),Node);treebin之中使用了读写锁;

    forwardingnode扩容的时候需要用到占位

    sizeCtl(-1正在初始化)(0是默认值)(-n表示有多少线程正在扩容)(正数相当于负载因子)

    tabAt,setTabAt,casTabAt

    构造函数什么也没做

    get方法:数组+红黑树+链表

    put方法:初始化table,只有一个线程能够初始化成功(sizectl开控制这个并发);CAS操作把sizectl置为-1的线程初始化table;sizectl设置为阈值;CAS操作向数组里放入值;正在扩容时,帮助扩容;synchronized(锁住树根)网链表或者红黑树里面插入值【尾插法】,树与链表的转换;扩容

    负载因子仍然是0.75

    sizeCtl

    Node

    get put

    jkd1.8实现hashmap:treenode(继承于linkedhashmap.entry)

    2、跳表

    增加链表的快速查找性(redis,lucence)

    3、ConcurrentLinkedQueue

    无界非租塞队列,线程安全

    4、CopyOnWriteArrayList

    CopyOnWriteArraySet

    最适应读多写少的并发场景(白名单,黑名单)

    内存占用严重,只能保证最终一致性

    批量提交

    5、阻塞队列(尽量使用有界队列)(等待通知模式)

    有界就是put会阻塞的队列

    无界就是put不会阻塞的队列

    生产者消费者模式

    ArrayBlockingQueue(有界阻塞队列,先进先出原则,必须设定初始大小,只用一个锁,直接插入元素)【生产者和消费者是同一把锁】

    LinkedBlockingQueue(有界阻塞队列,先进先出原则,可以不设定初始大小默认就是Integer.MAX_Value,用了两个锁,插入元素时需要转换)【生产者和消费者是两把锁】

    PriorityBlockingQueue(无界阻塞队列)(默认,按照自然顺序,要么实现compareTo()方法,指定构造参数Comparator)

    DelayQueue(无界阻塞队列)(实现自己的缓冲系统,订单到期,限时支付)(延时获取元素)消息中间件

    SynchronousQueue(不存储元素)(每一个put操作都要等待一个take操作)(Exchanger)

    LinkedTransferQueue(无界阻塞队列)相比LinkedBlockingQueue,多了两个方法transfer()必须要消费者消费了以后方法才会返回(先直接给消费者,消费者没有则放入队列阻塞);tryTransfer()无论消费者是否接收,方法都立即返回(先给消费者,消费者没有就直接返回)

    LinkedBlockingDeque(双向阻塞队列(fork/join工作密取))可以从头和尾插入和删除元素

    六、线程池

    如果开启prestartAllCoreThreads,那么提交任务就会把线程直接放到queue里面;当queue里面满了的时候,就直接执行线程数直到线程最大值;如果队列设置太大;那么最大线程数和拒绝策略就没什么意义了。;队列的大小最好大于核心线程数,但是不能过大。

    1、ThreadPoolExecutor

      keepalivetime不对coresthread起作用,请对其他线程起作用

      ThreadFactory(作用是创建线程名字)

      RejectedExecutionHandler(拒绝策略)

        AbortPolicy(直接抛出异常,默认策略)

        CallerRunsPolicy让提交的线程执行该线程

    prestartAllCoreThreads设置一开始就会起来corepool线程

    allowCoreThreadTimeOut是否允许核心线程数也超时退出

        DiscardOldestPolicy(丢弃队列里面最老的任务)

        DiscardPolicy直接抛弃

    线程池的AOP模式

    计算密集型:加密,大数分解,正则【线程数适当小一点,机器的cpu核心数+1,操作系统调用线程就会造成页缺失,+1为了防止也确实】

    IO密集型:读取文件,数据库连接,网络通讯,rpc【线程数适当大一点:cpu的核心数×2】(NCPU*UCPU*(1+W/C))

    混合型:尽量拆分成为IO密集型和计算密集型(如果使用的时间相差过大,就不需要拆分了)

    FixedThreadPool(创建固定数量的线程池,适用于负载较重的服务器,使用了无界队列)

    CachedThreadPool会根据需要创建新线程(执行很多短期异步任务的程序)使用了SynchronousQueue阻塞队列

    SingleThreadPool创建单个线程,需要顺序保证执行任务,没有线程安全问题,使用了无界队列(保证任务串行执行)

    WorkStealingPool工作密取(fork/join)

    2、ScheduledThreadPoolExecutor

      (1)newSingleThreadScheduledExecutor只包含一个线程,只需要单个线程执行周期任务,保证顺序的执行各个任务

      (2)newScheduledThreadPool可以包含多个线程的,线程执行周期任务,适度控制后台线程数量的时候

      方法说明:

        schedule只执行一次,可以延时执行(抛出异常以后,任务直接中止执行;需要用try---catch)

        scheduleAtFixedRate任务超时怎么办?下一个任务马上开始执行(抛出异常以后,任务直接中止执行;需要用try---catch)

        scheduleWithFixedDelay(抛出异常以后,任务直接中止执行;需要用try---catch)

    3、CompletionService

    4、CompleteableFuture

    七、线程安全性

    栈封闭

    无状态(无状态的类即没有任何成员变量的类)【servlet是线程不安全的】

    让类不可变(final,String,包装类)只要不变,所有的成员变量都加上final关键字(Akka)

    volatile保证类的可见性(最适合一个线程写,多个线程读的情景)(1)可见性(2)禁止重排序

    加锁和CAS

    安全的发布

    ThreadLocal

    Collections.synchronizedList()和使用synchronized包装普通类(继承和委托)

    AtomicReference,CopyOnWriteArrayList

    死锁是由于加锁造成的(争夺资源顺序,操作者>资源数)(数据库事务也可能产生死锁)jps -v    ;jstack 7412

    解决办法:强制拿锁顺序必须一致,动态死锁原生hashcode解决(拿第三次锁);tryLock尝试加两个锁(但为了避免活锁,也要等待随机时间);

    活锁

    线程饥饿:读写锁的机制

    性能:线程上下文切换,锁同步,页缺失

    vmstat

    内存屏障指令,对性能有影响

    阻塞:将导致线程的挂起

    解决办法:

    同时锁两个锁

    减少锁的范围

    减少锁的力度(通过增加锁的数量)

    锁分段

    替换独占锁(读写锁的应用,CAS替换synchronized,并发容器)

    避免多余的锁(就是把两把锁合成一把锁,锁的粗化)

        

  • 相关阅读:
    Codeforces Beta Round #92 (Div. 2 Only) B. Permutations 模拟
    POJ 3281 Dining 最大流 Dinic算法
    POJ 2441 Arrange the BUlls 状压DP
    URAL 1152 Faise Mirrors 状压DP 简单题
    URAL 1039 Anniversary Party 树形DP 水题
    URAL 1018 Binary Apple Tree 树形DP 好题 经典
    pytorch中的forward前向传播机制
    .data()与.detach()的区别
    Argparse模块
    pytorch代码调试工具
  • 原文地址:https://www.cnblogs.com/erdanyang/p/12333302.html
Copyright © 2011-2022 走看看