zoukankan      html  css  js  c++  java
  • 架构师养成记--16.disruptor并发框架中RingBuffer的使用

    很多时候我们只需要消息中间件这样的功能,那么直需要RinBuffer就可以了。

    入口:

     1 import java.util.concurrent.Callable;
     2 import java.util.concurrent.ExecutorService;
     3 import java.util.concurrent.Executors;
     4 import java.util.concurrent.Future;
     5 
     6 import com.lmax.disruptor.BatchEventProcessor;
     7 import com.lmax.disruptor.EventFactory;
     8 import com.lmax.disruptor.RingBuffer;
     9 import com.lmax.disruptor.SequenceBarrier;
    10 import com.lmax.disruptor.YieldingWaitStrategy;
    11 
    12 public class Main1 {  
    13    
    14     public static void main(String[] args) throws Exception {  
    15         int BUFFER_SIZE=1024;  
    16         int THREAD_NUMBERS=4;  
    17         /* 
    18          * createSingleProducer创建一个单生产者的RingBuffer, 
    19          * 第一个参数叫EventFactory,从名字上理解就是"事件工厂",其实它的职责就是产生数据填充RingBuffer的区块。 
    20          * 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率 
    21          * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略 
    22          */  
    23         final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  
    24             @Override  
    25             public Trade newInstance() {  
    26                 return new Trade();  
    27             }  
    28         }, BUFFER_SIZE, new YieldingWaitStrategy());  
    29         
    30         //创建线程池  
    31         ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
    32         
    33         //创建SequenceBarrier  
    34         SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
    35           
    36         /****************** @beg 消费者消费数据 2017-1-11******************/
    37         //创建消息处理器  
    38         BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(  
    39                 ringBuffer, sequenceBarrier, new TradeHandler());  
    40           
    41         //这一步的目的就是把消费者的位置信息引用注入到生产者    如果只有一个消费者的情况可以省略 
    42         ringBuffer.addGatingSequences(transProcessor.getSequence());  
    43           
    44         //把消息处理器提交到线程池  
    45         executors.submit(transProcessor);  
    46         /****************** @end 消费者消费数据 2017-1-11******************/
    47         
    48         //如果存在多个消费者 那重复执行上面3行代码 把TradeHandler换成其它消费者类  
    49           
    50         /****************** @beg 生产者生产数据  2017-1-11******************/
    51 
    52         Future<?> future= executors.submit(new Callable<Void>() {  
    53             @Override  
    54             public Void call() throws Exception {  
    55                 long seq;  
    56                 for(int i=0;i<10;i++){  
    57                     seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块  
    58                     ringBuffer.get(seq).setPrice(Math.random()*9999);//给这个区块放入 数据 
    59                     ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见  
    60                 }  
    61                 return null;  
    62             }  
    63         }); 
    64 
    65         /****************** @end 生产者生产数据 2017-1-11******************/
    66         
    67         future.get();//等待生产者结束  
    68         Thread.sleep(1000);//等上1秒,等消费都处理完成  
    69         transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)  
    70         executors.shutdown();//终止线程  
    71     }  
    72 }  

    消费者:

     1 import java.util.UUID;
     2 
     3 import com.lmax.disruptor.EventHandler;
     4 import com.lmax.disruptor.WorkHandler;
     5 
     6 public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> {  
     7       
     8     @Override  
     9     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
    10         this.onEvent(event);  
    11     }  
    12   
    13     @Override  
    14     public void onEvent(Trade event) throws Exception {  
    15         //这里做具体的消费逻辑  
    16         event.setId(UUID.randomUUID().toString());//简单生成下ID  
    17         System.out.println(event.getId());  
    18     }  
    19 }  

    数据对象:

     1 import java.util.concurrent.atomic.AtomicInteger;
     2 
     3 public class Trade {  
     4     
     5     private String id;//ID  
     6     private String name;
     7     private double price;//金额  
     8     private AtomicInteger count = new AtomicInteger(0);
     9     
    10     public String getId() {
    11         return id;
    12     }
    13     public void setId(String id) {
    14         this.id = id;
    15     }
    16     public String getName() {
    17         return name;
    18     }
    19     public void setName(String name) {
    20         this.name = name;
    21     }
    22     public double getPrice() {
    23         return price;
    24     }
    25     public void setPrice(double price) {
    26         this.price = price;
    27     }
    28     public AtomicInteger getCount() {
    29         return count;
    30     }
    31     public void setCount(AtomicInteger count) {
    32         this.count = count;
    33     } 
    34       
    35       
    36 }  
  • 相关阅读:
    MindSpore 建立神经网络
    MindSpore 数据加载及处理
    MindSpore 初探, 使用LeNet训练minist数据集
    Ubuntu 20.04 Xrdp Black Screen Ubuntu 20.04 Xrdp 远程桌面黑屏
    基于Transformer的ViT、DETR、Deformable DETR原理详解
    Ubuntu 18.04 / 20.04 自定义锁屏时间
    Transformer中K 、Q、V的设置以及为什么不能使用同一个值
    Auto-Encoding Scene Graphs for Image Captioning
    Eureka中读写锁的奇思妙想,学废了吗?
    PostgreSQL-查询所有索引
  • 原文地址:https://www.cnblogs.com/sigm/p/6272384.html
Copyright © 2011-2022 走看看