zoukankan      html  css  js  c++  java
  • 15.SpringBoot学习(十五)——Spring Boot WebFlux路由

    1.简介

    1.1 概述

    In WebFlux.fn, an HTTP request is handled with a HandlerFunction: a function that takes ServerRequest and returns a delayed ServerResponse (i.e. Mono<ServerResponse>). Both the request and the response object have immutable contracts that offer JDK 8-friendly access to the HTTP request and response. HandlerFunction is the equivalent of the body of a @RequestMapping method in the annotation-based programming model.

    Incoming requests are routed to a handler function with a RouterFunction: a function that takes ServerRequest and returns a delayed HandlerFunction (i.e. Mono<HandlerFunction>). When the router function matches, a handler function is returned; otherwise an empty Mono. RouterFunction is the equivalent of a @RequestMapping annotation, but with the major difference that router functions provide not just data, but also behavior.

    RouterFunctions.route() provides a router builder that facilitates the creation of routers, as the following example shows:

    在WebFlux.fn中,HTTP请求由HandlerFunction处理:该函数接受ServerRequest并返回延迟的ServerResponse(即Mono )。请求和响应对象都具有不可变的协定,这些协定为JDK 8提供了对HTTP请求和响应的友好访问。 HandlerFunction等效于基于注释的编程模型中@RequestMapping方法的主体。传入的请求被路由到带有RouterFunction的处理程序函数:该函数接受ServerRequest并返回延迟的HandlerFunction(即Mono )。当路由器功能匹配时,返回处理程序功能。否则为空Mono。 RouterFunction等效于@RequestMapping批注,但主要区别在于路由器功能不仅提供数据,而且还提供行为。 RouterFunctions.route() 提供了一个路由器构建器,可简化路由器的创建过程。

    1.2 特点

    Router functions are used to route the requests to the corresponding HandlerFunction. Typically, you do not write router functions yourself, but rather use a method on the RouterFunctions utility class to create one. RouterFunctions.route() (no parameters) provides you with a fluent builder for creating a router function, whereas RouterFunctions.route(RequestPredicate, HandlerFunction) offers a direct way to create a router.

    Generally, it is recommended to use the route() builder, as it provides convenient short-cuts for typical mapping scenarios without requiring hard-to-discover static imports. For instance, the router function builder offers the method GET(String, HandlerFunction) to create a mapping for GET requests; and POST(String, HandlerFunction) for POSTs.

    Besides HTTP method-based mapping, the route builder offers a way to introduce additional predicates when mapping to requests. For each HTTP method there is an overloaded variant that takes a RequestPredicate as a parameter, though which additional constraints can be expressed.

    路由器功能用于将请求路由到相应的HandlerFunction。通常,您不是自己编写路由器功能,而是使用RouterFunctions实用程序类上的方法创建一个。 RouterFunctions.route()(无参数)为您提供了一个流畅的生成器来创建路由器功能,而RouterFunctions.route(RequestPredicate,HandlerFunction)提供了直接创建路由器的方法。通常,建议使用route()构建器,因为它为典型的映射方案提供了便捷的捷径,而无需发现静态导入。例如,路由器功能构建器提供了GET(String,HandlerFunction)方法来为GET请求创建映射。和POST(String,HandlerFunction)进行POST。除了基于HTTP方法的映射外,路由构建器还提供了一种在映射到请求时引入其他谓词的方法。对于每个HTTP方法,都有一个以RequestPredicate作为参数的重载变体,尽管可以表达其他约束。

    2.演示环境

    1. JDK 1.8.0_201
    2. Spring Boot 2.2.0.RELEASE
    3. 构建工具(apache maven 3.6.3)
    4. 开发工具(IntelliJ IDEA )

    3.演示代码

    3.1 代码说明

    使用 RouterFunction 的方式使用 webflux

    3.2 代码结构

    image-20200728211754999

    3.3 maven 依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
    </dependencies>
    

    3.4 配置文件

    无配置

    3.5 java代码

    UserModel.java

    public class UserModel {
    
        private Long id;
        private String name;
        private Integer age;
        private String birthday;
        private String address;
        private String phone;
    
        public UserModel() {}
    
        public UserModel(Long id, String name, Integer age, String birthday, String address, String phone) {
            this.id = id;
            this.name = name;
            this.age = age;
            this.birthday = birthday;
            this.address = address;
            this.phone = phone;
        }
    	
        // get&set&toString
    }
    

    UserRepository.java

    @Repository
    public class UserRepository {
    
        // 预置两条数据,所以起始值从2开始
        private static final AtomicLong ID_GENERATOR = new AtomicLong(2);
    
        // 模拟数据库操作
        private static final Map<Long, UserModel> USER_MAP = new HashMap<>();
    
        @PostConstruct
        public void init() {
            UserModel user1 = new UserModel(1L, "zhangsan", 20, "2000-01-02", "beijing", "13666666666");
            UserModel user2 = new UserModel(2L, "lisi", 30, "1990-03-23", "shanghai", "13888888888");
            USER_MAP.put(user1.getId(), user1);
            USER_MAP.put(user2.getId(), user2);
        }
    
        public List<UserModel> findAll() {
            return new ArrayList<>(USER_MAP.values());
        }
    
        public UserModel findById(Long id) {
            return USER_MAP.get(id);
        }
    
        public UserModel add(UserModel userModel) {
            long id = ID_GENERATOR.incrementAndGet();
            userModel.setId(id);
            USER_MAP.put(id, userModel);
            return userModel;
        }
    
        public UserModel update(UserModel userModel) {
            USER_MAP.put(userModel.getId(), userModel);
            return USER_MAP.get(userModel.getId());
        }
    
        public UserModel deleteById(Long id) {
            UserModel userModel = USER_MAP.get(id);
            USER_MAP.remove(id);
            return userModel;
        }
    }
    

    UserHandler.java

    @Component
    public class UserHandler {
    
        @Autowired
        private UserRepository userRepository;
    
        public Mono<ServerResponse> list(ServerRequest request) {
            // ServerResponse.ok().body(Flux.fromIterable(userRepository.findAll()), UserModel.class);
            return ServerResponse.ok().body(Flux.fromStream(userRepository.findAll().stream()), UserModel.class);
        }
    
        public Mono<ServerResponse> findById(ServerRequest request) {
            return Mono.justOrEmpty(userRepository.findById(Long.valueOf(request.pathVariable("id"))))
                .flatMap(user -> ServerResponse.ok().body(Mono.just(user), UserModel.class))
                .switchIfEmpty(ServerResponse.notFound().build()); // 输出 404 Not Found
        }
    
        public Mono<ServerResponse> add(ServerRequest request) {
            return ServerResponse.ok().body(
                request.bodyToMono(UserModel.class).flatMap(userModel -> Mono.just(userRepository.add(userModel))),
                UserModel.class);
        }
    
        public Mono<ServerResponse> update(ServerRequest request) {
            /*request.bodyToMono(UserModel.class)
                .flatMap(user -> Mono.justOrEmpty(userRepository.findById(user.getId()))
                    .then(ServerResponse.ok().body(Mono.just(userRepository.update(user)), UserModel.class))
                    // .switchIfEmpty(Mono.error(new NotFoundException(String.valueOf(user.getId())))))
                    .switchIfEmpty(ServerResponse.notFound().build()));*/
    
            return request.bodyToMono(UserModel.class)
                .flatMap(body -> Mono.justOrEmpty(userRepository.findById(body.getId())).flatMap(foundUser -> {
                    foundUser.setAge(body.getAge());
                    foundUser.setName(body.getName());
                    foundUser.setBirthday(body.getBirthday());
                    foundUser.setAddress(body.getAddress());
                    foundUser.setPhone(body.getPhone());
                    return Mono.just(foundUser);
                }).flatMap(u -> ServerResponse.ok().body(fromObject(u))).switchIfEmpty(ServerResponse.notFound().build()));
        }
    
        public Mono<ServerResponse> deleteById(ServerRequest request) {
            Long id = Long.valueOf(request.pathVariable("id"));
            return Mono.justOrEmpty(userRepository.findById(id))
                .switchIfEmpty(Mono.error(new RuntimeException(id + "not found!"))) // 控制台异常:RuntimeException: 30
                .then(ServerResponse.ok().body(Mono.justOrEmpty(userRepository.deleteById(id)), UserModel.class));
        }
    
        public Mono<ServerResponse> deleteId(ServerRequest request) {
            Long id = Long.valueOf(request.pathVariable("id"));
            return Mono.justOrEmpty(userRepository.findById(id))
                .flatMap(user -> ServerResponse.ok().body(Mono.just(userRepository.deleteById(id)), UserModel.class))
                .switchIfEmpty(ServerResponse.notFound().build());
        }
    }
    

    UserRouter.java

    import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
    import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
    import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
    import static org.springframework.web.reactive.function.server.RequestPredicates.PUT;
    import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
    import static org.springframework.web.reactive.function.server.RequestPredicates.contentType;
    import static org.springframework.web.reactive.function.server.RouterFunctions.route;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.http.MediaType;
    import org.springframework.web.reactive.function.server.RouterFunction;
    import org.springframework.web.reactive.function.server.ServerResponse;
    
    @Configuration
    public class UserRouter {
    
        @Autowired
        private UserHandler userHandler;
    
        @Bean
        public RouterFunction<ServerResponse> routerFunction() {
            return route(GET("/user/list").and(accept(MediaType.APPLICATION_JSON)), userHandler::list)
                .andRoute(GET("/user/find/{id}").and(accept(MediaType.APPLICATION_JSON)), userHandler::findById)
                .andRoute(POST("/user/add").and(contentType(MediaType.APPLICATION_JSON)), userHandler::add)
                .andRoute(PUT("/user/update").and(contentType(MediaType.APPLICATION_JSON)), userHandler::update)
                .andRoute(DELETE("/user/delete/{id}"), userHandler::deleteId);
        }
    }
    

    3.6 git 地址

    spring-boot/spring-boot-07-webflux/spring-boot-webflux-router

    4.效果展示

    启动 SpringBootWebfluxRouterApplication.main 方法,在 spring-boot-webflux-router.http 访问下列地址,观察输出信息是否符合预期。

    查询用户列表

    ### GET /user/list
    GET http://localhost:8080/user/list
    Accept: application/json
    

    image-20200728212327686

    根据id查询用户

    ### GET /user/find/{id}
    GET http://localhost:8080/user/find/1
    Accept: application/json
    

    image-20200728212428240

    新增用户

    ### POST /user/add
    POST http://localhost:8080/user/add
    Accept: application/json
    Content-Type: application/json
    
    {
      "name": "wangwu",
      "age": 25,
      "birthday": "1995-06-23",
      "address": "guangzhou",
      "phone": "13777777777"
    }
    

    image-20200728212520918

    更新用户(成功)

    ### PUT /user/update success
    PUT http://localhost:8080/user/update
    Accept: application/json
    Content-Type: application/json
    
    {
      "id": 2,
      "name": "lisi2",
      "age": 32,
      "birthday": "1988-03-23",
      "address": "shanghai2",
      "phone": "13888888882"
    }
    

    image-20200728212644646

    更新用户(失败)

    ### PUT /user/update fail  // 404 Not Found (id)
    PUT http://localhost:8080/user/update
    Accept: application/json
    Content-Type: application/json
    
    {
      "id": 222,
      "name": "lisi2",
      "age": 32,
      "birthday": "1988-03-23",
      "address": "shanghai2",
      "phone": "13888888882"
    }
    

    image-20200728212727205

    删除用户

    ### DELETE /user/delete/{id}
    DELETE http://localhost:8080/user/delete/3
    Accept: application/json
    

    image-20200728212821991

    5.源码分析

    5.1 Webflux Server 如何启动?

    使用 webflux 时,默认的 applicationContext 为 AnnotationConfigReactiveWebServerApplicationContext

    protected ConfigurableApplicationContext createApplicationContext() {
        Class<?> contextClass = this.applicationContextClass;
        if (contextClass == null) {
            try {
                switch (this.webApplicationType) {
                    case SERVLET:
                        contextClass = Class.forName(DEFAULT_SERVLET_WEB_CONTEXT_CLASS);
                        break;
                    case REACTIVE:
                        contextClass = Class.forName(DEFAULT_REACTIVE_WEB_CONTEXT_CLASS);
                        break;
                    default:
                        contextClass = Class.forName(DEFAULT_CONTEXT_CLASS);
                }
            }
            catch (ClassNotFoundException ex) {
                throw new IllegalStateException(
                    "Unable create a default ApplicationContext, please specify an ApplicationContextClass", ex);
            }
        }
        return (ConfigurableApplicationContext) BeanUtils.instantiateClass(contextClass);
    }
    

    所以在 AnnotationConfigReactiveWebServerApplicationContext 中进行初始化和启动

    5.2 Webflux 默认 Server 为何是 Netty?

    先看下 spring-boot-starter-webflux 的依赖结构

    image-20200728222250093

    在 spring-boot-autoconfigure/spring.factories 中有一个自动装配的类 ReactiveWebServerFactoryAutoConfiguration

    image-20200728222459008

    它的内容如下

    @AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE)
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(ReactiveHttpInputMessage.class)
    @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
    @EnableConfigurationProperties(ServerProperties.class)
    @Import({ ReactiveWebServerFactoryAutoConfiguration.BeanPostProcessorsRegistrar.class,
             ReactiveWebServerFactoryConfiguration.EmbeddedTomcat.class,
             ReactiveWebServerFactoryConfiguration.EmbeddedJetty.class,
             ReactiveWebServerFactoryConfiguration.EmbeddedUndertow.class,
             ReactiveWebServerFactoryConfiguration.EmbeddedNetty.class })
    public class ReactiveWebServerFactoryAutoConfiguration {
    
        @Bean
        public ReactiveWebServerFactoryCustomizer reactiveWebServerFactoryCustomizer(ServerProperties serverProperties) {
            return new ReactiveWebServerFactoryCustomizer(serverProperties);
        }
    
        @Bean
        @ConditionalOnClass(name = "org.apache.catalina.startup.Tomcat")
        public TomcatReactiveWebServerFactoryCustomizer tomcatReactiveWebServerFactoryCustomizer(
            ServerProperties serverProperties) {
            return new TomcatReactiveWebServerFactoryCustomizer(serverProperties);
        }
    
        @Bean
        @ConditionalOnMissingBean
        @ConditionalOnProperty(value = "server.forward-headers-strategy", havingValue = "framework")
        public ForwardedHeaderTransformer forwardedHeaderTransformer() {
            return new ForwardedHeaderTransformer();
        }
    
        /**
        * Registers a {@link WebServerFactoryCustomizerBeanPostProcessor}. Registered via
        * {@link ImportBeanDefinitionRegistrar} for early registration.
        */
        public static class BeanPostProcessorsRegistrar implements ImportBeanDefinitionRegistrar, BeanFactoryAware {
    
            private ConfigurableListableBeanFactory beanFactory;
    
            @Override
            public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
                if (beanFactory instanceof ConfigurableListableBeanFactory) {
                    this.beanFactory = (ConfigurableListableBeanFactory) beanFactory;
                }
            }
    
            @Override
            public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
                if (this.beanFactory == null) {
                    return;
                }
                registerSyntheticBeanIfMissing(registry, "webServerFactoryCustomizerBeanPostProcessor",
                                               WebServerFactoryCustomizerBeanPostProcessor.class);
            }
    
            private void registerSyntheticBeanIfMissing(BeanDefinitionRegistry registry, String name, Class<?> beanClass) {
                if (ObjectUtils.isEmpty(this.beanFactory.getBeanNamesForType(beanClass, true, false))) {
                    RootBeanDefinition beanDefinition = new RootBeanDefinition(beanClass);
                    beanDefinition.setSynthetic(true);
                    registry.registerBeanDefinition(name, beanDefinition);
                }
            }
        }
    }
    

    它里面通过 @Import 分别引入了 EmbeddedTomcat、EmbeddedJetty、EmbeddedUndertow、EmbeddedNetty,它们都是 ReactiveWebServerFactoryConfiguration 的内部类

    abstract class ReactiveWebServerFactoryConfiguration {
    
        @Configuration(proxyBeanMethods = false)
        @ConditionalOnMissingBean(ReactiveWebServerFactory.class)
        @ConditionalOnClass({ HttpServer.class })
        static class EmbeddedNetty {
    
            @Bean
            @ConditionalOnMissingBean
            ReactorResourceFactory reactorServerResourceFactory() {
                return new ReactorResourceFactory();
            }
    
            @Bean
            NettyReactiveWebServerFactory nettyReactiveWebServerFactory(ReactorResourceFactory resourceFactory,
                                                                        ObjectProvider<NettyRouteProvider> routes, ObjectProvider<NettyServerCustomizer> serverCustomizers) {
                NettyReactiveWebServerFactory serverFactory = new NettyReactiveWebServerFactory();
                serverFactory.setResourceFactory(resourceFactory);
                routes.orderedStream().forEach(serverFactory::addRouteProviders);
                serverFactory.getServerCustomizers().addAll(serverCustomizers.orderedStream().collect(Collectors.toList()));
                return serverFactory;
            }
    
        }
    
        @Configuration(proxyBeanMethods = false)
        @ConditionalOnMissingBean(ReactiveWebServerFactory.class)
        @ConditionalOnClass({ org.apache.catalina.startup.Tomcat.class })
        static class EmbeddedTomcat {
    
            @Bean
            TomcatReactiveWebServerFactory tomcatReactiveWebServerFactory(
                ObjectProvider<TomcatConnectorCustomizer> connectorCustomizers,
                ObjectProvider<TomcatContextCustomizer> contextCustomizers,
                ObjectProvider<TomcatProtocolHandlerCustomizer<?>> protocolHandlerCustomizers) {
                TomcatReactiveWebServerFactory factory = new TomcatReactiveWebServerFactory();
                factory.getTomcatConnectorCustomizers()
                    .addAll(connectorCustomizers.orderedStream().collect(Collectors.toList()));
                factory.getTomcatContextCustomizers()
                    .addAll(contextCustomizers.orderedStream().collect(Collectors.toList()));
                factory.getTomcatProtocolHandlerCustomizers()
                    .addAll(protocolHandlerCustomizers.orderedStream().collect(Collectors.toList()));
                return factory;
            }
    
        }
    
        @Configuration(proxyBeanMethods = false)
        @ConditionalOnMissingBean(ReactiveWebServerFactory.class)
        @ConditionalOnClass({ org.eclipse.jetty.server.Server.class })
        static class EmbeddedJetty {
    
            @Bean
            @ConditionalOnMissingBean
            JettyResourceFactory jettyServerResourceFactory() {
                return new JettyResourceFactory();
            }
    
            @Bean
            JettyReactiveWebServerFactory jettyReactiveWebServerFactory(JettyResourceFactory resourceFactory,
                                                                        ObjectProvider<JettyServerCustomizer> serverCustomizers) {
                JettyReactiveWebServerFactory serverFactory = new JettyReactiveWebServerFactory();
                serverFactory.getServerCustomizers().addAll(serverCustomizers.orderedStream().collect(Collectors.toList()));
                serverFactory.setResourceFactory(resourceFactory);
                return serverFactory;
            }
    
        }
    
        @Configuration(proxyBeanMethods = false)
        @ConditionalOnMissingBean(ReactiveWebServerFactory.class)
        @ConditionalOnClass({ Undertow.class })
        static class EmbeddedUndertow {
    
            @Bean
            UndertowReactiveWebServerFactory undertowReactiveWebServerFactory(
                ObjectProvider<UndertowBuilderCustomizer> builderCustomizers) {
                UndertowReactiveWebServerFactory factory = new UndertowReactiveWebServerFactory();
                factory.getBuilderCustomizers().addAll(builderCustomizers.orderedStream().collect(Collectors.toList()));
                return factory;
            }
        }
    }
    

    它们生效的条件分别是:

    • EmbeddedTomcat: @ConditionalOnClass -- Tomcat.class(apache)
    • EmbeddedJetty: @ConditionalOnClass -- Server.class(jetty)
    • EmbeddedUndertow: @ConditionalOnClass -- Undertow.class
    • EmbeddedNetty: @ConditionalOnClass -- HttpServer.class

    结合上面的依赖分析,spring-boot-starter-webflux 中依赖了 reactor-netty.jar,而 HttpServer.class 恰好在 reactor-netty.jar 包中,所以 netty 生效。最终使用 EmbeddedNetty 启动服务。

    6.参考

    1. Spring Framework/WebFlux
  • 相关阅读:
    删除了原有的offset之后再次启动会报错park Streaming from Kafka has error numRecords must not ...
    sparkStreaming消费kafka-1.0.1方式:direct方式(存储offset到Hbase)
    sparkStreaming消费kafka-1.0.1方式:direct方式(存储offset到zookeeper)
    进程的管理(五)-进程的实现
    进程管理(四)-进程的状态以及转换
    进程管理(三)-进程的层次
    numpy库的认识以及数组的创建
    进程管理(二)-进程的终止
    进程管理(一)-进程的概念以及进程的创建
    python爬取b站排行榜
  • 原文地址:https://www.cnblogs.com/col-smile/p/13423606.html
Copyright © 2011-2022 走看看