zoukankan      html  css  js  c++  java
  • 从构建分布式秒杀系统聊聊Disruptor高性能队列

    前言

    秒杀架构持续优化中,基于自身认知不足之处在所难免,也请大家指正,共同进步。文章标题来自码友的建议,希望可以把阻塞队列ArrayBlockingQueue这个队列替换成Disruptor,由于之前曾接触过这个东西,听说很不错,正好借此机会整合进来。

    简介

    LMAX Disruptor是一个高性能的线程间消息库。它源于LMAX对并发性,性能和非阻塞算法的研究,如今构成了Exchange基础架构的核心部分。

    • Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。

    • Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。

    在这里你可以跟BlockingQueue队列作比对,简单的理解为它是一种高效的"生产者-消费者"模型,先了解后深入底层原理。

    核心

    写代码案例之前,大家最好先了解 Disruptor 的核心概念,至少知道它是如何运作的。

    • Ring Buffer
      如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
    • Sequence Disruptor
      通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。
    • Sequencer
      Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
    • Sequence Barrier
      用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
    • Wait Strategy
      定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
    • Event
      在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
    • EventProcessor
      EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
    • EventHandler
      Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
    • Producer
      即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

    优点

    • 剖析Disruptor:为什么会这么快?(一)锁的缺点
    • 剖析Disruptor:为什么会这么快?(二)神奇的缓存行填充
    • 剖析Disruptor:为什么会这么快?(三)伪共享
    • 剖析Disruptor:为什么会这么快?(四)揭秘内存屏障

    有兴趣的参考:
    https://coolshell.cn/articles/9169.html

    https://www.cnblogs.com/daoqidelv/p/6995888.html

    使用案例

    这里以我们系统中的秒杀作为案例,后面有相对复杂的场景介绍。

    定义秒杀事件对象:

    /**
     * 事件对象(秒杀事件)
     * 创建者 科帮网
     */
    public class SeckillEvent implements Serializable {
    	private static final long serialVersionUID = 1L;
    	private long seckillId;
    	private long userId;
    	
    	public SeckillEvent(){
    		
    	}
    
    	public long getSeckillId() {
    		return seckillId;
    	}
    
    	public void setSeckillId(long seckillId) {
    		this.seckillId = seckillId;
    	}
    
    	public long getUserId() {
    		return userId;
    	}
    
    	public void setUserId(long userId) {
    		this.userId = userId;
    	}
    }
    

    为了让Disruptor为我们预先分配这些事件,我们需要一个将执行构造的EventFactory:

    /**
     * 事件生成工厂(用来初始化预分配事件对象)
     * 创建者 科帮网
     */
    public class SeckillEventFactory implements EventFactory<SeckillEvent> {
    
    	public SeckillEvent newInstance() {
    		return new SeckillEvent();
    	}
    }
    

    然后,我们需要创建一个处理这些事件的消费者:

    /**
     * 消费者(秒杀处理器)
     * 创建者 科帮网
     */
    public class SeckillEventConsumer implements EventHandler<SeckillEvent> {
    	//业务处理、这里是无法注入的,需要手动获取,见源码
    	private ISeckillService seckillService = (ISeckillService) SpringUtil.getBean("seckillService");
    	
    	public void onEvent(SeckillEvent seckillEvent, long seq, boolean bool) throws Exception {
    		seckillService.startSeckil(seckillEvent.getSeckillId(), seckillEvent.getUserId());
    	}
    }
    

    既然有消费者,我们将需要这些秒杀事件的来源:

    /**
     * 使用translator方式生产者
     * 创建者 科帮网
     */
    public class SeckillEventProducer {
    	
    	private final static EventTranslatorVararg<SeckillEvent> translator = new EventTranslatorVararg<SeckillEvent>() {
    		public void translateTo(SeckillEvent seckillEvent, long seq, Object... objs) {
    			seckillEvent.setSeckillId((Long) objs[0]);
    			seckillEvent.setUserId((Long) objs[1]);
    		}
    	};
    
    	private final RingBuffer<SeckillEvent> ringBuffer;
    	
    	public SeckillEventProducer(RingBuffer<SeckillEvent> ringBuffer){
    		this.ringBuffer = ringBuffer;
    	}
    	
    	public void seckill(long seckillId, long userId){
    		this.ringBuffer.publishEvent(translator, seckillId, userId);
    	}
    }
    

    最后,我们来写一个测试类,运行一下(跑不通,需要修改消费者):

    /**
     * 測試類
     * 创建者 科帮网
     */
    public class SeckillEventMain {
    
    	public static void main(String[] args) {
    		producerWithTranslator();
    	}
    	public static void producerWithTranslator(){
    		SeckillEventFactory factory = new SeckillEventFactory();
    		int ringBufferSize = 1024;
    		ThreadFactory threadFactory = new ThreadFactory() {
    			public Thread newThread(Runnable runnable) {
    				return new Thread(runnable);
    			}
    		};
    		//创建disruptor
    		Disruptor<SeckillEvent> disruptor = new Disruptor<SeckillEvent>(factory, ringBufferSize, threadFactory);
    		//连接消费事件方法
    		disruptor.handleEventsWith(new SeckillEventConsumer());
    		//启动
    		disruptor.start();
    		RingBuffer<SeckillEvent> ringBuffer = disruptor.getRingBuffer();
    		SeckillEventProducer producer = new SeckillEventProducer(ringBuffer);
    		for(long i = 0; i<10; i++){
    			producer.seckill(i, i);
    		}
    		disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;
    	}
    }
    

    使用场景

    • PCP (生产者-消费者问题)
    • 网上搜了下国内实战案例并不多,大厂可能有在使用

    这里举一个大家日常的例子,停车场景。当汽车进入停车场时(A),系统首先会记录汽车信息(B)。同时也会发送消息到其他系统处理相关业务(C),最后发送短信通知车主收费开始(D)。

    一个生产者A与三个消费者B、C、D,D的事件处理需要B与C先完成。则该模型结构如下:

    在这个结构下,每个消费者拥有各自独立的事件序号Sequence,消费者之间不存在共享竞态。SequenceBarrier1监听RingBuffer的序号cursor,消费者B与C通过SequenceBarrier1等待可消费事件。SequenceBarrier2除了监听cursor,同时也监听B与C的序号Sequence,从而将最小的序号返回给消费者D,由此实现了D依赖B与C的逻辑。

    代码案例:从0到1构建分布式秒杀系统

    参考:
    https://github.com/LMAX-Exchange/disruptor/wiki

    https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started

    http://wiki.jikexueyuan.com/project/disruptor-getting-started/lmax-framework.html

  • 相关阅读:
    我的浏览器收藏夹分类
    我的浏览器收藏夹分类
    Java实现 LeetCode 318 最大单词长度乘积
    Java实现 LeetCode 318 最大单词长度乘积
    Java实现 LeetCode 318 最大单词长度乘积
    Java实现 LeetCode 316 去除重复字母
    Java实现 LeetCode 316 去除重复字母
    Java实现 LeetCode 316 去除重复字母
    Java实现 LeetCode 315 计算右侧小于当前元素的个数
    Java实现 LeetCode 315 计算右侧小于当前元素的个数
  • 原文地址:https://www.cnblogs.com/smallSevens/p/9081000.html
Copyright © 2011-2022 走看看