zoukankan      html  css  js  c++  java
  • 图解Disruptor框架(二):核心概念

    图解Disruptor框架(二):核心概念

    概述

    上一个章节简单的介绍了了下Disruptor,这节就是要好好的理清楚Disruptor中的核心的概念。并且会给出个HelloWorld的小例子。
    在正式开始之前,我觉得有一点我感触非常的深刻,那就是:外国人取的类名真的真的非常的合适以及形象!看看接下来的内容就知道了!

    核心概念介绍

    下面这张图,非常好的总结了Disruptor中需要了解的核心概念:

    多生产者多消费者模型
    多生产者多消费者模型

    • RingBuffer: Disruptor中的数据结构,用于存储生产者生产的数据(在Disruptor中,叫做Event)。

    • Sequence:序号。在Disruptor框架中,任何地方都有序号。生产者生产的数据放在RingBuffer中的哪个位置,消费者应该消费哪个位置的数据,RingBuffer中的某个位置的数据是什么,这些都是由这个序号来决定的。这个序号可以简单的理解为一个AtomicLong类型的变量。其使用了padding的方法去消除缓存的伪共享问题。

    • Sequencer:序号生成器。这个类主要是用来协调生产者的。在生产者生产数据的时候,Sequencer会产生一个可用的序号(Sequence),然后生产者就乖乖的把数据放在那里了。(此处不严谨,后续会说明原因。)

    • SequencerBarrier:序号屏障。(我觉得这个名字真的太形象了!)我们都知道,消费者在消费数据的时候,需要知道消费哪个位置的数据。消费者总不能自己想取哪个数据消费,就取哪个数据消费吧。(这样多混乱啊!)这个SequencerBarrier起到的就是这样一个“栅栏”般的阻隔作用。你消费者想消费数据,得,我告诉你一个序号(Sequence),你去消费那个位置上的数据。要是没有数据,就好好等着吧(怎么等也是有讲究的)。

    先小结一下:Sequence、Sequencer、SequencerBarrier这三个概念一开始我是比较难以理解的。但是总结一下,无非就是Ringbuffer中哪里都需要用到序号,而Sequencer用于生产者生产的时候产生序号,SeqencerBarrier就是协调生产者与消费者,并且告诉消费者一个可以用于消费的序号!

    • Wait Strategy:等待策略。设想一种这样的情景:生产者生产的非常慢,而消费者消费的非常快。那么必然会出现数据不够的情况,这个时候消费者怎么进行等待呢?WaitStrategy就是为了解决问题而诞生的。

    • Event:数据、事件。这个Event就是我们希望RingBuffer存储的数据类型。这个是我们用户自己定义的。我们可以定义为任何事情,比如处理订单、消息等等。

    • EventHandler:事件处理器。当RingBuffer中有数据的时候,消费者怎么去对数据进行处理,就是由这个类去决定的。

    • Producer:生产者。用于生产数据。

    上述就是Disruptor框架中的核心概念。如果不是非常的理解,可以先跟着我下面的例子去手敲一遍入门的程序,去体验一下。然后看看源码加深理解。

    第一个入门小程序

    首先是订单类Order

    1.  
    2. @Data 
    3. public class Order
    4.  
    5. /** 
    6. * 订单ID 
    7. */ 
    8. private String id; 
    9.  
    10. /** 
    11. * 订单名字 
    12. */ 
    13. private String name; 
    14.  
    15. /** 
    16. * 用于记录这个对象创建的时间 
    17. */ 
    18. private Date createTime; 
    19.  

    订单工厂类OrderFactory

    1. public class OrderFactory implements EventFactory<Order>
    2. public Order newInstance()
    3. Order order = new Order(); 
    4. order.setCreateTime(new Date()); 
    5. return order; 

    这个类主要用于ringbuffer构造的时候对其中存放的数据进行预加载。
    相应的源码如下面所示:

    1.  
    2. /** 
    3. * 用于构造的时候需加载数据 
    4. * @param eventFactory 
    5. */ 
    6. private void fill(EventFactory<E> eventFactory) 
    7. for (int i = 0; i < bufferSize; i++) 
    8. // entries数组就是ringbuffer中用于存放数据的数组 
    9. entries[BUFFER_PAD + i] = eventFactory.newInstance(); 

    生产者Producer

    1.  
    2. public class Producer
    3.  
    4. private RingBuffer<Order> ringBuffer; 
    5.  
    6.  
    7. public Producer(RingBuffer<Order> ringBuffer)
    8. this.ringBuffer = ringBuffer; 
    9.  
    10. public void sendData(String s)
    11. long sequence = ringBuffer.next(); 
    12. Order order = ringBuffer.get(sequence); 
    13. order.setId(s); 
    14. ringBuffer.publish(sequence); 

    消费者Comsumer

    1.  
    2. @Data 
    3. public class Comsumer implements WorkHandler<Order>
    4.  
    5. /** 
    6. * 消费者ID 
    7. */ 
    8. private String comsumerId; 
    9.  
    10. /** 
    11. * 记录消费的次数 
    12. */ 
    13. public static final AtomicInteger count = new AtomicInteger(0); 
    14.  
    15.  
    16. public Comsumer(String comsumerId)
    17. this.comsumerId = comsumerId; 
    18.  
    19. @Override 
    20. public void onEvent(Order event) throws Exception
    21. System.out.println("消费者:"+comsumerId+"消费了数据,ID="+event.getId()+"Name="+event.getName()); 
    22. count.incrementAndGet(); 

    主函数Main

    1.  
    2. public class Main
    3. public static void main(String[] args) throws InterruptedException
    4.  
    5. // 构造ringbuffer 
    6. RingBuffer<Order> ringBuffer = RingBuffer.create( 
    7. ProducerType.MULTI, 
    8. new OrderFactory(), 
    9. 1024*1024
    10. new BlockingWaitStrategy() 
    11. ); 
    12.  
    13. // 创建一个序号屏障 
    14. SequenceBarrier barrier = ringBuffer.newBarrier(); 
    15.  
    16. // 创建消费者数组 
    17. Comsumer[] comsumers = new Comsumer[10]; 
    18. for (int i = 0; i < comsumers.length; i++) { 
    19. comsumers[i] = new Comsumer("comsumer"+i); 
    20.  
    21. // 创建多消费者的工作池 
    22. WorkerPool<Order> workerPool = new WorkerPool<Order>( 
    23. ringBuffer, 
    24. barrier, 
    25. new EventExceptionHandler(), 
    26. comsumers 
    27. ); 
    28.  
    29. // 设置多个消费者的sequence序号 用于单独统计消费进度, 并且设置到ringbuffer中 
    30. ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); 
    31.  
    32. workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 
    33.  
    34. // -----消费者创建完毕----- 
    35.  
    36.  
    37. // -----创建生产者------ 
    38. // 用于阻塞生产者生产数据 
    39. CountDownLatch latch = new CountDownLatch(1); 
    40.  
    41. for(int i = 0; i < 100; i++) { 
    42. final Producer producer = new Producer(ringBuffer); 
    43. new Thread(new Runnable() { 
    44. public void run()
    45. try
    46. // 阻塞生产者 
    47. latch.await(); 
    48. } catch (Exception e) { 
    49. e.printStackTrace(); 
    50. for(int j = 0; j<100; j++) { 
    51. producer.sendData(UUID.randomUUID().toString()); 
    52. }).start(); 
    53.  
    54. System.err.println("----------线程创建完毕,开始生产数据----------"); 
    55. // 放行生产者,让其生产数据 
    56. latch.countDown(); 
    57.  
    58. // 等待消费者消费完数据 
    59. Thread.sleep(2000); 
    60.  
    61. System.err.println("消费次数:"+Comsumer.count); 
    62.  
    63.  
    64. static class EventExceptionHandler implements ExceptionHandler<Order>
    65. public void handleEventException(Throwable ex, long sequence, Order event)
    66.  
    67. public void handleOnStartException(Throwable ex)
    68.  
    69. public void handleOnShutdownException(Throwable ex)

    运行结果:
    运行结果

    上述的代码主要就是生产者生产了10000个数据,然后消费者再消费这些数据。可以看到结果是正确的。

    上述这个小小的demo就结束了。接下来会再看看Disruptor的操作,例如链式操作、菱形操作、多边形操作等等。

    总结

    这一个小节,阐述了Disruptor中的一些核心概念,并编写了一个helloworld程序。如果读者觉得还是比较难理解,那就多敲几遍。这个没有办法的。
    项目源码地址: https://gitee.com/cjh95/disruptor_blog/tree/master

    参考资料

    1. 官网的简单介绍 https://github.com/LMAX-Exchange/disruptor/wiki/Introduction
    2. 伪共享 https://www.cnblogs.com/cyfonly/p/5800758.html
  • 相关阅读:
    delphi xe10 FMX 启动参数
    delphi xe6 JSON 测试
    oracle实现http请求,oracle发送http请求。
    ORACLE存储过程调用Web Service
    新搭建的iis服务器,运行网站报 System.BadImageFormatException:未能加载文件或程序集”....“或它的某一个依赖项。
    c#的http请求工具类核心代码
    vue-cli3 取消关闭eslint 校验代码
    quartz.net数据库持久化教程
    sql备份一张表的数据
    iis 长期无访问导致定时任务不执行的解决方案
  • 原文地址:https://www.cnblogs.com/junhong1995/p/10146399.html
Copyright © 2011-2022 走看看