zoukankan      html  css  js  c++  java
  • 架构师养成记--17.disrunptor 多生产者多消费者

    入口:

     1 import java.nio.ByteBuffer;
     2 import java.util.UUID;
     3 import java.util.concurrent.CountDownLatch;
     4 import java.util.concurrent.Executors;
     5 
     6 import com.lmax.disruptor.EventFactory;
     7 import com.lmax.disruptor.ExceptionHandler;
     8 import com.lmax.disruptor.RingBuffer;
     9 import com.lmax.disruptor.SequenceBarrier;
    10 import com.lmax.disruptor.WorkHandler;
    11 import com.lmax.disruptor.WorkerPool;
    12 import com.lmax.disruptor.YieldingWaitStrategy;
    13 import com.lmax.disruptor.dsl.ProducerType;
    14 
    15 public class Main {
    16     
    17     public static void main(String[] args) throws Exception {
    18 
    19         //创建ringBuffer
    20         RingBuffer<Order> ringBuffer = 
    21                 RingBuffer.create(ProducerType.MULTI, 
    22                         new EventFactory<Order>() {  
    23                             @Override  
    24                             public Order newInstance() {  
    25                                 return new Order();  
    26                             }  
    27                         }, 
    28                         1024 * 1024, 
    29                         new YieldingWaitStrategy());
    30         
    31         SequenceBarrier barriers = ringBuffer.newBarrier();
    32         
    33         Consumer[] consumers = new Consumer[3];
    34         for(int i = 0; i < consumers.length; i++){
    35             consumers[i] = new Consumer("c" + i);
    36         }
    37         
    38         WorkerPool<Order> workerPool = 
    39                 new WorkerPool<Order>(ringBuffer, 
    40                         barriers, 
    41                         new IntEventExceptionHandler(),
    42                         consumers);
    43         
    44         ringBuffer.addGatingSequences(workerPool.getWorkerSequences());  
    45         workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));  
    46         
    47         final CountDownLatch latch = new CountDownLatch(1);
    48         for (int i = 0; i < 100; i++) {  
    49             final Producer p = new Producer(ringBuffer);
    50             new Thread(new Runnable() {
    51                 @Override
    52                 public void run() {
    53                     try {
    54                         latch.await();
    55                     } catch (InterruptedException e) {
    56                         e.printStackTrace();
    57                     }
    58                     for(int j = 0; j < 100; j ++){
    59                         p.onData(UUID.randomUUID().toString());
    60                     }
    61                 }
    62             }).start();
    63         } 
    64         Thread.sleep(2000);
    65         System.out.println("---------------开始生产-----------------");
    66         latch.countDown();
    67         Thread.sleep(5000);
    68         System.out.println("总数:" + consumers[0].getCount() );
    69     }
    70     
    71     static class IntEventExceptionHandler implements ExceptionHandler {  
    72         public void handleEventException(Throwable ex, long sequence, Object event) {}  
    73         public void handleOnStartException(Throwable ex) {}  
    74         public void handleOnShutdownException(Throwable ex) {}  
    75     } 
    76 }

    生产者

     1 import com.lmax.disruptor.RingBuffer;
    11 public class Producer {
    12 
    13     private final RingBuffer<Order> ringBuffer;
    14     
    15     public Producer(RingBuffer<Order> ringBuffer){
    16         this.ringBuffer = ringBuffer;
    17     }
    18     
    19     /**
    20      * onData用来发布事件,每调用一次就发布一次事件
    21      * 它的参数会用过事件传递给消费者
    22      */
    23     public void onData(String data){
    24         //可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
    25         long sequence = ringBuffer.next();
    26         try {
    27             //用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)
    28             Order order = ringBuffer.get(sequence);
    29             //获取要通过事件传递的业务数据
    30             order.setId(data);
    31         } finally {
    32             //发布事件
    33             //注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
    34             ringBuffer.publish(sequence);
    35         }
    36     }
    37     
    38     
    39 }

    消费者:

     1 import java.util.concurrent.atomic.AtomicInteger;
     2 
     3 import com.lmax.disruptor.WorkHandler;
     4 
     5 public class Consumer implements WorkHandler<Order>{
     6     
     7     private String consumerId;
     8     
     9     private static AtomicInteger count = new AtomicInteger(0);
    10     
    11     public Consumer(String consumerId){
    12         this.consumerId = consumerId;
    13     }
    14 
    15     @Override
    16     public void onEvent(Order order) throws Exception {
    17         System.out.println("当前消费者: " + this.consumerId + ",消费信息:" + order.getId());
    18         count.incrementAndGet();
    19     }
    20     
    21     public int getCount(){
    22         return count.get();
    23     }
    24 
    25 }

    数据对象

     1 public class Order {  
     2     
     3     private String id;//ID  
     4     private String name;
     5     private double price;//金额  
     6     
     7     public String getId() {
     8         return id;
     9     }
    10     public void setId(String id) {
    11         this.id = id;
    12     }
    13     public String getName() {
    14         return name;
    15     }
    16     public void setName(String name) {
    17         this.name = name;
    18     }
    19     public double getPrice() {
    20         return price;
    21     }
    22     public void setPrice(double price) {
    23         this.price = price;
    24     }
    25       
    26 }  
  • 相关阅读:
    centos 用户管理
    rsync 实验
    文件共享和传输
    PAT 1109 Group Photo
    PAT 1108 Finding Average
    PAT 1107 Social Clusters
    PAT 1106 Lowest Price in Supply Chain
    PAT 1105 Spiral Matrix
    PAT 1104 Sum of Number Segments
    PAT 1103 Integer Factorization
  • 原文地址:https://www.cnblogs.com/sigm/p/6295463.html
Copyright © 2011-2022 走看看