zoukankan      html  css  js  c++  java
  • disruptor 高效队列

    disruptor 是什么: disruptor 是一个 低延时的 无锁 环形 队列.  相较于 java的 队列 ,他有明显的优点  ,无界,无锁,低延时(解决了为内存共享问题 ) disruptor 就一个 高效的生产者消费者队列.

    EventHandler ,WorkHandler 区别 ,  WorkHandler  适用于 多个消费者 中的一个消费这个 消息, EventHandler  适用于 每个消费者都要处理一下这个消息.

    package com.cxygg.test;
    
    import java.util.concurrent.ThreadFactory;
    
    import com.lmax.disruptor.BlockingWaitStrategy;
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.EventHandler;
    import com.lmax.disruptor.EventTranslatorOneArg;
    import com.lmax.disruptor.ExceptionHandler;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.WorkHandler;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    
    public class DisruptorTest {
        /**
         * 消息事件类
         */
        public static class MessageEvent{
            /**
             * 原始消息
             */
            private String message;
    
            public String getMessage() {
                return message;
            }
    
            public void setMessage(String message) {
                this.message = message;
            }
        }
    
        /**
         * 消息事件工厂类
         */
        public static class MessageEventFactory implements EventFactory<MessageEvent>{
            @Override
            public MessageEvent newInstance() {
                return new MessageEvent();
            }
        }
    
        /**
         * 消息转换类,负责将消息转换为事件
         */
        public static class MessageEventTranslator implements EventTranslatorOneArg<MessageEvent,String> {
            @Override
            public void translateTo(MessageEvent messageEvent, long l, String s) {
                messageEvent.setMessage(s);
            }
        }
    
        /**
         * 消费者线程工厂类
         */
        public static class MessageThreadFactory implements ThreadFactory{
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r,"Simple Disruptor Test Thread");
            }
        }
    
        /**
         * 消息事件处理类,这里只打印消息
         */
        public static class MessageEventHandler1 implements WorkHandler<MessageEvent>{
    
    		@Override
    		public void onEvent(MessageEvent event) throws Exception {
    			System.out.println("消费1:" + event.getMessage());
    		}
         
        	
        }
        /**
         * 消息事件处理类,这里只打印消息
         */
        public static class MessageEventHandler2 implements WorkHandler<MessageEvent>{
        	@Override
        	public void onEvent(MessageEvent messageEvent) throws Exception {
        		System.out.println("消费2:" + messageEvent.getMessage());
        	}
        }
        /**
         * 消息事件处理类,这里只打印消息
         */
        public static class MessageEventHandler3 implements WorkHandler<MessageEvent>{
        	@Override
        	public void onEvent(MessageEvent messageEvent) throws Exception {
        		System.out.println("消费3:" + messageEvent.getMessage());
        	}
        }
    
        /**
         * 异常处理类
         */
        public static class MessageExceptionHandler implements ExceptionHandler<MessageEvent>{
            @Override
            public void handleEventException(Throwable ex, long sequence, MessageEvent event) {
                ex.printStackTrace();
            }
    
            @Override
            public void handleOnStartException(Throwable ex) {
                ex.printStackTrace();
    
            }
    
            @Override
            public void handleOnShutdownException(Throwable ex) {
                ex.printStackTrace();
    
            }
        }
    
        /**
         * 消息生产者类
         */
        
        public static class MessageEventProducer{
            private RingBuffer<MessageEvent> ringBuffer;
    
            public MessageEventProducer(RingBuffer<MessageEvent> ringBuffer) {
                this.ringBuffer = ringBuffer;
            }
    
            /**
             * 将接收到的消息输出到ringBuffer
             * @param message
             */
            public void onData(String message){
                EventTranslatorOneArg<MessageEvent,String> translator = new MessageEventTranslator();
                ringBuffer.publishEvent(translator,message);
            }
        }
    
        public static void main(String[] args) {
            String message = "Hello Disruptor!";
            int ringBufferSize = 2;//必须是2的N次方
            Disruptor<MessageEvent> disruptor = new Disruptor<MessageEvent>(new MessageEventFactory(),ringBufferSize,new MessageThreadFactory(),ProducerType.SINGLE,new BlockingWaitStrategy());
            
            
            
            disruptor.handleEventsWithWorkerPool(new MessageEventHandler1(),new MessageEventHandler2()).thenHandleEventsWithWorkerPool( new MessageEventHandler3() );
            disruptor.handleEventsWithWorkerPool(new MessageEventHandler2(),new MessageEventHandler1()).thenHandleEventsWithWorkerPool( new MessageEventHandler3() );
            disruptor.setDefaultExceptionHandler(new MessageExceptionHandler());
            RingBuffer<MessageEvent> ringBuffer = disruptor.start();
            
            
            
            MessageEventProducer producer = new MessageEventProducer(ringBuffer);
            producer.onData( message +"");
            
            
            
        }
    }
    

      

     参考:https://blog.csdn.net/twypx/article/details/80398886

  • 相关阅读:
    cs231n --- 3 : Convolutional Neural Networks (CNNs / ConvNets)
    cs231n --- 1:线性svm与softmax
    Python的下划线_
    Python的类(class)
    python self
    MFC中应用对象的成员:窗口指针m_pMainWnd说明
    MSTP+VRRP组合组网
    VRRP组网下同网段内配置基于全局地址池的DHCP服务器
    路由器/交换机Console口登录密码丢失后如何恢复
    交换机处于同一网络作为DHCP中继与服务器
  • 原文地址:https://www.cnblogs.com/cxygg/p/11835581.html
Copyright © 2011-2022 走看看