zoukankan      html  css  js  c++  java
  • netty 转发服务

    NettyServer

    package com.youxiong.netty.server;
    
    import com.youxiong.netty.handler.MyChannelHandler;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.stream.ChunkedWriteHandler;
    import io.netty.handler.timeout.IdleStateHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.util.concurrent.TimeUnit;
    
    @Component
    public class NettyServer {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
    
        @Value("${netty.server.port}")
        public Integer port;
    
    
        public Integer getPort() {
            return port;
        }
    
        public void setPort(Integer port) {
            this.port = port;
        }
    
        private void startServer(){
            //服务端需要2个线程组  boss处理客户端连接  work进行客服端连接之后的处理
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup work = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                //服务器 配置
                bootstrap.group(boss,work).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // HttpServerCodec:将请求和应答消息解码为HTTP消息
                        socketChannel.pipeline().addLast("http-codec",new HttpServerCodec());
                        // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息
                        socketChannel.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
                        // ChunkedWriteHandler:向客户端发送HTML5文件
                        socketChannel.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
                        // 进行设置心跳检测
                        socketChannel.pipeline().addLast(new IdleStateHandler(60,30,60*30, TimeUnit.SECONDS));
                        // 配置通道处理  来进行业务处理
                        socketChannel.pipeline().addLast(new MyChannelHandler());
                    }
                }).option(ChannelOption.SO_BACKLOG,1024).childOption(ChannelOption.SO_KEEPALIVE,true);
                //绑定端口  开启事件驱动
                LOGGER.info("【服务器启动成功========端口:"+port+"");
                Channel channel = bootstrap.bind(port).sync().channel();
                channel.closeFuture().sync();
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                //关闭资源
                boss.shutdownGracefully();
                work.shutdownGracefully();
            }
        }
    
        @PostConstruct()
        public void init(){
            //需要开启一个新的线程来执行netty server 服务器
            new Thread(new Runnable() {
                public void run() {
                    startServer();
                }
            }).start();
        }
    }


    handler

    package com.youxiong.netty.handler;
    
    import com.youxiong.netty.util.GlobalUserUtil;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.codec.http.websocketx.*;
    import io.netty.handler.timeout.IdleStateEvent;
    import io.netty.util.AttributeKey;
    import io.netty.util.CharsetUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MyChannelHandler extends SimpleChannelInboundHandler<Object> {
    
    
        private static final Logger LOGGER = LoggerFactory.getLogger(MyChannelHandler.class);
    
        private static final String URI = "websocket";
    
        private WebSocketServerHandshaker handshaker ;
    
        /**
         * 连接上服务器
         * @param ctx
         * @throws Exception
         */
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            LOGGER.info("【handlerAdded】====>"+ctx.channel().id());
            GlobalUserUtil.channels.add(ctx.channel());
        }
    
        /**
         * 断开连接
         * @param ctx
         * @throws Exception
         */
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            LOGGER.info("【handlerRemoved】====>"+ctx.channel().id());
            GlobalUserUtil.channels.remove(ctx);
        }
    
        /**
         * 连接异常   需要关闭相关资源
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            LOGGER.error("【系统异常】======>"+cause.toString());
            ctx.close();
            ctx.channel().close();
        }
    
        /**
         * 活跃的通道  也可以当作用户连接上客户端进行使用
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            LOGGER.info("【channelActive】=====>"+ctx.channel());
        }
    
        /**
         * 不活跃的通道  就说明用户失去连接
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        }
    
        /**
         * 这里只要完成 flush
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        /**
         * 这里是保持服务器与客户端长连接  进行心跳检测 避免连接断开
         * @param ctx
         * @param evt
         * @throws Exception
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if(evt instanceof IdleStateEvent){
                IdleStateEvent stateEvent = (IdleStateEvent) evt;
                PingWebSocketFrame ping = new PingWebSocketFrame();
                switch (stateEvent.state()){
                    //读空闲(服务器端)
                    case READER_IDLE:
                        LOGGER.info(""+ctx.channel().remoteAddress()+"】读空闲(服务器端)");
                        ctx.writeAndFlush(ping);
                        break;
                        //写空闲(客户端)
                    case WRITER_IDLE:
                        LOGGER.info(""+ctx.channel().remoteAddress()+"】写空闲(客户端)");
                        ctx.writeAndFlush(ping);
                        break;
                    case ALL_IDLE:
                        LOGGER.info(""+ctx.channel().remoteAddress()+"】读写空闲");
                        break;
                }
            }
        }
    
        /**
         * 收发消息处理
         * @param ctx
         * @param msg
         * @throws Exception
         */
        protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
            if(msg instanceof HttpRequest){
                doHandlerHttpRequest(ctx,(HttpRequest) msg);
            }else if(msg instanceof WebSocketFrame){
                doHandlerWebSocketFrame(ctx,(WebSocketFrame) msg);
            }
        }
    
        /**
         * websocket消息处理
         * @param ctx
         * @param msg
         */
        private void doHandlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) {
            //判断msg 是哪一种类型  分别做出不同的反应
            if(msg instanceof CloseWebSocketFrame){
                LOGGER.info("【关闭】");
                handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg);
                return ;
            }
            if(msg instanceof PingWebSocketFrame){
                LOGGER.info("【ping】");
                PongWebSocketFrame pong = new PongWebSocketFrame(msg.content().retain());
                ctx.channel().writeAndFlush(pong);
                return ;
            }
            if(msg instanceof PongWebSocketFrame){
                LOGGER.info("【pong】");
                PingWebSocketFrame ping = new PingWebSocketFrame(msg.content().retain());
                ctx.channel().writeAndFlush(ping);
                return ;
            }
            if(!(msg instanceof TextWebSocketFrame)){
                LOGGER.info("【不支持二进制】");
                throw new UnsupportedOperationException("不支持二进制");
            }
            //可以对消息进行处理
            //群发
            for (Channel channel : GlobalUserUtil.channels) {
                channel.writeAndFlush(new TextWebSocketFrame(((TextWebSocketFrame) msg).text()));
            }
    
        }
    
    
        /**
         * wetsocket第一次连接握手
         * @param ctx
         * @param msg
         */
        private void doHandlerHttpRequest(ChannelHandlerContext ctx, HttpRequest msg) {
            // http 解码失败
            if(!msg.getDecoderResult().isSuccess() || (!"websocket".equals(msg.headers().get("Upgrade")))){
                sendHttpResponse(ctx, (FullHttpRequest) msg,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST));
            }
            //可以获取msg的uri来判断
            String uri = msg.getUri();
            if(!uri.substring(1).equals(URI)){
                ctx.close();
            }
            ctx.attr(AttributeKey.valueOf("type")).set(uri);
            //可以通过url获取其他参数
            WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(
                    "ws://"+msg.headers().get("Host")+"/"+URI+"",null,false
            );
            handshaker = factory.newHandshaker(msg);
            if(handshaker == null){
                WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
            }
            //进行连接
            handshaker.handshake(ctx.channel(), (FullHttpRequest) msg);
            //可以做其他处理
        }
    
        private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
            // 返回应答给客户端
            if (res.getStatus().code() != 200) {
                ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
                res.content().writeBytes(buf);
                buf.release();
            }
            // 如果是非Keep-Alive,关闭连接
            ChannelFuture f = ctx.channel().writeAndFlush(res);
            if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {
                f.addListener(ChannelFutureListener.CLOSE);
            }
        }
    }
    package com.youxiong.netty.util;
    
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    public class GlobalUserUtil {
    
        //保存全局的  连接上服务器的客户
        public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor
                .INSTANCE);
    }

    NettyServer
    package com.youxiong.netty.server;
    import com.youxiong.netty.handler.MyChannelHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.stream.ChunkedWriteHandler;import io.netty.handler.timeout.IdleStateHandler;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;
    import javax.annotation.PostConstruct;import java.util.concurrent.TimeUnit;
    @Componentpublic class NettyServer {
        private static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
        @Value("${netty.server.port}")    public Integer port;

        public Integer getPort() {        return port;    }
        public void setPort(Integer port) {        this.port = port;    }
        private void startServer(){        //服务端需要2个线程组  boss处理客户端连接  work进行客服端连接之后的处理        EventLoopGroup boss = new NioEventLoopGroup();        EventLoopGroup work = new NioEventLoopGroup();        try {            ServerBootstrap bootstrap = new ServerBootstrap();            //服务器 配置            bootstrap.group(boss,work).channel(NioServerSocketChannel.class)            .childHandler(new ChannelInitializer<SocketChannel>() {                protected void initChannel(SocketChannel socketChannel) throws Exception {                    // HttpServerCodec:将请求和应答消息解码为HTTP消息                    socketChannel.pipeline().addLast("http-codec",new HttpServerCodec());                    // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息                    socketChannel.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));                    // ChunkedWriteHandler:向客户端发送HTML5文件                    socketChannel.pipeline().addLast("http-chunked",new ChunkedWriteHandler());                    // 进行设置心跳检测                    socketChannel.pipeline().addLast(new IdleStateHandler(60,30,60*30, TimeUnit.SECONDS));                    // 配置通道处理  来进行业务处理                    socketChannel.pipeline().addLast(new MyChannelHandler());                }            }).option(ChannelOption.SO_BACKLOG,1024).childOption(ChannelOption.SO_KEEPALIVE,true);            //绑定端口  开启事件驱动            LOGGER.info("【服务器启动成功========端口:"+port+"】");            Channel channel = bootstrap.bind(port).sync().channel();            channel.closeFuture().sync();        }catch (Exception e){            e.printStackTrace();        }finally {            //关闭资源            boss.shutdownGracefully();            work.shutdownGracefully();        }    }
        @PostConstruct()    public void init(){        //需要开启一个新的线程来执行netty server 服务器        new Thread(new Runnable() {            public void run() {                startServer();            }        }).start();    }}handler
    package com.youxiong.netty.handler;
    import com.youxiong.netty.util.GlobalUserUtil;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.handler.codec.http.*;import io.netty.handler.codec.http.websocketx.*;import io.netty.handler.timeout.IdleStateEvent;import io.netty.util.AttributeKey;import io.netty.util.CharsetUtil;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
    public class MyChannelHandler extends SimpleChannelInboundHandler<Object> {

        private static final Logger LOGGER = LoggerFactory.getLogger(MyChannelHandler.class);
        private static final String URI = "websocket";
        private WebSocketServerHandshaker handshaker ;
        /**     * 连接上服务器     * @param ctx     * @throws Exception     */    @Override    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {        LOGGER.info("【handlerAdded】====>"+ctx.channel().id());        GlobalUserUtil.channels.add(ctx.channel());    }
        /**     * 断开连接     * @param ctx     * @throws Exception     */    @Override    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {        LOGGER.info("【handlerRemoved】====>"+ctx.channel().id());        GlobalUserUtil.channels.remove(ctx);    }
        /**     * 连接异常   需要关闭相关资源     * @param ctx     * @param cause     * @throws Exception     */    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        LOGGER.error("【系统异常】======>"+cause.toString());        ctx.close();        ctx.channel().close();    }
        /**     * 活跃的通道  也可以当作用户连接上客户端进行使用     * @param ctx     * @throws Exception     */    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        LOGGER.info("【channelActive】=====>"+ctx.channel());    }
        /**     * 不活跃的通道  就说明用户失去连接     * @param ctx     * @throws Exception     */    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {    }
        /**     * 这里只要完成 flush     * @param ctx     * @throws Exception     */    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        ctx.flush();    }
        /**     * 这里是保持服务器与客户端长连接  进行心跳检测 避免连接断开     * @param ctx     * @param evt     * @throws Exception     */    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        if(evt instanceof IdleStateEvent){            IdleStateEvent stateEvent = (IdleStateEvent) evt;            PingWebSocketFrame ping = new PingWebSocketFrame();            switch (stateEvent.state()){                //读空闲(服务器端)                case READER_IDLE:                    LOGGER.info("【"+ctx.channel().remoteAddress()+"】读空闲(服务器端)");                    ctx.writeAndFlush(ping);                    break;                    //写空闲(客户端)                case WRITER_IDLE:                    LOGGER.info("【"+ctx.channel().remoteAddress()+"】写空闲(客户端)");                    ctx.writeAndFlush(ping);                    break;                case ALL_IDLE:                    LOGGER.info("【"+ctx.channel().remoteAddress()+"】读写空闲");                    break;            }        }    }
        /**     * 收发消息处理     * @param ctx     * @param msg     * @throws Exception     */    protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {        if(msg instanceof HttpRequest){            doHandlerHttpRequest(ctx,(HttpRequest) msg);        }else if(msg instanceof WebSocketFrame){            doHandlerWebSocketFrame(ctx,(WebSocketFrame) msg);        }    }
        /**     * websocket消息处理     * @param ctx     * @param msg     */    private void doHandlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) {        //判断msg 是哪一种类型  分别做出不同的反应        if(msg instanceof CloseWebSocketFrame){            LOGGER.info("【关闭】");            handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg);            return ;        }        if(msg instanceof PingWebSocketFrame){            LOGGER.info("【ping】");            PongWebSocketFrame pong = new PongWebSocketFrame(msg.content().retain());            ctx.channel().writeAndFlush(pong);            return ;        }        if(msg instanceof PongWebSocketFrame){            LOGGER.info("【pong】");            PingWebSocketFrame ping = new PingWebSocketFrame(msg.content().retain());            ctx.channel().writeAndFlush(ping);            return ;        }        if(!(msg instanceof TextWebSocketFrame)){            LOGGER.info("【不支持二进制】");            throw new UnsupportedOperationException("不支持二进制");        }        //可以对消息进行处理        //群发        for (Channel channel : GlobalUserUtil.channels) {            channel.writeAndFlush(new TextWebSocketFrame(((TextWebSocketFrame) msg).text()));        }
        }

        /**     * wetsocket第一次连接握手     * @param ctx     * @param msg     */    private void doHandlerHttpRequest(ChannelHandlerContext ctx, HttpRequest msg) {        // http 解码失败        if(!msg.getDecoderResult().isSuccess() || (!"websocket".equals(msg.headers().get("Upgrade")))){            sendHttpResponse(ctx, (FullHttpRequest) msg,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST));        }        //可以获取msg的uri来判断        String uri = msg.getUri();        if(!uri.substring(1).equals(URI)){            ctx.close();        }        ctx.attr(AttributeKey.valueOf("type")).set(uri);        //可以通过url获取其他参数        WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(                "ws://"+msg.headers().get("Host")+"/"+URI+"",null,false        );        handshaker = factory.newHandshaker(msg);        if(handshaker == null){            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());        }        //进行连接        handshaker.handshake(ctx.channel(), (FullHttpRequest) msg);        //可以做其他处理    }
        private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {        // 返回应答给客户端        if (res.getStatus().code() != 200) {            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);            res.content().writeBytes(buf);            buf.release();        }        // 如果是非Keep-Alive,关闭连接        ChannelFuture f = ctx.channel().writeAndFlush(res);        if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {            f.addListener(ChannelFutureListener.CLOSE);        }    }}package com.youxiong.netty.util;
    import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.util.concurrent.GlobalEventExecutor;
    public class GlobalUserUtil {
        //保存全局的  连接上服务器的客户    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor            .INSTANCE);}--------------------- 作者:yx726843014 来源:CSDN 原文:https://blog.csdn.net/xieliaowa9231/article/details/80151446 版权声明:本文为博主原创文章,转载请附上博文链接!

  • 相关阅读:
    常用和实用的git命令,让你快速入门git
    如何获取电脑的IP和mac地址
    关于vue插件的使用和修改
    BullsEye游戏优化布局
    BullsEye游戏总结
    Android游戏小demo
    算法及相应算法应用之令牌桶算法
    php IDE之phpStorm使用小记
    php中openssl_encrypt方法
    mysql界面工具
  • 原文地址:https://www.cnblogs.com/hejunnuo/p/10333432.html
Copyright © 2011-2022 走看看