zoukankan      html  css  js  c++  java
  • spring与disruptor集成的简单示例[z]

    [z]https://www.jb51.net/article/135475.htm

    disruptor不过多介绍了,描述下当前的业务场景,两个应用A,B,应用 A 向应用 B 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致A应用比较大的压力. 使用mq 太重量级,所以选择了disruptor. 也可以使用Reactor

    BaseQueueHelper.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    /**
     * lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布。
     *
     * 调用init()时才真正启动线程开始处理 系统退出自动清理资源.
     *
     * @author xielongwang
     * @create 2018-01-18 下午3:49
     * @email xielong.wang@nvr-china.com
     * @description
     */
    public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> {
     
      /**
       * 记录所有的队列,系统退出时统一清理资源
       */
      private static List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>();
      /**
       * Disruptor 对象
       */
      private Disruptor<E> disruptor;
      /**
       * RingBuffer
       */
      private RingBuffer<E> ringBuffer;
      /**
       * initQueue
       */
      private List<D> initQueue = new ArrayList<D>();
     
      /**
       * 队列大小
       *
       * @return 队列长度,必须是2的幂
       */
      protected abstract int getQueueSize();
     
      /**
       * 事件工厂
       *
       * @return EventFactory
       */
      protected abstract EventFactory<E> eventFactory();
     
      /**
       * 事件消费者
       *
       * @return WorkHandler[]
       */
      protected abstract WorkHandler[] getHandler();
     
      /**
       * 初始化
       */
      public void init() {
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();
        disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy());
        disruptor.setDefaultExceptionHandler(new MyHandlerException());
        disruptor.handleEventsWithWorkerPool(getHandler());
        ringBuffer = disruptor.start();
     
        //初始化数据发布
        for (D data : initQueue) {
          ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
            @Override
            public void translateTo(E event, long sequence, D data) {
              event.setValue(data);
            }
          }, data);
        }
     
        //加入资源清理钩子
        synchronized (queueHelperList) {
          if (queueHelperList.isEmpty()) {
            Runtime.getRuntime().addShutdownHook(new Thread() {
              @Override
              public void run() {
                for (BaseQueueHelper baseQueueHelper : queueHelperList) {
                  baseQueueHelper.shutdown();
                }
              }
            });
          }
          queueHelperList.add(this);
        }
      }
     
      /**
       * 如果要改变线程执行优先级,override此策略. YieldingWaitStrategy会提高响应并在闲时占用70%以上CPU,
       * 慎用SleepingWaitStrategy会降低响应更减少CPU占用,用于日志等场景.
       *
       * @return WaitStrategy
       */
      protected abstract WaitStrategy getStrategy();
     
      /**
       * 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理.
       */
      public synchronized void publishEvent(D data) {
        if (ringBuffer == null) {
          initQueue.add(data);
          return;
        }
        ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
          @Override
          public void translateTo(E event, long sequence, D data) {
            event.setValue(data);
          }
        }, data);
      }
     
      /**
       * 关闭队列
       */
      public void shutdown() {
        disruptor.shutdown();
      }
    }

    EventFactory.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
     * @author xielongwang
     * @create 2018-01-18 下午6:24
     * @email xielong.wang@nvr-china.com
     * @description
     */
    public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> {
     
      @Override
      public SeriesDataEvent newInstance() {
        return new SeriesDataEvent();
      }
    }

    MyHandlerException.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    public class MyHandlerException implements ExceptionHandler {
     
      private Logger logger = LoggerFactory.getLogger(MyHandlerException.class);
     
      /*
       * (non-Javadoc) 运行过程中发生时的异常
       *
       * @see
       * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable
       * , long, java.lang.Object)
       */
      @Override
      public void handleEventException(Throwable ex, long sequence, Object event) {
        ex.printStackTrace();
        logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());
      }
     
      /*
       * (non-Javadoc) 启动时的异常
       *
       * @see
       * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang.
       * Throwable)
       */
      @Override
      public void handleOnStartException(Throwable ex) {
        logger.error("start disruptor error ==[{}]!", ex.getMessage());
      }
     
      /*
       * (non-Javadoc) 关闭时的异常
       *
       * @see
       * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang
       * .Throwable)
       */
      @Override
      public void handleOnShutdownException(Throwable ex) {
        logger.error("shutdown disruptor error ==[{}]!", ex.getMessage());
      }
    }

    SeriesData.java (代表应用A发送给应用B的消息)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public class SeriesData {
      private String deviceInfoStr;
      public SeriesData() {
      }
     
      public SeriesData(String deviceInfoStr) {
        this.deviceInfoStr = deviceInfoStr;
      }
     
      public String getDeviceInfoStr() {
        return deviceInfoStr;
      }
     
      public void setDeviceInfoStr(String deviceInfoStr) {
        this.deviceInfoStr = deviceInfoStr;
      }
     
      @Override
      public String toString() {
        return "SeriesData{" +
            "deviceInfoStr='" + deviceInfoStr + ''' +
            '}';
      }
    }

    SeriesDataEvent.java

    1
    2
    public class SeriesDataEvent extends ValueWrapper<SeriesData> {
    }

    SeriesDataEventHandler.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public class SeriesDataEventHandler implements WorkHandler<SeriesDataEvent> {
      private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class);
      @Autowired
      private DeviceInfoService deviceInfoService;
     
      @Override
      public void onEvent(SeriesDataEvent event) {
        if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) {
          logger.warn("receiver series data is empty!");
        }
        //业务处理
        deviceInfoService.processData(event.getValue().getDeviceInfoStr());
      }
    }

    SeriesDataEventQueueHelper.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    @Component
    public class SeriesDataEventQueueHelper extends BaseQueueHelper<SeriesData, SeriesDataEvent, SeriesDataEventHandler> implements InitializingBean {
      private static final int QUEUE_SIZE = 1024;
      @Autowired
      private List<SeriesDataEventHandler> seriesDataEventHandler;
     
      @Override
      protected int getQueueSize() {
        return QUEUE_SIZE;
      }
     
      @Override
      protected com.lmax.disruptor.EventFactory eventFactory() {
        return new EventFactory();
      }
     
      @Override
      protected WorkHandler[] getHandler() {
        int size = seriesDataEventHandler.size();
        SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]);
        return paramEventHandlers;
      }
     
      @Override
      protected WaitStrategy getStrategy() {
        return new BlockingWaitStrategy();
        //return new YieldingWaitStrategy();
      }
     
      @Override
      public void afterPropertiesSet() throws Exception {
        this.init();
      }
    }

    ValueWrapper.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public abstract class ValueWrapper<T> {
      private T value;
      public ValueWrapper() {}
      public ValueWrapper(T value) {
        this.value = value;
      }
     
      public T getValue() {
        return value;
      }
     
      public void setValue(T value) {
        this.value = value;
      }
    }

    DisruptorConfig.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    @Configuration
    @ComponentScan(value = {"com.portal.disruptor"})
    //多实例几个消费者
    public class DisruptorConfig {
     
      /**
       * smsParamEventHandler1
       *
       * @return SeriesDataEventHandler
       */
      @Bean
      public SeriesDataEventHandler smsParamEventHandler1() {
        return new SeriesDataEventHandler();
      }
     
      /**
       * smsParamEventHandler2
       *
       * @return SeriesDataEventHandler
       */
      @Bean
      public SeriesDataEventHandler smsParamEventHandler2() {
        return new SeriesDataEventHandler();
      }
     
      /**
       * smsParamEventHandler3
       *
       * @return SeriesDataEventHandler
       */
      @Bean
      public SeriesDataEventHandler smsParamEventHandler3() {
        return new SeriesDataEventHandler();
      }
     
     
      /**
       * smsParamEventHandler4
       *
       * @return SeriesDataEventHandler
       */
      @Bean
      public SeriesDataEventHandler smsParamEventHandler4() {
        return new SeriesDataEventHandler();
      }
     
      /**
       * smsParamEventHandler5
       *
       * @return SeriesDataEventHandler
       */
      @Bean
      public SeriesDataEventHandler smsParamEventHandler5() {
        return new SeriesDataEventHandler();
      }
    }

    测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    //注入SeriesDataEventQueueHelper消息生产者
    @Autowired
    private SeriesDataEventQueueHelper seriesDataEventQueueHelper;
     
    @RequestMapping(value = "/data", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
    public DataResponseVo<String> receiverDeviceData(@RequestBody String deviceData) {
      long startTime1 = System.currentTimeMillis();
     
      if (StringUtils.isEmpty(deviceData)) {
        logger.info("receiver data is empty !");
        return new DataResponseVo<String>(400, "failed");
      }
      seriesDataEventQueueHelper.publishEvent(new SeriesData(deviceData));
      long startTime2 = System.currentTimeMillis();
      logger.info("receiver data ==[{}] millisecond ==[{}]", deviceData, startTime2 - startTime1);
      return new DataResponseVo<String>(200, "success");
    }

    应用A通过/data 接口把数据发送到应用B ,然后通过seriesDataEventQueueHelper 把消息发给disruptor队列,消费者去消费,整个过程对不会堵塞应用A. 可接受消息丢失, 可以通过扩展SeriesDataEventQueueHelper来达到对disruptor队列的监控

  • 相关阅读:
    ●BZOJ 3894 文理分科
    ●BZOJ 1797 [Ahoi2009]Mincut 最小割
    ●BZOJ 1934 [Shoi2007]Vote 善意的投票
    ●BZOJ 3996 [TJOI2015]线性代数
    php--->self与static区别
    php--->使用callable强制指定回调类型
    php--->依赖注入(DI)实现控制反转(IOC)
    php--->cookie和session
    php--->注册模式
    linux---> siege压力测试工具使用
  • 原文地址:https://www.cnblogs.com/jjj250/p/10470787.html
Copyright © 2011-2022 走看看