WebFlux之DispatcherHandler
DispatcherHandler
处理服务器请求
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
return Flux.fromIterable(this.handlerMappings)
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
.switchIfEmpty(createNotFoundError())
.flatMap(handler -> invokeHandler(exchange, handler))
.flatMap(result -> handleResult(exchange, result));
}
private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
if (this.handlerAdapters != null) {
for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
if (handlerAdapter.supports(handler)) {
return handlerAdapter.handle(exchange, handler);
}
}
}
return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}
private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
return getResultHandler(result).handleResult(exchange, result)
.onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->
getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));
}
在DispatcherHandler中三个依次调用的核心接口是HandlerMapping、HandlerAdapter、HandlerResultHandler。DispatcherHandler实现了ApplicationContextAware接口,会自动调用setApplicationContext方法,然后在initStrategies中注入三个接口的所有实例。
protected void initStrategies(ApplicationContext context) {
Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerMapping.class, true, false);
ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
AnnotationAwareOrderComparator.sort(mappings);
this.handlerMappings = Collections.unmodifiableList(mappings);
Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerAdapter.class, true, false);
this.handlerAdapters = new ArrayList<>(adapterBeans.values());
AnnotationAwareOrderComparator.sort(this.handlerAdapters);
Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerResultHandler.class, true, false);
this.resultHandlers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(this.resultHandlers);
}
HandlerMapping
HandlerMapping负责路径到handler的映射
public interface HandlerMapping {
/**
* Return a handler for this request.
* @param exchange current server exchange
* @return a {@link Mono} that emits one value or none in case the request
* cannot be resolved to a handler
*/
Mono<Object> getHandler(ServerWebExchange exchange);
}
实现类有
RequestMappingHandlerMapping | 生成@Controller和@RequestMapping封装的接口 |
---|---|
RouterFunctionMapping | 支持RouterFunction |
SimpleUrlHandlerMapping | 支持ResourceWebHandler |
RequestMappingHandlerMapping
-
注册bean
org.springframework.web.reactive.config.WebFluxConfigurationSupport#requestMappingHandlerMapping
@Bean public RequestMappingHandlerMapping requestMappingHandlerMapping() { RequestMappingHandlerMapping mapping = createRequestMappingHandlerMapping(); mapping.setOrder(0); mapping.setContentTypeResolver(webFluxContentTypeResolver()); mapping.setCorsConfigurations(getCorsConfigurations()); PathMatchConfigurer configurer = getPathMatchConfigurer(); Boolean useTrailingSlashMatch = configurer.isUseTrailingSlashMatch(); if (useTrailingSlashMatch != null) { mapping.setUseTrailingSlashMatch(useTrailingSlashMatch); } Boolean useCaseSensitiveMatch = configurer.isUseCaseSensitiveMatch(); if (useCaseSensitiveMatch != null) { mapping.setUseCaseSensitiveMatch(useCaseSensitiveMatch); } Map<String, Predicate<Class<?>>> pathPrefixes = configurer.getPathPrefixes(); if (pathPrefixes != null) { mapping.setPathPrefixes(pathPrefixes); } return mapping; }
-
初始化mapping
@Override public void afterPropertiesSet() { initHandlerMethods(); // Total includes detected mappings + explicit registrations via registerMapping.. int total = this.getHandlerMethods().size(); if ((logger.isTraceEnabled() && total == 0) || (logger.isDebugEnabled() && total > 0) ) { logger.debug(total + " mappings in " + formatMappingName()); } }
initHandlerMethods遍历所有bean,如果bean标注了Controller或RequestMapping的注解,则调用detectHandlerMethods
protected void initHandlerMethods() { String[] beanNames = obtainApplicationContext().getBeanNamesForType(Object.class); for (String beanName : beanNames) { if (!beanName.startsWith(SCOPED_TARGET_NAME_PREFIX)) { Class<?> beanType = null; try { beanType = obtainApplicationContext().getType(beanName); } catch (Throwable ex) { // An unresolvable bean type, probably from a lazy bean - let's ignore it. if (logger.isTraceEnabled()) { logger.trace("Could not resolve type for bean '" + beanName + "'", ex); } } if (beanType != null && isHandler(beanType)) { detectHandlerMethods(beanName); } } } handlerMethodsInitialized(getHandlerMethods()); }
detectHandlerMethods遍历所有标注了RequestMapping的方法,注册到MappingRegistry. 当请求匹配路径是,lookupHandlerMethod会遍历MappingRegistry中的mapping.
protected void detectHandlerMethods(final Object handler) { Class<?> handlerType = (handler instanceof String ? obtainApplicationContext().getType((String) handler) : handler.getClass()); if (handlerType != null) { final Class<?> userType = ClassUtils.getUserClass(handlerType); Map<Method, T> methods = MethodIntrospector.selectMethods(userType, (MethodIntrospector.MetadataLookup<T>) method -> getMappingForMethod(method, userType)); if (logger.isTraceEnabled()) { logger.trace(formatMappings(userType, methods)); } methods.forEach((key, mapping) -> { Method invocableMethod = AopUtils.selectInvocableMethod(key, userType); registerHandlerMethod(handler, invocableMethod, mapping); }); } }
RouterFunctionMapping
同RequestMappingHandler类似,在WebFluxConfigurationSupport中注册RouterFunctionMapping的bean,实例化过程中导入所有的RouterFunction
public void afterPropertiesSet() throws Exception {
if (CollectionUtils.isEmpty(this.messageReaders)) {
ServerCodecConfigurer codecConfigurer = ServerCodecConfigurer.create();
this.messageReaders = codecConfigurer.getReaders();
}
if (this.routerFunction == null) {
initRouterFunctions();
}
}
initRouterFunctions方法从ApplicationContext中查找所有的RouterFunction
protected void initRouterFunctions() {
List<RouterFunction<?>> routerFunctions = routerFunctions();
this.routerFunction = routerFunctions.stream().reduce(RouterFunction::andOther).orElse(null);
logRouterFunctions(routerFunctions);
}
private List<RouterFunction<?>> routerFunctions() {
List<RouterFunction<?>> functions = obtainApplicationContext()
.getBeanProvider(RouterFunction.class)
.orderedStream()
.map(router -> (RouterFunction<?>)router)
.collect(Collectors.toList());
return (!CollectionUtils.isEmpty(functions) ? functions : Collections.emptyList());
}
一个简单的RouterFunction示例
@Bean
public RouterFunction<ServerResponse> routes(PostHandler postController) {
return route(GET("/posts"), postController::all)
.andRoute(POST("/posts"), postController::create)
.andRoute(GET("/posts/{id}"), postController::get)
.andRoute(PUT("/posts/{id}"), postController::update)
.andRoute(DELETE("/posts/{id}"), postController::delete);
}
SimpleUrlHandlerMapping
SimpleUrlHandlerMapping可以提供静态资源的访问
例如org.springframework.web.reactive.config.WebFluxConfigurationSupport#resourceHandlerMapping方法返回的是SimpleUrlHandlerMapping的实例对象
@Bean
public HandlerMapping resourceHandlerMapping() {
ResourceLoader resourceLoader = this.applicationContext;
if (resourceLoader == null) {
resourceLoader = new DefaultResourceLoader();
}
ResourceHandlerRegistry registry = new ResourceHandlerRegistry(resourceLoader);
registry.setResourceUrlProvider(resourceUrlProvider());
addResourceHandlers(registry);
AbstractHandlerMapping handlerMapping = registry.getHandlerMapping();
if (handlerMapping != null) {
PathMatchConfigurer configurer = getPathMatchConfigurer();
Boolean useTrailingSlashMatch = configurer.isUseTrailingSlashMatch();
Boolean useCaseSensitiveMatch = configurer.isUseCaseSensitiveMatch();
if (useTrailingSlashMatch != null) {
handlerMapping.setUseTrailingSlashMatch(useTrailingSlashMatch);
}
if (useCaseSensitiveMatch != null) {
handlerMapping.setUseCaseSensitiveMatch(useCaseSensitiveMatch);
}
}
else {
handlerMapping = new EmptyHandlerMapping();
}
return handlerMapping;
}
addResourceHandlers通过WebFluxConfigurer的addResourceHandlers配置静态资源(每个路径对应ResourceHandlerRegistration对象)
WebFluxConfig继承WebFluxConfigurer,并默认注入了几个静态资源访问路径,分别是
- /webjars/**
- /** (WebFluxProperties的默认值)
public void addResourceHandlers(ResourceHandlerRegistry registry) {
if (!this.resourceProperties.isAddMappings()) {
logger.debug("Default resource handling disabled");
return;
}
if (!registry.hasMappingForPattern("/webjars/**")) {
ResourceHandlerRegistration registration = registry.addResourceHandler("/webjars/**")
.addResourceLocations("classpath:/META-INF/resources/webjars/");
configureResourceCaching(registration);
customizeResourceHandlerRegistration(registration);
}
String staticPathPattern = this.webFluxProperties.getStaticPathPattern();
if (!registry.hasMappingForPattern(staticPathPattern)) {
ResourceHandlerRegistration registration = registry.addResourceHandler(staticPathPattern)
.addResourceLocations(this.resourceProperties.getStaticLocations());
configureResourceCaching(registration);
customizeResourceHandlerRegistration(registration);
}
}
ResourceHandlerRegistry.getHandleerMapping将ResourceHandlerRegistration转成ResourceWebHandler,然后封装成SimpleUrlHandlerMapping
protected AbstractUrlHandlerMapping getHandlerMapping() {
if (this.registrations.isEmpty()) {
return null;
}
Map<String, WebHandler> urlMap = new LinkedHashMap<>();
for (ResourceHandlerRegistration registration : this.registrations) {
for (String pathPattern : registration.getPathPatterns()) {
ResourceWebHandler handler = registration.getRequestHandler();
handler.getResourceTransformers().forEach(transformer -> {
if (transformer instanceof ResourceTransformerSupport) {
((ResourceTransformerSupport) transformer).setResourceUrlProvider(this.resourceUrlProvider);
}
});
try {
handler.afterPropertiesSet();
}
catch (Throwable ex) {
throw new BeanInitializationException("Failed to init ResourceHttpRequestHandler", ex);
}
urlMap.put(pathPattern, handler);
}
}
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(this.order);
handlerMapping.setUrlMap(urlMap);
return handlerMapping;
}
HandlerAdapter
public interface HandlerAdapter {
boolean supports(Object handler);
Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler);
}
HandlerAdapter负责适配HandlerMapping返回的handler,调用handle方法生成HandlerResult
HandlerMapping | getHandler | HandlerAdapter |
---|---|---|
RequestMappingHandlerMapping | HandlerMethod | RequestMappingHandlerAdapter |
SimpleUrlHandlerMapping | ResourceWebHandler | SimpleHandlerAdapter |
RouterFunctionMapping | HandlerFunction | HandlerFunctionAdapter |
WebFluxConfigurationSupport
WebFluxConfigurationSupport注册了上面三个HandlerAdapter的bean。
@Bean
public RequestMappingHandlerAdapter requestMappingHandlerAdapter() {
RequestMappingHandlerAdapter adapter = createRequestMappingHandlerAdapter();
adapter.setMessageReaders(serverCodecConfigurer().getReaders());
adapter.setWebBindingInitializer(getConfigurableWebBindingInitializer());
adapter.setReactiveAdapterRegistry(webFluxAdapterRegistry());
ArgumentResolverConfigurer configurer = new ArgumentResolverConfigurer();
configureArgumentResolvers(configurer);
adapter.setArgumentResolverConfigurer(configurer);
return adapter;
}
@Bean
public HandlerFunctionAdapter handlerFunctionAdapter() {
return new HandlerFunctionAdapter();
}
@Bean
public SimpleHandlerAdapter simpleHandlerAdapter() {
return new SimpleHandlerAdapter();
}
HandlerResultHandler
HandlerResultHandler | |
---|---|
ResponseBodyResultHandler | @ResponseBody |
ResponseEntityResultHandler | HttpEntity、RequestEntity、HttpHeaders |
ServerResponseResultHandler | ServerResponse |
ViewResolutionResultHandler | @ModelAttribute、CharSequence、Render、Model、Map、Void、View、"Simple" property(a primitive, a String or other CharSequence, a Number, a Date, a URI, a URL, a Locale, a Class, or a corresponding array) |
ReactiveAdapter
将泛型类映射为实体类,Reactor支持Mono、Flux、Publisher、CompletableFuture
例如如下示例,返回值为Flux时,Response自动转为成Message对象的json数组
@GetMapping
Flux<Message> allMessages(){
return Flux.just(
Message.builder().body("hello Spring 5").build(),
Message.builder().body("hello Spring Boot 2").build()
);
}
ReactiveAdapterRegistry
负责加载ClassPath中的ReactiveAdapter
public ReactiveAdapterRegistry() {
ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader();
// Reactor
boolean reactorRegistered = false;
if (ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader)) {
new ReactorRegistrar().registerAdapters(this);
reactorRegistered = true;
}
this.reactorPresent = reactorRegistered;
// RxJava1
if (ClassUtils.isPresent("rx.Observable", classLoader) &&
ClassUtils.isPresent("rx.RxReactiveStreams", classLoader)) {
new RxJava1Registrar().registerAdapters(this);
}
// RxJava2
if (ClassUtils.isPresent("io.reactivex.Flowable", classLoader)) {
new RxJava2Registrar().registerAdapters(this);
}
// Java 9+ Flow.Publisher
if (ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader)) {
new ReactorJdkFlowAdapterRegistrar().registerAdapter(this);
}
// If not present, do nothing for the time being...
// We can fall back on "reactive-streams-flow-bridge" (once released)
}
以Reactor为例,通过ReactorRegistrar注册支持的ReactiveType
void registerAdapters(ReactiveAdapterRegistry registry) {
// Register Flux and Mono before Publisher...
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
source -> (Mono<?>) source,
Mono::from
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
source -> (Flux<?>) source,
Flux::from);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
source -> (Publisher<?>) source,
source -> source);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> {
CompletableFuture<?> empty = new CompletableFuture<>();
empty.complete(null);
return empty;
}),
source -> Mono.fromFuture((CompletableFuture<?>) source),
source -> Mono.from(source).toFuture()
);
}