zoukankan      html  css  js  c++  java
  • Flux转Mono next()

    import java.util.LinkedHashMap;
    import java.util.Map;
    import java.util.NoSuchElementException;
    import java.util.function.Function;
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    @Slf4j
    public class TestFindResult {
      private static final Map<String, String> templates;
      private static final int sleep = 1000;
    
      static {
        templates = new LinkedHashMap<>();
        templates.put("aDB", "a");
        templates.put("bDB", "b");
        templates.put("cDB", "c");
      }
    
      public Mono<String> findResult(Function<String, Mono<String>> query) {
        return Flux.fromIterable(templates.values())
            .flatMap(query)
            .next()
            .onErrorResume(NoSuchElementException.class, e -> Mono.empty())
            .onErrorMap(IndexOutOfBoundsException.class, MultipleUpstreamException::new);
      }
    
      public static void main(String[] args) {
        TestFindResult test = new TestFindResult();
        Function<String, Mono<String>> query = (value) -> {
          try {
            Thread.sleep(sleep); // mock DB query
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          log.info(
              "Thread id:{}, Thread name:{}, value:{}, used ms:{}",
              Thread.currentThread().getId(),
              Thread.currentThread().getName(),
              value,
              sleep);
          return Mono.just(value);
        };
        System.out.println(test.findResult(query).subscribe());
      }
    }
    import java.util.LinkedHashMap;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.NoSuchElementException;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.time.StopWatch;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    @Slf4j
    public class TestFindMongo {
      private static final Map<String, String> templates;
      private static final int sleep = 1000;
    
      static {
        templates = new LinkedHashMap<>();
        templates.put("aDB", "a");
        templates.put("bDB", "b");
        templates.put("cDB", "c");
      }
    
      public Mono<String> findMongo() {
        StopWatch stopWatch = StopWatch.createStarted();
        return Flux.fromIterable(templates.entrySet())
            .filterWhen(
                template -> {
                  String key = template.getKey();
                  String value = template.getValue();
                  try {
                    Thread.sleep(sleep); // mock DB query
                  } catch (InterruptedException e) {
                    e.printStackTrace();
                  }
                  log.info(
                      "Thread id:{}, Thread name:{}, query:{}, value:{} , used ms:{}",
                      Thread.currentThread().getId(),
                      Thread.currentThread().getName(),
                      key,
                      value,
                      sleep);
                  return Mono.just(value.equals("b"));
                })
            .next()
            .doOnSuccess(templateEntry -> log.info("Match {} ", templateEntry.getKey()))
            .map(Entry::getValue)
            .onErrorResume(NoSuchElementException.class, e -> Mono.empty())
            .onErrorMap(IndexOutOfBoundsException.class, MultipleUpstreamException::new)
            .doOnTerminate(() -> log.info("Database recon took {} ms", stopWatch.getTime()));
      }
    
      public static void main(String[] args) {
        TestFindMongo test = new TestFindMongo();
        System.out.println(test.findMongo().subscribe());
      }
    }
    import static org.springframework.http.HttpStatus.*;
    
    import org.springframework.web.server.ResponseStatusException;
    import reactor.core.publisher.Flux;
    
    public class MultipleUpstreamException extends ResponseStatusException {
    
      private static final String MULTILPLE_UPSTREAM_MATCH_ERR =
          "Your query contains properties matching multiple upstreams. "
              + "Data for multiple upstreams can't be returned in one query. "
              + "Please either specify upstream by providing publisherSystem "
              + "(GSM,MUNI_ITICKET,MUNI_OASYS,TPSDERIV,EDLR) "
              + "and region or request deal properties matching only one upstream";
    
      MultipleUpstreamException() {
        super(BAD_REQUEST, MULTILPLE_UPSTREAM_MATCH_ERR);
      }
    
      /**
       * This constructor has syntax adapted to Mono API
       *
       * @param indexOutOfBoundsException emitted on {@link Flux#single()} when Flux has more than one
       *     elements
       * @see Mono#onErrorMap(Class, java.util.function.Function))
       */
      MultipleUpstreamException(IndexOutOfBoundsException indexOutOfBoundsException) {
        this();
      }
    }
  • 相关阅读:
    Mybatis的动态sql以及分页
    Mybatis入门
    使用java代码操作Redis
    Redis安装和基本操作
    idea安装以及使用
    卢卡斯定理 Lucas (p为素数)
    三分/优选法(黄金分割法)求单峰函数极值
    缩点tarjan
    tarjan 求割点
    tarjan
  • 原文地址:https://www.cnblogs.com/tonggc1668/p/12055711.html
Copyright © 2011-2022 走看看