zoukankan      html  css  js  c++  java
  • PriorityQueue和Queue的一种变体的实现

    队列和优先队列是我们十分熟悉的数据结构。提供了所谓的“先进先出”功能,优先队列则按照某种规则“先进先出”。但是他们都没有提供:“固定大小的队列”和“固定大小的优先队列”的功能。

    比如我们要实现:记录按照时间排序的最近的登录网站的20个人;按照分数排序的最高的30个人;比如在游戏中一场两两PK的战斗,得分最高的6个人;要实现这些功能时,需要的数据结构,在java类库中没有现成的类。我们需要利用现有的类库来实现它们。

    1. 固定大小的“先进先出”队列

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class TopQueue<E> {
        private final LinkedBlockingQueue<E> blockQueue;
        
        public TopQueue(int size){
            this.blockQueue = new LinkedBlockingQueue<E>(size);
        }
        
        public synchronized void put(E e) throws InterruptedException{
            if(blockQueue.offer(e)){
                return;
            }else{
                blockQueue.take();
                blockQueue.offer(e);
            }
        }
        
        public List<E> getAll(){
            return new ArrayList<E>(blockQueue);
        }
        
        public static void main(String[] args) throws InterruptedException{
            TopQueue<Integer> tq = new TopQueue<Integer>(3);
            tq.put(1);
            tq.put(2);
            tq.put(3);
            System.out.println(Arrays.toString(tq.getAll().toArray()));
            
            tq.put(4);
            System.out.println(Arrays.toString(tq.getAll().toArray()));
            
            tq.put(5);
            System.out.println(Arrays.toString(tq.getAll().toArray()));
            
            tq.put(6);
            System.out.println(Arrays.toString(tq.getAll().toArray()));
        }    
    }

    输出的结果为:

    [1, 2, 3]
    [2, 3, 4]
    [3, 4, 5]
    [4, 5, 6]

    上面的TopQueue实现了“固定大小的线程安全的”队列。无论有多少个线程,向TopQueue中放入了多少个元素,在TopQueue中只保留最后放进去的n个元素。

    2. 固定大小的优先队列(实现一)

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.PriorityBlockingQueue;
    import com.alibaba.fastjson.JSON;
    
    public class TopPriorityQueue<E> {
        private final PriorityBlockingQueue<E> blockQueue;
        private final int size;
    
        public TopPriorityQueue(int size){
            this.blockQueue = new PriorityBlockingQueue<E>(size + 1);
            this.size = size + 1;    // 这里多加1的原因是防止put方法中将大的删除了,反而降小的插入了,所以多加1个用做"哨卡"
        }
        
        public synchronized void put(E e) throws InterruptedException{
            if(blockQueue.size() >= size)
                blockQueue.take();
            blockQueue.offer(e);
        }
        
        public List<E> getAll() throws InterruptedException{
    synchronized(this){
    if(blockQueue.size() >= size) blockQueue.take(); // 前面构造函数中多加了1,这里减掉一个 } return new ArrayList<E>(blockQueue); } public static void main(String[] args) throws InterruptedException{ final TopPriorityQueue<User> tq = new TopPriorityQueue<User>(3); User u1 = new User(1, "bbb", 10); User u2 = new User(2, "ccc", 20); User u3 = new User(3, "ddd", 30); User u4 = new User(4, "fff", 40); User u5 = new User(5, "fff", 50); User u6 = new User(6, "ddd", 60); User u7 = new User(7, "ggg", 70); User u8 = new User(8, "hhh", 80); tq.put(u4); //4 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u8); //4,8 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u7); //4,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u5); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u2); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u3); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u1); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u6); //6,8,7 System.out.println(JSON.toJSONString(tq.getAll())); } }

    User类:

    import java.util.Comparator;
    
    public class User implements Comparable<User>{
        private int id;
        private String name;
        private long score;    // 得分
        // ... ...
        
        public User(int id, String name, long score){
            this.id = id;
            this.name = name;
            this.score = score;
        }
        
        public int getId() {
            return id;
        }
        public void setId(int id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public long getScore() {
            return score;
        }
        public void setScore(long score) {
            this.score = score;
        }
    
        @Override
        public int compareTo(User o) {
            return this.getScore() > o.getScore() ? 1 : this.getScore() < o.getScore() ? -1 : 0;
        }
    }

    输入的结果为:

    [{"id":4,"name":"fff","score":40}]
    [{"id":4,"name":"fff","score":40},{"id":8,"name":"hhh","score":80}]
    [{"id":4,"name":"fff","score":40},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]
    [{"id":5,"name":"fff","score":50},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]
    [{"id":5,"name":"fff","score":50},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]
    [{"id":5,"name":"fff","score":50},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]
    [{"id":5,"name":"fff","score":50},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]
    [{"id":6,"name":"ddd","score":60},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]

    TopPriorityQueue实现了“固定大小的优先队列”,的实现原理是:

    public synchronized void put(E e) throws InterruptedException{
            if(blockQueue.size() >= size)
                blockQueue.take();
            blockQueue.offer(e);
     }

    当队列满了,还要插入时,就删除队列中最小的一个,然后再插入。但是这里涉及到一个问题,如果这个要被插入的元素优先级要比那个被删除的元素优先级低呢?那岂不是将大的删除了,反而将小的插入了。所以这里我们采取的办法是,比实际要求的size的基础上多保留一个,用做“哨卡”。当队列满了时,我们将“哨卡”删掉,然后再插入我们的元素,然后队列中新的最小的元素就成为了新的“哨卡”。而“哨卡”因为是最小的一个,不是我们需要的,返回最终结果时会被删除掉。所以不会出现删除了大的,插入了小的问题。这里有点小技巧。

    3. 固定大小的优先队列(实现二)

    上面的实现,需要我们插入队列的元素Comparable这个接口,但是实际环境中,我们不太可能去进行这样的修改,所以我们还有另外一种方法——使用Comparator来搞定,看代码:

    import java.util.Comparator;
    import com.coin.User;
    
    public class MyComparator implements Comparator<User> {
        @Override
        public int compare(User u1, User u2) {
            if(u1.getScore() > u2.getScore())
                return 1;
            if(u1.getScore() < u2.getScore())
                return -1;
            return 0;
        }
    }
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.List;
    import java.util.concurrent.PriorityBlockingQueue;
    
    import com.alibaba.fastjson.JSON;
    
    public class TopPriorityQueue<E> {
        private final PriorityBlockingQueue<E> blockQueue;
        private final int size;
        
        public TopPriorityQueue(int size, Comparator<E> comparator){
            this.blockQueue = new PriorityBlockingQueue<E>(size + 1, comparator);
            this.size = size + 1;    // 这里多加1的原因是防止put方法中将大的删除了,反而降小的插入了,所以多加1个用做"哨卡"
        }
        
        public synchronized void put(E e) throws InterruptedException{
            if(blockQueue.size() >= size)
                blockQueue.take();
            blockQueue.offer(e);
        }
        
        public List<E> getAll() throws InterruptedException{
            synchronized(this){
                if(blockQueue.size() >= size)
                    blockQueue.take();    // 前面构造函数中多加了1,这里减掉一个
            }
            
            return new ArrayList<E>(blockQueue);
        }
        
        public static void main(String[] args) throws InterruptedException{
            MyComparator myComparator = new MyComparator();
            final TopPriorityQueue<User> tq = new TopPriorityQueue<User>(3, myComparator);
            User u1 = new User(1, "bbb", 10);
            User u2 = new User(2, "ccc", 20);
            User u3 = new User(3, "ddd", 30);
            User u4 = new User(4, "fff", 40);
            User u5 = new User(5, "fff", 50);
            User u6 = new User(6, "ddd", 60);
            User u7 = new User(7, "ggg", 70);
            User u8 = new User(8, "hhh", 80);
    
            tq.put(u4);    //4
            System.out.println(JSON.toJSONString(tq.getAll()));
            tq.put(u8);    //4,8
            System.out.println(JSON.toJSONString(tq.getAll()));
            tq.put(u7);    //4,8,7
            System.out.println(JSON.toJSONString(tq.getAll()));
            tq.put(u5);    //5,8,7
            System.out.println(JSON.toJSONString(tq.getAll()));
            tq.put(u2);    //5,8,7
            System.out.println(JSON.toJSONString(tq.getAll()));
            tq.put(u3);    //5,8,7
            System.out.println(JSON.toJSONString(tq.getAll()));
            tq.put(u1);    //5,8,7
            System.out.println(JSON.toJSONString(tq.getAll()));
            tq.put(u6);    //6,8,7
            System.out.println(JSON.toJSONString(tq.getAll()));
        }
    }

    所以我们在使用PriorityBlockingQueue时,要么我们插入的元素实现了Comparable这个接口,要么我定义一个Comparator,传入到PriorityBlockingQueue的构造函数中,我们可以看下PriorityBlockingQueue.offer(e)方法的源码,它会对这两种情况进行判断:

    public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            final ReentrantLock lock = this.lock;
            lock.lock();
            int n, cap;
            Object[] array;
            while ((n = size) >= (cap = (array = queue).length))
                tryGrow(array, cap);
            try {
                Comparator<? super E> cmp = comparator;
                if (cmp == null)
                    siftUpComparable(n, e, array);
                else
                    siftUpUsingComparator(n, e, array, cmp);
                size = n + 1;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
            return true;
        }

    其中的代码:

                Comparator<? super E> cmp = comparator;
                if (cmp == null)
                    siftUpComparable(n, e, array);
                else
                    siftUpUsingComparator(n, e, array, cmp);

    就是判断我们是否在PriorityBlockingQueue的构造函数中是否传入了Comparator。这样User类就不需要实现Comparable接口了。

    另外我们要注意 LinkedBlockingQueue  和  PriorityBlockingQueue 有一点不同,BlockingQueue.offer(e)在队列满了时,会返回false,而PriorityBlockingQueue.offer()即使队列满了,它会进行扩展,永远只返回true.

    LinkedBlockingQueue .offer() 的源码如下:

    /**
         * Inserts the specified element at the tail of this queue if it is
         * possible to do so immediately without exceeding the queue's capacity,
         * returning {@code true} upon success and {@code false} if this queue
         * is full.
         * When using a capacity-restricted queue, this method is generally
         * preferable to method {@link BlockingQueue#add add}, which can fail to
         * insert an element only by throwing an exception.
         *
         * @throws NullPointerException if the specified element is null
         */
        public boolean offer(E e) {
            if (e == null) throw new NullPointerException();
            final AtomicInteger count = this.count;
            if (count.get() == capacity)
                return false;
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                if (count.get() < capacity) {
                    enqueue(node);
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)
                        notFull.signal();
                }
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
            return c >= 0;
        }

    当满了时返回false:

    if (count.get() == capacity)
           return false;

    PriorityBlockingQueue.offer() 的源码如下:

    /**
         * Inserts the specified element into this priority queue.
         * As the queue is unbounded, this method will never return {@code false}.
         *
         * @param e the element to add
         * @return {@code true} (as specified by {@link Queue#offer})
         * @throws ClassCastException if the specified element cannot be compared
         *         with elements currently in the priority queue according to the
         *         priority queue's ordering
         * @throws NullPointerException if the specified element is null
         */
        public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            final ReentrantLock lock = this.lock;
            lock.lock();
            int n, cap;
            Object[] array;
            while ((n = size) >= (cap = (array = queue).length))
                tryGrow(array, cap);
            try {
                Comparator<? super E> cmp = comparator;
                if (cmp == null)
                    siftUpComparable(n, e, array);
                else
                    siftUpUsingComparator(n, e, array, cmp);
                size = n + 1;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
            return true;
        }

    当满了时,会扩容:

    while ((n = size) >= (cap = (array = queue).length))
                tryGrow(array, cap);

    As the queue is unbounded, this method will never return {@code false}.

    另外TopQueue 和 TopPriorityQueue 都是线程安全的,但是并不保证插入队列中的元素自身的线程安全性。

  • 相关阅读:
    【SAS NOTE】OUTPUT
    【SAS NOTES】_NULL_
    【SAS NOTE】sas 9.2 安装
    【SAS NOTE】FREQ
    纯数学教程 Page 203 例XLI (1)
    纯数学教程 Page 203 例XLI (3)
    纯数学教程 Page 203 例XLI (2)
    Prove Cauchy's inequality by induction
    纯数学教程 Page 325 例LXVIII (15) 调和级数发散
    纯数学教程 Page 325 例LXVIII (15) 调和级数发散
  • 原文地址:https://www.cnblogs.com/digdeep/p/4437797.html
Copyright © 2011-2022 走看看