zoukankan      html  css  js  c++  java
  • Java笔记(十五) 并发包

    并发包

    Java中还有一套并发工具包,位于包java.util.concurrent下,里面包括很多易用

    且很多高性能的并发开发工具。

    一、原子变量和CAS

    为什么需要原子变量,因为对于例如count++这种操作,使用

    synchronized成本太高了。Java并发包的基本原子变量有:

    AtomicBoolean、AtomicInteger、AtomitLong和AtomicReference(原子引用类型)

    一)AtomicInteger

    1.用法 

    构造方法:

    public AtomicInteger(int initialValue)
    public AtomicInteger() //初始值为0

    get/set方法:

    public final int get()
    public final void set(int newValue)

    之所以称为原子变量,是因为它们包含一些以原子方式实现组合操作的方法。部分方法如下:

    //以原子方式获取旧值并设置新值
    public final int getAndSet(int newValue)
    //以原子方式获取旧值并给当前值加一
    public final int getAndIncrement()
    //以原子方式获取旧值并给当前值减一
    public final int getAndDecrement()
    //以原子方式获取旧值并给当前值加delta
    public final int getAndAdd(int delta)
    //以原子方式给当前值加一并获取新值
    public final int incrementAndGet()
    //以原子方式给当前值减一并获取新值
    public final int decrementAndGet()
    //以原子方式给当前值加delta并获取新值
    public final int addAndGet(int delta)

    以上方法都依赖于方法:

    /**
    * @param expect 如果当前值等于expect,则更新当前值为update
    * @param update 更新后的值
    * @return 更新成功返回true,否则返回false
    * */
    public final boolean compareAndSet(int expect, int update)

    compareAndSet是一个非常重要的方法,比较并设置,我们以后简称为CAS。

    应用示例:

    public class AtomicIntegerDemo {
        private static AtomicInteger counter = new AtomicInteger(0);
        static class Vistor extends Thread {
            @Override
            public void run() {
                for (int i = 0; i < 1000; i++) {
                    counter.incrementAndGet();
                }
            }
        }
        public static void main(String[] args) throws InterruptedException {
             int num = 1000;
             Thread[] threads = new Thread[num];
             for (int i = 0; i < num; i++) {
                 threads[i] = new Vistor();
                 threads[i].start();
                 threads[i].join();
             }
             System.out.println(counter.get()); //1000000
        }
    }

    2.基本原理和思维 

    主要内部成员:

    private volatile int value; //volatile可保证内存可见性
    public final int incrementAndGet() {
        for(;;) {
            //获取当前值
            int current = get();
            //计算更新值
            int next = current + 1;
            //如果更新没有成功,说明value被别的线程改了,则再去取最新值
            //并尝试更新直到成功为止
            if(compareAndSet(current, next))
                return next;
        }
    }

    原子变量的更新逻辑是非阻塞式,更新冲突的时候,它就重试,

    不会阻塞,不会有上下文切换开销。对于大部分比较简单的操作,

    这种方式性能都远高于使用阻塞的方式。

    接下来是compareAndSet实现源代码:

    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    其中usafe是sun.misc.Unsafe类型,定义为:

    private static final Unsafe unsafe = Unsafe.getUnsafe();

    一般应用程序不应该直接使用。原理上,一般的计算机系统都在硬件层次上直接支持CAS指令,

    Java的实现都会利用这些特殊的指令。

    3.实现锁 

    基于CAS,除了可以实现乐观非阻塞外,还可以实现悲观阻塞式算法,比如锁。

    实际上,Java并发包的所有阻塞式工具、容器、算法都是基于CAS的(不过,也需要一些别的支持)。

    我们自己来实现一个简单的锁:

    /**
     * 一般而言这种阻塞方式过于消耗CPU,实际开发中应该使用并发包中的类
     * 这里只是一个演示
     */
    public class MyLock {
        //status表示锁的状态,0表示未锁定,1表示锁定
        private AtomicInteger status = new AtomicInteger(0);
        public void lock() {
            //更新成功后才退出
            while (!status.compareAndSet(0, 1)){
                Thread.yield();
            }
        }
        public void unlock() {
            status.compareAndSet(1, 0);
        }
    }

    4.ABA问题 

    假设当前值为A,如果另一个线程先将A修改成B,再修改A,

    当前线程的CAS操作无法分辨当前值发生变化。解决办法

    是使用AtomicStampedReference,在修改值的同时附件一个时间戳,

    只有值和时间戳都相同才能进行修改,其CAS声明方法为:

    public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp)
    Pair pair = new Pair(100, 200);
    int stamp = 1;
    AtomicStampedReference<Pair> pairRef = new AtomicStampedReference<Pair>(pair, stamp);
    int newStamp = 2;
    pairRef.compareAndSet(pair, new Pair(200, 200), stamp, newStamp);

     二、显式锁

    Java并发包中的显式锁接口和类位于包java.util.concurrent.locks下,主要接口和类如下:

    1)锁接口Lock,主要实现类是ReentrantLock;

    2)读写锁和接口:ReadWriteLock,主要实现类是ReentrantReadWriteLock;

    一)接口Lock

    public interface Lock {
        void lock();
        void lockInterruptibly() throws InterruptedException;
        boolean tryLock();
        boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
        void unlock();
        Condition newCondition();
    }

    1)lock()/unlock():获取锁和释放锁,lock会阻塞直到成功。

    2)lockInterruptibly():与lock()不同的是,它可以响应中断,如果被其他线程中断了,则抛出InterruptedException.

    3)tryLock():只是尝试获取锁,立即返回,不阻塞,如果成功,返回true,否则返回false。

    4)tryLock(long time, TimeUnit unit):在限定时间内等待,如果成功返回true,否则阻塞,在等待同时响应中断,超出时间没有获得锁,抛出false。

    5)newCondition():

    二)可重入锁ReentrantLock

    1.基本用法 

    该类方法lock()和unlock()实现了synchronized一样的语义,但:

    1)可重入,一个线程在持有锁的情况下,可以继续获得该锁;

    2)可以解决竞态条件问题;

    3)可以保证内存可见性;

    ReentrantLock有两个构造方法:

    //默认为不公平锁
    public ReentrantLock()
    //设定true为公平锁
    public ReentrantLock(boolean fair)

    所谓公平是指,等待时间最长的线程优先获得锁。保证公平会英雄性能,一般也不需要,所以默认不保证。

    public class Counter {
        private final Lock lock = new ReentrantLock();
        private volatile int count;
        public void incr() {
            lock.lock();
            try {
                count ++;
            //注意一定不要忘记释放锁
            } finally {
                lock.unlock();
            }
        }
        public int getCount() {
            return count;
        }
    }

    2.使用tryLock()避免死锁 

    在持有一个锁同时获取另一个锁而获取不到的时候,可以释放

    已持有的锁,给其他线程获取锁的机会,然后重试获取所有锁。

    账户之间转账的例子:

    public class Account {
        private Lock lock = new ReentrantLock();
        private volatile double money;
        public Account(double money) {
            this.money = money;
        }
        public void add(double money) {
            lock.lock();
            try {
                this.money += money;
            } finally {
                lock.unlock();
            }
        }
        public void reduce(double money) {
            lock.lock();
            try {
                this.money -= money;
            } finally {
                lock.unlock();
            }
        }
        public double getMoney() {return this.money;}
        void lock() {
            lock.lock();
        }
    
        void unlock() {
            lock.unlock();
        }
    
        boolean tryLock() {
            return lock.tryLock();
        }
    }
    public class AccountManager {
        public static class NoEnoughMoneyException extends Exception {}
        //该方法问题:如果两个账户同时给对方转账,都先获得了第一个锁,则会发生死锁
        public static void transfer(Account from, Account to, double money) throws NoEnoughMoneyException {
            from.lock();
            try {
                to.lock();
                try {
                    if (from.getMoney() >= money) {
                        from.reduce(money);
                        to.add(money);
                    } else {
                        System.out.println("The from account money is " + from.getMoney());
                        System.out.println("The money is " + money);
                        throw new NoEnoughMoneyException();
                    }
                } finally {
                    to.unlock();
                }
            } finally {
                from.unlock();
            }
        }
    
        //避免死锁的方法
        public static boolean tryTransfer(Account from, Account to, double money) throws NoEnoughMoneyException {
            if (from.tryLock()) {
                try {
                    if (to.tryLock()) {
                        try {
                            if (from.getMoney() >= money) {
                                from.reduce(money);
                                to.add(money);
                            } else {
                                throw new NoEnoughMoneyException();
                            }
                            return true;
                        } finally {
                            to.unlock();
                        }
                    }
                } finally {
                    from.unlock();
                }
            }
            return false;
        }
    
        //模拟账户转账的死锁过程
        public static void simulateDeadLock() throws InterruptedException {
            final int accountNum = 10;
            final Account[] accounts = new Account[accountNum];
            final Random random = new Random();
            for (int i = 0; i < accountNum; i++) {
                accounts[i] = new Account(random.nextInt(1000000));
                System.out.println("account[" + i + "]" + accounts[i].getMoney());
            }
    
    
            int threadNum = 100;
            Thread[] threads = new Thread[threadNum];
            for (int i = 0; i < threadNum; i++) {
                threads[i] = new Thread() {
                    @Override
                    public void run() {
                        int loopNum = 100;
                        for (int k = 0; k < loopNum; k++) {
                            int i = random.nextInt(accountNum);
                            int j = random.nextInt(accountNum);
                            int money = random.nextInt(10);
                            if (i != j) {
                                try {
                                    System.out.println( "Thread account[" + i + "]"+ accounts[i].getMoney());
                                    //会发生死锁
                                    transfer(accounts[i], accounts[j], money);
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    }
                };
                threads[i].start();
            }
        }
    
        public static void main(String[] args) {
            try {
                simulateDeadLock();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    3.ReentrantLock的实现原理 

    在最底层,该类依赖于CAS方法,另外,它依赖于LockSupport的一些方法。

    1)LockSupport

    LockSupport类的基本方法:

    //使当前线程放弃CPU,进入等待(WAITING)状态
    public static void park()
    //使指定线程恢复可运行状态
    public static void unpark(Thread thread)
    Thread t = new Thread(){
        @Override
        public void run() {
            LockSupport.park();
            System.out.println("exit");
        }
    };
    t.start();
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    LockSupport.unpark(t);

    park不同于Thread.yield(),yield只是告诉操作系统可以先让其他线程运行,

    但自己仍然是可运行状态。另外,park是响应中断的,当有中断发生时,park

    会返回,线程的中断状态会被设置。另外,还需要说明的,park可能会无缘无故

    地返回,程序应该重新检查park等待条件是否满足。park的两个变体:

    //可指定最长等待时间,参数是相对于当前时间的纳秒数
    public static void parkNanos(long nanos)
    //指定最长等待到什么时候,参数是绝对时间
    public static void parkUntil(long deadline)

    当等待超时,它们也会返回。

    也可以指定一个对象,表示是由于该对象而进行等待,以便调试:

    public static void park(Object blocker)

    返回一个线程的blocker对象:

    public static Object getBlocker(Thread t)

    2)AQS

    Java提供了一个抽象类AbstractQueuedSynchronizer简称AQS,用于各种锁的代码复用。

    AQS封装了一个状态,给子类提供查询和设置状态的方法:

    private volatile int state;
    protected final int getState()
    protected final void setState(int newState)
    protected final boolean compareAndSetState(int expect, int update)

    用于实现锁时,AQS可以保存锁的当前持有线程,提供了方法进行查询和设置:

    private transient Thread exclusiveOwnerThread();
    protected final void setExclusiveOwnerThread(Thread t);
    protected final Thread getExclusiveOwnerThread();

    下面,我们以ReentrantLock的使用为例简要介绍AQS的原理:

    ReentrantLock内部使用AQS,有三个内部类:

    abstract static class Sync extends AbstractQueuedSynchronizer
    //fair为false使用该类
    static final class NonfairSync extends Sync
    //fair为true时使用该类
    static final class FairSync extends Sync

    ReentrantLock内部有一个Sync成员:

    private final Sync sync;

    在构造方法中被赋值:

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    lock/unlock方法的实现:

    public void lock() {
        sync.lock();
    }

    sync默认类型是NonfairSync,NonfairSync的lock代码为:

    final void lock() {
        if(compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    ReentrantLock使用state表示是否被锁和持有数量,如果当前未被锁定,则立即获得锁,

    否则调用acquire(1)获得锁。acquire()是AQS中的方法,代码为:

    public final void acquire(int arg) {
        if(!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
    }

     它调用tryAcquire()获取锁,tryAcquire必须被子类重写。NofairSync的实现为:

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        //如果未被锁定,则进行锁定
        if(c == 0) {
            if(compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //如果已经锁定,增加锁定次数,这里state存放了锁定次数
        else if(current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if(nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    如果tryAcquire()返回false,则AQS会调用:

    //addWaiter()会新建一个节点Node,代表当前线程,然后加入内部等待队列
    //放入等待队列后,调用acquireQueued尝试获得锁
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for(;;) {
                //首先检查当前节点是不是第一个等待节点
                //如果是且能得到锁则将当前节点从等待队列中移除并返回
                final Node p = node.predecessor();
                if(p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //否则最终调用LockSupport.park放弃CPU,进入等待,
                //被唤醒后检查是否中断,记录中断标志
                if(shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if(failed)
                cancelAcquire(node);
        }
    }
    private static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

    总结lock方法的基本过程:能获得锁就立即获得,否则加入等待队列,被唤醒后检查

    自己是否是第一个等待的线程,如果是且能获得锁,则返回,否则继续等待。这个过程

    如果发生了中断,lock会记录中断标志位,但不会提前返回或抛出异常。

    ReentrantLock的unlock代码为:

    public void unlock() {
        sync.release(1);
    }
    public final boolean release(int arg) {
        //tryRelease会修改状态释放锁
        if(tryRelease(arg)) {
            Node h = head;
            if(h != null && h.waitStatus != 0)
                //会调用LockSupport.unpark将第一个等待的线程唤醒
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    4.对比ReentrantLock和synchronized 

    相比synchronized,ReentrantLock,支持以非阻塞方式获取锁,

    可以响应中断,可以限时,更为灵活。不过synchronized使用更简单

    性能通常更好,总之能用synchronized就不要用ReentrantLock。

    三、显式条件

    1.用法

    锁用于解决竞态条件问题,条件是线程间的协作机制。显式锁与synchronized相对应,

    而显式条件与wait/notify相对应。显式锁与显式条件配合使用。Lock接口中定义的创建

    显示条件的方法:

    Condition newCondition();

    Condition接口:

    public interface Condition {
        //对应于Object的wait
        void await() throws InterruptedException;
        //不响应中断的等待,该方法不会由于中断结束,但当它返回时
        //如果等待过程中发生中断,中断标志会被设置
        void awaitUninterruptibly();
        //等待相对时间,单位纳秒
        long awaitNanos(long nanosTimeout) throws InterruptedException;
        //等待相对时间
        boolean await(long time, TimeUnit unit) throws InterruptedException;
        //等待绝对时间
        boolean awaitUntil(Date deadline) throws InterruptedException;
        //对应于notify
        void signal();
        void signalAll();
    }

    与Object的wait方法一样,调用await方法前先要索取锁,如果没有锁会抛出异常。

    当await在进入等待队列后,会释放锁,释放CPU,当其他线程唤醒它后,或等待

    超时后,或发生中断异常后,它都需要重新获取锁,获取锁后,才会从await方法中退出。

     另外,与Object的wait方法一样,await返回后就不一定代表等待的条件就满足了,

    通常要将await的调用放到一个循环内,只有条件满足后才退出。示例:

    public class WaitThread  extends Thread{
        private volatile boolean fire = false;
        private Lock lock = new ReentrantLock();
        private Condition condition = lock.newCondition();
        @Override
        public void run() {
            try {
                lock.lock();
                try {
                    while (!fire){
                        condition.await();
                    }
                } finally {
                    lock.unlock();
                }
                System.out.println("fired");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        public void fire() {
            lock.lock();
            try {
                this.fire = true;
                condition.signal();
            } finally {
                lock.unlock();
            }
        }
        public static void main(String[] args) throws InterruptedException {
            WaitThread waitThread = new WaitThread();
            waitThread.start();
            Thread.sleep(1000);
            System.out.println("fire");
            waitThread.fire();
        }
    }

    2.生产者/消费者模式

    在使用wait/notify实现生产者/消费者的时候有一个局限就是只能有一个条件队列,

    而使用显式锁和条件不存在这个问题,例如:

    public class MyBlockingQueue<E> {
        private Queue<E> queue = null;
        private int limit;
        private Lock lock = new ReentrantLock();
        private Condition notFull = lock.newCondition();
        private Condition notEmpty = lock.newCondition();
    
        public MyBlockingQueue(int limit) {
            this.limit = limit;
            queue = new ArrayDeque<E>(limit);
        }
    
        public void put(E e) throws InterruptedException {
            lock.lockInterruptibly();
            try {
                while (queue.size() == limit) {
                    notFull.await();
                }
                queue.add(e);
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }
    
        public E take() throws InterruptedException {
            lock.lockInterruptibly();
            try {
                while (queue.isEmpty()) {
                    notEmpty.await();
                }
                E e = queue.poll();
                notFull.signal();
                return e;
            } finally {
                lock.unlock();
            }
        }
    }

    3.实现原理

    ReentrantLock的newCondition():

    public Condition newCondition() {
        return sync.newCondition();
    }

    sync的newCondition方法:

    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    ConditionObject是AQS的一个内部类,其内部有一个队列,

    表示条件等待队列,其成员声明:

    //条件队列的头节点
    private transient Node firstWaiter;
    //条件队列的尾节点
    private transient Node lastWaiter;
    public final void await() throws InterruptedException {
        //如果等待前中断标志位已经被设置,直接抛异常
        if (Thread.interrupted())
            throw new InterruptedException();
        //1.为当前进程创建节点,加入条件等待队列
        Node node = addConditionWaiter();
        //2.释放持有的锁
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        //3.放弃CPU进行等待,直到被中断或isOnSyncQueue变为true
        //isOnSyncQueue,表示该节点被其他线程从条件等待队列
        //移到了外部锁等待队列,等待的条件已经满足
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //重新获取锁
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
  • 相关阅读:
    007 连接
    006 group by having
    005 运算null 排序 模糊查询 聚合函数
    004 列、distinct、between、in、null
    003 约束和关系
    002 表的概念操作
    5-04用Sql语句创建表
    5-03使用视图创建表
    5-01表达基本概念
    4-04数据库的备份与还原
  • 原文地址:https://www.cnblogs.com/Shadowplay/p/10044255.html
Copyright © 2011-2022 走看看