zoukankan      html  css  js  c++  java
  • 使用BlockingQueue的生产者消费者模式

    BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。使用场景。

    首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:

    通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;在生产者消费者模式中,通过队列的方式可以很方便的实现两者之间的数据共享。强大的BlockingQueue使我们不用关心什么时候需要阻塞线程,什么时候需要唤醒线程。

    BlockingQueue的核心方法:

    放入数据:

      offer(anObject) 如果BlockingQueue可以容纳,返回为true,否则返回false.

      offer(E o,long timeout,TimeUnit unit),设置等待时间,如果指定时间内,还不能往队列中加入BlockingQueue,则返回失败。

      put(anObject)把anObject加到BlockingQueue中,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续。

    获取数据:
      poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
        取不到时返回null;
      poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
        队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
      take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
        BlockingQueue有新的数据被加入; 
      drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数), 
        通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

    测试代码:

    package BlockingQueue;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    
    public class BlockingQueueTest {
        public static void main(String args[]) throws InterruptedException{
            BlockingQueue<String> queue = new ArrayBlockingQueue(10);
            
            Producer producer1 = new Producer(queue);
            Producer producer2 = new Producer(queue);
            Producer producer3 = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            
            ExecutorService service = Executors.newCachedThreadPool();
            
            service.execute(producer1);
            service.execute(producer2);
            service.execute(producer3);
            service.execute(consumer);
            
            Thread.sleep(10 * 1000);
            producer1.stop();
            producer2.stop();
            producer3.stop();
            
            Thread.sleep(2000);
            // 退出Executor
            service.shutdown();
        }
    }

    生产者:

    package BlockingQueue;
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    
    public class Producer implements Runnable{
        
        private volatile boolean      isRunning               = true;
        private BlockingQueue<String> queue;
        private static AtomicInteger  count                   = new AtomicInteger();
        private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;
            
        public Producer(BlockingQueue queue){
            this.queue = queue;
        }
        
        public void run(){
            String data = null;
            Random r = new Random();
            System.out.println("启动生产者线程");
            try{
                while(isRunning){
                    System.out.println("正在生产数据.....");
                    Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                    
                    data = "data:" + count.incrementAndGet();
                    System.out.println("将数据:" + data + "放入队列...");
                    if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                        System.out.println("放入数据失败:" + data);
                    }
                }
            }catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
            finally{
                System.out.println("退出生产者线程!");
            }
        }
        
        public void stop(){
            isRunning = false;    
        }
        
        
    }

    消费者:

    package BlockingQueue;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class Consumer implements Runnable{
         private BlockingQueue<String> queue;
         private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;
        
        public Consumer(BlockingQueue<String> queue){
            this.queue = queue;
        }
        
        public void run(){
            System.out.println("启动消费者线程:");
            Random r = new Random();
            boolean isRunning = true;
            try{
                while(isRunning){
                    System.out.println("正从队列获取数据...");
                    String data = queue.poll(2,TimeUnit.SECONDS);
                    if(null != data){
                         System.out.println("拿到数据:" + data);
                         System.out.println("正在消费数据:" + data);
                         Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                    }else{
                        isRunning = false;
                    }
                }
            }catch(InterruptedException e){
                 e.printStackTrace();
                 Thread.currentThread().interrupt();
            }finally{
                System.out.println("退出消费者线程!");
            }
        }
    }

     

    参考:http://wsmajunfeng.iteye.com/blog/1629354

  • 相关阅读:
    Redis 连接
    Redis 脚本
    Redis 事务
    Redis 发布订阅
    redis 字符串数据(string)
    Redis 键(key)
    Redis 命令
    Redis的五种数据类型
    java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory 解决方案
    在命令行中运行eclipse中创建的java项目
  • 原文地址:https://www.cnblogs.com/yangsy0915/p/5428627.html
Copyright © 2011-2022 走看看