zoukankan      html  css  js  c++  java
  • Java web项目使用webSocket

    前端:

    复制代码
    <%@ page language="java" import="java.util.*" pageEncoding="UTF-8" %>
    <%
        String path = request.getContextPath();
        String basePath = request.getScheme() + "://"
                + request.getServerName() + ":" + request.getServerPort()
                + path + "/";
        String ppp = request.getServerName() + ":" + request.getServerPort()
                + path + "/";
    
    
    %>
    
    <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
    <html>
    <head>
        <base href="<%=basePath%>">
    
        <title>My JSP 'MyJsp.jsp' starting page</title>
    
        <meta http-equiv="pragma" content="no-cache">
        <meta http-equiv="cache-control" content="no-cache">
        <meta http-equiv="expires" content="0">
        <meta http-equiv="keywords" content="keyword1,keyword2,keyword3">
        <meta http-equiv="description" content="This is my page">
        <!--
            <link rel="stylesheet" type="text/css" href="styles.css">
            -->
    
        <script src="baseui/js/plugins/jquery.js"></script>
    </head>
    <script>
    
        function keepalive(ws) {
            var device = new Object();
    //        device.deviceIds = $('#msgText').val();
            device.optType = 'hb';
            var t = JSON.stringify(device);
            ws.send(t);
        }
    
        var websocket;
        if ('WebSocket' in window) {
            websocket = new WebSocket("ws://<%=ppp%>/app/indexConfig/indexConfigWebSocket.do");
        } else if ('MozWebSocket' in window) {
            websocket = new MozWebSocket("ws://<%=ppp%>/IBMS/point/webSocketServer.do");
        } else {
            websocket = new SockJS("http://<%=ppp%>/IBMS/sockjs/webSocketServer.do");
        }
        websocket.onopen = function (evnt) {
            var old = $("#msgcount").html();
            $("#msgcount").html(old + "</br><font color='green'>" + "onopen 方法" + "</font>");
    
            var old = $("#msgcount").html();
            $("#msgcount").html(old + "</br>ws URL:<font color='green'>" + "ws://<%=ppp%>/app/indexConfig/indexConfigWebSocket.do" + "</font>");
            setInterval(function () {
                keepalive(websocket)
            }, 3000);
        };
        websocket.onmessage = function (evnt) {
    
            var old = $("#msgcount").html();
            $("#msgcount").html(old + "</br>(<font color='red'>" + evnt.data + "</font>)");
            $("#msgcount").scrollTop($("#msgcount")[0].offsetHeight);
        };
        websocket.onerror = function (e) {
            for (var p in e) {
                var old = $("#msgcount").html();
                $("#msgcount").html(old + "</br>onerror 方法:<font color='green'>" + p + "=" + e[p] + "</font>");
            }
        };
        websocket.onclose = function (evnt) {
            var old = $("#msgcount").html();
            $("#msgcount").html(old + "</br>onclose 方法:<font color='green'>" + "onclose" + "</font>");
        }
    
        function send() {
    
            var device = new Object();
            //device = {"data":[{"statisticsId":"设备1","statisticsTypeId":"1","statisticsData":"dt1"}, {"statisticsId":"报警1","statisticsTypeId":"2","statisticsData":"dt1"}, {"statisticsId":"点位1","statisticsTypeId":"3","statisticsData":"po9884"}, {"statisticsId":"属性1","statisticsTypeId":"4","statisticsData":"st32,sv91"}], "optType":""};
            var t = $('#msgText').val();
            //var t = JSON.stringify(device);
            console.log(t)
            var old = $("#msgcount").html();
            $("#msgcount").html(old + "</br>请求报文:<font color='blue'>" + t + "</font>")
            websocket.send(t);
            if (true)return;
            var param = new Array();
            var point = new Object();
            point.pointId = '1';
            var point2 = new Object();
            point2.pointId = '2';
            point2.newValue = '789';
            var json = JSON.stringify(point);
            var json2 = JSON.stringify(point2);
            param[0] = point;
            param[1] = point2;
            var t = JSON.stringify(param);
            t = eval(t);
            var arrParam = JSON.stringify(t);
            websocket.send(arrParam);
        }
        function pause() {
    
            var device = new Object();
            device.deviceIds = $('#msgText').val();
            device.optType = 'pausePush';
            var t = JSON.stringify(device);
    
            var old = $("#msgcount").html();
            $("#msgcount").html(old + "</br>请求报文:<font color='blue'>" + t + "</font>")
    
            websocket.send(t);
        }
    </script>
    
    
    <body>
    <input type="text" id="msgText">
    <input type="button" onclick="send()" value="发送">
    <input type="button" onclick="pause()" value="暂停">
    
    <br>
    <div id="msgcount"></div>
    </body>
    </html>
    复制代码

    后端需要三个类:注册类、握手类、处理类(终端类)

    握手类:

    复制代码
    import java.net.InetSocketAddress;
    import java.net.URI;
    import java.util.Map;
    
    import javax.servlet.http.HttpSession;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.http.server.ServerHttpRequest;
    import org.springframework.http.server.ServerHttpResponse;
    import org.springframework.http.server.ServletServerHttpRequest;
    import org.springframework.web.socket.WebSocketHandler;
    import org.springframework.web.socket.server.HandshakeInterceptor;
    
    
    public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
    
        private static Logger logger = LoggerFactory
                .getLogger(HandshakeInterceptor.class);
    
        @Override
        public boolean beforeHandshake(ServerHttpRequest request,
                ServerHttpResponse response, WebSocketHandler wsHandler,
                Map<String, Object> attributes) throws Exception {
            if (request instanceof ServletServerHttpRequest) {
    //            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
    //            HttpSession session = servletRequest.getServletRequest()
    //                    .getSession(true);
                // 保存session中已登录的用户到websocket的上下文环境中。在推送消息的时候,需要根据当前登录用户获取点位权限
                final IbmsUser user = IbmsUserHolder.getUser();
                attributes.put(IBMSConstant.SESSION_WEBSOCKET_LOGINED_USER, user);
                // if (session != null) {
                // // 使用userName区分WebSocketHandler,以便定向发送消息
                // String userName = (String) session
                // .getAttribute(Constants.SESSION_USERNAME);
                // if(userName==null){
                // userName = "qianshihua";
                // }
                // attributes.put(Constants.WEBSOCKET_USERNAME, userName);
                // }
            }
            return true;
        }
    
        @Override
        public void afterHandshake(ServerHttpRequest request,
                ServerHttpResponse response, WebSocketHandler wsHandler,
                Exception exception) {
            URI uri = request.getURI();
            InetSocketAddress remoteAddress = request.getRemoteAddress();
            String msg = "afterHandshake*******************
    uri:" + uri + ";
    remoteAddress;" + remoteAddress;
            System.err.println(msg);
            logger.debug(msg);
    
        }
    }
    复制代码

    websocket注册类,注册类依赖握手类,可以编码实现,也可以直接通过spring配置实现:

    复制代码
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.servlet.config.annotation.EnableWebMvc;
    import org.springframework.web.socket.WebSocketHandler;
    import org.springframework.web.socket.config.annotation.EnableWebSocket;
    import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
    import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
    
    @Configuration
    @EnableWebMvc
    @EnableWebSocket
    public class WebSocketConfig  implements WebSocketConfigurer {
        @Override
        public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
            registry.addHandler(systemWebSocketHandler(),"/point/webSocketServer.do").addInterceptors(new WebSocketHandshakeInterceptor())
    //        .setAllowedOrigins("http://localhost:8087","http://10.16.38.21:8087","http://localhost:63342")
            ;
    
            registry.addHandler(systemWebSocketHandler(), "/point/sockjs/webSocketServer.do").addInterceptors(new WebSocketHandshakeInterceptor())
                    .withSockJS();
    
            registry.addHandler(indexConfigWebSocketHandler(),"/app/indexConfig/indexConfigWebSocket.do").addInterceptors(new WebSocketHandshakeInterceptor());
        }
    
        @Bean
        public WebSocketHandler systemWebSocketHandler(){
            return new IbmsWebSocketHandler();
        }
    
        @Bean
        public WebSocketHandler indexConfigWebSocketHandler(){
            return new IndexConfigWebSocketHandler();
        }
    
    }
    复制代码

    后端可以注册多个handler,如上图配置。

    handler:

    复制代码
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.web.socket.CloseStatus;
    import org.springframework.web.socket.WebSocketHandler;
    import org.springframework.web.socket.WebSocketMessage;
    import org.springframework.web.socket.WebSocketSession;
    
    import java.util.List;
    
    /**
     * 数据订阅处理类
     */
    public class IndexConfigWebSocketHandler implements WebSocketHandler {
    
        private static final Logger logger = LoggerFactory.getLogger(IndexConfigWebSocketHandler.class);
    
        @Override
        public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception {
            logger.debug("indexconfig connect to the websocket success......");
        }
    
        /**
         * 处理前端发起的订阅信息
         * 订阅列表中的id包含fmt前缀
         *
         * @param session
         * @param message
         * @throws Exception
         */
        @Override
        public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
            String jsontext = (String) message.getPayload();
            logger.info("收到统计项订阅:::" + jsontext);
            Object objUser = session.getAttributes().get(
                    IBMSConstant.SESSION_WEBSOCKET_LOGINED_USER);
            if (objUser == null) {
                // 取不到session中的用户信息
                throw new RuntimeException("会话中无用户信息");
            }
            JSONObject jsonObject = JSONObject.parseObject(jsontext);
            Object optType = jsonObject.get("optType");//状态字段
            String data = jsonObject.getString("data");//数据字段
            //将数据字段解析成SubscribeBO列表
            List<SubscribeBO> subscribeBOs = JSON.parseArray(data, SubscribeBO.class);
            boolean ignoreSession = false;
            if (subscribeBOs == null || subscribeBOs.size() == 0) {
                if ("pausePush".equals(optType)) {
                    //如果data为空,并且optType==pausePush,关闭该session的所有推送
                    this.removeReader(session);
                }
                return;
            }
            if (optType != null && "hb".equals(optType)) {
                //心跳
                return;
            }
            if (optType != null && "pausePush".equals(optType)) {
                //暂时关闭推送
                ignoreSession = true;
            }
    
    
            for (int i = 0; i < subscribeBOs.size(); i++) {
                SubscribeBO item = subscribeBOs.get(i);
                String id = item.getSubscribeId();
                String type = item.getSubscribeTypeId();
                if (StringUtils.isBlank(id) || StringUtils.isBlank(type)) {
                    continue;
                }
                /*if(SubscribeType.WEATHER.getCode().equals(type)){
                    //如果是天气预报,构造唯一的天气订阅
                    item.setSubscribeData(JOBDATA_KEY_WEATHER);
                    item.setSubscribeId(JOBDATA_KEY_WEATHER);
                }*/
                //根据类型不同,选择不同的存储空间
                BaseWSSHolder holder = this.getHolderByType(type);
                //根据SubscribeBO获取已订阅的session列表
                List<WebSocketSession> sessions = holder.getSessionBySubscribe(item);
                boolean exists = false;
                for (WebSocketSession wss : sessions) {
                    //将本次session与session列表进行比对,已存在则 exists = true;
                    if (wss.equals(session)) {
                        exists = true;
                    }
                }
                String msg = "关注";
                if (ignoreSession == true) {
                    msg = "取消关注";
                }
                logger.info("websocket会话:" + session + msg + "了:"
                        + SubscribeType.getDes(item.getSubscribeTypeId()) + "  " + item.getSubscribeData());
                //如果session列表中不存在本次session,则加入
                if (exists == false && ignoreSession == false) {
                    holder.putSession(session, item);
                }
                if (exists == true && ignoreSession == true) {
                    //暂时关闭推送
                    sessions.remove(session);
                }
            }
        }
    
    
        @Override
        public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) throws Exception {
            if (webSocketSession.isOpen()) {
                webSocketSession.close();
            }
            logger.debug("indexconfig websocket connection closed......");
        }
    
        @Override
        public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception {
            logger.debug("indexconfig websocket connection closed......");
        }
    
        @Override
        public boolean supportsPartialMessages() {
            return false;
        }
    
        /**
         * 根据类型获取session holder
         *
         * @param type
         * @return
         * @throws Exception
         */
        private BaseWSSHolder getHolderByType(String type) throws Exception {
            SubscribeType subscribeType = SubscribeType.getByCode(type);
            BaseWSSHolder holder = null;
            if (subscribeType == null) {
                throw new Exception("数据传入错误");
            }
            switch (subscribeType) {
                case DEVICE_COUNT:
                    //设备数量
                    holder = DeviceWSSHolder.getInstance();
                    break;
                case ALARM_DEVICE_COUNT:
                    //报警数量
                    holder = AlarmDeviceWSSHolder.getInstance();
                    break;
                case STATE_DEVICE_COUNT:
                    //某状态设备数量
                    holder = StateDeviceWSSHolder.getInstance();
                    break;
                case POINT_COUNT:
                    //点位值
                    holder = PointWSSHolder.getInstance();
                    break;
                case WEATHER:
                    //点位值
                    holder = WeatherWSSHolder.getInstance();
                    break;
            }
            if (holder == null) {
                logger.error("不存在对应的存储:" + type);
                throw new Exception("不存在对应的存储:" + type);
            }
            return holder;
        }
    
        private void removeReader(WebSocketSession session) {
            AlarmDeviceWSSHolder.getInstance().removeReader(session, null);
            DeviceWSSHolder.getInstance().removeReader(session, null);
            PointWSSHolder.getInstance().removeReader(session, null);
            StateDeviceWSSHolder.getInstance().removeReader(session, null);
            WeatherWSSHolder.getInstance().removeReader(session, null);
        }
    }
    复制代码

    订阅的信息存储在内存中,形式为<session,List<data>>的键值对

    存储工具类:

    复制代码
    import com.google.common.collect.Lists;
    import org.springframework.web.socket.WebSocketSession;
    
    import java.util.*;
    
    /**
     * 保存设备数量的订阅(哪些session订阅了设备数量)
     * 不存储统计数据值
     */
    public class DeviceWSSHolder implements BaseWSSHolder {
        /**
         * key值为统计,value值回哪些session关心这个点位
         */
        private Map<SubscribeBO, List<WebSocketSession>> sessions;
    
        private DeviceWSSHolder() {
        }
    
        private static class SingletonHolder {
            public final static DeviceWSSHolder holder = new DeviceWSSHolder();
        }
    
        public static DeviceWSSHolder getInstance() {
            return SingletonHolder.holder;
        }
    
        /**
         * 保存统计ID和websocket会话的关系
         *
         * @param s
         * @param subscribeBO
         */
        @Override
        public void putSession(WebSocketSession s, SubscribeBO subscribeBO) {
            if (getInstance().sessions == null) {
                getInstance().sessions = new HashMap<SubscribeBO, List<WebSocketSession>>();
            }
            if (getInstance().sessions.get(subscribeBO) == null) {
                getInstance().sessions.put(subscribeBO,
                        new ArrayList<WebSocketSession>());
            }
            final List<WebSocketSession> list = getInstance().sessions.get(subscribeBO);
            list.add(s);
        }
    
        @Override
        public void removeReader(WebSocketSession reader, SubscribeBO subscribeBO) {
            if (getInstance().sessions != null && reader != null) {
                if (subscribeBO != null) {
                    //移除该session的某个具体订阅
                    List<WebSocketSession> readers = this.getSessionBySubscribe(subscribeBO);
                    if (readers.size() > 0 && readers.contains(reader)) {
                        readers.remove(reader);
                    }
                } else {
                    //移除该session的所有订阅
                    for (Map.Entry<SubscribeBO, List<WebSocketSession>> entry :
                            getInstance().sessions.entrySet()) {
                        List<WebSocketSession> readers = entry.getValue();
                        //确定有session订阅
                        if (readers.size() > 0 && readers.contains(reader)) {
                            readers.remove(reader);
                            break;
                        }
                    }
                }
            }
        }
    
        /**
         * 根据统计ID获取websocket的会话信息
         *
         * @param subscribeBO
         * @return
         */
        @Override
        public List<WebSocketSession> getSessionBySubscribe(SubscribeBO subscribeBO) {
            if (getInstance().sessions == null) {
                getInstance().sessions = new HashMap<SubscribeBO, List<WebSocketSession>>();
            }
            if (getInstance().sessions.get(subscribeBO) == null) {
                getInstance().sessions.put(subscribeBO,
                        new ArrayList<WebSocketSession>());
            }
            return getInstance().sessions.get(subscribeBO);
        }
    
        /**
         * 获取所有有session订阅的业务ID
         * 业务ID带de前缀
         * @return
         */
        public List<String> getEffectDataIds() {
            List<String> ids = Lists.newArrayList();
            if (getInstance().sessions != null) {
                for (Map.Entry<SubscribeBO, List<WebSocketSession>> entry :
                        getInstance().sessions.entrySet()) {
                    List<WebSocketSession> readers = entry.getValue();
                    //确定有session订阅
                    if (readers != null && readers.size() > 0) {
                        SubscribeBO bo = entry.getKey();
                        ids.add(bo.getSubscribeData());//真正的业务id
                    }
                }
            }
            //String idsStr = Joiner.on(",").join(ids);
            return ids;
        }
    
        /**
         * 根据SubscribeBO获取一条订阅信息
         * @param condition
         * @return
         */
        public Map.Entry<SubscribeBO, List<WebSocketSession>> getItemBySubscribeBO(SubscribeBO condition) {
            if (getInstance().sessions != null && condition != null) {
                for (Map.Entry<SubscribeBO, List<WebSocketSession>> entry :
                        getInstance().sessions.entrySet()) {
                    if (entry.getKey().equals(condition)) {
                        return entry;
                    }
                }
            }
            return null;
        }
    
        /*public SubscribeBO getSubscribeByData(Long data) {
            Set<SubscribeBO> boSet = getInstance().sessions.keySet();
            for (SubscribeBO bo : boSet) {
    
                System.out.println(str);
            }
    
            List<Long> ids = Lists.newArrayList();
            if (getInstance().sessions != null) {
                for (Map.Entry<SubscribeBO, List<WebSocketSession>> entry :
                        getInstance().sessions.entrySet()) {
                    List<WebSocketSession> readers = entry.getValue();
                    //确定有session订阅
                    if (readers != null && readers.size() > 0) {
                        SubscribeBO bo = entry.getKey();
                        ids.add(Long.parseLong(bo.getData()));//真正的业务id
                    }
                }
            }
            //String idsStr = Joiner.on(",").join(ids);
            return ids;
        }*/
    }
    复制代码

    订阅工具类(subscribeBO):主要就是将接收到的websocket信息转成java对象

    复制代码
    import com.alibaba.fastjson.annotation.JSONField;
    
    /**
     * 订阅的对象
     */
    public class SubscribeBO {
        @JSONField(name="statisticsId")
        private String subscribeId;
        @JSONField(name="statisticsTypeId")
        private String subscribeTypeId;
        @JSONField(name="statisticsData")
        private String subscribeData;
        @JSONField(name="statisticsValue")
        private String subscribeValue;
    
    
        public SubscribeBO() {
        }
    
        public SubscribeBO(String subscribeTypeId, String subscribeData) {
            this.subscribeTypeId = subscribeTypeId;
            this.subscribeData = subscribeData;
        }
    
        public SubscribeBO(String subscribeId, String subscribeTypeId, String subscribeData) {
            this.subscribeId = subscribeId;
            this.subscribeTypeId = subscribeTypeId;
            this.subscribeData = subscribeData;
        }
    
        public String getSubscribeId() {
            return subscribeId;
        }
    
        public void setSubscribeId(String subscribeId) {
            this.subscribeId = subscribeId;
        }
    
        public String getSubscribeTypeId() {
            return subscribeTypeId;
        }
    
        public void setSubscribeTypeId(String subscribeTypeId) {
            this.subscribeTypeId = subscribeTypeId;
        }
    
        public String getSubscribeData() {
            return subscribeData;
        }
    
        public void setSubscribeData(String subscribeData) {
            this.subscribeData = subscribeData;
        }
    
        public String getSubscribeValue() {
            return subscribeValue;
        }
    
        public void setSubscribeValue(String subscribeValue) {
            this.subscribeValue = subscribeValue;
        }
    
        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
    
            SubscribeBO bo = (SubscribeBO) o;
    
            if (!subscribeTypeId.equals(bo.subscribeTypeId)) return false;
            return subscribeData != null ? subscribeData.equals(bo.subscribeData) : bo.subscribeData == null;
    
        }
    
        @Override
        public int hashCode() {
            int result = subscribeTypeId.hashCode();
            result = 31 * result + (subscribeData != null ? subscribeData.hashCode() : 0);
            return result;
        }
    }
    复制代码

    推送代码太多,主要是通过spring+Quartz进行后台运算,运算完毕之后将值按照订阅(DeviceWSSHolder)反查session,发送到客户端

    复制代码
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.google.common.base.Joiner;
    import com.google.common.base.Splitter;
    import com.google.common.collect.Lists;
    import org.apache.commons.lang.math.RandomUtils;
    import org.quartz.DisallowConcurrentExecution;
    import org.quartz.JobDataMap;
    import org.quartz.JobExecutionContext;
    import org.quartz.JobExecutionException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.scheduling.quartz.QuartzJobBean;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.web.socket.TextMessage;
    import org.springframework.web.socket.WebSocketSession;
    
    import java.io.IOException;
    import java.util.*;
    
    /**
     * 设备数量推送任务
     */
    @DisallowConcurrentExecution
    public class DevicePushJob extends QuartzJobBean {
        private Logger log = LoggerFactory.getLogger(DevicePushJob.class);
       /* @Autowired
        private DeviceService deviceService;*/
    
        @SuppressWarnings("unused")
        @Override
        protected void executeInternal(JobExecutionContext context)
                throws JobExecutionException {
            final JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
            final Object object = jobDataMap.get(JOBDATA_KEY_APPCTX);
            final ApplicationContext appCtx = (ApplicationContext) object;
            final DeviceWSSHolder deviceWSSHolder = (DeviceWSSHolder) jobDataMap
                    .get(JOBDATA_KEY_INDEXCONFIG_DEVICE);
            List<String> ids  = deviceWSSHolder.getEffectDataIds();
            if(ids.size()==0){
                return;
            }
            //String idsStr = Joiner.on(",").join(ids);
            //System.out.println("××××××××××××××××××要查询的设备类别是:"+idsStr);
            //log.info("××××××××××××××××××要查询的设备类别是:"+idsStr);
            //查询数据 id,type,value 把数据都装到新的List<SubscribeBO>中发送,DeviceWSSHolder数据仅作为字典查询用
            List<Integer> integers = Lists.newArrayList();
            for (String typeIdFmt : ids) {
                List<String> result = Splitter.onPattern(PORTLET_TREE_DEVICETYPE_FORMAT)
                        .omitEmptyStrings().splitToList(typeIdFmt);
                Integer id = Integer.parseInt(result.get(0));
                integers.add(id);
            }
            List<SubscribeBO> subscribeBOs = Lists.newArrayList();
            DeviceService deviceService = appCtx.getBean(DeviceService.class);
            Map<Integer,Integer> deviceCounts =  deviceService.countDeviceByDeviceType(integers,false);
            if (deviceCounts == null || deviceCounts.size()==0) {
                return;
            }
            for (Map.Entry<Integer, Integer> deviceCount : deviceCounts.entrySet()) {
                Integer deviceTypeId = deviceCount.getKey();
                Integer count = deviceCount.getValue();
                SubscribeBO sb = new SubscribeBO(SubscribeType.DEVICE_COUNT
                        .getCode(),PORTLET_TREE_DEVICETYPE_FORMAT+deviceTypeId.toString());
                sb.setSubscribeValue(""+count);
                subscribeBOs.add(sb);
            }
            for(SubscribeBO bo:subscribeBOs){
                Map.Entry<SubscribeBO, List<WebSocketSession>> entry = DeviceWSSHolder
                        .getInstance().getItemBySubscribeBO(bo);
                if(entry !=null){
                    SubscribeBO temp = entry.getKey();
                    bo.setSubscribeId(temp.getSubscribeId());
                    List<WebSocketSession> sessions = entry.getValue();
                    Iterator<WebSocketSession> iterator = sessions.iterator();
                    while (iterator.hasNext()) {
                        WebSocketSession session = iterator.next();
                        if (session != null && session.isOpen()) {
                            try {
                                JSONObject ret = new JSONObject();
                                ret.put("success", true);
                                List<SubscribeBO> retSbo = Lists.newArrayList(bo);
                                ret.put("data", retSbo);
                                String jsonString = JSON.toJSONString(ret);
                                //System.err.println(jsonString);
                                log.info(jsonString);
                                session.sendMessage(new TextMessage(jsonString));
                            } catch (IOException e) {
                                log.error(e.getMessage());
                            }
                        }else{
                            iterator.remove();
                        }
                    }
                }
            }
    
        }
    
    }
    复制代码

     附:seajs封装的前端js工具类

    复制代码
    define(function (require, exports, module) {
    
        var devWebSocket = {};
        var indexConfigSocket = function (opt) {
            if ('WebSocket' in window) {
                devWebSocket = new WebSocket("ws://" + window.location.host + basePath + "/app/indexConfig/indexConfigWebSocket.do");
            } else if ('MozWebSocket' in window) {
                devWebSocket = new MozWebSocket(
                    "ws://" + window.location.host + basePath + "/ws/point/webSocketServer.do");
            } else {
                devWebSocket = new SockJS(
                    "http://" + window.location.host + basePath + "/ws/point/webSocketServer.do");
            }
            devWebSocket.onopen = function (evnt) {
            };
            devWebSocket.onmessage = function (evnt) {
                //console.log("onMessage:"+"</br>(<font color='red'>" + evnt.data + "</font>)")
                window.PubSub.publish('indexConfigSocket-onMessage', evnt);
            };
            devWebSocket.onerror = function (e) {
                console.log('indexConfig webSocket error...');
                for (var p in e) {
                    //alert(p + "=" + e[p]);
                }
            };
            devWebSocket.onclose = function (evnt) {
                console.log('indexConfig webSocket error...');
            };
        };
    
        indexConfigSocket.prototype.send = function (indexConfigIdsStr) {
            var indexConfig = {};
            indexConfig.data = indexConfigIdsStr;
            indexConfig.optType = '';
            var t = JSON.stringify(indexConfig);
            console.log("</br>请求报文:<font color='blue'>" + t + "</font>")
            devWebSocket.send(t);
        };
    
        indexConfigSocket.prototype.close = function (indexConfigIdsStr) {
            var indexConfig = {};
            indexConfig.data = indexConfigIdsStr == null ? [] : indexConfigIdsStr;
            indexConfig.optType = 'pausePush';
            var t = JSON.stringify(indexConfig);
            console.log("关闭报文:" + t);
            devWebSocket.send(t);
        };
    
    
        module.exports = indexConfigSocket;
    
    })
    复制代码
     
     
     
  • 相关阅读:
    ICMPv6 Type 和 rfc
    Redis学习
    Vue学习(一)
    《一线架构师实践指南》读后感(五)
    《一线架构师实践指南》读后感(四)
    Stream流
    泛型
    《架构漫谈》读后感
    《一线架构师实践指南》读后感(三)
    《一线架构师实践指南》读后感(二)
  • 原文地址:https://www.cnblogs.com/daxiongblog/p/7048650.html
Copyright © 2011-2022 走看看