zoukankan      html  css  js  c++  java
  • Disruptor高性能缓存队列入门指导

    Disruptor是什么,怎么使用,网上有很多教材,但有些过于复杂,剖析了Disruptor的方方面面,实际上对应普通的开发人员,使用这个工具,只需要指导知道大概原理和使用方法,并不需要知道非常深入的原理。

    有些文章则是写了错误的实例,或者只是摘取了项目的一段代码,实际上,要把这些代码转化成项目的实际代码,却发现困难重重。

    这个文章主要是针对想提高性能,在项目组使用Disruptor的开发人员写的,会简单讲解它的一些原理,尽量把代码简单,但又包括项目使用中必须的方方面面。

    ——Disruptor的使用场景

    一个字,就是快,经过测试,Disruptor的速度比LinkedBlockingQueue提高了七倍。所以,当你在使用LinkedBlockingQueue出现性能瓶颈的时候,你就可以考虑采用Disruptor的代替。

    当然,Disruptor性能高并不是必然的,所以,是否使用还得经过测试。

    Disruptor的最常用的场景就是“生产者-消费者”场景,对场景的就是“一个生产者、多个消费者”的场景,并且要求顺序处理。

    举个例子,我们从MySQL的BigLog文件中顺序读取数据,然后写入到ElasticSearch(搜索引擎)中。在这种场景下,BigLog要求一个文件一个生产者,那个是一个生产者。而写入到ElasticSearch,则严格要求顺序,否则会出现问题,所以通常意义上的多消费者线程无法解决该问题,如果通过加锁,则性能大打折扣。

    ——一个生产者,多消费者例子

             在贴出代码之前,这里简单讲一下代码的结构和说明。简单的图示如下,先有Producer来作为生产者,发送事件。由EventHandler作为消费者,处理事件。

             这个实例代码很简单,就是由生产者发送一个字符串到Disruptor,然后消费者处理该事件,并把字符串打印处理。

             下面是各个Java类的说明:

    HelloEventProducer:生产者,负责传递一个字符串,并发布事件

    HelloEventHandler:消费者,负责消费事件,并打印字符串

    HelloEventFactory:事件工厂类,负责初始化一个事件

    HelloEvent:表示一个事件

    DisruptorMain:运行的主程序,负责将整个逻辑连接起来

    1.  
      package com.disruptor;
    2.  
       
    3.  
      import java.util.concurrent.ExecutorService;
    4.  
      import java.util.concurrent.Executors;
    5.  
       
    6.  
      import com.lmax.disruptor.EventFactory;
    7.  
      import com.lmax.disruptor.EventHandler;
    8.  
      import com.lmax.disruptor.RingBuffer;
    9.  
      import com.lmax.disruptor.WaitStrategy;
    10.  
      import com.lmax.disruptor.YieldingWaitStrategy;
    11.  
      import com.lmax.disruptor.dsl.Disruptor;
    12.  
      import com.lmax.disruptor.dsl.ProducerType;
    13.  
       
    14.  
      public class DisruptorMain {
    15.  
      public static void main(String[] args){
    16.  
      ExecutorService executor = Executors.newFixedThreadPool(3);
    17.  
       
    18.  
      // WaitStrategy blockingWaitStrategy = new BlockingWaitStrategy();
    19.  
      // WaitStrategy sleepingWaitStrategy = new SleepingWaitStrategy();
    20.  
      WaitStrategy yieldingWaitStrategy = new YieldingWaitStrategy();
    21.  
       
    22.  
      EventFactory<HelloEvent> eventFactory = new HelloEventFactory();
    23.  
       
    24.  
      int ringBufferSize = 1024 * 1024;
    25.  
       
    26.  
      Disruptor<HelloEvent> disruptor = new Disruptor<HelloEvent>(eventFactory,
    27.  
      ringBufferSize, executor, ProducerType.SINGLE
    28.  
      , yieldingWaitStrategy);
    29.  
       
    30.  
      EventHandler<HelloEvent> eventHandler = new HelloEventHandler();
    31.  
       
    32.  
      disruptor.handleEventsWith(eventHandler);
    33.  
       
    34.  
      disruptor.start();
    35.  
       
    36.  
      RingBuffer<HelloEvent> ringBuffer = disruptor.getRingBuffer();
    37.  
       
    38.  
      HelloEventProducer producer = new HelloEventProducer(ringBuffer);
    39.  
       
    40.  
      for(long l = 0; l<100; l++){
    41.  
      producer.onData("黄育源:Hello World!!!:" + l);
    42.  
      }
    43.  
      }
    44.  
      }
    1.  
      package com.disruptor;
    2.  
       
    3.  
      public class HelloEvent {
    4.  
      private String value;
    5.  
       
    6.  
      public String getValue() {
    7.  
      return value;
    8.  
      }
    9.  
       
    10.  
      public void setValue(String value) {
    11.  
      this.value = value;
    12.  
      }
    13.  
      }
    1.  
      package com.disruptor;
    2.  
       
    3.  
      import com.lmax.disruptor.EventFactory;
    4.  
       
    5.  
      public class HelloEventFactory implements EventFactory<HelloEvent>{
    6.  
       
    7.  
      @Override
    8.  
      public HelloEvent newInstance() {
    9.  
      return new HelloEvent();
    10.  
      }
    11.  
       
    12.  
      }
    1.  
      package com.disruptor;
    2.  
       
    3.  
      import com.lmax.disruptor.EventHandler;
    4.  
       
    5.  
      public class HelloEventHandler implements EventHandler<HelloEvent>{
    6.  
       
    7.  
      @Override
    8.  
      public void onEvent(HelloEvent event, long sequence, boolean endOfBatch) throws Exception {
    9.  
      System.out.println(event.getValue());
    10.  
      }
    11.  
       
    12.  
      }
    1.  
      package com.disruptor;
    2.  
       
    3.  
      import com.lmax.disruptor.RingBuffer;
    4.  
       
    5.  
      public class HelloEventProducer implements Runnable{
    6.  
      private final RingBuffer<HelloEvent> ringBuffer;
    7.  
       
    8.  
      public HelloEventProducer(RingBuffer<HelloEvent> ringBuffer){
    9.  
      this.ringBuffer = ringBuffer;
    10.  
      }
    11.  
       
    12.  
      /**
    13.  
      * onData用来发布事件,每调用一次就发布一次事件
    14.  
      * 它的参数会用过事件传递给消费者
    15.  
      */
    16.  
      public void onData(String str){
    17.  
      long sequence = ringBuffer.next();
    18.  
      System.out.println(sequence);
    19.  
      try{
    20.  
      HelloEvent event = ringBuffer.get(sequence);
    21.  
       
    22.  
      event.setValue(str);
    23.  
      }finally{
    24.  
      ringBuffer.publish(sequence);
    25.  
      }
    26.  
      }
    27.  
       
    28.  
      @Override
    29.  
      public void run() {
    30.  
      for(long l = 0; l<100; l++){
    31.  
      this.onData("黄育源:Hello World!!!:" + l);
    32.  
      }
    33.  
      }
    34.  
      }
  • 相关阅读:
    poj1141
    poj1260
    poj1080
    poj1221
    用Microsoft Office SharePoint Designer 2007开发aspx
    在Web Part中使用User Control
    MOSS中的WebPart开发
    在MOSS中开发一个模块化的feature
    SharePoint Web Service的身份验证
    MOSS中对列表的一些操作(创建,查询等)
  • 原文地址:https://www.cnblogs.com/kekexuanxaun/p/9480302.html
Copyright © 2011-2022 走看看