public interface WebSocketMessageBrokerConfigurer { // 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs void registerStompEndpoints(StompEndpointRegistry var1); // 配置发送与接收的消息参数,可以指定消息字节大小,缓存大小,发送超时时间 void configureWebSocketTransport(WebSocketTransportRegistration var1); // 设置输入消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间 void configureClientInboundChannel(ChannelRegistration var1); // 设置输出消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间 void configureClientOutboundChannel(ChannelRegistration var1); // 添加自定义的消息转换器,spring 提供多种默认的消息转换器,返回false,不会添加消息转换器,返回true,会添加默认的消息转换器,当然也可以把自己写的消息转换器添加到转换链中 boolean configureMessageConverters(List<MessageConverter> var1); // 配置消息代理,哪种路径的消息会进行代理处理 void configureMessageBroker(MessageBrokerRegistry var1); // 自定义控制器方法的参数类型,有兴趣可以百度google HandlerMethodArgumentResolver这个的用法 void addArgumentResolvers(List<HandlerMethodArgumentResolver> var1); // 自定义控制器方法返回值类型,有兴趣可以百度google HandlerMethodReturnValueHandler这个的用法 void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> var1); }
package com.hainei.config; import com.hainei.interceptor.HttpHandShakeInterceptor; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; /** * Created with IntelliJ IDEA. * User: lzx * Date: 2020/02/28 * Time: 14:52 * Description: No Description */ @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { /** * 注册端点,发布或者订阅消息的时候需要连接此端点 * setAllowedOrigins 非必须,*表示允许其他域进行连接 * withSockJS 表示开始sockejs支持 */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/samp-websocket") .addInterceptors(new HttpHandShakeInterceptor()) .setAllowedOrigins("*") .withSockJS(); } /** * 配置消息代理(中介) * enableSimpleBroker 服务端推送给客户端的路径前缀 * setApplicationDestinationPrefixes 客户端发送数据给服务器端的一个前缀 */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { //表示在topic和user这两个域上可以向客户端发消息 registry.enableSimpleBroker("/topic", "/user"); registry.setApplicationDestinationPrefixes("/app"); //给指定用户发送一对一的主题前缀是"/user" registry.setUserDestinationPrefix("/user"); } // @Override // public void configureClientInboundChannel(ChannelRegistration registration) { // registration.interceptors( new SocketChannelInterceptor()); // } // // @Override // public void configureClientOutboundChannel(ChannelRegistration registration) { // registration.interceptors( new SocketChannelInterceptor()); // } }
package com.hainei.interceptor; import com.hainei.common.constants.BaseConstant; import com.hainei.common.token.JwtTokenUtil; import org.apache.commons.lang3.StringUtils; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpSession; import java.net.URI; import java.util.Map; /** * Created with IntelliJ IDEA. * User: lzx * Date: 2020/03/02 * Time: 11:28 * Description: No Description */ public class HttpHandShakeInterceptor implements HandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> attributes) throws Exception { if(request instanceof ServletServerHttpRequest) { ServletServerHttpRequest servletRequest = (ServletServerHttpRequest)request; // String accessToken = servletRequest.getServletRequest().getHeader(BaseConstant.ACCESS_TOKEN); URI uri = servletRequest.getURI(); String userId = StringUtils.substringAfter(uri.toString(), "userId="); // String userId = JwtTokenUtil.getUserId(accessToken); System.out.println("【握手拦截器】beforeHandshake userId="+userId); attributes.put("userId", userId); } // if(request instanceof HttpServletRequest) { // HttpServletRequest httpServletRequest = (HttpServletRequest) request; // String accessToken=httpServletRequest.getHeader(BaseConstant.ACCESS_TOKEN); // if (StringUtils.isNotEmpty(accessToken)) { // System.out.println("token为空,设置为默认token"); // accessToken = "eyJhbGciOiJIUzI1NiJ9.eyJqd3QtcGVybWlzc2lvbnMta2V5XyI6bnVsbC" + // "wic3ViIjoiZGUzOTFmNGU4ZTg3NDQwMGFlYjRlMzhiOGE3NDIyZjQiLCJqd3QtbG9naW4taWQta2V5" + // "Ijoi5p6X5a2Q57-UIiwiand0LWlzLWFkbWluIjowLCJpc3MiOiJ5aW5neHVlLmNvbSIsImV4cCI6MTU4" + // "MzEzNjk0MywiaWF0IjoxNTgzMTI5NzQzfQ.hshSbSkn3o7y9CGLb3LXvGBp4z8T7qbHXx2ItAhDMdc"; // } // String userId = JwtTokenUtil.getUserId(accessToken); // System.out.println("【握手拦截器】beforeHandshake userId="+userId); // attributes.put("userId", userId); // } return true; } @Override public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) { } }
spring-boot-starter-parent 1.5.1.RELEASE
编码,在不同的版本中部分方法有区别。2. 因为博客字数限制,拆分成了两篇文章第一篇地址:https://www.jianshu.com/p/4762494d42f1
第二篇地址:https://www.jianshu.com/p/9103c9c7e128
作者:TryCatch菌
链接:https://www.jianshu.com/p/9103c9c7e128
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
前面两种建立websocket通讯,不管是用javax的包还是spring的包都是用的比较底层的协议,下面我们来看看用上层的STOMP来建立websocket通讯
SockJs+Spring-WebSocket时,由于SockJs与Spring WebSocket之间采用JSON通讯,需要引入jackson 2的相关jar包
<!-- jackson-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.6.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.6.3</version>
</dependency>
前面已经提到了STOMP是一个上层协议,STOMP 在 WebSocket 之上提供了一个基于 帧的线路格式层,用来定义消息语义。
STOMP 帧:该帧由命令,一个或多个 头信息 以及 负载所组成。如下就是发送 数据的一个 STOMP帧:
SEND
destination:/app/marco
content-length:20
{"message":"hello word!"}
- SEND:STOMP命令,表明会发送一些内容;
- destination:头信息,用来表示消息发送到哪里;
- content-length:头信息,用来表示 负载内容的 大小;
- 空行:
- 帧内容(负载)内容
要使用STOMP 通讯,服务端,和客户端都必须支持,服务端的准备步骤
服务端准备工作
-
我们已经配置了STOMP通讯的配置类 WebSocketStompConfig
-
配置了WebSocketChannelInterceptor 和 WebSocketHandshakeInterceptor 两个自定义拦截器
-
一个WebSocketStompController 用于接收客户端消息和响应客户端
-
一个简单的MVC controller 用于跳转websocket 页面
在Spring中启用STOMP通讯不用我们自己去写原生态的帧,spring的消息功能是基于代理模式构建,其实说得复杂,都是封装好了的,如果需要开启SOMP,只需要在websocket配置类上使用@EnableWebSocketMessageBroker
(注解的作用为能够在 WebSocket 上启用 STOMP),并实现WebSocketMessageBrokerConfigurer
接口,有些教程在这一步会继承AbstractWebSocketMessageBrokerConfigurer
类,我们看一下AbstractWebSocketMessageBrokerConfigurer
类的源码,可以看到都是空方法,也是实现的接口,这里推荐自己实现接口,因为官方API上AbstractWebSocketMessageBrokerConfigurer
已经标记为废弃
AbstractWebSocketMessageBrokerConfigurer
抽象类
public abstract class AbstractWebSocketMessageBrokerConfigurer implements WebSocketMessageBrokerConfigurer {
public AbstractWebSocketMessageBrokerConfigurer() {
}
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
}
public void configureClientInboundChannel(ChannelRegistration registration) {
}
public void configureClientOutboundChannel(ChannelRegistration registration) {
}
public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
return true;
}
public void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
}
public void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
}
public void configureMessageBroker(MessageBrokerRegistry registry) {
}
}
WebSocketMessageBrokerConfigurer
接口
public interface WebSocketMessageBrokerConfigurer {
// 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs
void registerStompEndpoints(StompEndpointRegistry var1);
// 配置发送与接收的消息参数,可以指定消息字节大小,缓存大小,发送超时时间
void configureWebSocketTransport(WebSocketTransportRegistration var1);
// 设置输入消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
void configureClientInboundChannel(ChannelRegistration var1);
// 设置输出消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
void configureClientOutboundChannel(ChannelRegistration var1);
// 添加自定义的消息转换器,spring 提供多种默认的消息转换器,返回false,不会添加消息转换器,返回true,会添加默认的消息转换器,当然也可以把自己写的消息转换器添加到转换链中
boolean configureMessageConverters(List<MessageConverter> var1);
// 配置消息代理,哪种路径的消息会进行代理处理
void configureMessageBroker(MessageBrokerRegistry var1);
// 自定义控制器方法的参数类型,有兴趣可以百度google HandlerMethodArgumentResolver这个的用法
void addArgumentResolvers(List<HandlerMethodArgumentResolver> var1);
// 自定义控制器方法返回值类型,有兴趣可以百度google HandlerMethodReturnValueHandler这个的用法
void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> var1);
}
在registerStompEndpoints 方法中,我们可以设置websocket服务的地址,同样,我们也可以根据自身业务需求,去添加拦截器,例如前文我们写的WebSocketHandshakeInterceptor拦截器,可以获取到httpsession,同样,当我们把信息存入map 后,都可以通过通过WebSocketSession的getAttributes()下提供get方法获取
/**
* 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,
* 也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs
*
* @param registry
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry)
{
/*
* 1. 将 /serviceName/stomp/websocketJs路径注册为STOMP的端点,
* 用户连接了这个端点后就可以进行websocket通讯,支持socketJs
* 2. setAllowedOrigins("*")表示可以跨域
* 3. withSockJS()表示支持socktJS访问
* 4. 添加自定义拦截器,这个拦截器是上一个demo自己定义的获取httpsession的拦截器
*/
registry.addEndpoint("/stomp/websocketJS")
.setAllowedOrigins("*")
.withSockJS()
.setInterceptors(new WebSocketHandshakeInterceptor())
;
/*
* 看了下源码,它的实现类是WebMvcStompEndpointRegistry ,
* addEndpoint是添加到WebMvcStompWebSocketEndpointRegistration的集合中,
* 所以可以添加多个端点
*/
registry.addEndpoint("/stomp/websocket");
}
如果我们业务关心,用户的数量,在线数量,连接状况等数据,我们也可以通过ChannelRegistration对象的setInterceptors方法添加监听,这里先展示一个完整的实现类,监听接口在后面会介绍,代码中的WebSocketHandshakeInterceptor
拦截器,是上一个例子已经实现的,用于存储httpsession,WebSocketChannelInterceptor
拦截器 ,在这个拦截器中可以做一些在线人数统计等操作,后面会介绍
package com.wzh.demo.websocket.config;
import com.wzh.demo.websocket.handler.MyPrincipalHandshakeHandler;
import com.wzh.demo.websocket.interceptor.WebSocketChannelInterceptor;
import com.wzh.demo.websocket.interceptor.WebSocketHandshakeInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.DefaultManagedTaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.AntPathMatcher;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
import java.util.List;
/**
* <配置基于STOMP的websocket>
* <功能详细描述>
* @author wzh
* @version 2018-08-12 18:38
* @see [相关类/方法] (可选)
**/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
/**
* 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,
* 也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs
*
* @param registry
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry)
{
/*
* 1. 将 /serviceName/stomp/websocketJs路径注册为STOMP的端点,
* 用户连接了这个端点后就可以进行websocket通讯,支持socketJs
* 2. setAllowedOrigins("*")表示可以跨域
* 3. withSockJS()表示支持socktJS访问
* 4. addInterceptors 添加自定义拦截器,这个拦截器是上一个demo自己定义的获取httpsession的拦截器
* 5. addInterceptors 添加拦截处理,这里MyPrincipalHandshakeHandler 封装的认证用户信息
*/
registry.addEndpoint("/stomp/websocketJS")
//.setAllowedOrigins("*")
.addInterceptors(new WebSocketHandshakeInterceptor())
.setHandshakeHandler(new MyPrincipalHandshakeHandler())
.withSockJS()
;
/*
* 看了下源码,它的实现类是WebMvcStompEndpointRegistry ,
* addEndpoint是添加到WebMvcStompWebSocketEndpointRegistration的集合中,
* 所以可以添加多个端点
*/
registry.addEndpoint("/stomp/websocket");
}
/**
* 配置消息代理
* @param registry
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry)
{
/*
* enableStompBrokerRelay 配置外部的STOMP服务,需要安装额外的支持 比如rabbitmq或activemq
* 1. 配置代理域,可以配置多个,此段代码配置代理目的地的前缀为 /topicTest 或者 /userTest
* 我们就可以在配置的域上向客户端推送消息
* 3. 可以通过 setRelayHost 配置代理监听的host,默认为localhost
* 4. 可以通过 setRelayPort 配置代理监听的端口,默认为61613
* 5. 可以通过 setClientLogin 和 setClientPasscode 配置账号和密码
* 6. setxxx这种设置方法是可选的,根据业务需要自行配置,也可以使用默认配置
*/
//registry.enableStompBrokerRelay("/topicTest","/userTest")
//.setRelayHost("rabbit.someotherserver")
//.setRelayPort(62623);
//.setClientLogin("userName")
//.setClientPasscode("password")
//;
// 自定义调度器,用于控制心跳线程
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();