zoukankan      html  css  js  c++  java
  • spring websocket集群问题的简单记录

    前言

    最近公司里遇到一个问题,在集群中一些websocket的消息丢失了。
    产生问题的原理很简单,发送消息的服务和接收者连接的服务不是同一个服务。

    解决方案

    用中间件(mq, redis etc.)来在服务之间进行通信。

    不直接发送websocket消息,而是将消息放在mq或者redis的list中。
    并在redis中维护连接信息,服务根据连接信息来判断自己是否需要处理消息,或者将消息发给接收者连接的服务。

    代码示例

    我们的项目中使用的是Spring WebSocket,并且使用了STOMP协议,可以去官网查看文档。

    代码示例只做维护连接信息的代码示例,其他部分就不放上来了。

    维护连接信息的代码示例

    想要在维护STOMP协议的连接信息,可以查看文档的这一部分Listening To ApplicationContext Events and Intercepting Messages

    这里的连接信息只要是能够标识出不同的服务就OK。

    一下是监听了订阅事件的Listener的部分代码:

    package cn.fjhdtp.websocket.interceptor;
    
    import java.util.Map;
    
    import org.apache.commons.lang.StringUtils;
    import org.springframework.http.server.ServerHttpRequest;
    import org.springframework.http.server.ServerHttpResponse;
    import org.springframework.web.socket.WebSocketHandler;
    import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
    
    public class LoginInfoInterceptor extends HttpSessionHandshakeInterceptor {
    
        @Override
        public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
                WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
    			//握手前,往attributes中增加所需信息
    
    
            Object loginBean = ...;//获取登录的用户信息(或其他信息)
            attributes.put(WebSocketConstant.WEBSOKET_LOGINBEAN,loginBean);
    
            return super.beforeHandshake(request, response, wsHandler, attributes);
        }
    }
    
    package cn.fjhdtp.listener;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.ApplicationListener;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    import org.springframework.stereotype.Component;
    import org.springframework.web.socket.messaging.SessionSubscribeEvent;
    
    import java.util.Map;
    
    @Component
    public class SessionSubscribeEventListener implements ApplicationListener<SessionSubscribeEvent> {
    
        @Autowired
        @Qualifier("serversideMessageTaskExecutor")
        private ThreadPoolTaskExecutor threadPoolTaskExecutor;
        @Autowired
        private IMessageHandler messageHandler;
    
        @Override
        public void onApplicationEvent(SessionSubscribeEvent event) {
    			//获取订阅的destination
              String destination = (String) event.getMessage().getHeaders().get("simpDestination");
                //获取登录信息
                Object loginBean = ((Map) event.getMessage().getHeaders().get("simpSessionAttributes")).get(WebSocketConstant.WEBSOKET_LOGINBEAN);
    			//TODO 向redis中增加连接信息
        }
    }
    
    package cn.fjhdtp.message.listener;
    
    import org.springframework.context.ApplicationListener;
    import org.springframework.stereotype.Component;
    import org.springframework.web.socket.messaging.SessionDisconnectEvent;
    
    
    import java.util.Map;
    
    @Component
    public class SessionDisconnectEventListener implements ApplicationListener<SessionDisconnectEvent> {
    
        @Override
        public void onApplicationEvent(SessionDisconnectEvent event) {
            // stomp连接断开,清除连接信息
    		//从attributes中获取登录信息(或其他信息)
            Object loginBean = ((Map) event.getMessage().getHeaders().get("simpSessionAttributes")).get(WebSocketConstant.WEBSOKET_LOGINBEAN);
    
            //从redis中移除连接信息
        }
    }
    

    当然,有些情况下可能不会正常的触发断开连接的事件(在was下就不会有这个事件),因此还会需要HeartBeat。

  • 相关阅读:
    vue.js 系列教程
    vue.js 生命周期
    MVVM代码例子
    vue.js windows下开发环境搭建
    Vue.js 之修饰符详解
    elementUi——适合于Vue的UI框架
    Vue.js——60分钟快速入门
    Keil sct分散加载文件
    Keil ARM-CM3 printf输出调试信息到Debug (printf) Viewer
    Cortex-M3(NXP LPC 1788) 启动代码
  • 原文地址:https://www.cnblogs.com/FJH1994/p/8998900.html
Copyright © 2011-2022 走看看