zoukankan      html  css  js  c++  java
  • 实现一个独占锁和3元共享锁及其思路

    public class SelfLock implements Lock{
        
        //state 表示获取到锁 state=1 获取到了锁,state=0,表示这个锁当前没有线程拿到
        private static class Sync extends AbstractQueuedSynchronizer{
            
            //是否占用
            protected boolean isHeldExclusively() {
                return getState()==1;
            }
            
            protected boolean tryAcquire(int arg) {
                if(compareAndSetState(0,1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
            
            protected boolean tryRelease(int arg) {
                if(getState()==0) {
                    throw new UnsupportedOperationException();
                }
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
            
            Condition newCondition() {
                return new ConditionObject();
            }
        }
        
        private final Sync sycn = new Sync();
    
        @Override
        public void lock() {
            sycn.acquire(1);
            
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
            sycn.acquireInterruptibly(1);
            
        }
    
        @Override
        public boolean tryLock() {
            return sycn.tryAcquire(1);
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sycn.tryAcquireNanos(1, unit.toNanos(time));
        }
    
        @Override
        public void unlock() {
            sycn.release(1);
            
        }
    
        @Override
        public Condition newCondition() {
            return sycn.newCondition();
        }
    
    
    }
    public class Test {
        private static SelfLock sl = new SelfLock();
    
        private static int a = 0;
        private static Condition con = sl.newCondition();
    
        public static void increment() {
            sl.lock();
            a++;
            sl.unlock();
        }
    
        private static CyclicBarrier cb = new CyclicBarrier(31);// 设置一个同步屏障,等30个线程+主线程都执行完后再打印最终a的值
    
        public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
    
            for (int i = 0; i < 30; i++) {
                new Thread(new Runnable() {
    
                    @Override
                    public void run() {
                        for (int j = 0; j < 100000; j++) {
                            increment();
                        }
                        try {
                            cb.await();
                            //System.out.println(Thread.currentThread().getId() + " wo hao le");
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        } catch (BrokenBarrierException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }).start();
    
            }
            cb.await();
            System.out.println(a);
        }
    }

    public class TrinityLock   {
    
        //为3表示允许两个线程同时获得锁
        private final Sync sync = new Sync(3);
    
        private static final class Sync extends AbstractQueuedSynchronizer {
            //private static final long serialVersionUID = -7889272986162341211L;
    
            Sync(int count) {
                if (count <= 0) {
                    throw new IllegalArgumentException("count must large than zero.");
                }
                setState(count);//设置3个许可证,最多允许3个共享线程
            }
    
            public int tryAcquireShared(int reduceCount) {
                for (;;) {
                    int current = getState();
                    int newCount = current - reduceCount;
                    if (newCount < 0 || compareAndSetState(current, newCount)) {
                        return newCount;
                    }
                }
            }
    
            public boolean tryReleaseShared(int returnCount) {
                for (;;) {
                    int current = getState();
                    int newCount = current + returnCount;
                    if (compareAndSetState(current, newCount)) {
                        return true;
                    }
                }
            }
    
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
        }
    
        public void lock() {
            sync.acquireShared(1);
        }
    
        public void unlock() {
            sync.releaseShared(1);
        }
    
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        public boolean tryLock() {
            return sync.tryAcquireShared(1) >= 0;
        }
    
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
        }
    
        public Condition newCondition() {
            return sync.newCondition();
        }
    }
    三元共享锁

    实现思路:一个锁的基本行为范式都定义在Lock接口中,我们只要实现Lock接口即可。lock接口中主要的lock和unlock方法的实现AQS已经帮我们实现大部分了,我们只要实现对state共享资源的细化操作就可以实现不同的锁也就是tryAcquire,tryRelease,tryAcquireShared,tryReleaseShared方法。

    那么为啥我们只要实现上述4个方法就能实现不同的锁?

        比如:
        public void lock() {
            sycn.acquire(1);
            
        }

    它的实现依赖于AQS中acquire方法 如下面代码

     public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

     final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    acquireQueued

    这里的tryAcquire就是我们重写的方法,我们重写的方法假如state为0,就将state置为1,并将当前线程设为独占线程,

    假如state为1,返回false,然后addWaiter将当前线程置入同步队列的尾部,acquireQueued是自旋中,假如head节点指向了当前线程Node,就tryAcquire,返回interrupted值,然后根据条件值执行selfInterrupt阻塞当前线程。

    总结:AQS为我们定义好了顶层的处理实现逻辑,我们在使用AQS构建符合我们需求的同步组件时,只需重写tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared几个方法,来决定同步状态的释放和获取即可,至于背后复杂的线程排队,线程阻塞/唤醒,如何保证线程安全,都由AQS为我们完成了,这也是非常典型的模板方法的应用。AQS定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现。

  • 相关阅读:
    shapely and geos break在distance方法
    linux运维
    【未完待补充】linux 设置So动态库链接路径
    用python建立最简单的web服务器
    nginx + keepalived 双机热备
    nagios监控linux主机监控内存脚本
    linux普通用户权限设置为超级用户权限方法、sudo不用登陆密码
    zato server启动后自动关闭问题解决
    Linux下几种文件传输命令 sz rz sftp scp
    python风味之大杂烩
  • 原文地址:https://www.cnblogs.com/ljjnbpy/p/10030018.html
Copyright © 2011-2022 走看看