zoukankan      html  css  js  c++  java
  • 图解Disruptor框架(二):核心概念

    图解Disruptor框架(二):核心概念

    概述

    上一个章节简单的介绍了了下Disruptor,这节就是要好好的理清楚Disruptor中的核心的概念。并且会给出个HelloWorld的小例子。
    在正式开始之前,我觉得有一点我感触非常的深刻,那就是:外国人取的类名真的真的非常的合适以及形象!看看接下来的内容就知道了!

    核心概念介绍

    下面这张图,非常好的总结了Disruptor中需要了解的核心概念:

    多生产者多消费者模型
    多生产者多消费者模型

    • RingBuffer: Disruptor中的数据结构,用于存储生产者生产的数据(在Disruptor中,叫做Event)。

    • Sequence:序号。在Disruptor框架中,任何地方都有序号。生产者生产的数据放在RingBuffer中的哪个位置,消费者应该消费哪个位置的数据,RingBuffer中的某个位置的数据是什么,这些都是由这个序号来决定的。这个序号可以简单的理解为一个AtomicLong类型的变量。其使用了padding的方法去消除缓存的伪共享问题。

    • Sequencer:序号生成器。这个类主要是用来协调生产者的。在生产者生产数据的时候,Sequencer会产生一个可用的序号(Sequence),然后生产者就乖乖的把数据放在那里了。(此处不严谨,后续会说明原因。)

    • SequencerBarrier:序号屏障。(我觉得这个名字真的太形象了!)我们都知道,消费者在消费数据的时候,需要知道消费哪个位置的数据。消费者总不能自己想取哪个数据消费,就取哪个数据消费吧。(这样多混乱啊!)这个SequencerBarrier起到的就是这样一个“栅栏”般的阻隔作用。你消费者想消费数据,得,我告诉你一个序号(Sequence),你去消费那个位置上的数据。要是没有数据,就好好等着吧(怎么等也是有讲究的)。

    先小结一下:Sequence、Sequencer、SequencerBarrier这三个概念一开始我是比较难以理解的。但是总结一下,无非就是Ringbuffer中哪里都需要用到序号,而Sequencer用于生产者生产的时候产生序号,SeqencerBarrier就是协调生产者与消费者,并且告诉消费者一个可以用于消费的序号!

    • Wait Strategy:等待策略。设想一种这样的情景:生产者生产的非常慢,而消费者消费的非常快。那么必然会出现数据不够的情况,这个时候消费者怎么进行等待呢?WaitStrategy就是为了解决问题而诞生的。

    • Event:数据、事件。这个Event就是我们希望RingBuffer存储的数据类型。这个是我们用户自己定义的。我们可以定义为任何事情,比如处理订单、消息等等。

    • EventHandler:事件处理器。当RingBuffer中有数据的时候,消费者怎么去对数据进行处理,就是由这个类去决定的。

    • Producer:生产者。用于生产数据。

    上述就是Disruptor框架中的核心概念。如果不是非常的理解,可以先跟着我下面的例子去手敲一遍入门的程序,去体验一下。然后看看源码加深理解。

    第一个入门小程序

    首先是订单类Order

    1.  
    2. @Data 
    3. public class Order
    4.  
    5. /** 
    6. * 订单ID 
    7. */ 
    8. private String id; 
    9.  
    10. /** 
    11. * 订单名字 
    12. */ 
    13. private String name; 
    14.  
    15. /** 
    16. * 用于记录这个对象创建的时间 
    17. */ 
    18. private Date createTime; 
    19.  

    订单工厂类OrderFactory

    1. public class OrderFactory implements EventFactory<Order>
    2. public Order newInstance()
    3. Order order = new Order(); 
    4. order.setCreateTime(new Date()); 
    5. return order; 

    这个类主要用于ringbuffer构造的时候对其中存放的数据进行预加载。
    相应的源码如下面所示:

    1.  
    2. /** 
    3. * 用于构造的时候需加载数据 
    4. * @param eventFactory 
    5. */ 
    6. private void fill(EventFactory<E> eventFactory) 
    7. for (int i = 0; i < bufferSize; i++) 
    8. // entries数组就是ringbuffer中用于存放数据的数组 
    9. entries[BUFFER_PAD + i] = eventFactory.newInstance(); 

    生产者Producer

    1.  
    2. public class Producer
    3.  
    4. private RingBuffer<Order> ringBuffer; 
    5.  
    6.  
    7. public Producer(RingBuffer<Order> ringBuffer)
    8. this.ringBuffer = ringBuffer; 
    9.  
    10. public void sendData(String s)
    11. long sequence = ringBuffer.next(); 
    12. Order order = ringBuffer.get(sequence); 
    13. order.setId(s); 
    14. ringBuffer.publish(sequence); 

    消费者Comsumer

    1.  
    2. @Data 
    3. public class Comsumer implements WorkHandler<Order>
    4.  
    5. /** 
    6. * 消费者ID 
    7. */ 
    8. private String comsumerId; 
    9.  
    10. /** 
    11. * 记录消费的次数 
    12. */ 
    13. public static final AtomicInteger count = new AtomicInteger(0); 
    14.  
    15.  
    16. public Comsumer(String comsumerId)
    17. this.comsumerId = comsumerId; 
    18.  
    19. @Override 
    20. public void onEvent(Order event) throws Exception
    21. System.out.println("消费者:"+comsumerId+"消费了数据,ID="+event.getId()+"Name="+event.getName()); 
    22. count.incrementAndGet(); 

    主函数Main

    1.  
    2. public class Main
    3. public static void main(String[] args) throws InterruptedException
    4.  
    5. // 构造ringbuffer 
    6. RingBuffer<Order> ringBuffer = RingBuffer.create( 
    7. ProducerType.MULTI, 
    8. new OrderFactory(), 
    9. 1024*1024
    10. new BlockingWaitStrategy() 
    11. ); 
    12.  
    13. // 创建一个序号屏障 
    14. SequenceBarrier barrier = ringBuffer.newBarrier(); 
    15.  
    16. // 创建消费者数组 
    17. Comsumer[] comsumers = new Comsumer[10]; 
    18. for (int i = 0; i < comsumers.length; i++) { 
    19. comsumers[i] = new Comsumer("comsumer"+i); 
    20.  
    21. // 创建多消费者的工作池 
    22. WorkerPool<Order> workerPool = new WorkerPool<Order>( 
    23. ringBuffer, 
    24. barrier, 
    25. new EventExceptionHandler(), 
    26. comsumers 
    27. ); 
    28.  
    29. // 设置多个消费者的sequence序号 用于单独统计消费进度, 并且设置到ringbuffer中 
    30. ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); 
    31.  
    32. workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 
    33.  
    34. // -----消费者创建完毕----- 
    35.  
    36.  
    37. // -----创建生产者------ 
    38. // 用于阻塞生产者生产数据 
    39. CountDownLatch latch = new CountDownLatch(1); 
    40.  
    41. for(int i = 0; i < 100; i++) { 
    42. final Producer producer = new Producer(ringBuffer); 
    43. new Thread(new Runnable() { 
    44. public void run()
    45. try
    46. // 阻塞生产者 
    47. latch.await(); 
    48. } catch (Exception e) { 
    49. e.printStackTrace(); 
    50. for(int j = 0; j<100; j++) { 
    51. producer.sendData(UUID.randomUUID().toString()); 
    52. }).start(); 
    53.  
    54. System.err.println("----------线程创建完毕,开始生产数据----------"); 
    55. // 放行生产者,让其生产数据 
    56. latch.countDown(); 
    57.  
    58. // 等待消费者消费完数据 
    59. Thread.sleep(2000); 
    60.  
    61. System.err.println("消费次数:"+Comsumer.count); 
    62.  
    63.  
    64. static class EventExceptionHandler implements ExceptionHandler<Order>
    65. public void handleEventException(Throwable ex, long sequence, Order event)
    66.  
    67. public void handleOnStartException(Throwable ex)
    68.  
    69. public void handleOnShutdownException(Throwable ex)

    运行结果:
    运行结果

    上述的代码主要就是生产者生产了10000个数据,然后消费者再消费这些数据。可以看到结果是正确的。

    上述这个小小的demo就结束了。接下来会再看看Disruptor的操作,例如链式操作、菱形操作、多边形操作等等。

    总结

    这一个小节,阐述了Disruptor中的一些核心概念,并编写了一个helloworld程序。如果读者觉得还是比较难理解,那就多敲几遍。这个没有办法的。
    项目源码地址: https://gitee.com/cjh95/disruptor_blog/tree/master

    参考资料

    1. 官网的简单介绍 https://github.com/LMAX-Exchange/disruptor/wiki/Introduction
    2. 伪共享 https://www.cnblogs.com/cyfonly/p/5800758.html
  • 相关阅读:
    泛微云桥e-Bridge 目录遍历,任意文件读取
    (CVE-2020-8209)XenMobile-控制台存在任意文件读取漏洞
    selenium 使用初
    将HTML文件转换为MD文件
    Python对word文档进行操作
    使用java安装jar包出错,提示不是有效的JDK java主目录
    Windows server 2012安装VM tools异常解决办法
    ifconfig 命令,改变主机名,改DNS hosts、关闭selinux firewalld netfilter 、防火墙iptables规则
    iostat iotop 查看硬盘的读写、 free 查看内存的命令 、netstat 命令查看网络、tcpdump 命令
    使用w uptime vmstat top sar nload 等命令查看系统负载
  • 原文地址:https://www.cnblogs.com/junhong1995/p/10146399.html
Copyright © 2011-2022 走看看