zoukankan      html  css  js  c++  java
  • disruptor 高效队列

    disruptor 是什么: disruptor 是一个 低延时的 无锁 环形 队列.  相较于 java的 队列 ,他有明显的优点  ,无界,无锁,低延时(解决了为内存共享问题 ) disruptor 就一个 高效的生产者消费者队列.

    EventHandler ,WorkHandler 区别 ,  WorkHandler  适用于 多个消费者 中的一个消费这个 消息, EventHandler  适用于 每个消费者都要处理一下这个消息.

    package com.cxygg.test;
    
    import java.util.concurrent.ThreadFactory;
    
    import com.lmax.disruptor.BlockingWaitStrategy;
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.EventHandler;
    import com.lmax.disruptor.EventTranslatorOneArg;
    import com.lmax.disruptor.ExceptionHandler;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.WorkHandler;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    
    public class DisruptorTest {
        /**
         * 消息事件类
         */
        public static class MessageEvent{
            /**
             * 原始消息
             */
            private String message;
    
            public String getMessage() {
                return message;
            }
    
            public void setMessage(String message) {
                this.message = message;
            }
        }
    
        /**
         * 消息事件工厂类
         */
        public static class MessageEventFactory implements EventFactory<MessageEvent>{
            @Override
            public MessageEvent newInstance() {
                return new MessageEvent();
            }
        }
    
        /**
         * 消息转换类,负责将消息转换为事件
         */
        public static class MessageEventTranslator implements EventTranslatorOneArg<MessageEvent,String> {
            @Override
            public void translateTo(MessageEvent messageEvent, long l, String s) {
                messageEvent.setMessage(s);
            }
        }
    
        /**
         * 消费者线程工厂类
         */
        public static class MessageThreadFactory implements ThreadFactory{
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r,"Simple Disruptor Test Thread");
            }
        }
    
        /**
         * 消息事件处理类,这里只打印消息
         */
        public static class MessageEventHandler1 implements WorkHandler<MessageEvent>{
    
    		@Override
    		public void onEvent(MessageEvent event) throws Exception {
    			System.out.println("消费1:" + event.getMessage());
    		}
         
        	
        }
        /**
         * 消息事件处理类,这里只打印消息
         */
        public static class MessageEventHandler2 implements WorkHandler<MessageEvent>{
        	@Override
        	public void onEvent(MessageEvent messageEvent) throws Exception {
        		System.out.println("消费2:" + messageEvent.getMessage());
        	}
        }
        /**
         * 消息事件处理类,这里只打印消息
         */
        public static class MessageEventHandler3 implements WorkHandler<MessageEvent>{
        	@Override
        	public void onEvent(MessageEvent messageEvent) throws Exception {
        		System.out.println("消费3:" + messageEvent.getMessage());
        	}
        }
    
        /**
         * 异常处理类
         */
        public static class MessageExceptionHandler implements ExceptionHandler<MessageEvent>{
            @Override
            public void handleEventException(Throwable ex, long sequence, MessageEvent event) {
                ex.printStackTrace();
            }
    
            @Override
            public void handleOnStartException(Throwable ex) {
                ex.printStackTrace();
    
            }
    
            @Override
            public void handleOnShutdownException(Throwable ex) {
                ex.printStackTrace();
    
            }
        }
    
        /**
         * 消息生产者类
         */
        
        public static class MessageEventProducer{
            private RingBuffer<MessageEvent> ringBuffer;
    
            public MessageEventProducer(RingBuffer<MessageEvent> ringBuffer) {
                this.ringBuffer = ringBuffer;
            }
    
            /**
             * 将接收到的消息输出到ringBuffer
             * @param message
             */
            public void onData(String message){
                EventTranslatorOneArg<MessageEvent,String> translator = new MessageEventTranslator();
                ringBuffer.publishEvent(translator,message);
            }
        }
    
        public static void main(String[] args) {
            String message = "Hello Disruptor!";
            int ringBufferSize = 2;//必须是2的N次方
            Disruptor<MessageEvent> disruptor = new Disruptor<MessageEvent>(new MessageEventFactory(),ringBufferSize,new MessageThreadFactory(),ProducerType.SINGLE,new BlockingWaitStrategy());
            
            
            
            disruptor.handleEventsWithWorkerPool(new MessageEventHandler1(),new MessageEventHandler2()).thenHandleEventsWithWorkerPool( new MessageEventHandler3() );
            disruptor.handleEventsWithWorkerPool(new MessageEventHandler2(),new MessageEventHandler1()).thenHandleEventsWithWorkerPool( new MessageEventHandler3() );
            disruptor.setDefaultExceptionHandler(new MessageExceptionHandler());
            RingBuffer<MessageEvent> ringBuffer = disruptor.start();
            
            
            
            MessageEventProducer producer = new MessageEventProducer(ringBuffer);
            producer.onData( message +"");
            
            
            
        }
    }
    

      

     参考:https://blog.csdn.net/twypx/article/details/80398886

  • 相关阅读:
    什么是用户画像——从零开始搭建实时用户画像(一)
    一站式Kafka平台解决方案——KafkaCenter
    Druid 0.17入门(4)—— 数据查询方式大全
    流媒体与实时计算,Netflix公司Druid应用实践
    解读银行卡支付背后的原理
    求求你了,不要再自己实现这些逻辑了,开源工具类不香吗?
    编程坑太多,Map 集合怎么也有这么多坑?一不小心又踩了好几个!
    设计数据库 ER 图太麻烦?不妨试试这两款工具,自动生成数据库 ER 图!!!
    一口气带你踩完五个 List 的大坑,真的是处处坑啊!
    轻轻一扫,立刻扣款,付款码背后的原理你不想知道吗?|原创
  • 原文地址:https://www.cnblogs.com/cxygg/p/11835581.html
Copyright © 2011-2022 走看看