zoukankan      html  css  js  c++  java
  • WebFlux响应式编程简单示例

    WebFlux介绍

    WebFlux是一个异步非阻塞框架

    什么是异步非阻塞

    同步和异步

    针对的是调用者,调用者发出请求,如果等着对方回应之后才去做其它的事情,那就是同步;如果发送请求之后不等着对方回应就去做其它的事情,那就是异步。

    阻塞与非阻塞

    针对被调用者而言, 如果收到请求之后,做完请求任务之后才反馈就是阻塞;如果收到请求之后就马上反馈,然后再去做事情 ,就是非阻塞

    WebFlux的特点

    • 非阻塞式: 在有限的资源下,提高系统的吞吐量和伸缩性, 以Reactor为基础,实现响应式编程
    • 函数式编程

    Reactor响应式编程

    (1) 在响应式编程中,Reactor是满足Reactive规范的框架

    (2) Reactor有两个核心类,Mono和Flux ,这两个类实现了接口Publisher, 提供了丰富的操作符。 Flux对象实现发布者,返回N个元素;Mono实现翻版发布,返回0或者1 个元素。

    (3) Flux和Mono都是数据流的发布者,使用Flux和Mono都可以发出三种信号:元素值 ,错误信号, 完成信号 。 其中错误信息和完成信号都是终止信号。 终止信号用于告诉订阅者数据流已经结束 了,错误信号终止数据流时,还会将错误信息返回给订阅者。

    3种信号的特点

    1. 错误信号和完成信号都是终止信号,不能并存
    2. 如果没有发送任何元素值 ,而是直接发送错误或者完成信号,表示是一个空的数据流。
    3. 如果没有错误信号也没有完成信号,表示无限数据流。

    操作符

    对我们的数据流进行一道道的操作,就是操作符

    1. map : 把元素映射成一个新的元素
    2. flatmap : 把每个元素转成流,把多个流合并成一个大的流,然后进行输出

    代码示例

    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    /**
     * @Author ZhengQinfeng
     * @Date 2020/12/5 21:04
     * @dec
     */
    public class TestReactor {
        public static void main(String[] args) {
            // just方法中直接声明数据流,而且这个数据流还没有发出。 subscribe是订阅数据流,订阅之后数据流才会发出
            Flux.just(1, 2, 3, 4).subscribe(System.out::println);
            Mono.just(1).subscribe(System.out::println);
    
            // 其它方法
    //        Integer[] arr = {1, 2, 3, 4};
    //        Flux.fromArray(arr);
    //
    //        List<Integer> list = Arrays.asList(arr);
    //        Flux.fromIterable(list);
    //
    //        Stream<Integer> stream = list.stream();
    //        Flux.fromStream(stream);
    //
    //
    //        // 发送一个错误信号
    //        Flux.error(new RuntimeException());
        }
    }
    
    

    SpringWebFlux

    SpringWebFlux基于Reactor,默认使用的容器是Netty , Netty是高性能的IO框架 , 异步非阻塞框架

    SpringWebFlux执行过程

    核心控制器DispatchHandler, 实现了WebHandler

    两个接口

    1. RouterFunction

      路由请求

    2. HandlerFunction

      处理的具体的函数

    编程的两种方式

    基于注解编程

    与springMmc使用相似,只需要把相关依赖引用入项目中即可,springboot自动配置相关的运行容器,默认使用Netty服务器

    1. 引入依赖

         <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-webflux</artifactId>
              </dependency>
      
    2. 代码示例

      /**
       * @Author ZhengQinfeng
       * @Date 2020/12/5 21:47
       * @dec UserService 操作,定义一些操作方法
       */
      public interface UserService {
          // 根据id查询用户
          // Mono表示返回一个或0个元素
          Mono<User> getUserById(Integer id);
      
          // 查询所有用户
          Flux<User> getAllUser();
      
          // 添加用户
          Mono<Void> saveUserInfo(Mono<User> user);
      }
      
      
      @Service
      public class UserServiceImpl implements UserService {
          // 创建一个Map集合,存储数据
          private final Map<Integer, User> users = new HashMap<>();
      
          public UserServiceImpl() {
              this.users.put(1, new User(1, "admin", "男", 20));
              this.users.put(2, new User(2, "jack", "女", 20));
          }
      
          @Override
          public Mono<User> getUserById(Integer id) {
              return Mono.justOrEmpty(this.users.get(id));
          }
      
          @Override
          public Flux<User> getAllUser() {
              return Flux.fromIterable(this.users.values());
          }
      
          @Override
          public Mono<Void> saveUserInfo(Mono<User> userMono) {
              return userMono.doOnNext(user -> {
                  // 放到Map
                  this.users.put(user.getId(), user);
              }).thenEmpty(Mono.empty());// 终止信号
      
          }
      }
      
      
      /**
       * @Author ZhengQinfeng
       * @Date 2020/12/5 21:58
       * @dec
       */
      @RestController
      public class UserController {
      
          @Autowired
          private UserService userService;
      
          @GetMapping("/user/{id}")
          public Mono<User> getUserById(@PathVariable Integer id) {
              return userService.getUserById(id);
          }
      
          @GetMapping("/users")
          public Flux<User> getAllUsers() {
              return userService.getAllUser();
          }
      
      
          @PostMapping("/save/user")
          public Mono<Void> getAllUsers(@RequestBody User user) {
              Mono<User> userMono = Mono.just(user);
              return userService.saveUserInfo(userMono);
          }
      
      
      }
      

    基于函数式编程

    在使用函数式编程时,需要我们自己初始化服务器

    两个核心接口,RouterFunction(实现路由功能,将请求转发给对应的handler)和HandlerFunction(处理请求生成响应的函数). 核心任务定义这两个接口的实现并且启动需要的服务器

    SpringWebFlux请求和响应不再是ServetRequest和ServetResponse, 而是ServerRequest 和 ServerResponse

    代码示例

    Service层代码和之前一样

    不需要Controller层代码, 使用Handler替换之前的Controller

    import org.springframework.http.MediaType;
    import org.springframework.web.reactive.function.server.ServerRequest;
    import org.springframework.web.reactive.function.server.ServerResponse;
    import qinfeng.zheng.springwebflux.entity.User;
    import qinfeng.zheng.springwebflux.service.UserService;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    import static org.springframework.web.reactive.function.BodyInserters.fromValue;
    
    /**
     * @Author ZhengQinfeng
     * @Date 2020/12/5 22:38
     * @dec 函数式编程模型
     */
    public class UserHandler {
        private final UserService userService;
    
        public UserHandler(UserService userService) {
            this.userService = userService;
        }
    
    
        // 根据id查询
        public Mono<ServerResponse> getUserById(ServerRequest request) {
            Integer id = Integer.valueOf(request.pathVariable("id"));
            // 非空处理
            Mono<ServerResponse> notFound = ServerResponse.notFound().build();
            Mono<User> userMono = this.userService.getUserById(id);
            //把userMono 进行转换
            //使用Reactor 操作符flatMap
            return userMono
                    .flatMap(user -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromValue(user)))
                    .switchIfEmpty(notFound);
        }
        
        // 查询所有
        public Mono<ServerResponse> getAllUsers(ServerRequest request) {
            Flux<User> users = this.userService.getAllUser();
            return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(users, User.class);
        }
        // 添加User
    
        public Mono<ServerResponse> saveUser(ServerRequest request) {
            // 获取user 对象
            Mono<User> userMono = request.bodyToMono(User.class);
            return ServerResponse.ok().build(this.userService.saveUserInfo(userMono));
        }
    }
    
    

    服务端代码

    import org.springframework.http.MediaType;
    import org.springframework.http.server.reactive.HttpHandler;
    import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
    import org.springframework.web.reactive.function.server.RequestPredicates;
    import org.springframework.web.reactive.function.server.RouterFunction;
    import org.springframework.web.reactive.function.server.RouterFunctions;
    import org.springframework.web.reactive.function.server.ServerResponse;
    import qinfeng.zheng.springwebflux.handler.UserHandler;
    import qinfeng.zheng.springwebflux.service.UserService;
    import qinfeng.zheng.springwebflux.service.impl.UserServiceImpl;
    import reactor.netty.http.server.HttpServer;
    
    import static org.springframework.web.reactive.function.server.RouterFunctions.toHttpHandler;
    
    /**
     * @Author ZhengQinfeng
     * @Date 2020/12/5 22:56
     * @dec
     */
    public class Server {
        public static void main(String[] args) throws InterruptedException {
            Server server = new Server();
            server.createReactorServer();
            Thread.sleep(Integer.MAX_VALUE);  // 主进程一直运行
    
        }
    
        // 1. 创建Router路由
        public RouterFunction<ServerResponse> routingFunction() {
            UserService userService = new UserServiceImpl();
            UserHandler userHandler = new UserHandler(userService);
    
            // 创建 路由与 handler方法的映射关系
            return RouterFunctions
                    .route(
                            RequestPredicates.GET("/user/{id}").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), userHandler::getUserById)
                    .andRoute(
                            RequestPredicates.GET("/users").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), userHandler::getAllUsers)
                    .andRoute(
                            RequestPredicates.POST("/save/user").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), userHandler::saveUser)
    
            ;
    
        }
    
        //2. 创建服务器,完成适配
        public void createReactorServer() {
            // 路由与handler适配
            RouterFunction<ServerResponse> router = routingFunction();
    
            HttpHandler httpHandler = toHttpHandler(router);
            ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
    
            // 创建服务器
            HttpServer httpServer = HttpServer.create();
    
            httpServer.port(9999); // 端口没生效,不知咋回事, 启动时会随机生效一个端口
            
            httpServer.handle(handlerAdapter).bindNow();  // 立即启动服务器
        }
    
    }
    
    
  • 相关阅读:
    这个博客的由来
    PLSQL Developer 不能连接 oracle 12c 64位 的解决办法 for win 64
    Netflix 是如何推荐电影的
    Hadoop中国技术峰会引发Hadoop 2.0风暴
    JS如何导出Div的内容为图片
    jquery如何获取标签本生的文本内容,不获取本身子级元素及子级元素的内容
    superagent抓取gb2312网页出来是十六进制的解决办法
    一些常见html css遇到的问题
    es6 三行代码数组去重
    es6中的 Set、Map 和 WeakMap
  • 原文地址:https://www.cnblogs.com/z-qinfeng/p/14091732.html
Copyright © 2011-2022 走看看