前端:
<%@ page language="java" import="java.util.*" pageEncoding="UTF-8" %> <% String path = request.getContextPath(); String basePath = request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + path + "/"; String ppp = request.getServerName() + ":" + request.getServerPort() + path + "/"; %> <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> <html> <head> <base href="<%=basePath%>"> <title>My JSP 'MyJsp.jsp' starting page</title> <meta http-equiv="pragma" content="no-cache"> <meta http-equiv="cache-control" content="no-cache"> <meta http-equiv="expires" content="0"> <meta http-equiv="keywords" content="keyword1,keyword2,keyword3"> <meta http-equiv="description" content="This is my page"> <!-- <link rel="stylesheet" type="text/css" href="styles.css"> --> <script src="baseui/js/plugins/jquery.js"></script> </head> <script> function keepalive(ws) { var device = new Object(); // device.deviceIds = $('#msgText').val(); device.optType = 'hb'; var t = JSON.stringify(device); ws.send(t); } var websocket; if ('WebSocket' in window) { websocket = new WebSocket("ws://<%=ppp%>/app/indexConfig/indexConfigWebSocket.do"); } else if ('MozWebSocket' in window) { websocket = new MozWebSocket("ws://<%=ppp%>/IBMS/point/webSocketServer.do"); } else { websocket = new SockJS("http://<%=ppp%>/IBMS/sockjs/webSocketServer.do"); } websocket.onopen = function (evnt) { var old = $("#msgcount").html(); $("#msgcount").html(old + "</br><font color='green'>" + "onopen 方法" + "</font>"); var old = $("#msgcount").html(); $("#msgcount").html(old + "</br>ws URL:<font color='green'>" + "ws://<%=ppp%>/app/indexConfig/indexConfigWebSocket.do" + "</font>"); setInterval(function () { keepalive(websocket) }, 3000); }; websocket.onmessage = function (evnt) { var old = $("#msgcount").html(); $("#msgcount").html(old + "</br>(<font color='red'>" + evnt.data + "</font>)"); $("#msgcount").scrollTop($("#msgcount")[0].offsetHeight); }; websocket.onerror = function (e) { for (var p in e) { var old = $("#msgcount").html(); $("#msgcount").html(old + "</br>onerror 方法:<font color='green'>" + p + "=" + e[p] + "</font>"); } }; websocket.onclose = function (evnt) { var old = $("#msgcount").html(); $("#msgcount").html(old + "</br>onclose 方法:<font color='green'>" + "onclose" + "</font>"); } function send() { var device = new Object(); //device = {"data":[{"statisticsId":"设备1","statisticsTypeId":"1","statisticsData":"dt1"}, {"statisticsId":"报警1","statisticsTypeId":"2","statisticsData":"dt1"}, {"statisticsId":"点位1","statisticsTypeId":"3","statisticsData":"po9884"}, {"statisticsId":"属性1","statisticsTypeId":"4","statisticsData":"st32,sv91"}], "optType":""}; var t = $('#msgText').val(); //var t = JSON.stringify(device); console.log(t) var old = $("#msgcount").html(); $("#msgcount").html(old + "</br>请求报文:<font color='blue'>" + t + "</font>") websocket.send(t); if (true)return; var param = new Array(); var point = new Object(); point.pointId = '1'; var point2 = new Object(); point2.pointId = '2'; point2.newValue = '789'; var json = JSON.stringify(point); var json2 = JSON.stringify(point2); param[0] = point; param[1] = point2; var t = JSON.stringify(param); t = eval(t); var arrParam = JSON.stringify(t); websocket.send(arrParam); } function pause() { var device = new Object(); device.deviceIds = $('#msgText').val(); device.optType = 'pausePush'; var t = JSON.stringify(device); var old = $("#msgcount").html(); $("#msgcount").html(old + "</br>请求报文:<font color='blue'>" + t + "</font>") websocket.send(t); } </script> <body> <input type="text" id="msgText"> <input type="button" onclick="send()" value="发送"> <input type="button" onclick="pause()" value="暂停"> <br> <div id="msgcount"></div> </body> </html>
后端需要三个类:注册类、握手类、处理类(终端类)
握手类:
import java.net.InetSocketAddress; import java.net.URI; import java.util.Map; import javax.servlet.http.HttpSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; public class WebSocketHandshakeInterceptor implements HandshakeInterceptor { private static Logger logger = LoggerFactory .getLogger(HandshakeInterceptor.class); @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { if (request instanceof ServletServerHttpRequest) { // ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request; // HttpSession session = servletRequest.getServletRequest() // .getSession(true); // 保存session中已登录的用户到websocket的上下文环境中。在推送消息的时候,需要根据当前登录用户获取点位权限 final IbmsUser user = IbmsUserHolder.getUser(); attributes.put(IBMSConstant.SESSION_WEBSOCKET_LOGINED_USER, user); // if (session != null) { // // 使用userName区分WebSocketHandler,以便定向发送消息 // String userName = (String) session // .getAttribute(Constants.SESSION_USERNAME); // if(userName==null){ // userName = "qianshihua"; // } // attributes.put(Constants.WEBSOCKET_USERNAME, userName); // } } return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { URI uri = request.getURI(); InetSocketAddress remoteAddress = request.getRemoteAddress(); String msg = "afterHandshake******************* uri:" + uri + "; remoteAddress;" + remoteAddress; System.err.println(msg); logger.debug(msg); } }
websocket注册类,注册类依赖握手类,可以编码实现,也可以直接通过spring配置实现:
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.EnableWebMvc; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; @Configuration @EnableWebMvc @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(systemWebSocketHandler(),"/point/webSocketServer.do").addInterceptors(new WebSocketHandshakeInterceptor()) // .setAllowedOrigins("http://localhost:8087","http://10.16.38.21:8087","http://localhost:63342") ; registry.addHandler(systemWebSocketHandler(), "/point/sockjs/webSocketServer.do").addInterceptors(new WebSocketHandshakeInterceptor()) .withSockJS(); registry.addHandler(indexConfigWebSocketHandler(),"/app/indexConfig/indexConfigWebSocket.do").addInterceptors(new WebSocketHandshakeInterceptor()); } @Bean public WebSocketHandler systemWebSocketHandler(){ return new IbmsWebSocketHandler(); } @Bean public WebSocketHandler indexConfigWebSocketHandler(){ return new IndexConfigWebSocketHandler(); } }
后端可以注册多个handler,如上图配置。
handler:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; import java.util.List; /** * 数据订阅处理类 */ public class IndexConfigWebSocketHandler implements WebSocketHandler { private static final Logger logger = LoggerFactory.getLogger(IndexConfigWebSocketHandler.class); @Override public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception { logger.debug("indexconfig connect to the websocket success......"); } /** * 处理前端发起的订阅信息 * 订阅列表中的id包含fmt前缀 * * @param session * @param message * @throws Exception */ @Override public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { String jsontext = (String) message.getPayload(); logger.info("收到统计项订阅:::" + jsontext); Object objUser = session.getAttributes().get( IBMSConstant.SESSION_WEBSOCKET_LOGINED_USER); if (objUser == null) { // 取不到session中的用户信息 throw new RuntimeException("会话中无用户信息"); } JSONObject jsonObject = JSONObject.parseObject(jsontext); Object optType = jsonObject.get("optType");//状态字段 String data = jsonObject.getString("data");//数据字段 //将数据字段解析成SubscribeBO列表 List<SubscribeBO> subscribeBOs = JSON.parseArray(data, SubscribeBO.class); boolean ignoreSession = false; if (subscribeBOs == null || subscribeBOs.size() == 0) { if ("pausePush".equals(optType)) { //如果data为空,并且optType==pausePush,关闭该session的所有推送 this.removeReader(session); } return; } if (optType != null && "hb".equals(optType)) { //心跳 return; } if (optType != null && "pausePush".equals(optType)) { //暂时关闭推送 ignoreSession = true; } for (int i = 0; i < subscribeBOs.size(); i++) { SubscribeBO item = subscribeBOs.get(i); String id = item.getSubscribeId(); String type = item.getSubscribeTypeId(); if (StringUtils.isBlank(id) || StringUtils.isBlank(type)) { continue; } /*if(SubscribeType.WEATHER.getCode().equals(type)){ //如果是天气预报,构造唯一的天气订阅 item.setSubscribeData(JOBDATA_KEY_WEATHER); item.setSubscribeId(JOBDATA_KEY_WEATHER); }*/ //根据类型不同,选择不同的存储空间 BaseWSSHolder holder = this.getHolderByType(type); //根据SubscribeBO获取已订阅的session列表 List<WebSocketSession> sessions = holder.getSessionBySubscribe(item); boolean exists = false; for (WebSocketSession wss : sessions) { //将本次session与session列表进行比对,已存在则 exists = true; if (wss.equals(session)) { exists = true; } } String msg = "关注"; if (ignoreSession == true) { msg = "取消关注"; } logger.info("websocket会话:" + session + msg + "了:" + SubscribeType.getDes(item.getSubscribeTypeId()) + " " + item.getSubscribeData()); //如果session列表中不存在本次session,则加入 if (exists == false && ignoreSession == false) { holder.putSession(session, item); } if (exists == true && ignoreSession == true) { //暂时关闭推送 sessions.remove(session); } } } @Override public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) throws Exception { if (webSocketSession.isOpen()) { webSocketSession.close(); } logger.debug("indexconfig websocket connection closed......"); } @Override public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception { logger.debug("indexconfig websocket connection closed......"); } @Override public boolean supportsPartialMessages() { return false; } /** * 根据类型获取session holder * * @param type * @return * @throws Exception */ private BaseWSSHolder getHolderByType(String type) throws Exception { SubscribeType subscribeType = SubscribeType.getByCode(type); BaseWSSHolder holder = null; if (subscribeType == null) { throw new Exception("数据传入错误"); } switch (subscribeType) { case DEVICE_COUNT: //设备数量 holder = DeviceWSSHolder.getInstance(); break; case ALARM_DEVICE_COUNT: //报警数量 holder = AlarmDeviceWSSHolder.getInstance(); break; case STATE_DEVICE_COUNT: //某状态设备数量 holder = StateDeviceWSSHolder.getInstance(); break; case POINT_COUNT: //点位值 holder = PointWSSHolder.getInstance(); break; case WEATHER: //点位值 holder = WeatherWSSHolder.getInstance(); break; } if (holder == null) { logger.error("不存在对应的存储:" + type); throw new Exception("不存在对应的存储:" + type); } return holder; } private void removeReader(WebSocketSession session) { AlarmDeviceWSSHolder.getInstance().removeReader(session, null); DeviceWSSHolder.getInstance().removeReader(session, null); PointWSSHolder.getInstance().removeReader(session, null); StateDeviceWSSHolder.getInstance().removeReader(session, null); WeatherWSSHolder.getInstance().removeReader(session, null); } }
订阅的信息存储在内存中,形式为<session,List<data>>的键值对
存储工具类:
import com.google.common.collect.Lists; import org.springframework.web.socket.WebSocketSession; import java.util.*; /** * 保存设备数量的订阅(哪些session订阅了设备数量) * 不存储统计数据值 */ public class DeviceWSSHolder implements BaseWSSHolder { /** * key值为统计,value值回哪些session关心这个点位 */ private Map<SubscribeBO, List<WebSocketSession>> sessions; private DeviceWSSHolder() { } private static class SingletonHolder { public final static DeviceWSSHolder holder = new DeviceWSSHolder(); } public static DeviceWSSHolder getInstance() { return SingletonHolder.holder; } /** * 保存统计ID和websocket会话的关系 * * @param s * @param subscribeBO */ @Override public void putSession(WebSocketSession s, SubscribeBO subscribeBO) { if (getInstance().sessions == null) { getInstance().sessions = new HashMap<SubscribeBO, List<WebSocketSession>>(); } if (getInstance().sessions.get(subscribeBO) == null) { getInstance().sessions.put(subscribeBO, new ArrayList<WebSocketSession>()); } final List<WebSocketSession> list = getInstance().sessions.get(subscribeBO); list.add(s); } @Override public void removeReader(WebSocketSession reader, SubscribeBO subscribeBO) { if (getInstance().sessions != null && reader != null) { if (subscribeBO != null) { //移除该session的某个具体订阅 List<WebSocketSession> readers = this.getSessionBySubscribe(subscribeBO); if (readers.size() > 0 && readers.contains(reader)) { readers.remove(reader); } } else { //移除该session的所有订阅 for (Map.Entry<SubscribeBO, List<WebSocketSession>> entry : getInstance().sessions.entrySet()) { List<WebSocketSession> readers = entry.getValue(); //确定有session订阅 if (readers.size() > 0 && readers.contains(reader)) { readers.remove(reader); break; } } } } } /** * 根据统计ID获取websocket的会话信息 * * @param subscribeBO * @return */ @Override public List<WebSocketSession> getSessionBySubscribe(SubscribeBO subscribeBO) { if (getInstance().sessions == null) { getInstance().sessions = new HashMap<SubscribeBO, List<WebSocketSession>>(); } if (getInstance().sessions.get(subscribeBO) == null) { getInstance().sessions.put(subscribeBO, new ArrayList<WebSocketSession>()); } return getInstance().sessions.get(subscribeBO); } /** * 获取所有有session订阅的业务ID * 业务ID带de前缀 * @return */ public List<String> getEffectDataIds() { List<String> ids = Lists.newArrayList(); if (getInstance().sessions != null) { for (Map.Entry<SubscribeBO, List<WebSocketSession>> entry : getInstance().sessions.entrySet()) { List<WebSocketSession> readers = entry.getValue(); //确定有session订阅 if (readers != null && readers.size() > 0) { SubscribeBO bo = entry.getKey(); ids.add(bo.getSubscribeData());//真正的业务id } } } //String idsStr = Joiner.on(",").join(ids); return ids; } /** * 根据SubscribeBO获取一条订阅信息 * @param condition * @return */ public Map.Entry<SubscribeBO, List<WebSocketSession>> getItemBySubscribeBO(SubscribeBO condition) { if (getInstance().sessions != null && condition != null) { for (Map.Entry<SubscribeBO, List<WebSocketSession>> entry : getInstance().sessions.entrySet()) { if (entry.getKey().equals(condition)) { return entry; } } } return null; } /*public SubscribeBO getSubscribeByData(Long data) { Set<SubscribeBO> boSet = getInstance().sessions.keySet(); for (SubscribeBO bo : boSet) { System.out.println(str); } List<Long> ids = Lists.newArrayList(); if (getInstance().sessions != null) { for (Map.Entry<SubscribeBO, List<WebSocketSession>> entry : getInstance().sessions.entrySet()) { List<WebSocketSession> readers = entry.getValue(); //确定有session订阅 if (readers != null && readers.size() > 0) { SubscribeBO bo = entry.getKey(); ids.add(Long.parseLong(bo.getData()));//真正的业务id } } } //String idsStr = Joiner.on(",").join(ids); return ids; }*/ }
订阅工具类(subscribeBO):主要就是将接收到的websocket信息转成java对象
import com.alibaba.fastjson.annotation.JSONField; /** * 订阅的对象 */ public class SubscribeBO { @JSONField(name="statisticsId") private String subscribeId; @JSONField(name="statisticsTypeId") private String subscribeTypeId; @JSONField(name="statisticsData") private String subscribeData; @JSONField(name="statisticsValue") private String subscribeValue; public SubscribeBO() { } public SubscribeBO(String subscribeTypeId, String subscribeData) { this.subscribeTypeId = subscribeTypeId; this.subscribeData = subscribeData; } public SubscribeBO(String subscribeId, String subscribeTypeId, String subscribeData) { this.subscribeId = subscribeId; this.subscribeTypeId = subscribeTypeId; this.subscribeData = subscribeData; } public String getSubscribeId() { return subscribeId; } public void setSubscribeId(String subscribeId) { this.subscribeId = subscribeId; } public String getSubscribeTypeId() { return subscribeTypeId; } public void setSubscribeTypeId(String subscribeTypeId) { this.subscribeTypeId = subscribeTypeId; } public String getSubscribeData() { return subscribeData; } public void setSubscribeData(String subscribeData) { this.subscribeData = subscribeData; } public String getSubscribeValue() { return subscribeValue; } public void setSubscribeValue(String subscribeValue) { this.subscribeValue = subscribeValue; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SubscribeBO bo = (SubscribeBO) o; if (!subscribeTypeId.equals(bo.subscribeTypeId)) return false; return subscribeData != null ? subscribeData.equals(bo.subscribeData) : bo.subscribeData == null; } @Override public int hashCode() { int result = subscribeTypeId.hashCode(); result = 31 * result + (subscribeData != null ? subscribeData.hashCode() : 0); return result; } }
推送代码太多,主要是通过spring+Quartz进行后台运算,运算完毕之后将值按照订阅(DeviceWSSHolder)反查session,发送到客户端
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.Lists; import org.apache.commons.lang.math.RandomUtils; import org.quartz.DisallowConcurrentExecution; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.quartz.QuartzJobBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import java.io.IOException; import java.util.*; /** * 设备数量推送任务 */ @DisallowConcurrentExecution public class DevicePushJob extends QuartzJobBean { private Logger log = LoggerFactory.getLogger(DevicePushJob.class); /* @Autowired private DeviceService deviceService;*/ @SuppressWarnings("unused") @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { final JobDataMap jobDataMap = context.getJobDetail().getJobDataMap(); final Object object = jobDataMap.get(JOBDATA_KEY_APPCTX); final ApplicationContext appCtx = (ApplicationContext) object; final DeviceWSSHolder deviceWSSHolder = (DeviceWSSHolder) jobDataMap .get(JOBDATA_KEY_INDEXCONFIG_DEVICE); List<String> ids = deviceWSSHolder.getEffectDataIds(); if(ids.size()==0){ return; } //String idsStr = Joiner.on(",").join(ids); //System.out.println("××××××××××××××××××要查询的设备类别是:"+idsStr); //log.info("××××××××××××××××××要查询的设备类别是:"+idsStr); //查询数据 id,type,value 把数据都装到新的List<SubscribeBO>中发送,DeviceWSSHolder数据仅作为字典查询用 List<Integer> integers = Lists.newArrayList(); for (String typeIdFmt : ids) { List<String> result = Splitter.onPattern(PORTLET_TREE_DEVICETYPE_FORMAT) .omitEmptyStrings().splitToList(typeIdFmt); Integer id = Integer.parseInt(result.get(0)); integers.add(id); } List<SubscribeBO> subscribeBOs = Lists.newArrayList(); DeviceService deviceService = appCtx.getBean(DeviceService.class); Map<Integer,Integer> deviceCounts = deviceService.countDeviceByDeviceType(integers,false); if (deviceCounts == null || deviceCounts.size()==0) { return; } for (Map.Entry<Integer, Integer> deviceCount : deviceCounts.entrySet()) { Integer deviceTypeId = deviceCount.getKey(); Integer count = deviceCount.getValue(); SubscribeBO sb = new SubscribeBO(SubscribeType.DEVICE_COUNT .getCode(),PORTLET_TREE_DEVICETYPE_FORMAT+deviceTypeId.toString()); sb.setSubscribeValue(""+count); subscribeBOs.add(sb); } for(SubscribeBO bo:subscribeBOs){ Map.Entry<SubscribeBO, List<WebSocketSession>> entry = DeviceWSSHolder .getInstance().getItemBySubscribeBO(bo); if(entry !=null){ SubscribeBO temp = entry.getKey(); bo.setSubscribeId(temp.getSubscribeId()); List<WebSocketSession> sessions = entry.getValue(); Iterator<WebSocketSession> iterator = sessions.iterator(); while (iterator.hasNext()) { WebSocketSession session = iterator.next(); if (session != null && session.isOpen()) { try { JSONObject ret = new JSONObject(); ret.put("success", true); List<SubscribeBO> retSbo = Lists.newArrayList(bo); ret.put("data", retSbo); String jsonString = JSON.toJSONString(ret); //System.err.println(jsonString); log.info(jsonString); session.sendMessage(new TextMessage(jsonString)); } catch (IOException e) { log.error(e.getMessage()); } }else{ iterator.remove(); } } } } } }
附:seajs封装的前端js工具类
define(function (require, exports, module) { var devWebSocket = {}; var indexConfigSocket = function (opt) { if ('WebSocket' in window) { devWebSocket = new WebSocket("ws://" + window.location.host + basePath + "/app/indexConfig/indexConfigWebSocket.do"); } else if ('MozWebSocket' in window) { devWebSocket = new MozWebSocket( "ws://" + window.location.host + basePath + "/ws/point/webSocketServer.do"); } else { devWebSocket = new SockJS( "http://" + window.location.host + basePath + "/ws/point/webSocketServer.do"); } devWebSocket.onopen = function (evnt) { }; devWebSocket.onmessage = function (evnt) { //console.log("onMessage:"+"</br>(<font color='red'>" + evnt.data + "</font>)") window.PubSub.publish('indexConfigSocket-onMessage', evnt); }; devWebSocket.onerror = function (e) { console.log('indexConfig webSocket error...'); for (var p in e) { //alert(p + "=" + e[p]); } }; devWebSocket.onclose = function (evnt) { console.log('indexConfig webSocket error...'); }; }; indexConfigSocket.prototype.send = function (indexConfigIdsStr) { var indexConfig = {}; indexConfig.data = indexConfigIdsStr; indexConfig.optType = ''; var t = JSON.stringify(indexConfig); console.log("</br>请求报文:<font color='blue'>" + t + "</font>") devWebSocket.send(t); }; indexConfigSocket.prototype.close = function (indexConfigIdsStr) { var indexConfig = {}; indexConfig.data = indexConfigIdsStr == null ? [] : indexConfigIdsStr; indexConfig.optType = 'pausePush'; var t = JSON.stringify(indexConfig); console.log("关闭报文:" + t); devWebSocket.send(t); }; module.exports = indexConfigSocket; })