zoukankan      html  css  js  c++  java
  • WebSocket实现(nginx、后端)

    一个完整的WebSocket流程(java实现):

    nginx配置

    upstream paas_gateway {
        least_conn;
        server ip:port;
        keepalive 1000;
        keepalive_timeout 65;
    }
    
    server{
        listen 9001;
        server_name localhost;
        root /paas-web;
        index index.html;
        location /paas-web {
            ....
        }
        location /paas-ws {
            rewrite ^/paas-ws/(.*)$ /$1 break; 
            proxy_pass http://paas_gateway; 
            proxy_http_version 1.1; 
            proxy_set_header Upgrade $http_upgrade; 
            proxy_set_header Connection "upgrade"; 
            proxy_redirect off;
            proxy_set_header Host $host; 
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_Forwarded_for;
            proxy_next_upstream error timeout invalid_header http_500 http_502 http_503 http_504; 
            proxy_max_temp_file_size 0;
            proxy_connect_timeout 90s; 
            proxy_send_timeout 90s;
            proxy_read_timeout 90s; 
            proxy_buffer_size 4k; 
            proxy_buffers 4 32k;
            proxy_busy_buffers_size 64k; 
            proxy_temp_file_write_size 64k;
        }
    }
    

    设置WebSocket路由请求,当前端请求/paas-ws走以上配置。

    gateWay网关设置

    spring:
        cloud:
            gateway:
                routes: #具体的路由信息,是一个数组,每一个路由基本包含部分:
                - id: bbb(模块名称)
                   uri: lb://bbb
                   predicates:
                   - Path=/bbbapi/**
                 - id: bbb-ws
                   uri: lb:ws://bbb
                   predicates:
                   - Path=/bbbwsapi/**
    

    java代码实现

    1.SpringBoot启动设置(WSServer为websocket请求类):

    @SpringBootApplication
    public static void main(String[] args) {
        SpringApplication springApplication = new SpringApplication(RunApplication.class);
        ConfigurableApplicationContext configurableApplicationContext = springApplication.run(args);
        //解决WebSocket不能注入的问题
        WSServer.setApplicationContext(configurableApplicationContext);
    }
    
    

    2.WebSocket实现:

    @ServerEndpoint(value = "server/{userId}")
    @Component
    @Slf4j
    public class WSServer {
    
        //此处是解决无法注入的关键
        private static ApplicationContext applicationContext;
        //要注入的service或者dao
        private WSService wsService;
    
        public static void setApplicationContext(ApplicationContext applicationContext) {
            WSServer.applicationContext = applicationContext;
        }
    
    
        /**
         * 当前服务端的连接数
         */
        private static int onlineCount = 0;
    
        /**
         * 与某个客户端的连接会话,需要通过Session对象来向客户端发送数据
         */
        private Session session;
    
        /**
         * 用户id
         */
        private String userId;
        /**
         * 存放每个客户端对应的webSocket对象
         */
        private static Map<Session, String> webSocketMap = new ConcurrentHashMap<>();
    
        /**
         * 增加连接数
         */
        public static synchronized void addOnlineCount() {
            onlineCount++;
        }
    
        /**
         * 减少连接数
         */
        public static synchronized void subOnlineCount() {
            onlineCount--;
        }
    
        /**
         * 连接成功时调用的方法
         *
         * @param session 与某个客户端的连接回话,通过这个参数给客户端发送数据
         * @param userId  用户id
         */
        @OnOpen
        public void onOpen(Session session, @PathParam("userId") String userId) {
            // 连接数增加
            addOnlineCount();
            this.session = session;
            this.userId = userId;
            // 保存连接关联信息
            webSocketMap.put(session, userId);
            log.error("有新连接加入!当前在线人数为" + getOnlineCount());
            log.error("当前用户开始[{}]", webSocketMap);
            this.onConnectQueryAndSend(userId);
        }
    
        @OnClose
        public void onClose(Session session) {
            webSocketMap.remove(session);
            subOnlineCount();
            log.error("有一连接关闭!当前在线人数为" + getOnlineCount());
            log.error("当前用户关闭[{}]", webSocketMap);
        }
    
        public static synchronized int getOnlineCount() {
            return onlineCount;
        }
    
        /**
         * 设置前后端通信
         */
        @OnMessage
        public void onMessage(String message, Session session) {
            try {
                if (webSocketMap.containsKey(userId)) {
                    if (message.equals("ping")) {
                        session.getBasicRemote().sendText("pong");
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @OnError
        public void onError(Session session, Throwable throwable) {
            subOnlineCount();
            webSocketMap.remove(userId);
            if (throwable.getClass().equals(EOFException.class)) {
                log.info("用户:{}的连接因nginx连接超时重连,等待客户端重连", userId);
            } else {
                // TODO: 2018/11/26 统一异常信息
                throwable.printStackTrace();
            }
        }
        
         public void onConnectQueryAndSend(String userId) {
            // 查询用户当前所有的消息
       		....
        }
    }
    

    前端请求:localhost:9001/paas-ws/bbbwsapi/server/1

    最后:
    前端WebSocket实现,笔者不会,这里没有列出。

  • 相关阅读:
    Storm-源码分析-Stats (backtype.storm.stats)
    Storm-源码分析-Topology Submit-Task-TopologyContext (backtype.storm.task)
    Storm-源码分析-Streaming Grouping (backtype.storm.daemon.executor)
    Storm-源码分析-Topology Submit-Worker
    Storm-源码分析- Messaging (backtype.storm.messaging)
    Storm-源码分析-LocalState (backtype.storm.utils)
    Storm-源码分析- Disruptor在storm中的使用
    LMAX Disruptor 原理
    Shiro学习(7)与Web整合
    MQTT---HiveMQ源代码具体解释(十四)Persistence-LocalPersistence
  • 原文地址:https://www.cnblogs.com/snail-gao/p/11876820.html
Copyright © 2011-2022 走看看