zoukankan      html  css  js  c++  java
  • SpringBoot整合WebSocket的客户端和服务端的实现

    本文是项目中使用了websocket进行一些数据的推送,对比项目做了一个demo,ws的相关问题不做细数,仅做一下记录。

        此demo针对ws的搭建主要逻辑背景是一个服务端B:通讯层    产生消息推送出去,另外一个项目A充当客户端和服务端,A的客户端:是接收通讯层去无差别接收这些消息,A的服务端:根据地址ip去订阅。用户通过订阅A的ws,同时记录下自己的信息,项目B推送的消息,项目A接收到之后通过当初订阅的逻辑和一些权限过滤条件对项目B产生的消息进行过滤再推送到用户客户端上。

        一、项目中服务端的创建

    首先引入maven仓库

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-websocket</artifactId>
            </dependency>

    websocket的服务端搭建

     同时注意springboot要开启ws服务

    启动类加上@EnableScheduling

    简要解读demo

     /webSocket/{id}:链接的id是业务上的一个id,这边之前做过类似拍卖的,相当于一个服务端或者业务上的一个标识,是客户端指明链接到哪一个拍卖间的标识

     @ServerEndpoint:作为服务端的注解。

    package com.ghh.myproject.websocket;
    
    import cn.hutool.core.lang.UUID;
    import com.alibaba.fastjson.JSON;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    import javax.websocket.*;
    import javax.websocket.server.PathParam;
    import javax.websocket.server.ServerEndpoint;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    @ServerEndpoint("/webSocket/{id}")
    @Component
    public class WebSocket {
    
        private Logger log = LoggerFactory.getLogger(WebSocket.class);
    
        private static int onlineCount = 0;
        /** 创建一个map存放   产生的ws链接推送 */
        private static Map<String, WebSocket> clients = new ConcurrentHashMap<>();
        /** 创建一个map存放   当前接入的客户端 */
        private static Map<String, String> idMap = new ConcurrentHashMap<>();
        
        private Session session;
        /** 链接进入的一个场景id */
        private String id;
        /** 每一个链接的一个唯一标识 */
        private String userNo;
    
        /**
        * @Description: 第三方文接入当前项目websocket后的记录信息
        * @DateTime: 2021/7/5 10:02
        * @Author: GHH
        * @Params: [id, session]
        * @Return void
        */
        @OnOpen
        public void onOpen(@PathParam("id") String id, Session session) throws IOException {
            log.info("已连接到id:{}竞拍场,当前竞拍场人数:{}", id, getUserNosById(id).size());
            this.id = id;
            this.session = session;
            // 生成一个随机序列号来存储一个id下的所有用户
            this.userNo = UUID.fastUUID().toString();
            addOnlineCount();
            //根据随机序列号存储一个socket连接
            clients.put(userNo, this);
            idMap.put(userNo, id);
    
    
        }
    
    
        /**
        * @Description: 关闭连接
        * @DateTime: 2021/7/5 10:02
        * @Author: GHH
        * @Params: []
        * @Return void
        */
        @OnClose
        public void onClose() throws IOException {
            clients.remove(userNo);
            idMap.remove(userNo);
            subOnlineCount();
    
        }
    
        /**
        * @Description: 客户端发送消息调用此方法
        * @DateTime: 2021/6/16 15:35
        * @Author: GHH
        * @Params: [message]
        * @Return void
        */
        @OnMessage
        public void onMessage(String message) throws IOException {
    //        JSONObject jsonTo = JSONObject.parseObject(message);
    //        String mes = (String) jsonTo.get("message");
    //        if (!("All").equals(jsonTo.get("To"))) {
    //            sendMessageTo(mes, jsonTo.get("To").toString());
    //        } else {
    //            sendMessageAll(message);
    //        }
            log.info("onMessage方法成功");
        }
    
        @OnError
        public void onError(Session session, Throwable error) {
            log.error("{}", error);
        }
    
        public static void sendMessageTo(String message, String userNo) throws IOException {
            // session.getBasicRemote().sendText(message);
            //session.getAsyncRemote().sendText(message);
            WebSocket webSocket = clients.get(userNo);
            if (webSocket != null && webSocket.session.isOpen()) {
                webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
            }
    
        }
    
    
        /**
        * @Description: 推送到指定的id值的记录
        * @DateTime: 2021/6/15 17:11
        * @Author: GHH
        * @Params: [message, id]
        * @Return void
        */
        public static void sendMessageToById(String message, String id) {
            // session.getBasicRemote().sendText(message);
            //session.getAsyncRemote().sendText(message);
            //根据id获取所有的userNo链接的用户
            List<String> userNos = getUserNosById(id);
    
            for (WebSocket item : clients.values()) {
                //遍历链接的value值,如果当前传入的id中链接的用户包含value值,则推送。
                if (userNos.contains(item.userNo)) {
                    item.session.getAsyncRemote().sendText(message);
                }
            }
        }
    
    
        /**
        * @Description: 推送所有开启的信息
        * @DateTime: 2021/6/15 17:13
        * @Author: GHH
        * @Params: [message]
        * @Return void
        */
        public static void sendMessageAll(String message){
            for (WebSocket item : clients.values()) {
                item.session.getAsyncRemote().sendText(message);
            }
        }
    
    
        public static synchronized int getOnlineCount() {
            return onlineCount;
        }
    
    
        public static synchronized void addOnlineCount() {
            WebSocket.onlineCount++;
        }
    
    
        public static synchronized void subOnlineCount() {
            WebSocket.onlineCount--;
        }
    
    
        public static synchronized Map<String, WebSocket> getClients() {
            return clients;
        }
    
    
        /**
        * @Description: 根据相应场景的一些逻辑处理
        * @DateTime: 2021/7/5 10:03
        * @Author: GHH
        * @Params: [id]
        * @Return java.util.List<java.lang.String>
        */
        public static List<String> getUserNosById(String id) {
            ArrayList<String> userNos = new ArrayList<>();
    
            for (Map.Entry<String, String> entry : idMap.entrySet()) {
                if (entry.getValue().equals(id)) {
                    userNos.add(entry.getKey());
                }
            }
            return userNos;
        }
    
    }

     demo中模拟的是定时器推送,第一个参数是消息内容,第二个是推送到哪一个拍卖间或者其他业务上的内容。方法的具体内容上一段代码有详细解释,有通过id,或者发送给全部ws链接的客户端

    WebSocket.sendMessageToById(""+count,2+"");
        @Scheduled(cron = "*/5 * * * * ?")
        public void job1(){
            log.info("测试生成次数:{}",count);
            redisTemplate.opsForValue().set("测试"+count, ""+count++);
            if (count%2==0){
                WebSocket.sendMessageToById(""+count,2+"");
            }else {
                WebSocket.sendMessageToById(""+count,1+"");
            }
    
            log.info("websocket发送"+count);
        }

    二、java充当客户端链接ws。

      上述是java作为ws服务端推送当前业务信息的一个demo。我们项目目前做的是一个通讯层的概念,只能够推送数据内容,却无法根据用户权限去推送不同的数据。

       ws客户端的搭建,首先链接ws服务端。首先是我们另外一个服务的ws配置信息,我这边demo是模拟链接上面的ws服务

      1、ws客户端的配置

    package com.ghh.websocketRecive.wsMessage;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    import javax.websocket.ContainerProvider;
    import javax.websocket.Session;
    import javax.websocket.WebSocketContainer;
    import java.net.URI;
    
    /**
     * @author ghh
     * @date 2019-08-16 16:02
     */
    @Component
    @Slf4j
    public class WSClient {
        public static Session session;
    
    
        public static void startWS() {
            try {
                if (WSClient.session != null) {
                    WSClient.session.close();
                }
                WebSocketContainer container = ContainerProvider.getWebSocketContainer();
                //设置消息大小最大为10M
                container.setDefaultMaxBinaryMessageBufferSize(10*1024*1024);
                container.setDefaultMaxTextMessageBufferSize(10*1024*1024);
                // 客户端,开启服务端websocket。
                String uri = "ws://192.168.0.108:8082/webSocket/1";
                Session session = container.connectToServer(WSHandler.class, URI.create(uri));
                WSClient.session = session;
            } catch (Exception ex) {
                log.info(ex.getMessage());
            }
        }
    }

       2、配置信息需要在项目启动的时候去启用和链接ws服务

    package com.ghh.websocketRecive;
    
    
    import com.ghh.websocketRecive.wsMessage.WSClient;
    import lombok.extern.slf4j.Slf4j;
    import org.mybatis.spring.annotation.MapperScan;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    import javax.annotation.PostConstruct;
    
    @Slf4j
    @EnableScheduling
    @SpringBootApplication
    @MapperScan("com.ghh.websocketRecive.dao")
    public class WebsocketReciveApplication {
        public static void main(String[] args) {
            SpringApplication.run(WebsocketReciveApplication.class, args);
        }
    
        @PostConstruct
        public void init(){
            log.info("初始化应用程序");
         // 初始化ws,链接服务端 WSClient.startWS(); } }

      3、接收服务端推送的消息进行权限过滤demo

    @ClientEndpoint:作为ws的客户端注解,@OnMessage接收服务端推送的消息。
    package com.ghh.websocketRecive.wsMessage;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.ghh.websocketRecive.entity.Student;
    import com.ghh.websocketRecive.service.UserService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import javax.websocket.*;
    import java.util.Objects;
    import java.util.Set;
    
    import static com.ghh.websocketRecive.wsMessage.WSClient.startWS;
    
    
    @ClientEndpoint
    @Slf4j
    @Component
    public class WSHandler {
        @Autowired
        RedisTemplate<String,String> redisTemplate;
        private static RedisTemplate<String,String> redisTemplateService;
    
        @PostConstruct
        public void init() {
            redisTemplateService=redisTemplate;
        }
    
        @OnOpen
        public void onOpen(Session session) {
            WSClient.session = session;
        }
    
        @OnMessage
        public void processMessage(String message) {
            log.info("websocketRecive接收推送消息"+message);
            int permission = Integer.parseInt(message)%5;
            //查询所有订阅的客户端的ip。
            Set<String> keys = redisTemplateService.keys("ip:*");
            for (String key : keys) {
                // 根据登录后存储的客户端ip,获取权限地址
                String s = redisTemplateService.opsForValue().get(key);
                String[] split = s.split(",");
                for (String s1 : split) {
                    //向含有推送过来的数据权限地址的客户端推送告警数据。
                    if (s1.equals(permission+"")){
                        WebSocket.sendMessageToByIp(message,key.split(":")[1]);
                    }
                }
            }
        }
    
        @OnError
        public void processError(Throwable t) {
            WSClient.session = null;
            try {
                Thread.sleep(5000);
                startWS();
            } catch (InterruptedException e) {
                log.error("---websocket processError InterruptedException---", e);
            }
            log.error("---websocket processError error---", t);
        }
    
        @OnClose
        public void processClose(Session session, CloseReason closeReason) {
            log.error(session.getId() + closeReason.toString());
        }
    
        public void send(String sessionId, String message) {
            try {
                log.info("send Msg:" + message);
                if (Objects.nonNull(WSClient.session)) {
                    WSClient.session.getBasicRemote().sendText(message);
                } else {
                    log.info("---websocket error----");
                }
    
            } catch (Exception e) {
                log.error("---websocket send error---", e);
            }
    
        }
    }

     4、ws客户端推送消息,推送消息和上面服务端类似。这边是根据ip

    package com.ghh.websocketRecive.wsMessage;
    
    import cn.hutool.core.lang.UUID;
    import com.alibaba.fastjson.JSON;
    import com.ghh.websocketRecive.service.UserService;
    import lombok.Builder;
    import lombok.Data;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import javax.websocket.*;
    import javax.websocket.server.PathParam;
    import javax.websocket.server.ServerEndpoint;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    @ServerEndpoint("/webSocket/{ip}")
    @Component
    public class WebSocket {
    
        private Logger log = LoggerFactory.getLogger(WebSocket.class);
    
        private static int onlineCount = 0;
    
        private static Map<String, WebSocket> clients = new ConcurrentHashMap<>();
    
        private Session session;
        /** 当前连接服务端的客户端ip */
        private String ip;
    
        @Autowired
        RedisTemplate<String,String> redisTemplate;
        private static RedisTemplate<String,String> redisTemplateService;
        @PostConstruct
        public void init() {
            redisTemplateService = redisTemplate;
        }
    
    
        @OnOpen
        public void onOpen(@PathParam("ip") String ip, Session session) throws IOException {
            log.info("ip:{}客户端已连接:,当前客户端数量:{}", ip, onlineCount+1);
            this.ip = ip;
            this.session = session;
            // 接入一个websocket则生成一个随机序列号
            addOnlineCount();
            //根据随机序列号存储一个socket连接
            clients.put(ip, this);
        }
    
    
        @OnClose
        public void onClose() throws IOException {
            clients.remove(ip);
            onlineCount--;
            subOnlineCount();
        }
    
        /**
        * @Description: 客户端发送消息调用此方法
        * @DateTime: 2021/6/16 15:35
        * @Author: GHH
        * @Params: [message]
        * @Return void
        */
        @OnMessage
        public void onMessage(String message) throws IOException {
            log.info("客户端发送消onMessage方法成功");
        }
    
        @OnError
        public void onError(Session session, Throwable error) {
            log.error("{}", error);
        }
    
        public static void sendMessageTo(String message, String userNo) throws IOException {
            WebSocket webSocket = clients.get(userNo);
            if (webSocket != null && webSocket.session.isOpen()) {
                webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
            }
        }
    
        /**
        * @Description: 推送到指定的ip值的记录
        * @DateTime: 2021/6/15 17:11
        * @Author: GHH
        * @Params: [message, id]
        * @Return void
        */
        public static void sendMessageToByIp(String message, String ip) {
            for (WebSocket item : clients.values()) {
                //遍历链接的value值,如果当前传入的ip中链接的用户包含value值,则推送。
                if (item.ip.equals(ip)) {
                    item.session.getAsyncRemote().sendText(message);
                }
            }
        }
    
    
        /**
        * @Description: 推送所有开启的信息
        * @DateTime: 2021/6/15 17:13
        * @Author: GHH
        * @Params: [message]
        * @Return void
        */
        public static void sendMessageAll(String message){
            for (WebSocket item : clients.values()) {
                item.session.getAsyncRemote().sendText(message);
            }
        }
    
    
        public static synchronized int getOnlineCount() {
            return onlineCount;
        }
    
    
        public static synchronized void addOnlineCount() {
            WebSocket.onlineCount++;
        }
    
    
        public static synchronized void subOnlineCount() {
            WebSocket.onlineCount--;
        }
    
    
        public static synchronized Map<String, WebSocket> getClients() {
            return clients;
        }
    
    }

    概述:

        至此,简易的demo搭建完成,项目gitee网址:https://gitee.com/ghhNB/study.git

        如有疑惑部分,欢迎大家积极探讨

  • 相关阅读:
    2017普及组D1T3 洛谷P3956 棋盘
    2017提高组D1T1 洛谷P3951 小凯的疑惑
    Title
    Title
    Title
    Title
    Title
    Title
    Title
    Title
  • 原文地址:https://www.cnblogs.com/guanyuehao0107/p/14971494.html
Copyright © 2011-2022 走看看