zoukankan      html  css  js  c++  java
  • disruptor 多生产者多消费者实战 四

    一、创建event类 Order

    public class Order {
    
        private String id;
        private String name;
        private double price;
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public double getPrice() {
            return price;
        }
    
        public void setPrice(double price) {
            this.price = price;
        }
    }

    二、创建消费者类 Consumer

    import com.lmax.disruptor.WorkHandler;
    
    import java.util.Random;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Consumer implements WorkHandler<Order> {
    
        private String consumerId;
    
        private static AtomicInteger count = new AtomicInteger(0);
    
        private Random random = new Random();
    
        public Consumer(String consumerId) {
            this.consumerId = consumerId;
        }
    
        @Override
        public void onEvent(Order event) throws Exception {
            Thread.sleep(1 * random.nextInt(5));
            System.out.println("当前消费者:" + this.consumerId + ",消费信息ID:"+event.getId());
            count.incrementAndGet();
        }
    
        public int getCount() {
            return count.get();
        }
    }

    三、创建生产者类 Producer

    import com.lmax.disruptor.RingBuffer;
    
    public class Producer {
    
        private RingBuffer<Order> ringBuffer;
    
        public Producer(RingBuffer<Order> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        public void sendData(String data) {
            long sequnce = ringBuffer.next();
    
            try {
                Order order = ringBuffer.get(sequnce);
                order.setId(data);
            } finally {
                ringBuffer.publish(sequnce);
            }
        }

    四、创建测试类

    import com.lmax.disruptor.*;
    import com.lmax.disruptor.dsl.ProducerType;
    
    import java.util.UUID;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.Executors;
    
    public class TestMain {
        public static void main(String[] args) throws Exception{
    
            //1 创建ringbuffer
            RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI,
                    new EventFactory<Order>() {
                        @Override
                        public Order newInstance() {
                            return new Order();
                        }
                    },
                    1024 * 1024,
                    new YieldingWaitStrategy());
    
            //2 通过ringbuffer 创建一个屏障
            SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
    
            //3 创建多个消费者
            Consumer[] consumers = new Consumer[10];
            for (int i = 0; i < consumers.length; i++) {
                consumers[i] = new Consumer("C" + i);
            }
    
            //4 构建多消费者工作池
            WorkerPool<Order> workerPool = new WorkerPool<Order>(
                    ringBuffer,
                    sequenceBarrier,
                    new EventExceptionHandler(),
                    consumers);
    
            //5 设置多个消费者的sequence 序号用于单独统计消费进度,并且设置到ringbuffer中
            ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
    
            //6 启动workPool
            workerPool.start(Executors.newFixedThreadPool(10));
    
            //设置异步生产 100个生产者
            CountDownLatch latch = new CountDownLatch(1);
    
            for (int i = 0; i < 100; i++) {
                Producer producer = new Producer(ringBuffer);
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            latch.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
    
                        for (int j = 0; j < 100; j++) {
                            producer.sendData(UUID.randomUUID().toString());
                        }
                    }
                }).start();
            }
    
            Thread.sleep(2000);
            System.out.println("--------线程创建完毕,开始生产数据----------");
            latch.countDown();
    
            Thread.sleep(10000);
            System.out.println("消费者处理的任务总数:" + consumers[0].getCount());
        }
    
        //创建exception类
        static class EventExceptionHandler implements ExceptionHandler<Order> {
    
            @Override
            public void handleEventException(Throwable ex, long sequence, Order event) {
    
            }
    
            @Override
            public void handleOnStartException(Throwable ex) {
    
            }
    
            @Override
            public void handleOnShutdownException(Throwable ex) {
    
            }
        }
    }
  • 相关阅读:
    ThinkInJava4读书笔记之第二章一切都是对象
    工具类Excel相关处理
    工具类ID生成器工具类
    工具类获取地址
    工具类反射工具类
    工具类Md5加密方法
    工具类通用http工具封装
    工具类Base64工具类
    工具类通用http发送方法
    工具类spring工具类 方便在非spring管理环境中获取bean
  • 原文地址:https://www.cnblogs.com/gyli20170901/p/10272336.html
Copyright © 2011-2022 走看看