zoukankan      html  css  js  c++  java
  • springboot websocket集群(stomp协议)连接时候传递参数

    最近在公司项目中接到个需求。就是后台跟前端浏览器要保持长连接,后台主动往前台推数据。

    网上查了下,websocket stomp协议处理这个很简单。尤其是跟springboot 集成。

    但是由于开始是单机玩的,很顺利。

    但是后面部署到生产搞集群的话,就会出问题了。

    假如集群两个节点,浏览器A与节点A建立连接,A节点发的消息浏览器A节点肯定能收到。但是B节点由于没有跟浏览器A建立连接。B节点发的消息浏览器就收不到了。

    网上也查了好多,但是没有一个说的很清楚的,也很多都是理论层面的。

    还有很多思路都是通过session获取信息的。但是这都不是我需要的。我需要的是从前台传递参数,连接的时候每个节点保存下。然后通过SimpleUserRegistry.getUser获取。

    话不多说,直接上代码。

    <script type="text/javascript" src="${request.contextPath}/scripts/sockjs.min.js"></script>
    <script type="text/javascript" src="${request.contextPath}/scripts/stomp.min.js"></script>
    var WEB_SOCKET = {
        
            topic : "",
            url : "",
            stompClient : null,
            
            connect : function(url, topic, callback,userid) {
                this.url = url;
                this.topic = topic;
                var socket = new SockJS(url); //连接SockJS的endpoint名称为"endpointOyzc"
                WEB_SOCKET.stompClient = Stomp.over(socket);//使用STMOP子协议的WebSocket客户端
                WEB_SOCKET.stompClient.connect({userid:userid},function(frame){//连接WebSocket服务端
                    // console.log('Connected:' + frame);
                    //通过stompClient.subscribe订阅/topic/getResponse 目标(destination)发送的消息
                    WEB_SOCKET.stompClient.subscribe(topic, callback);
                });
            }
    };

    这是响应的前端代码。只需要引入两个js。调用new SockJS(url) 就代表跟服务器建立连接了。

    @Configuration
    
    //注解开启使用STOMP协议来传输基于代理(message broker)的消息,这时控制器支持使用@MessageMapping,就像使用@RequestMapping一样
    @EnableWebSocketMessageBroker
    public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    
        @Autowired
        private GetHeaderParamInterceptor getHeaderParamInterceptor;
    
        @Override
        //注册STOMP协议的节点(endpoint),并映射指定的url
        public void registerStompEndpoints(StompEndpointRegistry registry) {
            //注册一个STOMP的endpoint,并指定使用SockJS协议
            registry.addEndpoint("/endpointOyzc")
                    .setAllowedOrigins("*")
                    .withSockJS();
           /* registry.addEndpoint("/endpointOyzc")
                    .setAllowedOrigins("*")
                    .setHandshakeHandler(xlHandshakeHandler)
                    .withSockJS();*/
        }
    
        @Override
        //配置消息代理(Message Broker)
        public void configureMessageBroker(MessageBrokerRegistry registry) {
            //点对点应配置一个/user消息代理,广播式应配置一个/topic消息代理
            registry.enableSimpleBroker("/topic", "/user");
            // 全局使用的消息前缀(客户端订阅路径上会体现出来)
            //registry.setApplicationDestinationPrefixes("/app");
            //点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
            registry.setUserDestinationPrefix("/user");
        }
    
        /**
         * 采用自定义拦截器,获取connect时候传递的参数
         *
         * @param registration
         */
        @Override
        public void configureClientInboundChannel(ChannelRegistration registration) {
            registration.interceptors(getHeaderParamInterceptor);
        }
    }

    注:上面的endpointOyzc就是前端的url。后面注册端点,前台链接。

    然后注意下configureClientInboundChannel这个方法,这个方法里面注入拦截器就是为了链接时候接收参数的。

    /**
     * @author : hao
     * @description : websocket建立链接的时候获取headeri里认证的参数拦截器。
     * @time : 2019/7/3 20:42
     */
    @Component
    public class GetHeaderParamInterceptor extends ChannelInterceptorAdapter {
    
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
            if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
                if (raw instanceof Map) {
                    Object name = ((Map) raw).get("userid");
                    if (name instanceof LinkedList) {
                        // 设置当前访问的认证用户
                        accessor.setUser(new JqxxPrincipal(((LinkedList) name).get(0).toString()));
                    }
                }
            }
            return message;
        }
    }
    /**
     * @author : hao
     * @description : 自定义的java.security.Principal
     * @time : 2019/7/3 20:42
     */
    public class JqxxPrincipal implements Principal {
    
        private String loginName;
    
        public JqxxPrincipal(String loginName) {
            this.loginName = loginName;
        }
    
        @Override
        public String getName() {
            return loginName;
        }
    }

    这样就存入的前台传的参数。

    后台发消息的时候怎么发呢?

    /**
     * @author : hao
     * @description : websocket发送代理,负责发送消息
     * @time : 2019/7/4 11:01
     */
    @Component
    @Slf4j
    public class WebsocketSendProxy<T> {
        @Autowired
        private SimpMessagingTemplate template;
    
        @Autowired
        private SimpUserRegistry userRegistry;
    
        @Resource(name = "redisServiceImpl")
        private RedisService redisService;
    
        @Value("spring.redis.message.topic-name")
        private String topicName;
    
        public void sendMsg(RedisWebsocketMsg<T> redisWebsocketMsg) {
            SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());
            log.info("发送消息前获取接收方为{},根据Registry获取本节点上这个用户{}", redisWebsocketMsg.getReceiver(), simpUser);
            if (simpUser != null && StringUtils.isNotBlank(simpUser.getName())) {
                //2. 获取WebSocket客户端的订阅地址
                WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());
                if (channelEnum != null) {
                    //3. 给WebSocket客户端发送消息
                    template.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());
                }
            } else {
                //给其他订阅了主题的节点发消息,因为本节点没有
                redisService.convertAndSend(topicName, redisWebsocketMsg);
            }
    
        }
    }

    可以发现上面代码利用了redis监听模型,也就是redis模型的消息队列

    /**
     * @author : hao
     * @description : redis消息监听实现类,接收处理类
     * @time : 2019/7/3 14:00
     */
    @Component
    @Slf4j
    public class MessageReceiver {
    
        @Autowired
        private SimpMessagingTemplate messagingTemplate;
    
        @Autowired
        private SimpUserRegistry userRegistry;
    
        /**
         * 处理WebSocket消息
         */
        public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) {
            log.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg));
            //1. 取出用户名并判断是否连接到当前应用节点的WebSocket
            SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());
    
            if (simpUser != null && StringUtils.isNotBlank(simpUser.getName())) {
                //2. 获取WebSocket客户端的订阅地址
                WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());
                if (channelEnum != null) {
                    //3. 给WebSocket客户端发送消息
                    messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());
                }
            }
        }
    }

    redis消息模型只贴部分代码就好了

    /**
         * 消息监听器
         */
        @Bean
        MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){
            //消息接收者以及对应的默认处理方法
            MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageReceiver, "receiveMessage");
            //消息的反序列化方式
            messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);
    
            return messageListenerAdapter;
        }
    
        /**
         * message listener container
         */
        @Bean
        RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory
                , MessageListenerAdapter messageListenerAdapter){
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            //添加消息监听器
            container.addMessageListener(messageListenerAdapter, new PatternTopic(topicName));
    
            return container;
        }

    上面的思路大体如下:客户端简历链接时候,传过来userid保存起来。发消息的时候 通过userRegistry获取,能获取到就证明是跟本节点建立的链接,直接用本节点发消息就好了。

    如果不是就利用redis消息队列,把消息推出去。每个节点去判断获取看下是不是本节点的userid。这样就实现了集群的部署。

  • 相关阅读:
    分享一款手机端的PDF文件编辑软件「PDF处理助手」简单、免费
    30个学习大数据挖掘的重要知识点!建议收藏
    网站数据挖掘--从零开始建立数据分析师个人成长体系
    30个学习大数据挖掘的重要知识点!建议收藏
    一个 SAP 开发工程师十余年的技术写作之路回顾
    关于问题 SAP ABAP ME2O 事物码如何(是否可以)打印发货单?如何自己找到答案
    SAP Spartacus Popover Directive 构造函数的用途分析
    SAP Spartacus B2B OrgUnit 和 OrgUser 的路由映射差异比较
    SAP Spartacus B2B 列表页面的 (i) icon popover Component 的声明位置
    SAP 3D visual enterprise viewer 9.0 的下载方式
  • 原文地址:https://www.cnblogs.com/haoerlv/p/11138538.html
Copyright © 2011-2022 走看看