zoukankan      html  css  js  c++  java
  • 阅读源码中一些常用方法的记录

    看到的LOCK实现,记录下备用

    public interface PutMessageLock {
        void lock();
        void unlock();
    }
    
    /**
     * Exclusive lock implementation to put message
     */
    public class PutMessageReentrantLock implements PutMessageLock {
        private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
    
        @Override
        public void lock() {
            putMessageNormalLock.lock();
        }
    
        @Override
        public void unlock() {
            putMessageNormalLock.unlock();
        }
    }
    
    /**
     * Spin lock Implementation to put message, suggest using this with low race conditions
     */
    public class PutMessageSpinLock implements PutMessageLock {
        //true: Can lock, false : in lock.
        private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
    
        @Override
        public void lock() {
            boolean flag;
            do {
                flag = this.putMessageSpinLock.compareAndSet(true, false);
            }
            while (!flag);
        }
    
        @Override
        public void unlock() {
            this.putMessageSpinLock.compareAndSet(false, true);
        }
    }

    另一种

    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        private void deleteExpiredFile(List<IndexFile> files) {
            if (!files.isEmpty()) {
                try {
                    this.readWriteLock.writeLock().lock();
                    for (IndexFile file : files) {
                        boolean destroyed = file.destroy(3000);
                        destroyed = destroyed && this.indexFileList.remove(file);
                        if (!destroyed) {
                            log.error("deleteExpiredFile remove failed.");
                            break;
                        }
                    }
                } catch (Exception e) {
                    log.error("deleteExpiredFile has exception.", e);
                } finally {
                    this.readWriteLock.writeLock().unlock();
                }
            }
        }
            Object[] files = null;
            try {
                this.readWriteLock.readLock().lock();
                if (this.indexFileList.isEmpty()) {
                    return;
                }
    
                long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
                if (endPhyOffset < offset) {
                    files = this.indexFileList.toArray();
                }
            } catch (Exception e) {
                log.error("destroy exception", e);
            } finally {
                this.readWriteLock.readLock().unlock();
            }

    还有这种普通的synchronized,基于类的同步

        private ZooKeeper getZk() {
            if (zk == null) {
                synchronized (ZooKeeperTransactionRepository.class) {
                    if (zk == null) {
                        try {
                            zk = new ZooKeeper(zkServers, zkTimeout, new Watcher() {
                                @Override
                                public void process(WatchedEvent watchedEvent) {
    
                                }
                            });
    
                            Stat stat = zk.exists(zkRootPath, false);
    
                            if (stat == null) {
                                zk.create(zkRootPath, zkRootPath.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                            }
                        } catch (Exception e) {
                            throw new TransactionIOException(e);
                        }
                    }
                }
            }
            return zk;
        }

    基于某个成员的同步

        @Override
        public void registerMessageQueueListener(String topic, MessageQueueListener listener) {
            synchronized (this.registerTopics) {
                this.registerTopics.add(topic);
                if (listener != null) {
                    this.messageQueueListener = listener;
                }
            }
        }

    还有特意定义一个private final Object compileLock = new Object(); 在synchronized的时候用来做对象锁的

            if (registerNew) {
                synchronized (this.compileLock) {
                    filterClassInfoPrev = this.filterClassTable.get(key);
                    if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) {
                        return true;
                    }
    
                    try {
    
                        FilterClassInfo filterClassInfoNew = new FilterClassInfo();
                        filterClassInfoNew.setClassName(className);
                        filterClassInfoNew.setClassCRC(0);
                        filterClassInfoNew.setMessageFilter(null);
    
                        if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
                            String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
                            Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource);
                            Object newInstance = newClass.newInstance();
                            filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);
                            filterClassInfoNew.setClassCRC(classCRC);
                        }
    
                        this.filterClassTable.put(key, filterClassInfoNew);
                    } catch (Throwable e) {
                    }
                }
            }

    还有这种用BIT位标识各种状态的方法,在网络编程里也比较多见

    public class RunningFlags {
        private static final int NOT_READABLE_BIT = 1;
        private static final int NOT_WRITEABLE_BIT = 1 << 1;
        private static final int WRITE_LOGICS_QUEUE_ERROR_BIT = 1 << 2;
        private static final int WRITE_INDEX_FILE_ERROR_BIT = 1 << 3;
        private static final int DISK_FULL_BIT = 1 << 4;
        private volatile int flagBits = 0;
    
        public RunningFlags() {
        }
    
        public int getFlagBits() {
            return flagBits;
        }
    
        public boolean getAndMakeReadable() {
            boolean result = this.isReadable();
            if (!result) {
                this.flagBits &= ~NOT_READABLE_BIT;
            }
            return result;
        }
    
        public boolean isReadable() {
            if ((this.flagBits & NOT_READABLE_BIT) == 0) {
                return true;
            }
            return false;
        }
    
        public boolean getAndMakeNotReadable() {
            boolean result = this.isReadable();
            if (result) {
                this.flagBits |= NOT_READABLE_BIT;
            }
            return result;
        }
    
        public boolean getAndMakeWriteable() {
            boolean result = this.isWriteable();
            if (!result) {
                this.flagBits &= ~NOT_WRITEABLE_BIT;
            }
            return result;
        }
    
        public boolean isWriteable() {
            if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | DISK_FULL_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) {
                return true;
            }
            return false;
        }
    
        //for consume queue, just ignore the DISK_FULL_BIT
        public boolean isCQWriteable() {
            if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) {
                return true;
            }
            return false;
        }
    
        public boolean getAndMakeNotWriteable() {
            boolean result = this.isWriteable();
            if (result) {
                this.flagBits |= NOT_WRITEABLE_BIT;
            }
            return result;
        }
    
        public void makeLogicsQueueError() {
            this.flagBits |= WRITE_LOGICS_QUEUE_ERROR_BIT;
        }
    
        public boolean isLogicsQueueError() {
            if ((this.flagBits & WRITE_LOGICS_QUEUE_ERROR_BIT) == WRITE_LOGICS_QUEUE_ERROR_BIT) {
                return true;
            }
            return false;
        }
    
        public void makeIndexFileError() {
            this.flagBits |= WRITE_INDEX_FILE_ERROR_BIT;
        }
    
        public boolean isIndexFileError() {
            if ((this.flagBits & WRITE_INDEX_FILE_ERROR_BIT) == WRITE_INDEX_FILE_ERROR_BIT) {
                return true;
            }
            return false;
        }
    
        public boolean getAndMakeDiskFull() {
            boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);
            this.flagBits |= DISK_FULL_BIT;
            return result;
        }
    
        public boolean getAndMakeDiskOK() {
            boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);
            this.flagBits &= ~DISK_FULL_BIT;
            return result;
        }
    }

    这里还重写了countdownlatch,因为里面添加了reset方法,其余都一样

    public class CountDownLatch2 {
        private final Sync sync;
    
        public CountDownLatch2(int count) {
            if (count < 0)
                throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
        public void countDown() {
            sync.releaseShared(1);
        }
    
        public long getCount() {
            return sync.getCount();
        }
    
        public void reset() {
            sync.reset();
        }
    
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
            private final int startCount;
    
            Sync(int count) {
                this.startCount = count;
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (; ; ) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c - 1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    
            protected void reset() {
                setState(startCount);
            }
        }
    }

    同样一些常见用法,线程间通信,锁

    protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
    protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
    protected volatile boolean stopped = false;
    
        public void stop(final boolean interrupt) {
            this.stopped = true;
            log.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
    
            if (hasNotified.compareAndSet(false, true)) {
                waitPoint.countDown(); // notify
            }
    
            if (interrupt) {
                this.thread.interrupt();
            }
        }
    
        protected void waitForRunning(long interval) {
            if (hasNotified.compareAndSet(true, false)) {
                this.onWaitEnd();
                return;
            }
            waitPoint.reset();
            try {
                waitPoint.await(interval, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
            } finally {
                hasNotified.set(false);
                this.onWaitEnd();
            }
        }

    别的地方看到的

    public class LockManager {
    
        private static volatile LockManager instance = new LockManager();
        private LockManager() {}
        
        public static LockManager getInstance() {
            return instance;
        }
        
        private ConcurrentHashMap<String, String> locks = new ConcurrentHashMap<>();
        
        public boolean tryLock(String key) {
            if (key == null) {
                return true;
            }
            String result = locks.putIfAbsent(key, "");
            if (result != null) {
                return false;
            }
            return true;
        }
        
        public void releaseLock(String key) {
            if (key == null) {
                return;
            }
            locks.remove(key);
        }
        
        public boolean queryLock(String key) {
            return locks.containsKey(key);
        }
    }

    普通的retreentlock

    private final Lock groupChannelLock = new ReentrantLock();
    
                if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                    try {
                        newGroupChannelTable.putAll(groupChannelTable);
                    } finally {
                        groupChannelLock.unlock();
                    }
                }

    对于信号量的使用

        /**
         * Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.
         */
        protected final Semaphore semaphoreOneway;
            boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
            if (acquired) {
                final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
                try {
                    channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture f) throws Exception {
                            once.release();
                            if (!f.isSuccess()) {
                                log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                            }
                        }
                    });
                } catch (Exception e) {
                    once.release();
                    log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
                    throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
                }
            } 

    每种业务请求对应不同的处理器处理

        /**
         * This container holds all processors per request code, aka, for each incoming request, we may look up the
         * responding processor in this map to handle the request.
         */
        protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
            new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
  • 相关阅读:
    Codeforces 960B(优先队列)
    “景驰科技杯”2018年华南理工大学程序设计竞赛 H-对称与反对称(逆元)
    AcWing
    POJ
    POJ
    AtCoder
    HRBUST
    CodeForces
    HYSBZ
    HDU
  • 原文地址:https://www.cnblogs.com/it-worker365/p/10082536.html
Copyright © 2011-2022 走看看