zoukankan      html  css  js  c++  java
  • ReactorTest

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>com</groupId>
      <artifactId>test</artifactId>
      <version>1.0-SNAPSHOT</version>
    
      <dependencies>
      <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>3.2.8.RELEASE</version>
      </dependency>
        <dependency>
          <groupId>io.projectreactor</groupId>
          <artifactId>reactor-test</artifactId>
          <version>3.2.8.RELEASE</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.12</version>
          <scope>test</scope>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
          <version>1.18.0</version>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
          <version>1.7.25</version>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
        <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-classic</artifactId>
          <version>1.2.3</version>
        </dependency>
      </dependencies>
    
      <build>
        <plugins>
          <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
              <source>1.8</source>
              <target>1.8</target>
            </configuration>
          </plugin>
        </plugins>
      </build>
    
    </project>
    import lombok.extern.slf4j.Slf4j;
    import org.reactivestreams.Subscriber;
    import org.reactivestreams.Subscription;
    import reactor.core.publisher.Flux;
    
    @Slf4j
    public class ReactorTest {
    
    
      public static void main(String[] args) {
       /* Mono.fromCallable(System::currentTimeMillis)
            .doOnSuccess(r -> log.info("1"))
            .subscribe(r -> log.info("2"));
        Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).subscribe(r -> log.info(""+r));
        CompletableFuture.supplyAsync(System::currentTimeMillis).join();*/
    
        //防止程序过早退出,放一个CountDownLatch拦住
       /* CountDownLatch latch = new CountDownLatch(1);
        try {
          latch.await();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }*/
        Flux.just(1, 2, 3, 4, 5).subscribe(new Subscriber<Integer>() { // 1
    
          @Override
          public void onSubscribe(Subscription s) {
            log.info("onSubscribe");
            s.request(6);   // 2
          }
    
          @Override
          public void onNext(Integer integer) {
            log.info("onNext:" + integer);
          }
    
          @Override
          public void onError(Throwable t) {
    
          }
    
          @Override
          public void onComplete() {
            log.info("onComplete");
          }
        });
      }
    }
    import java.util.Random;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.FutureTask;
    import lombok.extern.slf4j.Slf4j;
    import org.reactivestreams.Subscriber;
    import org.reactivestreams.Subscription;
    import reactor.core.publisher.Flux;
    
    @Slf4j
    public class ReactorTest {
    
      public static void main(String[] args) throws ExecutionException, InterruptedException {
    
        MyTask myTask = new MyTask();//实例化任务,传递参数
        FutureTask<Integer> futureTask = new FutureTask<>(myTask);//将任务放进FutureTask里
        //采用thread来开启多线程,futuretask继承了Runnable,可以放在线程池中来启动执行
        Thread thread = new Thread(futureTask);
        thread.start();
    
        log.info("begin");
        MySubscriber mySubscriber = new MySubscriber();
        Flux.just(futureTask.get()).subscribe(mySubscriber);
    
        Integer result = mySubscriber.getI();
        log.info("result: "+ result);
      }
    }
    
    @Slf4j
    class MyTask  implements Callable<Integer>{
    
      //任务执行的动作
      @Override
      public Integer call() throws Exception {
    
       Thread.sleep(2000);
        Random r = new Random();
        int i = r.nextInt(5);
        log.info("set i ="+i);
        return i;
      }
    }
    
    @Slf4j
    class MySubscriber  implements Subscriber<Integer>{
    
      private Integer i;
    
      @Override
      public void onSubscribe(Subscription s) {
        log.info("onSubscribe");
        s.request(2);
      }
    
      @Override
      public void onNext(Integer integer) {
        log.info("onNext "+integer);
        i = integer;
        log.info("onNext i="+i);
      }
    
      @Override
      public void onError(Throwable throwable) {
        log.info("onError");
      }
    
      @Override
      public void onComplete() {
        log.info("onComplete");
      }
    
      public Integer getI() {
        return i;
      }
    }
    import java.time.Duration;
    import java.time.temporal.ChronoUnit;
    import java.util.concurrent.CountDownLatch;
    import lombok.extern.slf4j.Slf4j;
    import org.reactivestreams.Subscriber;
    import org.reactivestreams.Subscription;
    import reactor.core.publisher.Flux;
    
    @Slf4j
    public class ReactorTestOld {
      public static void main(String[] args) {
       // Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).subscribe(r -> log.info(""+r));
        Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).subscribe(new Subscriber<Long>() { // 1
    
          @Override
          public void onSubscribe(Subscription s) {
            log.info("onSubscribe");
           //s.request(3);   // 2
            s.request(Long.MAX_VALUE);
          }
    
          @Override
          public void onNext(Long integer) {
            log.info("onNext:" + integer);
          }
    
          @Override
          public void onError(Throwable e) {
            log.error("onError:" ,e);
          }
    
          @Override
          public void onComplete() {
            log.info("onComplete");
          }
        });
        CountDownLatch latch = new CountDownLatch(1);
        try {
          latch.await();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>com</groupId>
      <artifactId>test</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>test</name>
      <url>http://maven.apache.org</url>
    
      <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
      </properties>
    
        
        
      <dependencies>
         <!-- <dependency>  
                <groupId>org.springframework.boot</groupId>  
                <artifactId>spring-boot-starter-web</artifactId>  
                <exclusions>去掉默认配置  
                    <exclusion>  
                        <groupId>org.springframework.boot</groupId>  
                        <artifactId>spring-boot-starter-logging</artifactId>  
                    </exclusion>  
                    </exclusions> 
         </dependency>  
           <dependency> 引入log4j2依赖  
                <groupId>org.springframework.boot</groupId>  
                <artifactId>spring-boot-starter-log4j2</artifactId>  
            </dependency>
        <dependency>
        <groupId>org.springframework.retry</groupId>
        <artifactId>spring-retry</artifactId>
    </dependency> -->
    
        <!--<dependency>
          <groupId>io.reactivex.rxjava2</groupId>
          <artifactId>rxjava</artifactId>
          <version>2.1.7</version>
        </dependency>-->
    
        <dependency>
          <groupId>io.projectreactor</groupId>
          <artifactId>reactor-core</artifactId>
          <version>3.2.8.RELEASE</version>
        </dependency>
    
    <dependency>
        <groupId>commons-lang</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.1</version>
    </dependency>
    
    <!--log4j相关配置开始-->
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
          <version>1.7.25</version>
        </dependency>
        <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-core</artifactId>
          <version>1.2.3</version>
        </dependency>
        <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-classic</artifactId>
          <version>1.2.3</version>
        </dependency>
     </dependencies>
      
         <build>
        <plugins>
          <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.1</version>
            <configuration>
              <source>${java.version}</source>
              <target>${java.version}</target>
            </configuration>
          </plugin>
        </plugins>
      </build>
      
    </project>
    package com.test.reactor;
    
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    import reactor.core.scheduler.Schedulers;
    
    @Slf4j
    public class MonoTest {
      public static void main(String[] args) {
        /*Mono.fromCallable(System::currentTimeMillis)
            .flatMap(time -> Mono.first(serviceA.findRecent(time), serviceB.findRecent(time)))
            .timeout(Duration.ofSeconds(3), errorHandler::fallback)
            .doOnSuccess(r -> serviceM.incrementSuccess())
            .subscribe(System.out::println);*/
       /* Flux.range(1,4)
    
            .filter(e -> {
          log.info("filter thread:[{}]",Thread.currentThread().getName());
          return e % 2 == 0;
        })
            .subscribeOn(Schedulers.newSingle("newSingle1"))
            .publishOn(Schedulers.newSingle("newSingle2"))
            .subscribe(e-> {
              log.info("log thread:[{}]",Thread.currentThread().getName());
              System.out.println(e);});*/
        Flux.range(1,4)
    
            //.delayUntil(MonoTest::request)
            /*.subscribeOn(Schedulers.elastic())
            .filter(e -> {
              log.info("filter thread:[{}]",Thread.currentThread().getName());
              return e % 2 == 0;
            })*/
            // .publishOn(Schedulers.newSingle("newSingle2"))
            .doOnComplete(()->Mono.just(6).subscribeOn(Schedulers.elastic()).subscribe(e-> log.info(""+e)))
            .subscribe(e->  log.info(""+e));
    
    
        //LockSupport.park();
      }
    
      public static Mono<Void> request(int i) {
        // run in a separate thread (RabbitTemplate uses blocking calls)
        return Mono.fromRunnable(() -> {
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          log.info("log thread:[{}]",Thread.currentThread().getName());
          })
            .subscribeOn(Schedulers.newSingle("newSingle"))
            .then();
      }
    }
  • 相关阅读:
    【Distributed】缓存技术
    【Redis】基本数据类型
    【Redis】安装、开启以及关闭
    【Ehcache】基础知识学习
    VS2012 改C# 模版
    C# Windows Services 启动和结束其它进程
    .net Console.ReadLine无效
    VS2012在解决方案资源管理器显示解决方案名称
    Mysql 数据库中9大对象
    C# 开发 Windows 服务 使用Log4net 组件 不能生成日志文件
  • 原文地址:https://www.cnblogs.com/tonggc1668/p/13039755.html
Copyright © 2011-2022 走看看