zoukankan      html  css  js  c++  java
  • Netty 实现 websocket

    现在网上网站为了实现推送基本都采用轮询的方式,比较新的轮询技术是comet,采用ajax,但是还是得发送请求,为了解决html效率低下的问题,html5定义了websocket协议。

    服务端代码:

    import java.util.concurrent.TimeUnit;
    
    import org.apache.activemq.util.TimeUtils;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.stream.ChunkedWriteHandler;
    
    public class WebSocketServer {
    
        public void bind(int port) throws Exception {
            EventLoopGroup parentGroup = new NioEventLoopGroup();
            EventLoopGroup childGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(parentGroup, childGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel ch)
                                    throws Exception {
                                ch.pipeline().addLast("http-codec",
                                        new HttpServerCodec());
    
                                ch.pipeline().addLast("aggregator",
                                        new HttpObjectAggregator(65536));
    
                                ch.pipeline().addLast("http-chunked",
                                        new ChunkedWriteHandler());
    
                                ch.pipeline().addLast("handler",
                                        new WebSocketServerHandler());
                            }
                            
                        });
                Channel f = b.bind(port).sync().channel();
                f.closeFuture().sync();
            } finally{
                parentGroup.shutdownGracefully();
                childGroup.shutdownGracefully();
            }
        }
    
        /**
         * @param args
         */
        public static void main(String[] args) throws Exception{
            // TODO Auto-generated method stub
            new WebSocketServer().bind(8080);
        }
    
    }

    handler

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.FullHttpRequest;
    import io.netty.handler.codec.http.FullHttpResponse;
    import io.netty.handler.codec.http.HttpHeaderUtil;
    import io.netty.handler.codec.http.HttpResponseStatus;
    import io.netty.handler.codec.http.HttpVersion;
    import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
    import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
    import io.netty.util.CharsetUtil;
    
    import java.util.Date;
    
    public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
    
        private WebSocketServerHandshaker handshaker;
    
        @Override
        protected void messageReceived(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            // TODO Auto-generated method stub
            if(msg instanceof FullHttpRequest){
                handleHttpRequest(ctx, (FullHttpRequest)msg);
            }else if(msg instanceof WebSocketFrame){
                handleWebSocketFrame(ctx, (WebSocketFrame) msg);
            }
        }
        
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
        private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception{
            System.out.println("handleHttpRequest");
            if(!req.decoderResult().isSuccess() || !"websocket".equals(req.headers().get("Upgrade"))){
                sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
                return;
            }
            WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket", null, false);
            handshaker = factory.newHandshaker(req);
            
            if(handshaker == null){
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            }else{
                handshaker.handshake(ctx.channel(), req);
            }
        }
        
        private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
            System.out.println("handleWebSocketFrame");
            if(frame instanceof CloseWebSocketFrame){
                handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
                return;
            }
            if(frame instanceof PingWebSocketFrame){
                ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
                return;
            }
            if(!(frame instanceof TextWebSocketFrame)){
                throw new UnsupportedOperationException(String.format("%s frame types not support", frame.getClass().getName()));
            }
            String req = ((TextWebSocketFrame) frame).text();
            
            System.out.println(String.format("%s received %s", ctx.channel(), req));
            
            for(int i = 0; i < 10; i++){
                ctx.channel().writeAndFlush(new TextWebSocketFrame(req+",欢迎使用Netty Websocket服务,现在时刻:" + new Date().toString()));
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
        }
        
        private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) throws Exception{
            if(res.status().code() != 200){
                ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),CharsetUtil.UTF_8);
                res.content().writeBytes(buf);
                buf.release();
                HttpHeaderUtil.setContentLength(res, res.content().readableBytes());
            }
        
            ChannelFuture f = ctx.channel().writeAndFlush(res);
            if(!HttpHeaderUtil.isKeepAlive(req) || res.status().code() != 200){
                f.addListener(ChannelFutureListener.CLOSE);
            }
        }
    
    }

    html调用:

    <html>
        <head>
            <meta charset="utf-8">
            Netty Socket时间服务器
        </head>
        <body>
            <script type="text/javascript">
                var socket;
                if(!window.WebSocket){
                    window.WebSocket = window.MozWebSocket;
                }
            
                if(window.WebSocket){
                    socket = new WebSocket("ws://localhost:8080/websocket");
                    socket.onmessage = function(event){
                        var ta = document.getElementById("responseText");
                        ta.value = "";
                        ta.value = event.data;
                    };
                    socket.onopen = function(event){
                        var ta = document.getElementById("responseText");
                        ta.value ="打开websocket服务正常,浏览器支持websocket";                
                    };
                    socket.onclose = function(event){
                        var ta = document.getElementById("responseText");
                        ta.value = "";
                        ta.value = "websocket关闭";
                    };
                }else{
                    alert("抱歉,浏览器不支持websocket");
                }
                
                function send(msg){
                    if(socket.readyState = WebSocket.OPEN){
                        socket.send(msg);
                    }else{
                        alert("没有建立连接");
                    }
                }
            </script>
            
            <form>
                <input type="text" name="message" value="Netty最佳"/>
                <input type="button" value="发送websocket请求信息" onclick="send(this.form.message.value)"/>
                <hr color="blue">
                <textarea id="responseText" style="500px;height:300px"></textarea>
                
            </form>
        </body>
    </html>
  • 相关阅读:
    java将string转化为int Yannis
    vm虚拟机启动报The VMware Authorization Service is not running错误 Yannis
    [org.hibernate.util.JDBCExceptionReporter] Cannot load JDBC driver class 'net. Yannis
    前台页面分页对总页数的判断 Yannis
    事务及其特性 Yannis
    iReport报表的简单函数及部分操作 Yannis
    spring aop与事务配置 Yannis
    大数据的验证和插入数据库 Yannis
    唔哇哈哈,拉霸机
    bindebug放到别的目录后不能看?编译器参数设置一下
  • 原文地址:https://www.cnblogs.com/momofeng/p/5482827.html
Copyright © 2011-2022 走看看