zoukankan      html  css  js  c++  java
  • ReentrantLock实现阻塞队列

    package com.test;
    
    import java.util.Map;
    import java.util.Objects;
    import java.util.concurrent.ConcurrentSkipListMap;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * @Author: movie
     * @Date: 2020/3/29 15:42
     */
    public class Concurrent {
        public static void main(String[] args) {
            Map<String, String> map = new ConcurrentSkipListMap();
            map.put("a", "1");
            map.put("b", "12");
            map.put("aa", "3");
            map.put("ba", "4");
            map.forEach((k, v) -> System.out.println(String.format("%s:%s", k, v)));
            MyBlockQueue myBlockQueue = new MyBlockQueue(5);
            int len = 20;
            new Thread(() -> {
                for (int i = 0; i < len; i++) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(300);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    myBlockQueue.put(i);
                    System.out.println("produce:" + i);
                }
            }).start();
    
            new Thread(() -> {
                for (int i = 0; i < len; i++) {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    try {
                        myBlockQueue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("take:" + i);
                }
            }).start();
            while (Thread.activeCount() > 1) {
                Thread.yield();
            }
            System.out.println(myBlockQueue.size());
        }
    
        static class MyBlockQueue {
            private Object[] items;
            private int putIndex;
            private int takeIndex;
            private int count;
            private Condition notFull;
            private Condition notEmpty;
            private ReentrantLock lock;
    
            public MyBlockQueue(int cap) {
                if (cap <= 0) {
                    throw new IllegalArgumentException();
                }
                this.items = new Object[cap];
                this.lock = new ReentrantLock();
                notFull = lock.newCondition();
                notEmpty = lock.newCondition();
            }
    
            public void put(Object element) {
                Objects.requireNonNull(element);
                ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    while (count == items.length) {
                        notFull.await();
                    }
                    enqueue(element);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
    
            }
    
            private void enqueue(Object element) {
                Object[] items = this.items;
                items[putIndex] = element;
                while (++putIndex == items.length) {
                    putIndex = 0;
                }
                count++;
                notEmpty.signal();
            }
    
            public Object take() throws InterruptedException {
                ReentrantLock lock = this.lock;
                try {
                    lock.lock();
                    while (count == 0) {
                        notEmpty.await();
                    }
                    return dequeue();
                } finally {
                    lock.unlock();
                }
            }
    
            private Object dequeue() {
                Object[] items = this.items;
                Object element = items[takeIndex];
                items[takeIndex] = null;
                while (++takeIndex == items.length) {
                    takeIndex = 0;
                }
                count--;
                notFull.signal();
                return element;
            }
    
            public int size() {
                ReentrantLock lock = this.lock;
                try {
                    lock.lock();
                    return count;
                } finally {
                    lock.unlock();
                }
            }
        }
    
    }
  • 相关阅读:
    错误、异常与自定义异常
    关于使用第三方库、代码复用的一些思考
    [Scheme]一个Scheme的Metacircular evaluator
    [Scheme]Understanding the Yin-Yang Puzzle
    [Lua]50行代码的解释器,用来演示lambda calculus
    将jar包安装到本地仓库
    PowerDesigner安装教程(含下载+汉化+破解)
    Jmeter如何录制APP客户端脚本
    jdk1.8 stream 求和
    VMware的快照和克隆总结
  • 原文地址:https://www.cnblogs.com/lijiale/p/14042549.html
Copyright © 2011-2022 走看看