zoukankan      html  css  js  c++  java
  • springboot情操陶冶-web配置(四)

    承接前文springboot情操陶冶-web配置(三),本文将在DispatcherServlet应用的基础上谈下websocket的使用

    websocket

    websocket的简单了解可见维基百科WebSocket,在笔者看来其大多数应用在web浏览器上用于与服务端的持续性通信,并大多用于接收服务器的推送内容

    简单例子

    spring很友好的向我们展示了如何在springboot上整合websocket,并给出了一个hello例子。读者可参照官方例子走一遍便可对websocket有一定的了解。附上官方部分源码


    Controller响应层

    package hello;
    
    import org.springframework.messaging.handler.annotation.MessageMapping;
    import org.springframework.messaging.handler.annotation.SendTo;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.util.HtmlUtils;
    
    @Controller
    public class GreetingController {
    
    
        @MessageMapping("/hello")
        @SendTo("/topic/greetings")
        public Greeting greeting(HelloMessage message) throws Exception {
            Thread.sleep(1000); // simulated delay
            return new Greeting("Hello, " + HtmlUtils.htmlEscape(message.getName()) + "!");
        }
    
    }
    

    客户端HTML界面

    <!DOCTYPE html>
    <html>
    <head>
        <title>Hello WebSocket</title>
        <link href="/webjars/bootstrap/css/bootstrap.min.css" rel="stylesheet">
        <link href="/main.css" rel="stylesheet">
        <script src="/webjars/jquery/jquery.min.js"></script>
        <script src="/webjars/sockjs-client/sockjs.min.js"></script>
        <script src="/webjars/stomp-websocket/stomp.min.js"></script>
        <script src="/app.js"></script>
    </head>
    <body>
    <noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websocket relies on Javascript being
        enabled. Please enable
        Javascript and reload this page!</h2></noscript>
    <div id="main-content" class="container">
        <div class="row">
            <div class="col-md-6">
                <form class="form-inline">
                    <div class="form-group">
                        <label for="connect">WebSocket connection:</label>
                        <button id="connect" class="btn btn-default" type="submit">Connect</button>
                        <button id="disconnect" class="btn btn-default" type="submit" disabled="disabled">Disconnect
                        </button>
                    </div>
                </form>
            </div>
            <div class="col-md-6">
                <form class="form-inline">
                    <div class="form-group">
                        <label for="name">What is your name?</label>
                        <input type="text" id="name" class="form-control" placeholder="Your name here...">
                    </div>
                    <button id="send" class="btn btn-default" type="submit">Send</button>
                </form>
            </div>
        </div>
        <div class="row">
            <div class="col-md-12">
                <table id="conversation" class="table table-striped">
                    <thead>
                    <tr>
                        <th>Greetings</th>
                    </tr>
                    </thead>
                    <tbody id="greetings">
                    </tbody>
                </table>
            </div>
        </div>
    </div>
    </body>
    </html>
    

    在阅读完官方的demo例子之后,读者务必再去阅览下WebSocket在springboot的基本概念>>>Web on Servlet Stack。虽然文档很长,但还是需要理解下其的工作原理,大致上和rabbitmq类似,采取的是订阅推送的模式。
    websocket_page

    源码层分析

    笔者优先关注下@EnableWebSocketMessageBroker注解,其用于开启websocket服务

    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.TYPE)
    @Documented
    @Import(DelegatingWebSocketMessageBrokerConfiguration.class)
    public @interface EnableWebSocketMessageBroker {
    
    }
    

    由上可知,引入的DelegatingWebSocketMessageBrokerConfiguration类用于加载websocket的相关配置。
    本文不进行详细的源码分析,笔者则会根据上图的原理图寻找在springboot中的配置,这样应该会起到事半功倍的效果。

    RequestChannel和ResponseChannel

    请求与响应处理通道

    	@Bean
    	public AbstractSubscribableChannel clientInboundChannel() {
    		ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor());
    		// 拦截器,用户也可通过WebSocketMessageBrokerConfigurer接口增加拦截器
    		ChannelRegistration reg = getClientInboundChannelRegistration();
    		if (reg.hasInterceptors()) {
    			channel.setInterceptors(reg.getInterceptors());
    		}
    		return channel;
    	}
    	
    	protected final ChannelRegistration getClientInboundChannelRegistration() {
    		if (this.clientInboundChannelRegistration == null) {
    			ChannelRegistration registration = new ChannelRegistration();
    			// 加载请求通道,也可新增拦截器
    			configureClientInboundChannel(registration);
    			registration.interceptors(new ImmutableMessageChannelInterceptor());
    			this.clientInboundChannelRegistration = registration;
    		}
    		return this.clientInboundChannelRegistration;
    	}
    	
    	@Bean
    	public AbstractSubscribableChannel clientOutboundChannel() {
    		ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor());
    		// 拦截器,用户也可通过WebSocketMessageBrokerConfigurer接口增加拦截器
    		ChannelRegistration reg = getClientOutboundChannelRegistration();
    		if (reg.hasInterceptors()) {
    			channel.setInterceptors(reg.getInterceptors());
    		}
    		return channel;
    	}
    

    不管是请求通道还是响应通道代码一模一样,使用的均是ExecutorSubscribableChannel,其内部整合了拦截器和线程池。此类基本是所有channel的公用类,笔者稍微看下里面有什么小花头


    No.1 消息处理ExecutorSubscribableChannel

    	@Override
    	public boolean sendInternal(Message<?> message, long timeout) {
    		// 消息处理者遍历
    		for (MessageHandler handler : getSubscribers()) {
    			// 统一由SendTask类处理消息
    			SendTask sendTask = new SendTask(message, handler);
    			if (this.executor == null) {
    				sendTask.run();
    			}
    			else {
    				this.executor.execute(sendTask);
    			}
    		}
    		return true;
    	}
    

    这里的消息处理者有直接处理注解的,也有直接返回给BorkerRelay的,读者可自行去查阅


    No.2 消息任务SendTask

    		@Override
    		public void run() {
    			Message<?> message = this.inputMessage;
    			try {
    				// 应用拦截器
    				message = applyBeforeHandle(message);
    				if (message == null) {
    					return;
    				}
    				// 通过messageHandler来处理消息
    				this.messageHandler.handleMessage(message);
    				triggerAfterMessageHandled(message, null);
    			}
    			catch (Exception ex) {
    				triggerAfterMessageHandled(message, ex);
    				if (ex instanceof MessagingException) {
    					throw (MessagingException) ex;
    				}
    				String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
    				throw new MessageDeliveryException(message, description, ex);
    			}
    			catch (Throwable err) {
    				String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
    				MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err);
    				triggerAfterMessageHandled(message, ex2);
    				throw ex2;
    			}
    		}
    

    由此可得出通用的SendTask只是个消息中转器,最终的消息处理均是由MessageHandler来解决的。看来处理消息的路由核心得继续往下文分析了

    注解方式消息处理器MessageHandler

    即解析@MessageMapping/@SendTo等websocket注解的方法,其类似于MVC的@RequestMapping等注解。可见SimpAnnotationMethodMessageHandler

    	@Override
    	protected SimpAnnotationMethodMessageHandler createAnnotationMethodMessageHandler() {
    		// 请求和响应通道、broker消息模板
    		return new WebSocketAnnotationMethodMessageHandler(
    				clientInboundChannel(), clientOutboundChannel(), brokerMessagingTemplate());
    	}
    

    上述代码中的broker消息模板主要通过brokerChannel通道将注解方法返回的值经过订阅关系处理后再传入响应通道。笔者此处对WebSocketAnnotationMethodMessageHandler作下分步骤的解析


    No.1 入参解析器

    
    	protected List<HandlerMethodArgumentResolver> initArgumentResolvers() {
    		ApplicationContext context = getApplicationContext();
    		ConfigurableBeanFactory beanFactory = (context instanceof ConfigurableApplicationContext ?
    				((ConfigurableApplicationContext) context).getBeanFactory() : null);
    
    		List<HandlerMethodArgumentResolver> resolvers = new ArrayList<>();
    
    		// @Header和@Headers参数注解解析
    		resolvers.add(new HeaderMethodArgumentResolver(this.conversionService, beanFactory));
    		resolvers.add(new HeadersMethodArgumentResolver());
    		// @DestinationVariable注解参数解析
    		resolvers.add(new DestinationVariableMethodArgumentResolver(this.conversionService));
    
    		// 读取Principal类型的参数,读取的为头部的simpUser属性
    		resolvers.add(new PrincipalMethodArgumentResolver());
    		// 读取Message类型的参数
    		resolvers.add(new MessageMethodArgumentResolver(this.messageConverter));
    
    		resolvers.addAll(getCustomArgumentResolvers());//用户自定义,可扩展
    		// @Payload注解的参数解析
    		resolvers.add(new PayloadArgumentResolver(this.messageConverter, this.validator));
    
    		return resolvers;
    	}
    

    No.2 反参解析器

    	@Override
    	protected List<? extends HandlerMethodReturnValueHandler> initReturnValueHandlers() {
    		List<HandlerMethodReturnValueHandler> handlers = new ArrayList<>();
    
    		// Single-purpose return value types
    		handlers.add(new ListenableFutureReturnValueHandler());
    		handlers.add(new CompletableFutureReturnValueHandler());
    
    		// @SendTo和@SendToUser注解解析器
    		SendToMethodReturnValueHandler sendToHandler =
    				new SendToMethodReturnValueHandler(this.brokerTemplate, true);
    		if (this.headerInitializer != null) {
    			sendToHandler.setHeaderInitializer(this.headerInitializer);
    		}
    		handlers.add(sendToHandler);
    
    		// @SubscribeMapping注解解析器
    		SubscriptionMethodReturnValueHandler subscriptionHandler =
    				new SubscriptionMethodReturnValueHandler(this.clientMessagingTemplate);
    		subscriptionHandler.setHeaderInitializer(this.headerInitializer);
    		handlers.add(subscriptionHandler);
    
    		// custom return value types
    		handlers.addAll(getCustomReturnValueHandlers());
    
    		// 默认处理
    		sendToHandler = new SendToMethodReturnValueHandler(this.brokerTemplate, false);
    		sendToHandler.setHeaderInitializer(this.headerInitializer);
    		handlers.add(sendToHandler);
    
    		return handlers;
    	}
    

    No.3 HandlerMethod对象创建

    	@Override
    	public void afterPropertiesSet() {
    		// 入参和反参解析器配置
    		if (this.argumentResolvers.getResolvers().isEmpty()) {
    			this.argumentResolvers.addResolvers(initArgumentResolvers());
    		}
    
    		if (this.returnValueHandlers.getReturnValueHandlers().isEmpty()) {
    			this.returnValueHandlers.addHandlers(initReturnValueHandlers());
    		}
    
    		ApplicationContext context = getApplicationContext();
    		if (context == null) {
    			return;
    		}
    		for (String beanName : context.getBeanNamesForType(Object.class)) {
    			if (!beanName.startsWith(SCOPED_TARGET_NAME_PREFIX)) {
    				Class<?> beanType = null;
    				try {
    					beanType = context.getType(beanName);
    				}
    				catch (Throwable ex) {
    					// An unresolvable bean type, probably from a lazy bean - let's ignore it.
    					if (logger.isDebugEnabled()) {
    						logger.debug("Could not resolve target class for bean with name '" + beanName + "'", ex);
    					}
    				}
    				// 查找被@Controller注解下修饰的带有@MessageMapping和@SubscribeMapping注解的方法集合并存放至handlerMethods映射集合中
    				if (beanType != null && isHandler(beanType)) {
    					detectHandlerMethods(beanName);
    				}
    			}
    		}
    	}
    

    主要是搜索带有@MessageMapping@SubscribeMapping注解的方法注册至MessageHandler对象中的handlerMethods属性,方便后续对请求的路由


    No.4 请求通道注册SimpAnnotationMethodMessageHandler处理类

    	@Override
    	public final void start() {
    		synchronized (this.lifecycleMonitor) {
    			// 请求通道注入此处理器
    			this.clientInboundChannel.subscribe(this);
    			this.running = true;
    		}
    	}
    

    针对注解方式的消息路由处理我们基本知道了,那这个针对websocket的发过来的请求如何被路由至相应的HandlerMethod中呢?

    HandlerMapping

    笔者此处找寻下针对websocket的请求的路由

    	@Bean
    	public HandlerMapping stompWebSocketHandlerMapping() {
    		WebSocketHandler handler = decorateWebSocketHandler(subProtocolWebSocketHandler());
    		WebMvcStompEndpointRegistry registry = new WebMvcStompEndpointRegistry(
    				handler, getTransportRegistration(), messageBrokerTaskScheduler());
    		ApplicationContext applicationContext = getApplicationContext();
    		if (applicationContext != null) {
    			registry.setApplicationContext(applicationContext);
    		}
    		registerStompEndpoints(registry);
    		// 返回的是AbstractUrlHandlerMapping的继承类
    		return registry.getHandlerMapping();
    	}
    

    具体的读者可查询源码,内容还是很多的,笔者只知道上述代码返回的是AbstractUrlHandlerMapping的继承类,其存储的urlMap中的key值为websocket的端点路径,比如/ws-demo/**,而value值则是HttpRequestHandler接口的实现类,其主要处理基于HTTP的websocket请求。

    @FunctionalInterface
    public interface HttpRequestHandler {
    
    	/**
    	 * Process the given request, generating a response.
    	 * @param request current HTTP request
    	 * @param response current HTTP response
    	 * @throws ServletException in case of general errors
    	 * @throws IOException in case of I/O errors
    	 */
    	void handleRequest(HttpServletRequest request, HttpServletResponse response)
    			throws ServletException, IOException;
    
    }
    

    感兴趣的读者可进行深入的研究,其中有包括对ajax/sockJs/handshake等的支持。

    消息流

    主要是由AbstractHttpReceivingTransportHandler接收客户端的请求,然后通过StompSubProtocolHandler类将消息发送至ExecutorSubscribableChannel,由其调用sendInternal()方法遍历注册的MessageHandlers,由后者去真正的处理消息并回包。具体的代码逻辑还是很复杂的,笔者此处罗列下注解方式的处理以及响应给客户端的消息处理


    No.1 注解消息处理AbstractMethodMessageHandler

    	@Override
    	public void handleMessage(Message<?> message) throws MessagingException {
    		// 获取目的地址
    		String destination = getDestination(message);
    		if (destination == null) {
    			return;
    		}
    		// 确保请求的发过来的地址是指定的前缀,否则消息就会被直接丢弃
    		String lookupDestination = getLookupDestination(destination);
    		if (lookupDestination == null) {
    			return;
    		}
    
    		MessageHeaderAccessor headerAccessor = MessageHeaderAccessor.getMutableAccessor(message);
    		headerAccessor.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, lookupDestination);
    		headerAccessor.setLeaveMutable(true);
    		message = MessageBuilder.createMessage(message.getPayload(), headerAccessor.getMessageHeaders());
    
    		if (logger.isDebugEnabled()) {
    			logger.debug("Searching methods to handle " +
    					headerAccessor.getShortLogMessage(message.getPayload()) +
    					", lookupDestination='" + lookupDestination + "'");
    		}
    		// 找寻注解进行相应的方法响应
    		handleMessageInternal(message, lookupDestination);
    		headerAccessor.setImmutable();
    	}
    

    此处需要注意的是请求的路径前缀必须是指定的前缀,此前缀可通过WebSocketMessageBrokerConfigurer#configureMessageBroker()方法来设置,如下

        @Override
        public void configureMessageBroker(MessageBrokerRegistry registry) {
            // response destination prefix
            registry.enableSimpleBroker("/topic");
            // request destination prefix
            registry.setApplicationDestinationPrefixes("/app");
        }
    

    No.2 注解消息响应处理SimpleBrokerMessageHandler

    @Override
    	protected void handleMessageInternal(Message<?> message) {
    		MessageHeaders headers = message.getHeaders();
    		SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
    		String destination = SimpMessageHeaderAccessor.getDestination(headers);
    		String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
    
    		updateSessionReadTime(sessionId);
    		
    		// 此处确保回包的路径是以指定的BrokerPath作为前缀,否则则会被丢弃,配置同上
    		if (!checkDestinationPrefix(destination)) {
    			return;
    		}
    		
    		// 针对消息的发送,会根据多个订阅者进行广播发送
    		if (SimpMessageType.MESSAGE.equals(messageType)) {
    			logMessage(message);
    			sendMessageToSubscribers(destination, message);
    		}
    		// 连接请求响应
    		else if (SimpMessageType.CONNECT.equals(messageType)) {
    				....
    			}
    		}
    		// 关闭请求响应
    		else if (SimpMessageType.DISCONNECT.equals(messageType)) {
    			....
    		}
    		// 订阅请求响应
    		else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {
    			...
    		}
    		// 取消订阅请求响应
    		else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
    			....
    		}
    	}
    

    No.3 消息响应处理StompBrokerRelayMessageHandler,其作为真实的处理响应的出处

    @Override
    	protected void handleMessageInternal(Message<?> message) {
    		String sessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders());
    
    		.....
    		.....
    
    		// 回包路径,默认以用户设定的BrokerPath为前缀;不满足就将包丢弃
    		String destination = stompAccessor.getDestination();
    		if (command != null && command.requiresDestination() && !checkDestinationPrefix(destination)) {
    			return;
    		}
    		
    		// 连接请求
    		if (StompCommand.CONNECT.equals(command)) {
    			if (logger.isDebugEnabled()) {
    				logger.debug(stompAccessor.getShortLogMessage(EMPTY_PAYLOAD));
    			}
    			stompAccessor = (stompAccessor.isMutable() ? stompAccessor : StompHeaderAccessor.wrap(message));
    			stompAccessor.setLogin(this.clientLogin);
    			stompAccessor.setPasscode(this.clientPasscode);
    			if (getVirtualHost() != null) {
    				stompAccessor.setHost(getVirtualHost());
    			}
    			StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor);
    			this.connectionHandlers.put(sessionId, handler);
    			this.stats.incrementConnectCount();
    			Assert.state(this.tcpClient != null, "No TCP client available");
    			this.tcpClient.connect(handler);
    		}
    		// 关闭请求
    		else if (StompCommand.DISCONNECT.equals(command)) {
    			StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
    			if (handler == null) {
    				if (logger.isDebugEnabled()) {
    					logger.debug("Ignoring DISCONNECT in session " + sessionId + ". Connection already cleaned up.");
    				}
    				return;
    			}
    			stats.incrementDisconnectCount();
    			handler.forward(message, stompAccessor);
    		}
    		else {
    			StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
    			if (handler == null) {
    				if (logger.isDebugEnabled()) {
    					logger.debug("No TCP connection for session " + sessionId + " in " + message);
    				}
    				return;
    			}
    			// 直接调用连接返回,内含sessionId以及订阅者id等等
    			handler.forward(message, stompAccessor);
    		}
    	}
    

    小结

    先了解websocket的原理,然后再结合源码加深对原理的理解,这便是了解一个新技术的必要步骤。笔者此处针对官方的例子作以下小贴士
    1.配置websocket的请求响应前缀以及端点配置,务必实现WebSocketMessageBrokerConfigurer接口

    @Configuration
    @EnableWebSocketMessageBroker
    public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {
    
        @Override
        public void registerStompEndpoints(StompEndpointRegistry registry) {
            // register web socket contextPath and allow any origin
            registry.addEndpoint("/ws-demo").setAllowedOrigins("*").withSockJS();
        }
    
        @Override
        public void configureMessageBroker(MessageBrokerRegistry registry) {
            // response destination prefix
            registry.enableSimpleBroker("/topic");
            // request destination prefix
            registry.setApplicationDestinationPrefixes("/app");
        }
    }
    

    2.针对回包处理时,一般我们需要指定路径,如果采用注解方式,默认情况下@SendTo不指定的时候,会采用用户设置的回包路径前缀,比如@MessageMapping("/app/hello")-->/topic/hello
    当然用户也可以采用SimpMessageTemplate#convertAndSend()方法直接发送至指定的回包路径

    3.客户端采用sockJs相关API时,其支持通过HTTP/HTTPS协议连接指定的websocket端点,但是务必在订阅或者发送消息的时候,指定的目的地址必须以/为开头,否则发送不成功

    4.客户端采用sockJs时,针对发起的subscribe请求作如下总结

    // 当服务端采取@SubscribeMapping注解时,则会对/app/subscribe直接请求有返回值
    stompClient.subscribe('/app/subscribe', function (greeting) {
                showGreeting(JSON.parse(greeting.body));
            });
    
    // 当服务端没有采取@SubsribeMapping注解时,下述代码则实现对/topic/subscribe的消息接收
    stompClient.subscribe('/topic/subscribe', function (greeting) {
                showGreeting(JSON.parse(greeting.body).content);
            });
    

    5.本文的例子读者也可访问该地址获取,建议了解原理再去阅读源码会事半功倍的

  • 相关阅读:
    软件构造—— 实验二 lex词法分析
    软件构造-实验1 根据状态转换图手工构造词法扫描器
    PHP——实验四 PHP操作数据库
    判断是不是素数
    hexo和github pages的关系
    Python的map,reduce,filter函数
    CentOS源码更新Linux最新内核
    CentOS打Meltdown等漏洞的补丁包
    let申明与const申明
    正则表达式
  • 原文地址:https://www.cnblogs.com/question-sky/p/9636756.html
Copyright © 2011-2022 走看看