zoukankan      html  css  js  c++  java
  • Netty实现简单WebSocket服务器

    本文参考《Netty权威指南》
    ├── WebSocketServerHandler.java
    ├── WebSocketServer.java
    └── wsclient.html

    package com.xh.netty.test11;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.stream.ChunkedWriteHandler;
    
    /**
     * Created by root on 1/8/18.
     */
    public class WebSocketServer {
        public void run(int port) {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast("http-codec", new HttpServerCodec());
                                pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
                                pipeline.addLast("http-chunked", new ChunkedWriteHandler());
                                pipeline.addLast("handler", new WebSocketServerHandler());
                            }
                        });
                Channel ch = b.bind(port).sync().channel();
                System.out.println("websocketserver start port at " + port);
                ch.closeFuture().sync();
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) {
            int port = 8080;
            if (args.length > 0) {
                port = Integer.valueOf(args[0]);
    
            }
            new WebSocketServer().run(port);
    
        }
    }
    
    package com.xh.netty.test11;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.codec.http.websocketx.*;
    import io.netty.util.CharsetUtil;
    
    
    import java.util.Date;
    import java.util.logging.Logger;
    
    import static io.netty.handler.codec.http.HttpHeaderUtil.isKeepAlive;
    import static io.netty.handler.codec.http.HttpHeaderUtil.setContentLength;
    
    /**
     * Created by root on 1/8/18.
     */
    public class WebSocketServerHandler extends SimpleChannelInboundHandler {
        private WebSocketServerHandshaker handshaker;
    
        @Override
        protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println(msg.toString());
            //http
            if (msg instanceof FullHttpRequest) {
                handleHttpRequest(ctx, (FullHttpRequest) msg);
            } else if (msg instanceof WebSocketFrame) {//websocket
                handleWebsocketFrame(ctx, (WebSocketFrame) msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
        private void handleWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) {
            //关闭链路指令
            if (msg instanceof CloseWebSocketFrame) {
                handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg.retain());
                return;
            }
    
            //PING 消息
            if (msg instanceof PingWebSocketFrame) {
                ctx.write(new PongWebSocketFrame(msg.content().retain()));
                return;
            }
    
            //非文本
            if (!(msg instanceof TextWebSocketFrame)) {
                throw new UnsupportedOperationException(String.format("%s frame type not support", msg.getClass().getName()));
    
            }
    
            //应答消息
            String requset = ((TextWebSocketFrame) msg).text();
            ctx.channel().write(new TextWebSocketFrame(requset + " >>>>Now is " + new Date().toString()));
    
        }
    
        private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest msg) {
    
            //HTTP 请异常
            if (!msg.decoderResult().isSuccess() || !"websocket".equals(msg.headers().get("Upgrade"))) {
                System.out.println(msg.decoderResult().isSuccess());
                System.out.println(msg.headers().get("Upgrade"));
                sendHttpResponse(ctx, msg, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
                return;
            }
    
            //握手
            WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket", null, false);
            handshaker = wsFactory.newHandshaker(msg);
            if (handshaker == null) {
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
    
            } else {
                handshaker.handshake(ctx.channel(), msg);
            }
        }
    
        private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest msg, FullHttpResponse resp) {
    
            //响应
            if (resp.status().code() != 200) {
                ByteBuf buf = Unpooled.copiedBuffer(resp.status().toString(), CharsetUtil.UTF_8);
                resp.content().writeBytes(buf);
                buf.release();
                setContentLength(resp, resp.content().readableBytes());
            }
    
            //非Keep-Alive,关闭链接
            ChannelFuture future = ctx.channel().writeAndFlush(resp);
            if (!isKeepAlive(resp) || resp.status().code() != 200) {
                future.addListener(ChannelFutureListener.CLOSE);
            }
    
    
        }
    
    
    }
    
    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
    <form onsubmit="return false;">
        <input type="text" name="msg" value="NETTY">
        <button onclick="send(this.form.msg.value)">send</button>
        <br>
        <textarea id="resText">
    
        </textarea>
    
    </form>
    </body>
    <script>
        var socket;
        if (!window.WebSocket) {
            window.WebSocket = window.MozWebSocket;
        }
        if (window.WebSocket) {
            socket = new WebSocket("ws://127.0.0.1:8080/websocket");
            socket.onmessage = function (event) {
                var ta = document.getElementById("resText");
                ta.value = "";
                ta.value = event.data;
    
            };
    
            socket.onopen = function (event) {
                alert("浏览器支持WebSocket");
                var ta = document.getElementById("resText");
                ta.value = "";
                ta.value = "浏览器支持WebSocket";
            };
    
            socket.onclose = function (event) {
                var ta = document.getElementById("resText");
                ta.value = "";
                ta.value = "关闭WebSocket";
            }
        } else {
    
            alert("浏览器不支持WebSocket");
        }
    
        function send(msg) {
            if (!window.WebSocket) {
                return;
    
            }
            if (socket.readyState == WebSocket.OPEN) {
                socket.send(msg);
            } else {
                alert("建立连接失败")
            }
        }
    </script>
    </html>
    
  • 相关阅读:
    【转】双口RAM
    Beep使用
    fcntl函数
    ioctl() 参数
    线程属性:pthread_attr_t
    GPIO
    Linux CGI编程基础
    看门狗watchdog
    Linux库知识大全
    linux进程间通讯
  • 原文地址:https://www.cnblogs.com/lanqie/p/8250302.html
Copyright © 2011-2022 走看看