zoukankan      html  css  js  c++  java
  • SpringCloud Gateway转发Websocket并修改消息体大小限制

    前言

    在生产环境中, 为了保障业务数据的安全, 我们往往会将业务服务部署在内网环境, 并通过一个网关服务将需要提供给外部调用的接口暴露出去, HTTP请求如此, Websocket亦是如此, 今天就来讲一下如何使用SpringCloud Gateway网关服务代理转发Websocket请求, 以及如何解决其中的消息体大小问题!

     
    根据上图, 我准备了两个案例服务, 一个是 Gateway网关 , 一个是 Websocket服务, 代码如下:

    网关

    项目结构

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <parent>
            <artifactId>spring-boot-parent</artifactId>
            <groupId>org.springframework.boot</groupId>
            <version>2.1.3.RELEASE</version>
        </parent>
    
        <groupId>org.example</groupId>
        <artifactId>sample-gateway</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
    
            <spring.cloud.version>Greenwich.SR3</spring.cloud.version>
        </properties>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring.cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <dependencies>
            <!-- 以下为核心依赖 -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-gateway</artifactId>
            </dependency>
        </dependencies>
    
    </project> 

    Application.java

    package com.my.gateway;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    /**
     * @author yjy
     * Created at 2022/1/2 3:24 下午
     */
    @SpringBootApplication
    public class Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    
    }

    application.properties

    spring.application.name=sample-gateway
    server.port=7070
     
    spring.cloud.gateway.routes[0].id = test-websocket
    spring.cloud.gateway.routes[0].uri = ws://127.0.0.1:18080
    spring.cloud.gateway.routes[0].predicates[0] = Path=/websocket
    spring.cloud.gateway.routes[0].predicates[1] = Header=Connection,Upgrade
     

    Websocket服务

    项目结构

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>spring-boot-starter-parent</artifactId>
            <groupId>org.springframework.boot</groupId>
            <version>2.1.3.RELEASE</version>
            <relativePath/>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>example-spring-cloud-simple-websocket</artifactId>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-websocket</artifactId>
            </dependency>
        </dependencies>
    
    </project>

    Application.java

    package com.idanchuang.example.spring.cloud.simple.websocket;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    
    @Configuration
    @SpringBootApplication
    public class Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    
        @Bean
        public ServerEndpointExporter serverEndpointExporter() {
            return new ServerEndpointExporter();
        }
    
        @Bean
        public WebsocketServer websocketServer() {
            return new WebsocketServer();
        }
    
    }

    WebsocketServer.java 

    package com.idanchuang.example.spring.cloud.simple.websocket;
    
    import javax.websocket.*;
    import javax.websocket.server.ServerEndpoint;
    import java.io.IOException;
    
    @ServerEndpoint("/websocket")
    public class WebsocketServer {
    
        private void sendMessage(Session session, String message) throws IOException {
            session.getBasicRemote().sendText(message);
        }
    
        @OnOpen
        public void onOpen(Session session) throws IOException {
            sendMessage(session, "收到请求, 连接成功");
        }
    
        @OnClose
        public void onClose(Session session){
            System.out.println(session.getBasicRemote() + " > 断开连接");
        }
    
        @OnMessage
        public void onMessage(String message, Session session) throws IOException{
            message = message.length() > 100 ? message.substring(0, 100) + "..." : message;
            message = "收到消息: " + message;
            System.out.println(message);
            sendMessage(session, message);
        }
    
        @OnError
        public void onError(Session session, Throwable throwable){
            throwable.printStackTrace();
        }
    
    } 

    application.yml

    server.port: 18080
    spring.application.name: example-simple-websocket
     

    存在问题

    一般情况下, 以上配置不会有什么问题, 但是有一个隐藏的限制是, Websocket消息的接收与发送限制了长度在 65535 以内, 否则连接就会异常并中断, 如下:

    解决方案

    Websocket服务配置

    首先调整的消息体大小限制, 具体的方式就是指定 @OnMessage 注解的 maxMessageSize 属性, 比如修改到 3000000字节 (约3M)
        @OnMessage(maxMessageSize = 3000000)
        public void onMessage(String message, Session session) throws IOException{
            message = message.length() > 100 ? message.substring(0, 100) + "..." : message;
            message = "收到消息: " + message;
            System.out.println(message);
            sendMessage(session, message);
        }
    这样修改后, 如果客户端直接连接Websocket服务端, 那么消息体大小限制的问题就解决了, 但是如果通过Gateway网关代理, 还需要对网关进行配置
     

    Gateway网关配置

    假如使用的SpringCloud版本为 H (Hoxton.???)

    那么仅需要在网关添加一行配置皆可, 如下
    spring.cloud.gateway.httpclient.websocket.max-frame-payload-length=3000000

    假如使用的SpringCloud版本为H以下

    比如我们正在使用的是G版本, 如果不能升级到H, 那可以通过覆盖源码的方案来解决了, 具体如下
    在我们的Gateway项目中创建两个类, 注意包名必须以类的全限定名为准
    org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient
    org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy.java
    调整好的项目结构如下
    将两个类的代码贴上来, 对源码做了修改的地方写了一行注释 // Modified
    ReactorNettyWebSocketClient.java
    package org.springframework.web.reactive.socket.client;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.core.io.buffer.NettyDataBufferFactory;
    import org.springframework.http.HttpHeaders;
    import org.springframework.util.Assert;
    import org.springframework.util.StringUtils;
    import org.springframework.web.reactive.socket.HandshakeInfo;
    import org.springframework.web.reactive.socket.WebSocketHandler;
    import org.springframework.web.reactive.socket.WebSocketSession;
    import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession;
    import reactor.core.publisher.Mono;
    import reactor.netty.http.client.HttpClient;
    import reactor.netty.http.websocket.WebsocketInbound;
    
    import java.net.URI;
    
    /**
     * {@link WebSocketClient} implementation for use with Reactor Netty.
     *
     * @author Rossen Stoyanchev
     * @since 5.0
     */
    public class ReactorNettyWebSocketClient implements WebSocketClient {
    
        private static final Log logger = LogFactory.getLog(ReactorNettyWebSocketClient.class);
    
    
        private final HttpClient httpClient;
    
        /**
         * Default constructor.
         */
        public ReactorNettyWebSocketClient() {
            this(HttpClient.create());
        }
    
        /**
         * Constructor that accepts an existing {@link HttpClient} builder.
         * @since 5.1
         */
        public ReactorNettyWebSocketClient(HttpClient httpClient) {
            Assert.notNull(httpClient, "HttpClient is required");
            this.httpClient = httpClient;
        }
    
    
        /**
         * Return the configured {@link HttpClient}.
         */
        public HttpClient getHttpClient() {
            return this.httpClient;
        }
    
    
        @Override
        public Mono<Void> execute(URI url, WebSocketHandler handler) {
            return execute(url, new HttpHeaders(), handler);
        }
    
        @Override
        public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
            return getHttpClient()
                    .headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
                    .websocket(StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols()))
                    .uri(url.toString())
                    .handle((inbound, outbound) -> {
                        HttpHeaders responseHeaders = toHttpHeaders(inbound);
                        String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
                        HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
                        NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
                        // Modified
                        WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory, 3000000);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Started session '" + session.getId() + "' for " + url);
                        }
                        return handler.handle(session);
                    })
                    .doOnRequest(n -> {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Connecting to " + url);
                        }
                    })
                    .next();
        }
    
        private void setNettyHeaders(HttpHeaders httpHeaders, io.netty.handler.codec.http.HttpHeaders nettyHeaders) {
            httpHeaders.forEach(nettyHeaders::set);
        }
    
        private HttpHeaders toHttpHeaders(WebsocketInbound inbound) {
            HttpHeaders headers = new HttpHeaders();
            io.netty.handler.codec.http.HttpHeaders nettyHeaders = inbound.headers();
            nettyHeaders.forEach(entry -> {
                String name = entry.getKey();
                headers.put(name, nettyHeaders.getAll(name));
            });
            return headers;
        }
    
    }
    View Code
    ReactorNettyRequestUpgradeStrategy.java
    /*
     * Copyright 2002-2018 the original author or authors.
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *      http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.springframework.web.reactive.socket.server.upgrade;
    
    import org.springframework.core.io.buffer.NettyDataBufferFactory;
    import org.springframework.http.server.reactive.AbstractServerHttpResponse;
    import org.springframework.http.server.reactive.ServerHttpResponse;
    import org.springframework.lang.Nullable;
    import org.springframework.web.reactive.socket.HandshakeInfo;
    import org.springframework.web.reactive.socket.WebSocketHandler;
    import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession;
    import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
    import org.springframework.web.server.ServerWebExchange;
    import reactor.core.publisher.Mono;
    import reactor.netty.http.server.HttpServerResponse;
    
    import java.util.function.Supplier;
    
    /**
     * A {@link RequestUpgradeStrategy} for use with Reactor Netty.
     *
     * @author Rossen Stoyanchev
     * @since 5.0
     */
    public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
    
        // Modified
        private int maxFramePayloadLength = 3000000;
    //    private int maxFramePayloadLength = NettyWebSocketSessionSupport.DEFAULT_FRAME_MAX_SIZE;
    
    
        /**
         * Configure the maximum allowable frame payload length. Setting this value
         * to your application's requirement may reduce denial of service attacks
         * using long data frames.
         * <p>Corresponds to the argument with the same name in the constructor of
         * {@link io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory
         * WebSocketServerHandshakerFactory} in Netty.
         * <p>By default set to 65536 (64K).
         * @param maxFramePayloadLength the max length for frames.
         * @since 5.1
         */
        public void setMaxFramePayloadLength(Integer maxFramePayloadLength) {
            this.maxFramePayloadLength = maxFramePayloadLength;
        }
    
        /**
         * Return the configured max length for frames.
         * @since 5.1
         */
        public int getMaxFramePayloadLength() {
            return this.maxFramePayloadLength;
        }
    
    
        @Override
        public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
                                  @Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
    
            ServerHttpResponse response = exchange.getResponse();
            HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
            HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
            NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
    
            return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
                    (in, out) -> {
                        ReactorNettyWebSocketSession session =
                                new ReactorNettyWebSocketSession(
                                        in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
                        return handler.handle(session);
                    });
        }
    
    }
    View Code

    验证结果

     
  • 相关阅读:
    简易的sniffer程序
    ubuntu 13.10 Rhythmbox不能播放mp3 和中文乱码的问题
    5.单行函数,多行函数,字符函数,数字函数,日期函数,数据类型转换,数字和字符串转换,通用函数(case和decode)
    Java设计模式-单例模式
    WordPress的用户系统总结
    STM8S EEPROM 操作
    简化ui文件转换写法
    大型网站架构之应用服务器集群化
    大型网站架构之分布式缓存
    大型网站架构之JAVA中间件
  • 原文地址:https://www.cnblogs.com/imyjy/p/15722897.html
Copyright © 2011-2022 走看看