zoukankan      html  css  js  c++  java
  • 并发框架Disruptor场景应用

    今天用一个停车场问题来加深对Disruptor的理解。一个有关汽车进入停车场的问题。当汽车进入停车场时,系统首先会记录汽车信息。同时也会发送消息到其他系统处理相关业务,最后发送短信通知车主收费开始。看了很多文章,里面的代码都是大同小异的,可能代码真的是很经典。以下代码也是来源网络,只是自己手动敲的,加了一些注释。
     
    代码包含以下内容:
    1) 事件对象Event
    2)三个消费者Handler
    3)一个生产者Processer
    4)执行Main方法
    Event类:汽车信息
    1.  
      public class MyInParkingDataEvent {
    2.  
       
    3.  
      private String carLicense; // 车牌号
    4.  
       
    5.  
      public String getCarLicense() {
    6.  
      return carLicense;
    7.  
      }
    8.  
       
    9.  
      public void setCarLicense(String carLicense) {
    10.  
      this.carLicense = carLicense;
    11.  
      }
    12.  
       
    13.  
      }
    Handler类:一个负责存储汽车数据,一个负责发送kafka信息到其他系统中,最后一个负责给车主发短信通知
    1.  
      import com.lmax.disruptor.EventHandler;
    2.  
      import com.lmax.disruptor.WorkHandler;
    3.  
       
    4.  
      /**
    5.  
      * Handler 第一个消费者,负责保存进场汽车的信息
    6.  
      *
    7.  
      */
    8.  
      public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent> , WorkHandler<MyInParkingDataEvent>{
    9.  
       
    10.  
      @Override
    11.  
      public void onEvent(MyInParkingDataEvent myInParkingDataEvent) throws Exception {
    12.  
      long threadId = Thread.currentThread().getId(); // 获取当前线程id
    13.  
      String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
    14.  
      System.out.println(String.format("Thread Id %s 保存 %s 到数据库中 ....", threadId, carLicense));
    15.  
      }
    16.  
       
    17.  
      @Override
    18.  
      public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
    19.  
      throws Exception {
    20.  
      this.onEvent(myInParkingDataEvent);
    21.  
      }
    22.  
       
    23.  
      }
    1.  
      import com.lmax.disruptor.EventHandler;
    2.  
       
    3.  
      /**
    4.  
      * 第二个消费者,负责发送通知告知工作人员(Kafka是一种高吞吐量的分布式发布订阅消息系统)
    5.  
      */
    6.  
      public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent>{
    7.  
       
    8.  
      @Override
    9.  
      public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
    10.  
      throws Exception {
    11.  
      long threadId = Thread.currentThread().getId(); // 获取当前线程id
    12.  
      String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
    13.  
      System.out.println(String.format("Thread Id %s 发送 %s 进入停车场信息给 kafka系统...", threadId, carLicense));
    14.  
      }
    15.  
       
    16.  
      }
    1.  
      import com.lmax.disruptor.EventHandler;
    2.  
       
    3.  
      /**
    4.  
      * 第三个消费者,sms短信服务,告知司机你已经进入停车场,计费开始。
    5.  
      */
    6.  
      public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent>{
    7.  
       
    8.  
      @Override
    9.  
      public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
    10.  
      throws Exception {
    11.  
      long threadId = Thread.currentThread().getId(); // 获取当前线程id
    12.  
      String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
    13.  
      System.out.println(String.format("Thread Id %s 给 %s 的车主发送一条短信,并告知他计费开始了 ....", threadId, carLicense));
    14.  
      }
    15.  
       
    16.  
      }
    Producer类:负责上报停车数据
    1.  
      import java.util.concurrent.CountDownLatch;
    2.  
      import com.lmax.disruptor.EventTranslator;
    3.  
      import com.lmax.disruptor.dsl.Disruptor;
    4.  
       
    5.  
      /**
    6.  
      * 生产者,进入停车场的车辆
    7.  
      */
    8.  
      public class MyInParkingDataEventPublisher implements Runnable{
    9.  
       
    10.  
      private CountDownLatch countDownLatch; // 用于监听初始化操作,等初始化执行完毕后,通知主线程继续工作
    11.  
      private Disruptor<MyInParkingDataEvent> disruptor;
    12.  
      private static final Integer NUM = 1; // 1,10,100,1000
    13.  
       
    14.  
      public MyInParkingDataEventPublisher(CountDownLatch countDownLatch,
    15.  
      Disruptor<MyInParkingDataEvent> disruptor) {
    16.  
      this.countDownLatch = countDownLatch;
    17.  
      this.disruptor = disruptor;
    18.  
      }
    19.  
       
    20.  
      @Override
    21.  
      public void run() {
    22.  
      MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator();
    23.  
      try {
    24.  
      for(int i = 0; i < NUM; i ++) {
    25.  
      disruptor.publishEvent(eventTranslator);
    26.  
      Thread.sleep(1000); // 假设一秒钟进一辆车
    27.  
      }
    28.  
      } catch (InterruptedException e) {
    29.  
      e.printStackTrace();
    30.  
      } finally {
    31.  
      countDownLatch.countDown(); // 执行完毕后通知 await()方法
    32.  
      System.out.println(NUM + "辆车已经全部进入进入停车场!");
    33.  
      }
    34.  
      }
    35.  
       
    36.  
      }
    37.  
       
    38.  
      class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> {
    39.  
       
    40.  
      @Override
    41.  
      public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) {
    42.  
      this.generateData(myInParkingDataEvent);
    43.  
      }
    44.  
       
    45.  
      private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) {
    46.  
      myInParkingDataEvent.setCarLicense("车牌号: 鄂A-" + (int)(Math.random() * 100000)); // 随机生成一个车牌号
    47.  
      System.out.println("Thread Id " + Thread.currentThread().getId() + " 写完一个event");
    48.  
      return myInParkingDataEvent;
    49.  
      }
    50.  
       
    51.  
      }
    执行的Main方法:
    1.  
      import com.lmax.disruptor.EventFactory;
    2.  
      import com.lmax.disruptor.YieldingWaitStrategy;
    3.  
      import com.lmax.disruptor.dsl.Disruptor;
    4.  
      import com.lmax.disruptor.dsl.EventHandlerGroup;
    5.  
      import com.lmax.disruptor.dsl.ProducerType;
    6.  
       
    7.  
      /**
    8.  
      * 执行的Main方法 ,
    9.  
      * 一个生产者(汽车进入停车场);
    10.  
      * 三个消费者(一个记录汽车信息,一个发送消息给系统,一个发送消息告知司机)
    11.  
      * 前两个消费者同步执行,都有结果了再执行第三个消费者
    12.  
      */
    13.  
      public class MyInParkingDataEventMain {
    14.  
       
    15.  
      public static void main(String[] args) {
    16.  
      long beginTime=System.currentTimeMillis();
    17.  
      int bufferSize = 2048; // 2的N次方
    18.  
      try {
    19.  
      // 创建线程池,负责处理Disruptor的四个消费者
    20.  
      ExecutorService executor = Executors.newFixedThreadPool(4);
    21.  
       
    22.  
      // 初始化一个 Disruptor
    23.  
      Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() {
    24.  
      @Override
    25.  
      public MyInParkingDataEvent newInstance() {
    26.  
      return new MyInParkingDataEvent(); // Event 初始化工厂
    27.  
      }
    28.  
      }, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
    29.  
       
    30.  
      // 使用disruptor创建消费者组 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler
    31.  
      EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith(
    32.  
      new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler());
    33.  
       
    34.  
      // 当上面两个消费者处理结束后在消耗 smsHandler
    35.  
      MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler();
    36.  
      handlerGroup.then(myParkingDataSmsHandler);
    37.  
       
    38.  
      // 启动Disruptor
    39.  
      disruptor.start();
    40.  
       
    41.  
      CountDownLatch countDownLatch = new CountDownLatch(1); // 一个生产者线程准备好了就可以通知主线程继续工作了
    42.  
      // 生产者生成数据
    43.  
      executor.submit(new MyInParkingDataEventPublisher(countDownLatch, disruptor));
    44.  
      countDownLatch.await(); // 等待生产者结束
    45.  
       
    46.  
      disruptor.shutdown();
    47.  
      executor.shutdown();
    48.  
      } catch (Exception e) {
    49.  
      e.printStackTrace();
    50.  
      }
    51.  
       
    52.  
      System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));
    53.  
      }
    54.  
       
    55.  
      }

    --------------------- 本文来自 ITDragon龙 的CSDN 博客 ,全文地址请点击:https://blog.csdn.net/qq_19558705/article/details/77247912?utm_source=copy 

  • 相关阅读:
    CKEditor 4 上传视频
    CKEditor 4 上传图片
    azure跨域问题(访问azure存储账户数据,blob)
    azure 上传blob到ams(CreateFromBlob)
    js 时间格式化成字符串
    js正则表达式替换HTML标签以及空格(&nbsp;)
    js 获取上传视频的时长、大小、后缀名
    webapi 跨域问题
    requests(二): json请求中固定键名顺序&消除键和值之间的空格
    requests(一): 发送一个json格式的post请求
  • 原文地址:https://www.cnblogs.com/felixzh/p/9711357.html
Copyright © 2011-2022 走看看