zoukankan      html  css  js  c++  java
  • SpringBoot集成Netty实现websocket通讯

    实现websocket通讯,和广播消息

    添加依赖:

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.1.Final</version>
    </dependency>
    <dependency>
        <groupId>commons-lang</groupId>
        <artifactId>commons-lang</artifactId>
        <version>${commons.lang.version}</version>
    </dependency>

    排除tomcat的依赖

    Netty Http服务端编写

    handler 处理类

    @Component
    @Slf4j
    @ChannelHandler.Sharable //@Sharable 注解用来说明ChannelHandler是否可以在多个channel直接共享使用
    @ConditionalOnProperty(  //配置文件属性是否为true
            value = {"netty.ws.enabled"},
            matchIfMissing = false
    )
    public class WsServerHandler extends ChannelInboundHandlerAdapter {
    
        @Autowired
        NettyWsProperties nettyWsProperties;
    
        public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        private WebSocketServerHandshaker handshaker;
        //websocket握手升级绑定页面
        String wsFactoryUri = "";
    
        @Value("${netty.ws.endPoint:/ws}")
        private String wsUri;
        //static Set<Channel> channelSet = new HashSet<>();
    
        /*
         * 握手建立
         */
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel incoming = ctx.channel();
            channels.add(incoming);
        }
    
        /*
         * 握手取消
         */
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            Channel incoming = ctx.channel();
            channels.remove(incoming);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof FullHttpRequest) {
                handleHttpRequest(ctx, (FullHttpRequest) msg);
            } else if (msg instanceof WebSocketFrame) {
                handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
            }
        }
        //websocket消息处理(只支持文本)
        public void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
    
            // 关闭请求
            if (frame instanceof CloseWebSocketFrame) {
                handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
                return;
            }
            // ping请求
            if (frame instanceof PingWebSocketFrame) {
                ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
                return;
            }
            // 只支持文本格式,不支持二进制消息
            if (frame instanceof TextWebSocketFrame) {
                //接收到的消息
                String requestmsg = ((TextWebSocketFrame) frame).text();
                TextWebSocketFrame tws = new TextWebSocketFrame(requestmsg);
                channels.writeAndFlush(tws);
            }
    
        }
    
        // 第一次请求是http请求,请求头包括ws的信息
        public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request)
                throws Exception {
            // 如果HTTP解码失败,返回HTTP异常
            if (request instanceof HttpRequest) {
                HttpMethod method = request.getMethod();
                // 如果是websocket请求就握手升级
                if (wsUri.equalsIgnoreCase(request.getUri())) {
                    System.out.println(" req instanceof HttpRequest");
                    WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                            wsFactoryUri, null, false);
                    handshaker = wsFactory.newHandshaker(request);
                    if (handshaker == null) {
                        WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
                    } else {
                    }
                    handshaker.handshake(ctx.channel(), request);
                }
            }
        }
        
        // 异常处理,netty默认是关闭channel
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
            
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
    
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state() == IdleState.READER_IDLE) {
                    // 读数据超时
                } else if (event.state() == IdleState.WRITER_IDLE) {
                    // 写数据超时
                } else if (event.state() == IdleState.ALL_IDLE) {
                    // 通道长时间没有读写,服务端主动断开链接
                    ctx.close();
                }
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }

    ChannelPipeline 实现

    @Component
    @ConditionalOnProperty(  //配置文件属性是否为true
            value = {"netty.ws.enabled"},
            matchIfMissing = false
    )
    public class WsPipeline  extends ChannelInitializer<SocketChannel>{
        
        @Autowired
        WsServerHandler wsServerHandler;
        
        private static final int READ_IDEL_TIME_OUT = 3; // 读超时
        private static final int WRITE_IDEL_TIME_OUT = 4;// 写超时
        private static final int ALL_IDEL_TIME_OUT = 5; // 所有超时
        
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
    
            ChannelPipeline p = ch.pipeline();
            p.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.MINUTES));
            p.addLast("http-codec", new HttpServerCodec());
            p.addLast("aggregator", new HttpObjectAggregator(65536));
            p.addLast("http-chunked", new ChunkedWriteHandler());
            p.addLast("handler",wsServerHandler);
        }
    
    }

    服务实现

    @Configuration
    @EnableConfigurationProperties({NettyWsProperties.class})
    @ConditionalOnProperty(  //配置文件属性是否为true
            value = {"netty.ws.enabled"},
            matchIfMissing = false
    )
    @Slf4j
    public class WsServer {
        
            @Autowired
            WsPipeline wsPipeline;
    
            @Autowired
            NettyWsProperties nettyWsProperties;
            
            @Bean("starWsServer")
            public String start() {
                // 准备配置
                // HttpConfiguration.me().setContextPath(contextPath).setWebDir(webDir).config();
                // 启动服务器
               Thread thread =  new Thread(() -> {
                    NioEventLoopGroup bossGroup = new NioEventLoopGroup(nettyWsProperties.getBossThreads());
                    NioEventLoopGroup workerGroup = new NioEventLoopGroup(nettyWsProperties.getWorkThreads());
                    try {
                        log.info("start netty [WebSocket] server ,port: " + nettyWsProperties.getPort());
                        ServerBootstrap boot = new ServerBootstrap();
                        options(boot).group(bossGroup, workerGroup)
                                .channel(NioServerSocketChannel.class)
                                .handler(new LoggingHandler(LogLevel.INFO))
                                .childHandler(wsPipeline);
                        Channel ch = null;
                        //是否绑定IP
                        if(StringUtils.isNotEmpty(nettyWsProperties.getBindIp())){
                            ch = boot.bind(nettyWsProperties.getBindIp(),nettyWsProperties.getPort()).sync().channel();
                        }else{
                            ch = boot.bind(nettyWsProperties.getPort()).sync().channel();
                        }
                        ch.closeFuture().sync();
                    } catch (InterruptedException e) {
                        log.error("启动NettyServer错误", e);
                    } finally {
                        bossGroup.shutdownGracefully();
                        workerGroup.shutdownGracefully();
                    }
                });
                thread.setName("Ws_Server");
                thread.start();
                return "ws start";
            }
            
            
            private ServerBootstrap options(ServerBootstrap boot) {
                boot.option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
                return boot;
            }
    
    }

    启动配置

    ---application.yml
    spring.profiles.active: ws
    
    ---application-ws.yml
    netty:
       ws:
         enabled: true
         port: 9988
         endPoint: /ws

    测试

    在浏览器打开多个http://127.0.0.1:8080/socket.html

  • 相关阅读:
    Note/Solution 转置原理 & 多点求值
    Note/Solution 「洛谷 P5158」「模板」多项式快速插值
    Solution 「CTS 2019」「洛谷 P5404」氪金手游
    Solution 「CEOI 2017」「洛谷 P4654」Mousetrap
    Solution Set Border Theory
    Solution Set Stirling 数相关杂题
    Solution 「CEOI 2006」「洛谷 P5974」ANTENNA
    Solution 「ZJOI 2013」「洛谷 P3337」防守战线
    Solution 「CF 923E」Perpetual Subtraction
    KVM虚拟化
  • 原文地址:https://www.cnblogs.com/47Gamer/p/13853478.html
Copyright © 2011-2022 走看看