zoukankan      html  css  js  c++  java
  • WebFlux源码之DispatcherHandler

    WebFlux之DispatcherHandler

    DispatcherHandler

    处理服务器请求

    	public Mono<Void> handle(ServerWebExchange exchange) {
    		if (this.handlerMappings == null) {
    			return createNotFoundError();
    		}
    		return Flux.fromIterable(this.handlerMappings)
    				.concatMap(mapping -> mapping.getHandler(exchange))
    				.next()
    				.switchIfEmpty(createNotFoundError())
    				.flatMap(handler -> invokeHandler(exchange, handler))
    				.flatMap(result -> handleResult(exchange, result));
    	}
    
    	private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
    		if (this.handlerAdapters != null) {
    			for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
    				if (handlerAdapter.supports(handler)) {
    					return handlerAdapter.handle(exchange, handler);
    				}
    			}
    		}
    		return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
    	}
    
    	private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
    		return getResultHandler(result).handleResult(exchange, result)
    				.onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->
    						getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));
    	}
    

    在DispatcherHandler中三个依次调用的核心接口是HandlerMapping、HandlerAdapter、HandlerResultHandler。DispatcherHandler实现了ApplicationContextAware接口,会自动调用setApplicationContext方法,然后在initStrategies中注入三个接口的所有实例。

    	protected void initStrategies(ApplicationContext context) {
    		Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
    				context, HandlerMapping.class, true, false);
    
    		ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
    		AnnotationAwareOrderComparator.sort(mappings);
    		this.handlerMappings = Collections.unmodifiableList(mappings);
    
    		Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
    				context, HandlerAdapter.class, true, false);
    
    		this.handlerAdapters = new ArrayList<>(adapterBeans.values());
    		AnnotationAwareOrderComparator.sort(this.handlerAdapters);
    
    		Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
    				context, HandlerResultHandler.class, true, false);
    
    		this.resultHandlers = new ArrayList<>(beans.values());
    		AnnotationAwareOrderComparator.sort(this.resultHandlers);
    	}
    

    HandlerMapping

    HandlerMapping负责路径到handler的映射

    public interface HandlerMapping {
    
    	/**
    	 * Return a handler for this request.
    	 * @param exchange current server exchange
    	 * @return a {@link Mono} that emits one value or none in case the request
    	 * cannot be resolved to a handler
    	 */
    	Mono<Object> getHandler(ServerWebExchange exchange);
    
    }
    

    实现类有

    RequestMappingHandlerMapping 生成@Controller和@RequestMapping封装的接口
    RouterFunctionMapping 支持RouterFunction
    SimpleUrlHandlerMapping 支持ResourceWebHandler

    RequestMappingHandlerMapping

    • 注册bean

      org.springframework.web.reactive.config.WebFluxConfigurationSupport#requestMappingHandlerMapping

      	@Bean
      	public RequestMappingHandlerMapping requestMappingHandlerMapping() {
      		RequestMappingHandlerMapping mapping = createRequestMappingHandlerMapping();
      		mapping.setOrder(0);
      		mapping.setContentTypeResolver(webFluxContentTypeResolver());
      		mapping.setCorsConfigurations(getCorsConfigurations());
      
      		PathMatchConfigurer configurer = getPathMatchConfigurer();
      		Boolean useTrailingSlashMatch = configurer.isUseTrailingSlashMatch();
      		if (useTrailingSlashMatch != null) {
      			mapping.setUseTrailingSlashMatch(useTrailingSlashMatch);
      		}
      		Boolean useCaseSensitiveMatch = configurer.isUseCaseSensitiveMatch();
      		if (useCaseSensitiveMatch != null) {
      			mapping.setUseCaseSensitiveMatch(useCaseSensitiveMatch);
      		}
      		Map<String, Predicate<Class<?>>> pathPrefixes = configurer.getPathPrefixes();
      		if (pathPrefixes != null) {
      			mapping.setPathPrefixes(pathPrefixes);
      		}
      		return mapping;
      	}
      
    • 初始化mapping

      	@Override
      	public void afterPropertiesSet() {
      
      		initHandlerMethods();
      
      		// Total includes detected mappings + explicit registrations via registerMapping..
      		int total = this.getHandlerMethods().size();
      
      		if ((logger.isTraceEnabled() && total == 0) || (logger.isDebugEnabled() && total > 0) ) {
      			logger.debug(total + " mappings in " + formatMappingName());
      		}
      	}
      

      initHandlerMethods遍历所有bean,如果bean标注了Controller或RequestMapping的注解,则调用detectHandlerMethods

      	protected void initHandlerMethods() {
      		String[] beanNames = obtainApplicationContext().getBeanNamesForType(Object.class);
      
      		for (String beanName : beanNames) {
      			if (!beanName.startsWith(SCOPED_TARGET_NAME_PREFIX)) {
      				Class<?> beanType = null;
      				try {
      					beanType = obtainApplicationContext().getType(beanName);
      				}
      				catch (Throwable ex) {
      					// An unresolvable bean type, probably from a lazy bean - let's ignore it.
      					if (logger.isTraceEnabled()) {
      						logger.trace("Could not resolve type for bean '" + beanName + "'", ex);
      					}
      				}
      				if (beanType != null && isHandler(beanType)) {
      					detectHandlerMethods(beanName);
      				}
      			}
      		}
      		handlerMethodsInitialized(getHandlerMethods());
      	}
      

      detectHandlerMethods遍历所有标注了RequestMapping的方法,注册到MappingRegistry. 当请求匹配路径是,lookupHandlerMethod会遍历MappingRegistry中的mapping.

      	protected void detectHandlerMethods(final Object handler) {
      		Class<?> handlerType = (handler instanceof String ?
      				obtainApplicationContext().getType((String) handler) : handler.getClass());
      
      		if (handlerType != null) {
      			final Class<?> userType = ClassUtils.getUserClass(handlerType);
      			Map<Method, T> methods = MethodIntrospector.selectMethods(userType,
      					(MethodIntrospector.MetadataLookup<T>) method -> getMappingForMethod(method, userType));
      			if (logger.isTraceEnabled()) {
      				logger.trace(formatMappings(userType, methods));
      			}
      			methods.forEach((key, mapping) -> {
      				Method invocableMethod = AopUtils.selectInvocableMethod(key, userType);
      				registerHandlerMethod(handler, invocableMethod, mapping);
      			});
      		}
      	}
      

    RouterFunctionMapping

    同RequestMappingHandler类似,在WebFluxConfigurationSupport中注册RouterFunctionMapping的bean,实例化过程中导入所有的RouterFunction

    	public void afterPropertiesSet() throws Exception {
    		if (CollectionUtils.isEmpty(this.messageReaders)) {
    			ServerCodecConfigurer codecConfigurer = ServerCodecConfigurer.create();
    			this.messageReaders = codecConfigurer.getReaders();
    		}
    
    		if (this.routerFunction == null) {
    			initRouterFunctions();
    		}
    	}
    

    initRouterFunctions方法从ApplicationContext中查找所有的RouterFunction

    	protected void initRouterFunctions() {
    		List<RouterFunction<?>> routerFunctions = routerFunctions();
    		this.routerFunction = routerFunctions.stream().reduce(RouterFunction::andOther).orElse(null);
    		logRouterFunctions(routerFunctions);
    	}
    
    	private List<RouterFunction<?>> routerFunctions() {
    		List<RouterFunction<?>> functions = obtainApplicationContext()
    				.getBeanProvider(RouterFunction.class)
    				.orderedStream()
    				.map(router -> (RouterFunction<?>)router)
    				.collect(Collectors.toList());
    		return (!CollectionUtils.isEmpty(functions) ? functions : Collections.emptyList());
    	}
    

    一个简单的RouterFunction示例

        @Bean
        public RouterFunction<ServerResponse> routes(PostHandler postController) {
            return route(GET("/posts"), postController::all)
                .andRoute(POST("/posts"), postController::create)
                .andRoute(GET("/posts/{id}"), postController::get)
                .andRoute(PUT("/posts/{id}"), postController::update)
                .andRoute(DELETE("/posts/{id}"), postController::delete);
        }
    

    SimpleUrlHandlerMapping

    SimpleUrlHandlerMapping可以提供静态资源的访问

    例如org.springframework.web.reactive.config.WebFluxConfigurationSupport#resourceHandlerMapping方法返回的是SimpleUrlHandlerMapping的实例对象

    	@Bean
    	public HandlerMapping resourceHandlerMapping() {
    		ResourceLoader resourceLoader = this.applicationContext;
    		if (resourceLoader == null) {
    			resourceLoader = new DefaultResourceLoader();
    		}
    		ResourceHandlerRegistry registry = new ResourceHandlerRegistry(resourceLoader);
    		registry.setResourceUrlProvider(resourceUrlProvider());
    		addResourceHandlers(registry);
    
    		AbstractHandlerMapping handlerMapping = registry.getHandlerMapping();
    		if (handlerMapping != null) {
    			PathMatchConfigurer configurer = getPathMatchConfigurer();
    			Boolean useTrailingSlashMatch = configurer.isUseTrailingSlashMatch();
    			Boolean useCaseSensitiveMatch = configurer.isUseCaseSensitiveMatch();
    			if (useTrailingSlashMatch != null) {
    				handlerMapping.setUseTrailingSlashMatch(useTrailingSlashMatch);
    			}
    			if (useCaseSensitiveMatch != null) {
    				handlerMapping.setUseCaseSensitiveMatch(useCaseSensitiveMatch);
    			}
    		}
    		else {
    			handlerMapping = new EmptyHandlerMapping();
    		}
    		return handlerMapping;
    	}
    

    addResourceHandlers通过WebFluxConfigurer的addResourceHandlers配置静态资源(每个路径对应ResourceHandlerRegistration对象)

    WebFluxConfig继承WebFluxConfigurer,并默认注入了几个静态资源访问路径,分别是

    • /webjars/**
    • /** (WebFluxProperties的默认值)
    		public void addResourceHandlers(ResourceHandlerRegistry registry) {
    			if (!this.resourceProperties.isAddMappings()) {
    				logger.debug("Default resource handling disabled");
    				return;
    			}
    			if (!registry.hasMappingForPattern("/webjars/**")) {
    				ResourceHandlerRegistration registration = registry.addResourceHandler("/webjars/**")
    						.addResourceLocations("classpath:/META-INF/resources/webjars/");
    				configureResourceCaching(registration);
    				customizeResourceHandlerRegistration(registration);
    			}
    			String staticPathPattern = this.webFluxProperties.getStaticPathPattern();
    			if (!registry.hasMappingForPattern(staticPathPattern)) {
    				ResourceHandlerRegistration registration = registry.addResourceHandler(staticPathPattern)
    						.addResourceLocations(this.resourceProperties.getStaticLocations());
    				configureResourceCaching(registration);
    				customizeResourceHandlerRegistration(registration);
    			}
    		}
    

    ResourceHandlerRegistry.getHandleerMapping将ResourceHandlerRegistration转成ResourceWebHandler,然后封装成SimpleUrlHandlerMapping

    	protected AbstractUrlHandlerMapping getHandlerMapping() {
    		if (this.registrations.isEmpty()) {
    			return null;
    		}
    		Map<String, WebHandler> urlMap = new LinkedHashMap<>();
    		for (ResourceHandlerRegistration registration : this.registrations) {
    			for (String pathPattern : registration.getPathPatterns()) {
    				ResourceWebHandler handler = registration.getRequestHandler();
    				handler.getResourceTransformers().forEach(transformer -> {
    					if (transformer instanceof ResourceTransformerSupport) {
    						((ResourceTransformerSupport) transformer).setResourceUrlProvider(this.resourceUrlProvider);
    					}
    				});
    				try {
    					handler.afterPropertiesSet();
    				}
    				catch (Throwable ex) {
    					throw new BeanInitializationException("Failed to init ResourceHttpRequestHandler", ex);
    				}
    				urlMap.put(pathPattern, handler);
    			}
    		}
    		SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
    		handlerMapping.setOrder(this.order);
    		handlerMapping.setUrlMap(urlMap);
    		return handlerMapping;
    	}
    

    HandlerAdapter

    public interface HandlerAdapter {
    
    	boolean supports(Object handler);
    
    
    	Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler);
    
    }
    

    HandlerAdapter负责适配HandlerMapping返回的handler,调用handle方法生成HandlerResult

    HandlerMapping getHandler HandlerAdapter
    RequestMappingHandlerMapping HandlerMethod RequestMappingHandlerAdapter
    SimpleUrlHandlerMapping ResourceWebHandler SimpleHandlerAdapter
    RouterFunctionMapping HandlerFunction HandlerFunctionAdapter

    WebFluxConfigurationSupport

    WebFluxConfigurationSupport注册了上面三个HandlerAdapter的bean。

    	@Bean
    	public RequestMappingHandlerAdapter requestMappingHandlerAdapter() {
    		RequestMappingHandlerAdapter adapter = createRequestMappingHandlerAdapter();
    		adapter.setMessageReaders(serverCodecConfigurer().getReaders());
    		adapter.setWebBindingInitializer(getConfigurableWebBindingInitializer());
    		adapter.setReactiveAdapterRegistry(webFluxAdapterRegistry());
    
    		ArgumentResolverConfigurer configurer = new ArgumentResolverConfigurer();
    		configureArgumentResolvers(configurer);
    		adapter.setArgumentResolverConfigurer(configurer);
    
    		return adapter;
    	}
    
    	@Bean
    	public HandlerFunctionAdapter handlerFunctionAdapter() {
    		return new HandlerFunctionAdapter();
    	}
    
    	@Bean
    	public SimpleHandlerAdapter simpleHandlerAdapter() {
    		return new SimpleHandlerAdapter();
    	}
    

    HandlerResultHandler

    HandlerResultHandler
    ResponseBodyResultHandler @ResponseBody
    ResponseEntityResultHandler HttpEntity、RequestEntity、HttpHeaders
    ServerResponseResultHandler ServerResponse
    ViewResolutionResultHandler @ModelAttribute、CharSequence、Render、Model、Map、Void、View、"Simple" property(a primitive, a String or other CharSequence, a Number, a Date, a URI, a URL, a Locale, a Class, or a corresponding array)

    ReactiveAdapter

    将泛型类映射为实体类,Reactor支持Mono、Flux、Publisher、CompletableFuture

    例如如下示例,返回值为Flux时,Response自动转为成Message对象的json数组

        @GetMapping
        Flux<Message> allMessages(){
            return Flux.just(
                Message.builder().body("hello Spring 5").build(),
                Message.builder().body("hello Spring Boot 2").build()
            );
        }
    

    ReactiveAdapterRegistry

    负责加载ClassPath中的ReactiveAdapter

    	public ReactiveAdapterRegistry() {
    		ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader();
    
    		// Reactor
    		boolean reactorRegistered = false;
    		if (ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader)) {
    			new ReactorRegistrar().registerAdapters(this);
    			reactorRegistered = true;
    		}
    		this.reactorPresent = reactorRegistered;
    
    		// RxJava1
    		if (ClassUtils.isPresent("rx.Observable", classLoader) &&
    				ClassUtils.isPresent("rx.RxReactiveStreams", classLoader)) {
    			new RxJava1Registrar().registerAdapters(this);
    		}
    
    		// RxJava2
    		if (ClassUtils.isPresent("io.reactivex.Flowable", classLoader)) {
    			new RxJava2Registrar().registerAdapters(this);
    		}
    
    		// Java 9+ Flow.Publisher
    		if (ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader)) {
    			new ReactorJdkFlowAdapterRegistrar().registerAdapter(this);
    		}
    		// If not present, do nothing for the time being...
    		// We can fall back on "reactive-streams-flow-bridge" (once released)
    	}
    

    以Reactor为例,通过ReactorRegistrar注册支持的ReactiveType

    void registerAdapters(ReactiveAdapterRegistry registry) {
    			// Register Flux and Mono before Publisher...
    
    			registry.registerReactiveType(
    					ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
    					source -> (Mono<?>) source,
    					Mono::from
    			);
    
    			registry.registerReactiveType(
    					ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
    					source -> (Flux<?>) source,
    					Flux::from);
    
    			registry.registerReactiveType(
    					ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
    					source -> (Publisher<?>) source,
    					source -> source);
    
    			registry.registerReactiveType(
    					ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> {
    						CompletableFuture<?> empty = new CompletableFuture<>();
    						empty.complete(null);
    						return empty;
    					}),
    					source -> Mono.fromFuture((CompletableFuture<?>) source),
    					source -> Mono.from(source).toFuture()
    			);
    		}
    
  • 相关阅读:
    MYSQL/HIVESQL笔试题(一):HIVESQL(一)分组求TopN/行转列/列转行
    ALINK(七):ALINK使用技巧(二)
    Hive实战(6):完整案例(二)业务分析
    Hive实战(5):完整案例(一)准备
    Mysql基础(二十四):数据类型/常见约束
    Mysql基础(二十三):视图/存储过程
    数据可视化基础专题(三十四):Pandas基础(十四) 分组(二)Aggregation/apply
    Daily Coding Problem: Problem #677
    1027. Longest Arithmetic Subsequence (Solution 1)
    346. Moving Average from Data Stream
  • 原文地址:https://www.cnblogs.com/huiyao/p/14444280.html
Copyright © 2011-2022 走看看