zoukankan      html  css  js  c++  java
  • websocket随笔

    https://github.com/littlechaser/push-service

    我们在浏览器调服务器的时候使用http连接通常就能实现,但是如果要服务器数据发生变化,需要主动推送给客户端(如订单的状态发送变化,由新建变成待支付,需要通知客户端去进行支付),那这时候http请求是不是就显得乏力呢?众所周知的轮询技术,不但会增加用户的流量消耗(移动端),并且如果客户端的数量比较大的话,轮询对服务器的压力也是非常巨大的,尽管有集群分散压力,那也不如寻找一种更好的技术来替代这种场景下的http请求。

    这时候websocket的发挥空间就来了。websocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工通信——允许服务器主动发送信息给客户端。

    以上的连接是本人在github上面写的一点东西,spring与websocket实现推送,支持集群的session共享,下面稍作讲解。

    1.首先搭建springmvc环境,这个不做过多讲解。

    2.然后基于springmvc环境进行整合

    ①.引入依赖包

    <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-websocket</artifactId>
          <version>${spring.version}</version>
    </dependency>

    ②.写一个自定义的处理器TextWebSocketHandler,需要实现WebSocketHandler接口并重写其中的方法。

    ③.写一个握手拦截器,继承HttpSessionHandshakeInterceptor并重写其中的beforeHandshake和afterHandshake方法,这个拦截器一般是做一些特殊处理,比如从http请求中获取参数,并传入attributes这个map中,然后就可以在TextWebSocketHandler的方法中通过webSocketSession.getAttributes()取到相关的参数,具体见demo,demo也是如此实现的。

    ④.最后编写一个配置类TextWebSocketConfig,需要确保能被spring扫描到。

    package com.allen.websocket;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.socket.config.annotation.EnableWebSocket;
    import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
    import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
    
    @Configuration
    @EnableWebSocket
    public class TextWebSocketConfig implements WebSocketConfigurer {
    
        @Override
        public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
            webSocketHandlerRegistry.addHandler(textWebSocketHandler(), "/websocket").addInterceptors(new UserHandshakeInterceptor()).setAllowedOrigins("*");
            webSocketHandlerRegistry.addHandler(textWebSocketHandler(), "/sockjs/websocket").addInterceptors(new UserHandshakeInterceptor()).setAllowedOrigins("*").withSockJS();
        }
    
        @Bean
        public TextWebSocketHandler textWebSocketHandler() {
            return new TextWebSocketHandler();
        }
    }

    注意setAllowedOrigins方法可以解决跨域问题,指定可以访问的ip或域名,配置为*是所有的客户端都可以访问。withSockJS是提供对sockjs的支持。UserHandshakeInterceptor是拦截用户的握手请求的。

    至此,服务器的websocket已经完成,编写客户端的代码,如项目中的chat.html(WEB-INF/static/html/chat.html),编写完运行项目,打开http://localhost:8080/push-service/static/html/chat.html即可实现通信。再打开http://localhost:8080/push-service/push/test?username=allen,通过http请求给页面发一条消息,发现页面收到。再打开一个页面http://localhost:8080/push-service/static/html/chat2.html,通过http请求给页面发一条消息http://localhost:8080/push-service/push/test?username=all,发现打开的两个页面均收到消息。

    至此,基于websocket实现的服务器主动推送消息给客户端的项目基本完成。

    但是还有一个问题,眼尖的同学会发现,我们的session是需要服务端进行保存的,因为我们需要通过session发送消息。然而本项目我们是直接将session存在内存中的(map结构),单台服务器是可行的。

    那么,如果服务器是集群模式的呢?这时候一个请求说要发消息给allen用户,allen用户的session保存在A服务器,但是发消息的请求被转发到了B服务器,B服务器并没有保存allen的session,这时候B服务器是没法发送消息给allen用户的。所以这里就存在了session共享的问题。

    如何解决呢?通常的方法有如下

    1.粘性session

    粘性session是指Ngnix每次都将同一用户的所有请求转发至同一台服务器上,即将用户与服务器绑定。

    2.服务器session复制

    即每次session发生变化时,创建或者修改,就广播给所有集群中的服务器,使所有的服务器上的session相同。

    3.session共享

    缓存session,使用redis, memcached。

    4.session持久化

    将session存储至数据库中,像操作数据一样才做session。

    其实,最简单的两种方案,就是方案一和方案三。方案一可以实现且不用改代码,但是也存在弊端,在此不做讨论,有兴趣的可以自行查阅资料搜一下粘性session。方案三在这里不可行,因为websocket的session没有实现序列化接口,无法序列化,所以无法使用redis之类的缓存工具。

    本例采用的是发布订阅的方式实现,即发送消息接口不是真正的发送消息,而是把消息广播出去,所有集群的节点订阅同一个topic,保证都能接收到广播。节点接收到广播后,去查询session,查到了就真正的对用户发送消息,没查到,则表明此用户没有连接到该服务器节点,无需处理。

    具体实现是采用redis的发布订阅功能(其他的如rabbitmq,kafka也可实现),参考spring-redis.xml的配置以及其中的类。

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
           xmlns:redis="http://www.springframework.org/schema/redis"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis.xsd">
        <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"
              p:maxTotal="100"
              p:maxIdle="10"
              p:maxWaitMillis="500"
              p:testWhileIdle="true"
              p:softMinEvictableIdleTimeMillis="600000"
              p:timeBetweenEvictionRunsMillis="1800000"/>
        <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
              p:hostName="10.104.3.27"
              p:port="6379"
              p:password=""
              p:timeout="1000"
              p:poolConfig-ref="jedisPoolConfig"/>
    
        <bean id="fastJsonRedisConvertor" class="com.allen.redis.FastJsonRedisConvertor"/>
    
        <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"
              p:connectionFactory-ref="jedisConnectionFactory"
              p:defaultSerializer-ref="fastJsonRedisConvertor"/>
    
        <bean id="redisMessageListener" class="com.allen.redis.RedisMessageListener"/>
    
        <redis:listener-container connection-factory="jedisConnectionFactory">
            <redis:listener ref="redisMessageListener" topic="push-topic"/>
        </redis:listener-container>
    </beans>
    redisTemplate是发布者,fastJsonRedisConvertor是在发布之前对数据进行转换的转换器,发布时指定一个发布主题和内容即可。redisMessageListener是监听者,当监听的主题有消息时即可收到消息,收到消息进行相关处理即可。

    由于客户端的行为是不可预测的,有的情况服务器对客户端的行为是无感知的(比如断网,或者崩溃了,客户端的连接断了),这时候服务器的连接还是保持无用的连接不会释放,如果这种情况发生的太多,就会出现内存溢出的现象。
    为了防止这种现象的出现,我们做了个客户端心跳检测的线程,当然本例的写法这并不一定是比较好的实现方式,但却是一种思路。
    心跳检测线程定时检测所有的session最后一次收到客户端回复的时间,超过一定时间逐出并关闭连接,同时给所有已连接的客户端发送ping消息,收到后服务器更新session的最后一次收到客户端回复的时间。详见HeartBeatExecutor相关代码


    PS:本例还有个jsr303 的内容,不做过多详解,
    写法是引入依赖
    <dependency>
                <groupId>javax.validation</groupId>
                <artifactId>validation-api</artifactId>
                <version>${validation-api.version}</version>
    </dependency>

    springmvc配置文件配置

    <mvc:annotation-driven validator="validator"/>
    <bean id="validator" class="org.springframework.validation.beanvalidation.LocalValidatorFactoryBean"
              p:providerClass="org.hibernate.validator.HibernateValidator"/>

    java代码

    DTO

    package com.allen.dto;
    
    import lombok.Data;
    import org.hibernate.validator.constraints.NotBlank;
    import org.hibernate.validator.constraints.Range;
    
    import javax.validation.constraints.NotNull;
    import javax.validation.constraints.Past;
    import java.util.Date;
    
    /**
     * @author yang_tao@<yangtao.letzgo.com.cn>
     * @version 1.0
     * @date 2018-04-10 9:16
     */
    @Data
    public class UserDTO {
        @NotBlank(message = "姓名不能为空")
        private String name;
    
        @NotNull(message = "年龄不能为空")
        @Range(min = 0, max = 120, message = "年龄必须在{min}和{max}之间")
        private Integer age;
    
        @NotNull(message = "性别不能为空")
        @Range(min = 0, max = 1, message = "性别必须在{min}和{max}之间")
        private Integer sex;
    
        @NotNull(message = "生日不能为空")
        @Past(message = "生日必须为当前时间之前的一个时间")
        private Date birthday;
    }
    @Data是lombok的注解,自动生成setter&getter方法,代码更简洁

    controller接口

        /**
         * JSR 303 - Bean Validation
         */
        @RequestMapping(value = "/jsr303",
                method = RequestMethod.POST,
                consumes = MediaType.APPLICATION_JSON_VALUE,
                produces = MediaType.APPLICATION_JSON_VALUE)
        public ResponseData jsr303(@RequestBody @Valid UserDTO userDTO) {
            return ResponseData.OK(userDTO);
        }
    @Valid注解的作用是开启Bean Validation,DTO中的字段相关注解即可起到校验参数的作用
  • 相关阅读:
    leetcode 122. Best Time to Buy and Sell Stock II
    leetcode 121. Best Time to Buy and Sell Stock
    python 集合(set)和字典(dictionary)的用法解析
    leetcode 53. Maximum Subarray
    leetcode 202. Happy Number
    leetcode 136.Single Number
    leetcode 703. Kth Largest Element in a Stream & c++ priority_queue & minHeap/maxHeap
    [leetcode]1379. Find a Corresponding Node of a Binary Tree in a Clone of That Tree
    正则表达式
    十种排序算法
  • 原文地址:https://www.cnblogs.com/xiao-tao/p/8761706.html
Copyright © 2011-2022 走看看