一、什么都不用说,导入个依赖先
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
二、推送到前端的消息实体类
import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; @Data @Builder @NoArgsConstructor @AllArgsConstructor public class NotifyBean<T> implements Serializable { private static final long serialVersionUID = 1L; private int type; private String message; private T data; }
三、因为要实现点对点的推送,所以需要创建一个监听器来获取到websocket的session,如下:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.web.socket.messaging.SessionConnectEvent; public class STOMPConnectEventListener implements ApplicationListener<SessionConnectEvent> { @Autowired private RedisHelper redisHelper; @Override public void onApplicationEvent(SessionConnectEvent event) { StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage()); //login get from browser if(sha.getNativeHeader("userid")==null){ return; } String userid = sha.getNativeHeader("userid").get(0); String sessionId = sha.getSessionId(); redisHelper.redisTemplate.opsForValue().set("websocket:"+userid,sessionId); } }
四、最重要的配置类
import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.ChannelInterceptorAdapter; import org.springframework.messaging.support.MessageHeaderAccessor; import org.springframework.web.socket.config.annotation.*; @Configuration @EnableWebSocketMessageBroker @Slf4j public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { //STOMP监听类 @Bean public STOMPConnectEventListener applicationStartListener(){ return new STOMPConnectEventListener(); } @Override public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) { //建立连接端点,注册一个STOMP的协议节点,并指定使用SockJS协议 stompEndpointRegistry.addEndpoint("/nmpSocketWeb") .setAllowedOrigins("*") .withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry messageBrokerRegistry) { //配置消息代理(MessageBroker)。 messageBrokerRegistry.enableSimpleBroker("/topic");// 推送消息前缀 messageBrokerRegistry.setApplicationDestinationPrefixes("/app");// 应用请求前缀,前端发过来的消息将会带有“/app”前缀。 } @Override public void configureClientInboundChannel(ChannelRegistration registration) { //token认证 registration.setInterceptors(new ChannelInterceptorAdapter() { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (StompCommand.CONNECT.equals(accessor.getCommand()) || StompCommand.SEND.equals(accessor.getCommand())) { String token = accessor.getFirstNativeHeader("token"); try { tokenValidate(token); } catch (Exception e) { log.error(e.toString()); return null; } } return message; } }); } public boolean tokenValidate(String token) throws Exception { if (token == null || token.isEmpty()) { throw new Exception("webSocket:token为空!"); } if (JwtUtil.validateToken(token)==null) { throw new Exception("webSoc:token无效!"); } return true; } }
代码中有详细的解释,认真看可以看明白的。
五、controller
import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Api(tags="WebSocket控制器",description="WebSocket控制器") @Controller @RequestMapping(value = "/webSocket") public class WebSocketController extends BaseController { @Autowired private SimpMessagingTemplate simpMessagingTemplate; @Autowired private RedisHelper redisHelper; @ApiOperation(value = "测试主动发送消息", notes = "测试主动发送消息", httpMethod = "GET") @RequestMapping(value = "/sendMsg") @ResponseBody public void sendMsg(){ System.out.println("测试主动发送消息"); NotifyBean notifyBean = NotifyBean.builder().message("服务器给你发消息啦!").build(); simpMessagingTemplate.convertAndSend(WebConstant.WEB_SC_TOPIC_NOTIFY,notifyBean); } @MessageMapping("/test") //当浏览器向服务端发送请求时,通过@MessageMapping映射/welcome这个地址,类似于@ResponseMapping @SendTo(WebConstant.WEB_SC_TOPIC_NOTIFY)//当服务器有消息时,会对订阅了@SendTo中的路径的浏览器发送消息 public NotifyBean test(UserVo userVo) { try { //睡眠1秒 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } NotifyBean notifyBean = NotifyBean.builder().message("welcome!"+ userVo.getName()).build(); return notifyBean; } /** * 点对点发送消息demo * 根据用户key发送消息 * @param userVo * @return * @throws Exception */ @MessageMapping("/test/toOne") public void toOne(UserVo userVo) throws Exception { String sessionId=(String)redisHelper.redisTemplate.opsForValue().get("websocket:"+userVo.getId()); NotifyBean notifyBean = NotifyBean.builder().message("welcome!"+ userVo.getName()).build(); //convertAndSendToUser该方法会在订阅路径前拼接"/user",所以前端订阅的路径全路径是"/user/topic/notify" simpMessagingTemplate.convertAndSendToUser(sessionId, WebConstant.WEB_SC_TOPIC_NOTIFY,notifyBean,createHeaders(sessionId)); } private MessageHeaders createHeaders(String sessionId) { SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); headerAccessor.setSessionId(sessionId); headerAccessor.setLeaveMutable(true); return headerAccessor.getMessageHeaders(); } }
六、前端页面
<!DOCTYPE html> <html xmlns:th="http://www.thymeleaf.org"> <link lang="en" xmlns:th="http://www.w3.org/1999/xhtml"></link> <link href="/webjars/bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet"></link> <head> <script th:src="@{sockjs.min.js}"></script> <script th:src="@{stomp.min.js}"></script> <script th:src="@{jquery-1.11.3.min.js}"></script> </head> <body> <blockquote class="layui-elem-quote">/user/topic-message</blockquote> <div id="main-content" class="container"> <div class="row"> <div class="col-md-6"> <form class="form-inline"> <div class="form-group"> <label for="connect">WebSocket connection:</label> <button id="connect" class="btn btn-default" type="submit">Connect</button> <button id="disconnect" class="btn btn-default" type="submit" disabled="disabled">Disconnect </button> </div> </form> </div> <div class="col-md-6"> <form class="form-inline"> <div class="form-group"> <label for="name">What is your name?</label> <input type="text" id="name" class="form-control" placeholder="Your name here..."></input> </div> <button id="send" class="btn btn-default" type="submit">Send</button> </form> </div> </div> <div class="row"> <div class="col-md-12"> <table id="conversation" class="table table-striped"> <thead> <tr> <th>Greetings</th> </tr> </thead> <tbody id="greetings"> </tbody> </table> </div> <div id="message"></div> </div> </div> <script> // /msg/sendcommuser var stompClient = null; //传递用户key值 var login = "ricky"; function setConnected(connected) { $("#connect").prop("disabled", connected); $("#disconnect").prop("disabled", !connected); if (connected) { $("#conversation").show(); } else { $("#conversation").hide(); } $("#greetings").html(""); } function connect() { var socket = new SockJS('/nmpSocketWeb'); stompClient = Stomp.over(socket); stompClient.connect({login:login}, function (frame) { setConnected(true); console.log('Connected: ' + frame); stompClient.subscribe('/user/topic/greetings', function (greeting) { setMessageInnerHTML(JSON.parse(greeting.body).message); console.log(JSON.parse(greeting.body).message) }); }); } function disconnect() { if (stompClient != null) { stompClient.disconnect(); } setConnected(false); console.log("Disconnected"); } function sendName() { stompClient.send("/app/test/toOne", {}, JSON.stringify({'name': $("#name").val(),'id':'ricky'})); } function showGreeting(message) { $("#greetings").append("<tr><td>" + message + "</td></tr>"); } $(function () { $("form").on('submit', function (e) { e.preventDefault(); }); $( "#connect" ).click(function() { connect(); }); $( "#disconnect" ).click(function() { disconnect(); }); $( "#send" ).click(function() { sendName(); }); }); //将消息显示在网页上 function setMessageInnerHTML(innerHTML){ console.log(innerHTML); document.getElementById('message').innerHTML += innerHTML + '<br/>'; } </script> </body> </html>
最好,来试试点对点推送。
第一个页面:
第二个页面:
可以看到,后台推送的消息只有一个页面接收到,完事!