zoukankan      html  css  js  c++  java
  • Observable Flowable Test

    package com.test.rxjava;
    
    import java.time.Duration;
    import java.time.Instant;
    import java.util.LinkedList;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.commons.lang3.time.DurationFormatUtils;
    
    import io.reactivex.Flowable;
    import io.reactivex.Observable;
    import io.reactivex.ObservableOnSubscribe;
    import io.reactivex.schedulers.Schedulers;
    
    public class ObservableTest {
      
      public static void main(String[] args) {
        ObservableTest test = new ObservableTest();
        CountDownLatch latch = new CountDownLatch(1);
        test.run();
        try {
          latch.await();
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }
      private void run() {
        LinkedList<Integer> aList = new LinkedList<Integer>();
        for (int i = 0; i < 10000000; i++) {
          aList.add(i);
        }
        Instant start = Instant.now();
        Observable
        //Flowable
        //.fromIterable(aList)
        .create((ObservableOnSubscribe<Integer>) observableEmitter -> {
          //observableEmitter: 发射器
          Integer i = 0;
          while ( true){
              i++;
              System.out.println(i);
              observableEmitter.onNext(i);
          }
      })
        .observeOn(Schedulers.io())
        .subscribeOn(Schedulers.newThread())
        //.filter(i -> i%2==0)
        .subscribe(this::next, throwable -> throwable.printStackTrace(),()-> System.out.println(DurationFormatUtils.formatDurationWords(Duration.between(start, Instant.now()).toMillis(), true, true)));
      }
      private void next(Integer i) {
        try {
          Thread.sleep(100000);
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+":"+i);
      }
    }
    package com.test.rxjava;
    
    import java.time.Duration;
    import java.time.Instant;
    import java.util.Iterator;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.commons.lang3.time.DurationFormatUtils;
    
    import io.reactivex.Flowable;
    import io.reactivex.Observable;
    import io.reactivex.schedulers.Schedulers;
    
    public class ObservableTest implements Iterable<Integer> {
    
      private Integer begin;
      private Integer end;
    
      private ObservableTest(Integer begin, Integer end) {
        this.begin = begin;
        this.end = end;
      }
      
      public static void main(String[] args) {
        ObservableTest test = new ObservableTest(0, 1_000_000);
        CountDownLatch latch = new CountDownLatch(1);
        test.run(test, latch);
        try {
          latch.await();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    
      private void run(ObservableTest test, CountDownLatch latch) {
        Instant start = Instant.now();
        //Observable
             Flowable
            .fromIterable(test).observeOn(Schedulers.io()).subscribeOn(Schedulers.newThread())
            .subscribe(this::handleNext, e -> {
              e.printStackTrace();
              latch.countDown();
            }, () -> {
              System.out.println(DurationFormatUtils
                  .formatDurationWords(Duration.between(start, Instant.now()).toMillis(), true, true));
              latch.countDown();
            });
      }
    
      private void handleNext(Integer i) {
        System.out.println("consumer: "+Thread.currentThread().getName() + ":" + i);
      }
    
      public Iterator<Integer> iterator() {
        return new Itr(begin, end);
      }
    
      private class Itr implements Iterator<Integer> {
        private Integer begin;
        private Integer end;
    
        private Itr(Integer begin, Integer end) {
          this.begin = begin;
          this.end = end;
        }
    
        @Override
        public boolean hasNext() {
          return begin < end;
        }
    
    
        @Override
        public Integer next() {
          begin++;
          try {
            Thread.sleep(100);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          System.out.println("producer: "+Thread.currentThread().getName() + ":" + begin);
          return begin;
        }
      }
    }
    package com.test.rxjava;
    
    import java.time.Duration;
    import java.time.Instant;
    import java.util.Iterator;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.commons.lang3.time.DurationFormatUtils;
    
    import io.reactivex.Observable;
    import io.reactivex.schedulers.Schedulers;
    
    public class ObservableTest implements Iterable<Integer> {
    
      private Integer begin;
      private Integer end;
    
      private ObservableTest(Integer begin, Integer end) {
        this.begin = begin;
        this.end = end;
      }
      
      public static void main(String[] args) {
        ObservableTest test = new ObservableTest(0, 100_000_000);
        CountDownLatch latch = new CountDownLatch(1);
        test.run(test, latch);
        try {
          latch.await();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    
      private void run(ObservableTest test, CountDownLatch latch) {
        Instant start = Instant.now();
        Observable
            .fromIterable(test)
            .observeOn(Schedulers.io())
            .subscribeOn(Schedulers.newThread())
            .filter(i -> i%2==0)
            .buffer(1000)
            .subscribe(this::handleNext, e -> {
              e.printStackTrace();
              latch.countDown();
            }, () -> {
              System.out.println("Observable:" + DurationFormatUtils
                  .formatDurationWords(Duration.between(start, Instant.now()).toMillis(), true, true));
              latch.countDown();
            });
      }
    
      private void handleNext(List<Integer> i) {
        System.out.println("consumer: "+Thread.currentThread().getName() + ":" + i.get(i.size()-1));
      }
    
      public Iterator<Integer> iterator() {
        return new Itr(begin, end);
      }
    
      private class Itr implements Iterator<Integer> {
        private Integer begin;
        private Integer end;
    
        private Itr(Integer begin, Integer end) {
          this.begin = begin;
          this.end = end;
        }
    
        @Override
        public boolean hasNext() {
          return begin < end;
        }
    
    
        @Override
        public Integer next() {
          begin++;
          //System.out.println("producer: "+Thread.currentThread().getName() + ":" + begin);
          return begin;
        }
      }
    }
    package com.test.rxjava;
    
    import java.time.Duration;
    import java.time.Instant;
    import java.util.Iterator;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.commons.lang3.time.DurationFormatUtils;
    
    import io.reactivex.Flowable;
    import io.reactivex.schedulers.Schedulers;
    
    public class FlowableTest implements Iterable<Integer> {
    
      private Integer begin;
      private Integer end;
    
      private FlowableTest(Integer begin, Integer end) {
        this.begin = begin;
        this.end = end;
      }
      
      public static void main(String[] args) {
        FlowableTest test = new FlowableTest(0, 100_000_000);
        CountDownLatch latch = new CountDownLatch(1);
        test.run(test, latch);
        try {
          latch.await();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    
      private void run(FlowableTest test, CountDownLatch latch) {
        Instant start = Instant.now();
        Flowable
            .fromIterable(test)
            .observeOn(Schedulers.io())
            .subscribeOn(Schedulers.newThread())
            .filter(i -> i%2==0)
            .buffer(1000)
            .subscribe(this::handleNext, e -> {
              e.printStackTrace();
              latch.countDown();
            }, () -> {
              System.out.println("Flowable:" + DurationFormatUtils
                  .formatDurationWords(Duration.between(start, Instant.now()).toMillis(), true, true));
              latch.countDown();
            });
      }
    
      private void handleNext(List<Integer> i) {
        System.out.println("consumer: "+Thread.currentThread().getName() + ":" + i.get(i.size()-1));
      }
    
      public Iterator<Integer> iterator() {
        return new Itr(begin, end);
      }
    
      private class Itr implements Iterator<Integer> {
        private Integer begin;
        private Integer end;
    
        private Itr(Integer begin, Integer end) {
          this.begin = begin;
          this.end = end;
        }
    
        @Override
        public boolean hasNext() {
          return begin < end;
        }
    
    
        @Override
        public Integer next() {
          begin++;
          //System.out.println("producer: "+Thread.currentThread().getName() + ":" + begin);
          return begin;
        }
      }
    }

     去掉

    .subscribeOn(Schedulers.newThread())
  • 相关阅读:
    制作计算器的代码(C#)
    Oracle如何实现创建数据库、备份数据库及数据导出导入操作
    XmlDocument类
    Android强大的开源库与系统架构工具
    IO
    胎压监测设备
    福施福、爱乐维、玛特纳各成分比较(已换算成同一单位)
    用车不容忽视的细节
    汽车必备车饰和常用物品
    j2ee指导型框架或示例
  • 原文地址:https://www.cnblogs.com/tonggc1668/p/10113641.html
Copyright © 2011-2022 走看看