zoukankan      html  css  js  c++  java
  • 生产者与消费者的阻塞队列实现以及无锁缓存框架的实现

    阻塞队列的实现

    package PandC;
    
    import java.text.MessageFormat;
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    
    public class Consumer implements Runnable{
        private BlockingQueue<PCData> queue;
        private static final int SLEEPTIME = 1000;
        @Override
        public void run() {
            System.out.println("start Consumer id=" + Thread.currentThread().getId());
            Random r = new Random();
            try{
                while(true){
                    PCData data = queue.take();
                    if(null != data){
                        int re = data.getData() * data.getData();
                        System.out.println(MessageFormat.format("{0} * {1} = {2}", data.getData(), data.getData(), re));
                        Thread.sleep(r.nextInt(SLEEPTIME));
                    }
                }
            }catch(InterruptedException e){
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
        public Consumer(BlockingQueue<PCData> queue){
            this.queue = queue;
        }
        
    }
    package PandC;
    
    
    public class PCData {
        private final int intData;
        public PCData(int d){
            intData = d;
        }
        public PCData(String d){
            intData = Integer.valueOf(d);
        }
        @Override
        public String toString() {
            return "data:" + intData;
        }
        public int getData(){
            return intData;
        }
    }
    package PandC;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Producer implements Runnable {
        private volatile boolean isRunning = true;
        private BlockingQueue<PCData> queue;
        private static AtomicInteger count = new AtomicInteger();
        private static final int SLEEPTIME = 1000;
        public Producer(BlockingQueue<PCData> queue){
            this.queue = queue;
        }
        @Override
        public void run() {
            PCData data = null;
            Random r = new Random();
            System.out.println("start producer id=" + Thread.currentThread().getId());
            try{
                while(isRunning){
                    Thread.sleep(r.nextInt(SLEEPTIME));
                    data = new PCData(count.incrementAndGet());
                    System.out.println(data + "is put into queue");
                    if(!queue.offer(data, 2, TimeUnit.SECONDS)){
                        System.err.println("fail to put data:" + data);
                    }
                }
            }catch(InterruptedException e){
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
        public void stop(){
            isRunning = false;
        }
    }
    package PandC;
    
    import java.awt.image.VolatileImage;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class Main {
        public static volatile BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10);
        public static void main(String[] args) throws InterruptedException {
            Producer producer1 = new Producer(queue);
            Producer producer2 = new Producer(queue);
            Producer producer3 = new Producer(queue);
            Consumer consumer1 = new Consumer(queue);
            Consumer consumer2 = new Consumer(queue);
            Consumer consumer3 = new Consumer(queue);
            ExecutorService service = Executors.newCachedThreadPool();
            service.execute(producer1);
            service.execute(producer2);
            service.execute(producer3);
            service.execute(consumer1);
            service.execute(consumer2);
            service.execute(consumer3);
            Thread.sleep(10 * 1000);
            producer1.stop();
            producer2.stop();
            producer3.stop();
            Thread.sleep(3000);
            service.shutdown();
        }
    }

    但是阻塞队列不是一个高性能的体现,完全使用锁和阻塞实现线程之间的同步,在高并发的场合它的性能并不卓越。他仅仅是方便数据的共享。

    concurrentLinkedQueue是一个高性能的队列,大量使用无锁的CAS操作,会获得客观的性能提升(不过CAS不是在所有的情况下都比悲观锁的性能好,比如在线程线程冲突严重的情况下,CAS会不断的自旋,更加耗费CPU)

    concurrentLinkedQueue实现生产者与消费者模式:

    package concurrentLinkedQueue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ConcurrentLinkedQueueTest {
        private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
        private static int count = 2; // 线程个数
        //CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
        private static CountDownLatch latch = new CountDownLatch(count);
    
        public static void main(String[] args) throws InterruptedException {
            long timeStart = System.currentTimeMillis();
            ExecutorService es = Executors.newFixedThreadPool(4);
            ConcurrentLinkedQueueTest.offer();
            for (int i = 0; i < count; i++) {
                es.submit(new Poll());
            }
            latch.await(); //使得主线程(main)阻塞直到latch.countDown()为零才继续执行
            System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
            es.shutdown();
        }
        
        /**
         * 生产
         */
        public static void offer() {
            for (int i = 0; i < 100000; i++) {
                queue.offer(i);
            }
        }
        /**
         * 消费
         */
        static class Poll implements Runnable {
            public void run() {
                // while (queue.size()>0) {
                while (!queue.isEmpty()) {
                    System.out.println(queue.poll());
                }
                latch.countDown();
            }
        }
    }

    LMAX公司开发了无所内存并发组件:Disruptor。也是采用CAS操作。用Disruptor实现生产者与消费者是最好的方式。Disruptor使用了环形队列来代替普通队列,只需要提供当前位置,利用这个指针可以进行入队操作和出队操作。带来的代价是总大小必须事先指定,可以做到完全的内存复用(我们可以自己实现阻塞队列,并且不指定大小)。Disruptor要求必须将数组的大小设置为2的整数次方。加快定位实际元素位置得速度。

    Disruptor甚至提供了不同的策略来提高消费者的响应时间,以及解决CPU的伪共享问题。将系统的吞吐量提升到极致。

    用Disruptor实现生产者与消费者模式:

    package disruptor;
    
    import com.lmax.disruptor.WorkHandler;
    
    public class Consumer implements WorkHandler<PCData> {
    
        @Override
        public void onEvent(PCData event) throws Exception {
            System.out.println(Thread.currentThread().getId() + ":Event: --"
                     + event.get() * event.get() + "--");
        }
        
    }
    package disruptor;
    
    public class PCData {
        private long value;
        public void set(long value){
            this.value = value;
        }
        public long get(){
            return value;
        }
    }
    package disruptor;
    
    import com.lmax.disruptor.EventFactory;
    
    public class PCDataFactory implements EventFactory<PCData> {
    
        @Override
        public PCData newInstance() {
            // TODO Auto-generated method stub
            return new PCData();
        }
    
    }
    package disruptor;
    
    import java.nio.ByteBuffer;
    
    import com.lmax.disruptor.RingBuffer;
    
    public class Producer {
        private final RingBuffer<PCData> ringBuffer;
        public Producer(RingBuffer<PCData> ringBuffer){
            this.ringBuffer = ringBuffer;
        }
        public void pushData(ByteBuffer bb){
            long sequence = ringBuffer.next();
            try{
                PCData event = ringBuffer.get(sequence);
                event.set(bb.getLong(0));
            }finally{
                ringBuffer.publish(sequence);
            }
        }
    }
    package disruptor;
    
    import java.nio.ByteBuffer;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    
    import com.lmax.disruptor.BlockingWaitStrategy;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    
    public class Main {
        public static void main(String[] args) throws InterruptedException {
            Executor executor = Executors.newCachedThreadPool();
            PCDataFactory factory = new PCDataFactory();
            int bufferSize = 1024;
            Disruptor<PCData> disruptor = new Disruptor<PCData>(factory,
                    bufferSize, executor, ProducerType.MULTI,
                    new BlockingWaitStrategy());
            disruptor.handleEventsWithWorkerPool(new Consumer(), new Consumer(),
                    new Consumer(), new Consumer());
            disruptor.start();
            RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
            Producer producer = new Producer(ringBuffer);
            ByteBuffer bb = ByteBuffer.allocate(8);
            for(long l = 0;true;l ++){
                bb.putLong(0, l);
                producer.pushData(bb);
                Thread.sleep(100);
                System.out.println("add data " + l);
            }
        }
    }
  • 相关阅读:
    MySQL-keepalived做高可用
    Linux-服务管理
    MySQL-CentOS7上安装Mysql5.7
    MySQL-查看DB文件位置
    游戏编程与游戏种类
    计算机
    python
    python中的构造函数
    IndentationError:expected an indented block错误解决
    python程序的pdb调试方法
  • 原文地址:https://www.cnblogs.com/caobojia/p/6841519.html
Copyright © 2011-2022 走看看