zoukankan      html  css  js  c++  java
  • 架构师养成记--17.disrunptor 多生产者多消费者

    入口:

     1 import java.nio.ByteBuffer;
     2 import java.util.UUID;
     3 import java.util.concurrent.CountDownLatch;
     4 import java.util.concurrent.Executors;
     5 
     6 import com.lmax.disruptor.EventFactory;
     7 import com.lmax.disruptor.ExceptionHandler;
     8 import com.lmax.disruptor.RingBuffer;
     9 import com.lmax.disruptor.SequenceBarrier;
    10 import com.lmax.disruptor.WorkHandler;
    11 import com.lmax.disruptor.WorkerPool;
    12 import com.lmax.disruptor.YieldingWaitStrategy;
    13 import com.lmax.disruptor.dsl.ProducerType;
    14 
    15 public class Main {
    16     
    17     public static void main(String[] args) throws Exception {
    18 
    19         //创建ringBuffer
    20         RingBuffer<Order> ringBuffer = 
    21                 RingBuffer.create(ProducerType.MULTI, 
    22                         new EventFactory<Order>() {  
    23                             @Override  
    24                             public Order newInstance() {  
    25                                 return new Order();  
    26                             }  
    27                         }, 
    28                         1024 * 1024, 
    29                         new YieldingWaitStrategy());
    30         
    31         SequenceBarrier barriers = ringBuffer.newBarrier();
    32         
    33         Consumer[] consumers = new Consumer[3];
    34         for(int i = 0; i < consumers.length; i++){
    35             consumers[i] = new Consumer("c" + i);
    36         }
    37         
    38         WorkerPool<Order> workerPool = 
    39                 new WorkerPool<Order>(ringBuffer, 
    40                         barriers, 
    41                         new IntEventExceptionHandler(),
    42                         consumers);
    43         
    44         ringBuffer.addGatingSequences(workerPool.getWorkerSequences());  
    45         workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));  
    46         
    47         final CountDownLatch latch = new CountDownLatch(1);
    48         for (int i = 0; i < 100; i++) {  
    49             final Producer p = new Producer(ringBuffer);
    50             new Thread(new Runnable() {
    51                 @Override
    52                 public void run() {
    53                     try {
    54                         latch.await();
    55                     } catch (InterruptedException e) {
    56                         e.printStackTrace();
    57                     }
    58                     for(int j = 0; j < 100; j ++){
    59                         p.onData(UUID.randomUUID().toString());
    60                     }
    61                 }
    62             }).start();
    63         } 
    64         Thread.sleep(2000);
    65         System.out.println("---------------开始生产-----------------");
    66         latch.countDown();
    67         Thread.sleep(5000);
    68         System.out.println("总数:" + consumers[0].getCount() );
    69     }
    70     
    71     static class IntEventExceptionHandler implements ExceptionHandler {  
    72         public void handleEventException(Throwable ex, long sequence, Object event) {}  
    73         public void handleOnStartException(Throwable ex) {}  
    74         public void handleOnShutdownException(Throwable ex) {}  
    75     } 
    76 }

    生产者

     1 import com.lmax.disruptor.RingBuffer;
    11 public class Producer {
    12 
    13     private final RingBuffer<Order> ringBuffer;
    14     
    15     public Producer(RingBuffer<Order> ringBuffer){
    16         this.ringBuffer = ringBuffer;
    17     }
    18     
    19     /**
    20      * onData用来发布事件,每调用一次就发布一次事件
    21      * 它的参数会用过事件传递给消费者
    22      */
    23     public void onData(String data){
    24         //可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
    25         long sequence = ringBuffer.next();
    26         try {
    27             //用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)
    28             Order order = ringBuffer.get(sequence);
    29             //获取要通过事件传递的业务数据
    30             order.setId(data);
    31         } finally {
    32             //发布事件
    33             //注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
    34             ringBuffer.publish(sequence);
    35         }
    36     }
    37     
    38     
    39 }

    消费者:

     1 import java.util.concurrent.atomic.AtomicInteger;
     2 
     3 import com.lmax.disruptor.WorkHandler;
     4 
     5 public class Consumer implements WorkHandler<Order>{
     6     
     7     private String consumerId;
     8     
     9     private static AtomicInteger count = new AtomicInteger(0);
    10     
    11     public Consumer(String consumerId){
    12         this.consumerId = consumerId;
    13     }
    14 
    15     @Override
    16     public void onEvent(Order order) throws Exception {
    17         System.out.println("当前消费者: " + this.consumerId + ",消费信息:" + order.getId());
    18         count.incrementAndGet();
    19     }
    20     
    21     public int getCount(){
    22         return count.get();
    23     }
    24 
    25 }

    数据对象

     1 public class Order {  
     2     
     3     private String id;//ID  
     4     private String name;
     5     private double price;//金额  
     6     
     7     public String getId() {
     8         return id;
     9     }
    10     public void setId(String id) {
    11         this.id = id;
    12     }
    13     public String getName() {
    14         return name;
    15     }
    16     public void setName(String name) {
    17         this.name = name;
    18     }
    19     public double getPrice() {
    20         return price;
    21     }
    22     public void setPrice(double price) {
    23         this.price = price;
    24     }
    25       
    26 }  
  • 相关阅读:
    漫游Kafka介绍章节简介
    poj 2309 BST 使用树阵lowbit
    华为-on练习--小写字符数的统计显示
    OpenMp高速分拣
    eclipse 于 Tomcat于 热部署 project
    2015第49周二
    2015第49周一
    2015第48周六
    2015第48周五
    2015第48周四
  • 原文地址:https://www.cnblogs.com/sigm/p/6295463.html
Copyright © 2011-2022 走看看