zoukankan      html  css  js  c++  java
  • PriorityQueue和Queue的一种变体的实现

    队列和优先队列是我们十分熟悉的数据结构。提供了所谓的“先进先出”功能,优先队列则按照某种规则“先进先出”。但是他们都没有提供:“固定大小的队列”和“固定大小的优先队列”的功能。

    比如我们要实现:记录按照时间排序的最近的登录网站的20个人;按照分数排序的最高的30个人;比如在游戏中一场两两PK的战斗,得分最高的6个人;要实现这些功能时,需要的数据结构,在java类库中没有现成的类。我们需要利用现有的类库来实现它们。

    1. 固定大小的“先进先出”队列

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class TopQueue<E> {
        private final LinkedBlockingQueue<E> blockQueue;
        
        public TopQueue(int size){
            this.blockQueue = new LinkedBlockingQueue<E>(size);
        }
        
        public synchronized void put(E e) throws InterruptedException{
            if(blockQueue.offer(e)){
                return;
            }else{
                blockQueue.take();
                blockQueue.offer(e);
            }
        }
        
        public List<E> getAll(){
            return new ArrayList<E>(blockQueue);
        }
        
        public static void main(String[] args) throws InterruptedException{
            TopQueue<Integer> tq = new TopQueue<Integer>(3);
            tq.put(1);
            tq.put(2);
            tq.put(3);
            System.out.println(Arrays.toString(tq.getAll().toArray()));
            
            tq.put(4);
            System.out.println(Arrays.toString(tq.getAll().toArray()));
            
            tq.put(5);
            System.out.println(Arrays.toString(tq.getAll().toArray()));
            
            tq.put(6);
            System.out.println(Arrays.toString(tq.getAll().toArray()));
        }    
    }

    输出的结果为:

    [1, 2, 3]
    [2, 3, 4]
    [3, 4, 5]
    [4, 5, 6]

    上面的TopQueue实现了“固定大小的线程安全的”队列。无论有多少个线程,向TopQueue中放入了多少个元素,在TopQueue中只保留最后放进去的n个元素。

    2. 固定大小的优先队列(实现一)

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.PriorityBlockingQueue;
    import com.alibaba.fastjson.JSON;
    
    public class TopPriorityQueue<E> {
        private final PriorityBlockingQueue<E> blockQueue;
        private final int size;
    
        public TopPriorityQueue(int size){
            this.blockQueue = new PriorityBlockingQueue<E>(size + 1);
            this.size = size + 1;    // 这里多加1的原因是防止put方法中将大的删除了,反而降小的插入了,所以多加1个用做"哨卡"
        }
        
        public synchronized void put(E e) throws InterruptedException{
            if(blockQueue.size() >= size)
                blockQueue.take();
            blockQueue.offer(e);
        }
        
        public List<E> getAll() throws InterruptedException{
    synchronized(this){
    if(blockQueue.size() >= size) blockQueue.take(); // 前面构造函数中多加了1,这里减掉一个 } return new ArrayList<E>(blockQueue); } public static void main(String[] args) throws InterruptedException{ final TopPriorityQueue<User> tq = new TopPriorityQueue<User>(3); User u1 = new User(1, "bbb", 10); User u2 = new User(2, "ccc", 20); User u3 = new User(3, "ddd", 30); User u4 = new User(4, "fff", 40); User u5 = new User(5, "fff", 50); User u6 = new User(6, "ddd", 60); User u7 = new User(7, "ggg", 70); User u8 = new User(8, "hhh", 80); tq.put(u4); //4 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u8); //4,8 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u7); //4,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u5); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u2); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u3); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u1); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u6); //6,8,7 System.out.println(JSON.toJSONString(tq.getAll())); } }

    User类:

    import java.util.Comparator;
    
    public class User implements Comparable<User>{
        private int id;
        private String name;
        private long score;    // 得分
        // ... ...
        
        public User(int id, String name, long score){
            this.id = id;
            this.name = name;
            this.score = score;
        }
        
        public int getId() {
            return id;
        }
        public void setId(int id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public long getScore() {
            return score;
        }
        public void setScore(long score) {
            this.score = score;
        }
    
        @Override
        public int compareTo(User o) {
            return this.getScore() > o.getScore() ? 1 : this.getScore() < o.getScore() ? -1 : 0;
        }
    }

    输入的结果为:

    [{"id":4,"name":"fff","score":40}]
    [{"id":4,"name":"fff","score":40},{"id":8,"name":"hhh","score":80}]
    [{"id":4,"name":"fff","score":40},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]
    [{"id":5,"name":"fff","score":50},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]
    [{"id":5,"name":"fff","score":50},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]
    [{"id":5,"name":"fff","score":50},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]
    [{"id":5,"name":"fff","score":50},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]
    [{"id":6,"name":"ddd","score":60},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]

    TopPriorityQueue实现了“固定大小的优先队列”,的实现原理是:

    public synchronized void put(E e) throws InterruptedException{
            if(blockQueue.size() >= size)
                blockQueue.take();
            blockQueue.offer(e);
     }

    当队列满了,还要插入时,就删除队列中最小的一个,然后再插入。但是这里涉及到一个问题,如果这个要被插入的元素优先级要比那个被删除的元素优先级低呢?那岂不是将大的删除了,反而将小的插入了。所以这里我们采取的办法是,比实际要求的size的基础上多保留一个,用做“哨卡”。当队列满了时,我们将“哨卡”删掉,然后再插入我们的元素,然后队列中新的最小的元素就成为了新的“哨卡”。而“哨卡”因为是最小的一个,不是我们需要的,返回最终结果时会被删除掉。所以不会出现删除了大的,插入了小的问题。这里有点小技巧。

    3. 固定大小的优先队列(实现二)

    上面的实现,需要我们插入队列的元素Comparable这个接口,但是实际环境中,我们不太可能去进行这样的修改,所以我们还有另外一种方法——使用Comparator来搞定,看代码:

    import java.util.Comparator;
    import com.coin.User;
    
    public class MyComparator implements Comparator<User> {
        @Override
        public int compare(User u1, User u2) {
            if(u1.getScore() > u2.getScore())
                return 1;
            if(u1.getScore() < u2.getScore())
                return -1;
            return 0;
        }
    }
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.List;
    import java.util.concurrent.PriorityBlockingQueue;
    
    import com.alibaba.fastjson.JSON;
    
    public class TopPriorityQueue<E> {
        private final PriorityBlockingQueue<E> blockQueue;
        private final int size;
        
        public TopPriorityQueue(int size, Comparator<E> comparator){
            this.blockQueue = new PriorityBlockingQueue<E>(size + 1, comparator);
            this.size = size + 1;    // 这里多加1的原因是防止put方法中将大的删除了,反而降小的插入了,所以多加1个用做"哨卡"
        }
        
        public synchronized void put(E e) throws InterruptedException{
            if(blockQueue.size() >= size)
                blockQueue.take();
            blockQueue.offer(e);
        }
        
        public List<E> getAll() throws InterruptedException{
            synchronized(this){
                if(blockQueue.size() >= size)
                    blockQueue.take();    // 前面构造函数中多加了1,这里减掉一个
            }
            
            return new ArrayList<E>(blockQueue);
        }
        
        public static void main(String[] args) throws InterruptedException{
            MyComparator myComparator = new MyComparator();
            final TopPriorityQueue<User> tq = new TopPriorityQueue<User>(3, myComparator);
            User u1 = new User(1, "bbb", 10);
            User u2 = new User(2, "ccc", 20);
            User u3 = new User(3, "ddd", 30);
            User u4 = new User(4, "fff", 40);
            User u5 = new User(5, "fff", 50);
            User u6 = new User(6, "ddd", 60);
            User u7 = new User(7, "ggg", 70);
            User u8 = new User(8, "hhh", 80);
    
            tq.put(u4);    //4
            System.out.println(JSON.toJSONString(tq.getAll()));
            tq.put(u8);    //4,8
            System.out.println(JSON.toJSONString(tq.getAll()));
            tq.put(u7);    //4,8,7
            System.out.println(JSON.toJSONString(tq.getAll()));
            tq.put(u5);    //5,8,7
            System.out.println(JSON.toJSONString(tq.getAll()));
            tq.put(u2);    //5,8,7
            System.out.println(JSON.toJSONString(tq.getAll()));
            tq.put(u3);    //5,8,7
            System.out.println(JSON.toJSONString(tq.getAll()));
            tq.put(u1);    //5,8,7
            System.out.println(JSON.toJSONString(tq.getAll()));
            tq.put(u6);    //6,8,7
            System.out.println(JSON.toJSONString(tq.getAll()));
        }
    }

    所以我们在使用PriorityBlockingQueue时,要么我们插入的元素实现了Comparable这个接口,要么我定义一个Comparator,传入到PriorityBlockingQueue的构造函数中,我们可以看下PriorityBlockingQueue.offer(e)方法的源码,它会对这两种情况进行判断:

    public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            final ReentrantLock lock = this.lock;
            lock.lock();
            int n, cap;
            Object[] array;
            while ((n = size) >= (cap = (array = queue).length))
                tryGrow(array, cap);
            try {
                Comparator<? super E> cmp = comparator;
                if (cmp == null)
                    siftUpComparable(n, e, array);
                else
                    siftUpUsingComparator(n, e, array, cmp);
                size = n + 1;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
            return true;
        }

    其中的代码:

                Comparator<? super E> cmp = comparator;
                if (cmp == null)
                    siftUpComparable(n, e, array);
                else
                    siftUpUsingComparator(n, e, array, cmp);

    就是判断我们是否在PriorityBlockingQueue的构造函数中是否传入了Comparator。这样User类就不需要实现Comparable接口了。

    另外我们要注意 LinkedBlockingQueue  和  PriorityBlockingQueue 有一点不同,BlockingQueue.offer(e)在队列满了时,会返回false,而PriorityBlockingQueue.offer()即使队列满了,它会进行扩展,永远只返回true.

    LinkedBlockingQueue .offer() 的源码如下:

    /**
         * Inserts the specified element at the tail of this queue if it is
         * possible to do so immediately without exceeding the queue's capacity,
         * returning {@code true} upon success and {@code false} if this queue
         * is full.
         * When using a capacity-restricted queue, this method is generally
         * preferable to method {@link BlockingQueue#add add}, which can fail to
         * insert an element only by throwing an exception.
         *
         * @throws NullPointerException if the specified element is null
         */
        public boolean offer(E e) {
            if (e == null) throw new NullPointerException();
            final AtomicInteger count = this.count;
            if (count.get() == capacity)
                return false;
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                if (count.get() < capacity) {
                    enqueue(node);
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)
                        notFull.signal();
                }
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
            return c >= 0;
        }

    当满了时返回false:

    if (count.get() == capacity)
           return false;

    PriorityBlockingQueue.offer() 的源码如下:

    /**
         * Inserts the specified element into this priority queue.
         * As the queue is unbounded, this method will never return {@code false}.
         *
         * @param e the element to add
         * @return {@code true} (as specified by {@link Queue#offer})
         * @throws ClassCastException if the specified element cannot be compared
         *         with elements currently in the priority queue according to the
         *         priority queue's ordering
         * @throws NullPointerException if the specified element is null
         */
        public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            final ReentrantLock lock = this.lock;
            lock.lock();
            int n, cap;
            Object[] array;
            while ((n = size) >= (cap = (array = queue).length))
                tryGrow(array, cap);
            try {
                Comparator<? super E> cmp = comparator;
                if (cmp == null)
                    siftUpComparable(n, e, array);
                else
                    siftUpUsingComparator(n, e, array, cmp);
                size = n + 1;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
            return true;
        }

    当满了时,会扩容:

    while ((n = size) >= (cap = (array = queue).length))
                tryGrow(array, cap);

    As the queue is unbounded, this method will never return {@code false}.

    另外TopQueue 和 TopPriorityQueue 都是线程安全的,但是并不保证插入队列中的元素自身的线程安全性。

  • 相关阅读:
    hdu 1.2.4
    交换机&&路由器
    AP、AC、无线路由器
    肩胛骨
    无线路由器
    背部肌肉
    胸部肌肉
    redis未授权访问
    进制
    攻防实验
  • 原文地址:https://www.cnblogs.com/digdeep/p/4437797.html
Copyright © 2011-2022 走看看