zoukankan      html  css  js  c++  java
  • java 可伸缩阻塞队列实现

    最近一年多写的最虐心的代码。必须好好复习java并发了。搞了一晚上终于测试都跑通过了,特此纪念,以资鼓励!

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * 实现可调整大小的阻塞队列,支持数据迁移平衡reader,writer读取吸入速度,达到最大吞吐
     * @author hanjie
     *
     */
    public class RecordBuffer {
    
        public static final Record CLOSE_RECORD = new Record() {
    
            @Override
            public Object getColumnValue(String columnName) {
                // TODO Auto-generated method stub
                return null;
            }
        };
        
        public static final Record SWITCH_QUEUE_RECORD = new Record() {
    
            @Override
            public Object getColumnValue(String columnName) {
                // TODO Auto-generated method stub
                return null;
            }
        };
        
        public Lock switchingQueueLock = new ReentrantLock();
        public Condition readerSwitched = switchingQueueLock.newCondition();
        public Condition writerSwitched = switchingQueueLock.newCondition();
        public Condition switchFinished = switchingQueueLock.newCondition();
        
        public volatile boolean readerSwitchSuccess = true;
        public volatile boolean writerSwitchSuccess = true;
        public volatile boolean switchingQueue = false;
        public volatile boolean closed = false;
        private volatile ArrayBlockingQueue<Record> queue;
        private TaskCounter taskCounter;
        
    
        public RecordBuffer(TaskCounter taskCounter, int size) {
            this.queue = new ArrayBlockingQueue<Record>(size);
            this.taskCounter = taskCounter;
        }
    
    
    
        public void resize(int newSize) {
            try {
                
                if(closed){
                    return;
                }
        
                switchingQueueLock.lock();
                try {
                    //double check下,要不可能writer收到CLOSED_record已经 退出了。writerSwitched.await() 会hang住
                    if(closed){
                        return;
                    }
                    this.switchingQueue = true;
            
                    ArrayBlockingQueue<Record> oldQueue = queue;
                    queue = new ArrayBlockingQueue<Record>(newSize);
                    this.readerSwitchSuccess = false;
                    this.writerSwitchSuccess = false;
                    
                    //先拯救下writer,可能writer刚好阻塞到take上,失败也没关系,说明老队列不空,writer不会阻塞到take
                    oldQueue.offer(SWITCH_QUEUE_RECORD);
    
                    while (!writerSwitchSuccess) {
                        writerSwitched.await();
                    }
                    //writer先切换队列,然后reader可能阻塞在最后一个put上,清空下老队列拯救reader,让它顺利醒来
                    transferOldQueueRecordsToNewQueue(oldQueue);
                    
                    
                    while (!readerSwitchSuccess) {
                        readerSwitched.await();
                    }
                    //前面的清空,刚好碰到reader要put最后一个,非阻塞式清空动作就有残留最后一个put
                    transferOldQueueRecordsToNewQueue(oldQueue);
                    
                    this.switchingQueue = false;
                    this.switchFinished.signalAll();
    
                } finally {
                    switchingQueueLock.unlock();
                }
    
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    
        private void transferOldQueueRecordsToNewQueue(ArrayBlockingQueue<Record> oldQueue)
                throws InterruptedException {
            List<Record> oldRecords = new ArrayList<Record>(oldQueue.size());
            Record record = null;
            while ((record = oldQueue.poll()) != null) {
                oldRecords.add(record);
            }
            // 转移老队列剩下的记录到新队列
            for (int i = 0; i < oldRecords.size(); i++) {
                queue.put(oldRecords.get(i));
            }
        }
    
        public void close() {
            this.closed = true;
            switchingQueueLock.lock();
            try {
                //如果正在切换队列, 等切换做完才能,发送最后一个CLOSE
                while (switchingQueue) {
                    switchFinished.await();
                }
                
                this.queue.put(CLOSE_RECORD);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            finally{
                switchingQueueLock.unlock();
            }
        }
    
        public void put(Record record) {
            try {
                
                if (!queue.offer(record)) {
                    taskCounter.incrBufferFullCount();
                    if (!readerSwitchSuccess) {
                        notifyReaderSwitchSuccess();
                    }
                    queue.put(record);
                }
                taskCounter.incrReadCount();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    
        private void notifyReaderSwitchSuccess() {
            System.out.println("reader switch");
            switchingQueueLock.lock();
            try {
                readerSwitchSuccess = true;
                readerSwitched.signalAll();
            } finally {
                switchingQueueLock.unlock();
            }
        }
    
        public Record take() {
            try {
                
                Record record = queue.poll();
                //如果拿到了切换记录,则切换队列重试
                if(record == SWITCH_QUEUE_RECORD){
                    if (!writerSwitchSuccess) {
                        notifyWriterSwitchSuccess();
                    }
                    record = queue.poll();
                }
                
                if (record == null) {
                    taskCounter.incrBufferEmptyCount();
                    
                    //调用take先检查是否正在切换,保证拿到新的队列
                    if (!writerSwitchSuccess) {
                        notifyWriterSwitchSuccess();
                    }
                    record = queue.take();
                    //如果很不幸刚好在take阻塞时候,切换,只能发送一个切换记录将其唤醒
                    if(record == SWITCH_QUEUE_RECORD){
                        if (!writerSwitchSuccess) {
                            notifyWriterSwitchSuccess();
                        }
                        record = queue.take();
                    }
                }
                if (record == CLOSE_RECORD) {
                    if (!writerSwitchSuccess) {
                        notifyWriterSwitchSuccess();
                    }
                    return null;
                }
                taskCounter.incrWriteCount();
                return record;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    
        private void notifyWriterSwitchSuccess() {
    
            System.out.println("writer switch");
            switchingQueueLock.lock();
            try {
                writerSwitchSuccess = true;
                writerSwitched.signalAll();
            } finally {
                switchingQueueLock.unlock();
            }
    
        }
    
    }
  • 相关阅读:
    高德地图 Android编程中 如何设置使 标记 marker 能够被拖拽
    Android编程 高德地图 中如何重写 定位按键 的触发事件 (com.amap.api.maps2d.LocationSource)点击定位后不仅定位在地图中心点上而且可以设置地图的缩放大小和提示
    Android 编程 高德地图 (实现显示地图以及定位功能)
    Android 编程 AMapLocationClientOption 类中的 setMockEnable (高德地图 com.amap.api.location.AMapLocationClientOption 中的类)
    Android 编程 AMapLocationClientOption 类中的 setNeedAddress 方法用处 (高德地图 com.amap.api.location.AMapLocationClientOption 中的类)
    数据挖掘、目标检测中的cnn和cn---卷积网络和卷积神经网络
    mfc学习之gdi---gdi空间
    图像处理之特效---光剑特效
    模式识别之线性回归---最小二乘和线性回归
    matlab 学习之常用函数2
  • 原文地址:https://www.cnblogs.com/xhan/p/4621642.html
Copyright © 2011-2022 走看看