本篇博客只是记录websocket在自己的项目中的应用,只是记录,不做说明(后来替换为GoEasy了)。
/** * 握手的设置,这其实是为了获取session */ public class GetHttpSessionConfigurator extends ServerEndpointConfig.Configurator { @Override public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) { HttpSession httpSession = (HttpSession) request.getHttpSession(); config.getUserProperties().put(HttpSession.class.getName(), httpSession); } }
/** * JSR356定义了WebSocket的规范,Tomcat7中实现了该标准。 * JSR356 的 WebSocket 规范使用 javax.websocket.*的 API, * 可以将一个普通 Java 对象(POJO)使用 @ServerEndpoint 注释作为 WebSocket 服务器的端点。 * value必须以"/"开始,是否已"/"结束无所谓, * configurator指的是ServerEndpointConfig 的配置信息,可以配置握手协议等信息,而在本例中是为了获取session */ @ServerEndpoint(value = "/websocket/task", configurator = GetHttpSessionConfigurator.class) public class TaskServer { private static Logger logger = LoggerFactory.getLogger(TaskServer.class); //ConcurrentHashMap在线程安全的基础上提供了更好的写并发能力,但同时降低了对读一致性的要求 private static final Map<String, Session> doctorMap = new ConcurrentHashMap<String, Session>(); //使用ServerEndpoint注释的类必须有一个公共的无参数构造函数 public TaskServer() { } @OnOpen public void onOpen(Session session, EndpointConfig config) { HttpSession httpSession = (HttpSession) config.getUserProperties().get(HttpSession.class.getName()); if (httpSession == null) { return; } // DoctorUser doctor = (DoctorUser) httpSession.getAttribute(Constants.SESSION_DOCTOR); Doctoruser doctor = (Doctoruser) httpSession.getAttribute(Constants.SESSION_DOCTOR_USER); String doctorID = doctor.getId(); logger.info("doctor " + doctor.getName() + " id = " + doctor.getId() + " 建立连接"); doctorMap.put(doctorID, session); // attributes.put(Constants.WEBSOCKET_DOCTORNAME,userName); } @OnClose public void onClose(Session session) { // doctorMap.remove( doctorID); boolean has = doctorMap.containsValue(session); String doctorId = null; if (has == true) { for (Map.Entry entry : doctorMap.entrySet()) { if (entry.getValue().equals(session)) { doctorId = (String) entry.getKey(); doctorMap.remove(doctorId); } } } else { logger.info("on close error"); } logger.info("doctor " + doctorId + " 断开连接"); } @OnMessage public void incoming(String message, Session session) { //logger.info("recv message: " + message); sendMessage("pong",session); } @OnError public void onError(Throwable t) throws Throwable { logger.info("socket 连接错误 " + t.toString()); } //把消息推送给前端,在本例中推送是给doctorID推送message(数据库表中任务表的id) public static void broadcast(String doctorID, String message) { Session session = doctorMap.get(doctorID); if (session == null) { logger.warn("用户:" + doctorID + "socket连接不存在"); return; } session.getAsyncRemote().sendText(message); //非阻塞、异步 /*try { //详见 《 websocket getAsyncRemote()和getBasicRemote()区别》 //http://blog.csdn.net/who_is_xiaoming/article/details/53287691 //session.getBasicRemote().sendText(message); //阻塞、同步 session.getAsyncRemote().sendText(message); //非阻塞、异步 } catch (IOException e) { logger.error("推送消息错误--" + e); doctorMap.remove(doctorID); try { session.close(); } catch (IOException e1) { e1.printStackTrace(); } }*/ } /** * 发送信息 * * @param message 发送内容 * @param session 用户session */ public void sendMessage(String message, Session session) { try { synchronized (session) { if (session.isOpen()) { session.getBasicRemote().sendText(message); } } } catch (Exception e) { logger.error("send message exception", e); } } public static boolean hasConnection(String doctorID) { if (doctorMap.get(doctorID) == null) return false; return true; } public static List<String> getDoctorList() { ArrayList<String> doctors = new ArrayList<>(); for (Map.Entry entry : doctorMap.entrySet()) { doctors.add((String) entry.getKey()); } return doctors; } public static Map<String, Session> getDoctorMap() { return doctorMap; } }
<script type="text/javascript"> /* 1.连接上之后,每秒发送一个心跳,服务器同样返回一个心跳,用来表示服务器没挂。 2.断线重连(我们测试的环境是断开网络连接),断开网络后,心跳包无法发送出去,所以如果当前时间距离上次成功心跳的时间超过20秒,说明连接已经出现问题了,此时需要关闭连接。 3.第一次关闭连接时websocket会尝试重连,设置了一个时间期限,10秒。10秒内如果能连上(恢复网络连接)就可以继续收发消息,连不上就关闭了,并且不会重连。 4.30秒内收不到服务器消息(心跳每秒发送),我就认为服务器已经挂了,就会调用close事件,然后进入第3步。 */ /* •一个定时器ws.keepAliveTimer,用来每秒发送一次心跳。 •上次心跳成功的时间ws.last_health_time以及当前时间let time = new Date().getTime();。 •断开连接(ws.close())时的时间reconnect,因为在close事件发生后需要重连10秒。 •是否已经重连过reconnectMark。 •断开连接(ws.close())时需要保存ws对象tempWs。我曾试图ws = { ...ws }发现会丢失绑定的事件。 •一个定时时间为30秒的setTimeout定时器ws.receiveMessageTimer,用来表示服务器是否在30秒内返回了消息。 */ var ws = null; var url = 'ws://' + window.location.host + "${rootPath}/websocket/task"; var queue = new Queue(); //js的队列先进先出,把推送任务放到里面 $(function () { connect(url); getTask(); }); //==============================================推送================================================ function connect(url) { // 用户登录了并且没有连接过websocket if ("${sessionScope.SESSION_DOCTOR_ACCOUNT.id}".length == 32) { ws = new WebSocket(url); ws.last_health_time = -1; // 上次心跳成功的时间 //保持连接 ws.keepalive = function () { var time = new Date().getTime(); //当前时间 /*断线重连(我们测试的环境是断开网络连接),断开网络后,心跳包无法发送出去, 所以如果当前时间距离上次成功心跳的时间超过20秒,说明连接已经出现问题了,此时需要关闭连接。*/ if (ws.last_health_time !== -1 && time - ws.last_health_time > 20000) { ws.close(); console.log("Info: connection closed."); } else { /*连接上之后,每秒发送一个心跳,服务器同样返回一个心跳,用来表示服务器没挂。 ws.bufferedAmount === 0 && ws.readyState === 1 均表示连接是正常的*/ if (ws.bufferedAmount === 0 && ws.readyState === 1) { ws.send("ping"); ws.last_health_time = time; } } } if (ws) { var reconnectTime = 0; //断开连接的时间 var reconnectMark = false; //是否重连过 ws.onopen = function () { reconnectTime = 0; reconnectMark = false; // 30s没收到信息,代表服务器出问题了,关闭连接。如果收到消息了,重置该定时器。 ws.receiveMessageTimer = setTimeout(function () { ws.close(); console.log("Info: connection closed."); }, 30000); // 为1表示连接处于open状态,keepAliveTimer用来每秒发送一次心跳; if (ws.readyState === 1) { ws.keepAliveTimer = setInterval(function () { ws.keepalive(); }, 5000); } console.log("Info: connection opened. " + url); } ws.onerror = function () { console.error("onerror"); } ws.onmessage = function (event) { //console.log("Received:" + event.data+" "+new Date().getTime()); //真正的逻辑代码 if(event.data!="pong"){ queue.push(event.data); playSound(); $("#taskNum").html(queue.size()); var src1 = $("#iframe").attr("src"); if (src1 == "doctor/noTask.do") { srcValue("doctor/ecgTask.do"); } } // 收到消息,重置定时器 clearTimeout(ws.receiveMessageTimer); // 30s没收到信息,代表服务器出问题了,关闭连接。 ws.receiveMessageTimer = setTimeout(function () { ws.close(); console.log("Info: connection closed."); }, 30000); } ws.onclose = function () { clearTimeout(ws.receiveMessageTimer); clearInterval(ws.keepAliveTimer); // 如果没有重连过,进行重连。 if (!reconnectMark) { reconnectTime = new Date().getTime(); reconnectMark = true; } var tempWs = ws; // 保存ws对象 /*第一次关闭连接时websocket会尝试重连,设置了一个时间期限,10秒。 10秒内如果能连上(恢复网络连接)就可以继续收发消息,连不上就关闭了,并且不会重连。*/ if (new Date().getTime() - reconnectTime >= 10000) { ws.close(); console.log("Info: connection closed."); } else { ws = new WebSocket(url); ws.onopen = tempWs.onopen; ws.onmessage = tempWs.onmessage; ws.onerror = tempWs.onerror; ws.onclose = tempWs.onclose; ws.keepalive = tempWs.keepalive; ws.last_health_time = -1; } } } else { alert("This browser does not support webSocket"); return false; } } } <!--断开连接--> function disconnect() { if (ws!= null) { ws.close(); ws = null; window.location = document.getElementsByTagName("base")[0].getAttribute("href")+ "doctor/logout.do"; } } <!--播放音乐--> function playSound() { var borswer = window.navigator.userAgent.toLowerCase(); if (borswer.indexOf("ie") >= 0) { //IE内核浏览器 var strEmbed = '<embed name="embedPlay" src="sounds/doctorTask.wav" autostart="true" hidden="true" loop="false"></embed>'; if ($("body").find("embed").length <= 0) $("body").append(strEmbed); var embed = document.embedPlay; //浏览器不支持 audion,则使用 embed 播放 embed.volume = 100; //embed.play();这个不需要 } else { var audio = new Audio("sounds/doctorTask.wav"); //audio.loop="loop"; audio.play(); } } <!--onbeforeunload 事件在即将离开当前页面(刷新或关闭)时触发。--> window.onbeforeunload = signout; function signout() { disconnect(); } //改变iframe的src值 function srcValue(val) { $("#iframe").attr("src", val); } //退出 function logout(){ disconnect(); $("#exitModal").modal("hide"); } </script>