zoukankan      html  css  js  c++  java
  • netty同时实现http与socket

    (1)启动类

    package test;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    /**
     * netty服务器启动类
     * @author songyan
     * 
     */
    public class HttpProxyServer {
    
        public static void main(String[] args) throws Exception {
            int LOCAL_PORT = (args.length > 0) ? Integer.parseInt(args[0]) : 5688;// 代理的端口号
            System.out.println("Proxying on port " + LOCAL_PORT);
    
            // 主从线程组模型
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
    
                // 创建核心类
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
    
                        // 添加助手类
                        .childHandler(new ServerInitialzer()).bind(LOCAL_PORT).sync().channel().closeFuture().sync();
    
            } finally {
    
                // 关闭主从线程
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
        }
    }

    (2)初始化类

    package test;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    import io.netty.handler.stream.ChunkedWriteHandler;
    
    /**
     * 
     * @author songyan
     * 通用的初始化类
     */
    public class ServerInitialzer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            
            //netty是基于http的,所以要添加http编码器
            pipeline.addLast(new HttpServerCodec());
            //对写大数据流的支持
            pipeline.addLast(new ChunkedWriteHandler());
            //设置单次请求的文件大小上限
            pipeline.addLast(new HttpObjectAggregator(1024*1024*10));
            //websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws
            pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
            //自定义的路由
            pipeline.addLast(new HttpHandler());
            
        }
    
    }

    注:

    new WebSocketServerProtocolHandler("/ws")只能拦截uri为ws://127.0.0.1:5688/ws的请求

    比如我想匹配请求:ws://192.168.11.3:5688/gxgd/echo?fromUser=301208

    则应该new WebSocketServerProtocolHandler("/gxgd/echo?fromUser=301208")

    显然这样写是不合理的,我们的参数是不确定的,是动态的,但是如果这样写的话,是完全匹配,一点不一样就会报404。

    看他的构造函数发现有

     public WebSocketServerProtocolHandler(String websocketPath, boolean checkStartsWith) {
            this(websocketPath, null, false, 65536, false, checkStartsWith);
        }

    第一个是参数是路径,第二个参数是是否startwith,也就是第二个参数设置成true就可以:只要请求是以第一个参数开头的就可以了

    例如:

    new WebSocketServerProtocolHandler("/gxgd/echo",true)

    可以匹配

    ws://192.168.11.3:5688/gxgd/echo?fromUser=301208

    这样就不会出现更改参数报404的错误了

    (3)自定义路由

    package test;
    
    import java.time.LocalDateTime;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.handler.codec.http.FullHttpRequest;
    import io.netty.handler.codec.http.HttpMethod;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketFrame;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    /**
     * 自定义的路由 既可以实现http又可以实现socket
     * 
     * @author songyan
     *
     */
    public class HttpHandler extends SimpleChannelInboundHandler<Object> {
        // 用于记录和管理所有客户端的channle
        private Channel outboundChannel;
        private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        /**
         * 打开链接
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("websocket::::::::::: active");
            super.channelActive(ctx);
        }
    
        /**
         * 获取客户端的channle,添加到ChannelGroup中
         */
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println("websocket::::::::::: add");
            clients.add(ctx.channel());
        }
    
        /**
         * 从ChannelGroup中移除channel
         */
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            System.out.println("websocket::::::::::: Removed");
        }
    
        /**
         * 销毁channel
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("websocket::::::::::: destroyed");
            if (clients != null) {
                closeOnFlush(outboundChannel);
            }
        }
    
        /**
         * 关闭释放channel
         * @param ch
         */
        static void closeOnFlush(Channel ch) {
            if (ch != null && ch.isActive()) {
                ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }
    
        
        /**
         * 异常捕获
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            System.err.println("出错了");
            cause.printStackTrace();
            ctx.close();
        }
    
        /**
         * 路由
         * 对http,websocket单独处理
         */
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof FullHttpRequest) {// 如果是HTTP请求,进行HTTP操作
                handleHttpRequest(ctx, (FullHttpRequest) msg);
            } else if (msg instanceof WebSocketFrame) {// 如果是Websocket请求,则进行websocket操作
                handleWebSocketFrame(ctx, (WebSocketFrame) msg);
            }
        }
        
        /**
         * 对http请求的处理
         */
        private void handleHttpRequest(ChannelHandlerContext ctx, final FullHttpRequest msg) {
            final Channel inboundChannel = ctx.channel();
            String host = msg.headers().get("Host");
            int port = 80;
    
            String pattern = "(http://|https://)?([^:]+)(:[\d]+)?";
            Pattern r = Pattern.compile(pattern);
            Matcher m = r.matcher(host);
            if (m.find()) {
                host = m.group(2);
                port = (m.group(3) == null) ? 80 : Integer.parseInt(m.group(3).substring(1));
            }
    
            Bootstrap b = new Bootstrap();
            b.group(inboundChannel.eventLoop()) // use inboundChannel thread
                    .channel(ctx.channel().getClass()).handler(new BackendHandlerInitializer(inboundChannel));
    
            ChannelFuture f = b.connect("127.0.0.1", 8015);
            outboundChannel = f.channel();
            msg.retain();
            ChannelFuture channelFuture = f.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        outboundChannel.writeAndFlush(msg);
                    } else {
                        inboundChannel.close();
                    }
                }
            });
        }
    
        /**
         * 对socket请求的处理
         */
        private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) {
            // 获取客户端传输过来的消息
            String content = msg.toString();
            System.out.println("websocket:::  接受到的数据:" + content);
            clients.writeAndFlush(new TextWebSocketFrame("[服务器在]" + LocalDateTime.now() + "接受到消息, 消息为:" + content));
    
        }
    
    }

  • 相关阅读:
    G++与VS2015在变量作用域上的差异性
    SO_REUSEADDR与SO_REUSEPORT平台差异性与测试
    带着SMART原则重新出发
    动态语言的灵活性是把双刃剑 -- 以Python语言为例
    程序员必知的Python陷阱与缺陷列表
    MySQL添加字段和修改字段
    java poi给sheet表格中的某个单元格添加批注
    Maven入门:使用Nexus搭建Maven私服及上传下载jar包
    Linux上两种网络连接方式
    linux创建账户并自动生成主目录和主目录下的文件
  • 原文地址:https://www.cnblogs.com/excellencesy/p/11241063.html
Copyright © 2011-2022 走看看