zoukankan      html  css  js  c++  java
  • Disruptor并发框架(一)简介&上手demo

    框架简介

    • Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中,使`用事件源驱动方式。业务逻辑处理器的核心是Disruptor。
    • Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。
    • Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。

    在使用之前,首先说明disruptor主要功能加以说明,你可以理解为他是一种高效的"生产者-消费者"模型。也就性能远远高于传统的BlockingQueue容器。

    上手demo

    • 首先声明一个Event来包含需要传递的数据:
    public class LongEvent { 
        private long value;
        public long getValue() { 
            return value; 
        } 
     
        public void setValue(long value) { 
            this.value = value; 
        } 
    } 
    
    
    • 于需要让Disruptor为我们创建事件,我们同时还声明了一个EventFactory来实例化Event对象。
    // 需要让disruptor为我们创建事件,我们同时还声明了一个EventFactory来实例化Event对象。
    public class LongEventFactory implements EventFactory { 
    
        @Override 
        public Object newInstance() { 
            return new LongEvent(); 
        } 
    } 
    
    
    • 我们还需要一个事件消费者,也就是一个事件处理器。这个事件处理器简单地把事件中存储的数据打印到终端:
    public class LongEventHandler implements EventHandler<LongEvent>  {
    
    	@Override
    	public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
    		System.out.println(longEvent.getValue()); 		
    	}
    
    }
    
    
    • 事件都会有一个生成事件的源,这个例子中假设事件是由于磁盘IO或者network读取数据的时候触发的,事件源使用一个ByteBuffer来模拟它接受到的数据,也就是说,事件源会在IO读取到一部分数据的时候触发事件(触发事件不是自动的,程序员需要在读取到数据的时候自己触发事件并发布)
    
    public class LongEventProducer {
    
    	private final RingBuffer<LongEvent> ringBuffer;
    	
    	public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
    		this.ringBuffer = ringBuffer;
    	}
    	
    	/**
    	 * onData用来发布事件,每调用一次就发布一次事件
    	 * 它的参数会用过事件传递给消费者
    	 */
    	public void onData(ByteBuffer bb){
    		//1.可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
            long sequence = ringBuffer.next();
    		try {
    			//2.用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)
    			LongEvent event = ringBuffer.get(sequence);
    			//3.获取要通过事件传递的业务数据
    			event.setValue(bb.getLong(0));
    		} finally {
    			//4.发布事件
    			//注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;
                // 如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
    			ringBuffer.publish(sequence);
    		}
    	}
    	
    }
    
    • main函数执行调用
    public class LongEventMain {
    
    	public static void main(String[] args) throws Exception {
    		//创建缓冲池
    		ExecutorService  executor = Executors.newCachedThreadPool();
    		//创建工厂
    		LongEventFactory factory = new LongEventFactory();
    		//创建bufferSize ,也就是RingBuffer大小,必须是2的N次方
    		int ringBufferSize = 1024 * 1024; // 
    
    		/**
    		//BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现
    		WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
    		//SleepingWaitStrategy 的性能表现跟BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景
    		WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
    		//YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性
    		WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
    		*/
    
    
            /**
             *  参数说明:
             */
    		
    		//创建disruptor
    		Disruptor<LongEvent> disruptor = 
                        new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
    		// 连接消费事件方法
    		disruptor.handleEventsWith(new LongEventHandler());
    		
    		// 启动
    		disruptor.start();
    		
    		//Disruptor 的事件发布过程是一个两阶段提交的过程:
    		//发布事件
    		RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    		
    		LongEventProducer producer = new LongEventProducer(ringBuffer); 
    		//LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
    		ByteBuffer byteBuffer = ByteBuffer.allocate(8);
    		for(long l = 0; l<100; l++){
    			byteBuffer.putLong(0, l);
    			producer.onData(byteBuffer);
    			//Thread.sleep(1000);
    		}
    
    		
    		disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;
    		executor.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭;		
    	 
    	}
    }
    

    参考资料:http://ifeve.com/disruptor-getting-started/

  • 相关阅读:
    jython resources
    Installing a Library of Jython ScriptsPart of the WebSphere Application Server v7.x Administration Series Series
    jython好资料
    ulipad install on 64bit win7 has issue
    an oracle article in high level to descibe how to archtichre operator JAVA relevet project
    table的宽度,单元格内换行问题
    Linux常用命令大全
    dedecms系统后台登陆提示用户名密码不存在
    登录织梦后台提示用户名不存在的解决方法介绍
    Shell常用命令整理
  • 原文地址:https://www.cnblogs.com/jiangjun-x/p/8111315.html
Copyright © 2011-2022 走看看