zoukankan      html  css  js  c++  java
  • netty使用(4)一个简单Http服务器

    package club.test;
    import java.util.HashMap;
    import java.util.Map;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.FullHttpResponse;
    import io.netty.handler.codec.http.HttpHeaderNames;
    import io.netty.handler.codec.http.HttpHeaderValues;
    import io.netty.handler.codec.http.HttpRequest;
    import io.netty.handler.codec.http.HttpRequestDecoder;
    import io.netty.handler.codec.http.HttpResponseStatus;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.http.HttpUtil;
    import io.netty.handler.codec.http.HttpVersion;
    
    /**
     * Created by apple on 17/10/21.
     */
    public class HttpServer {
    
        public static void start(final int port) throws Exception {
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup woker = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
    
            try {
    
                serverBootstrap.channel(NioServerSocketChannel.class)
                        .group(boss, woker)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast("http-decoder",new HttpServerCodec());
                                ch.pipeline().addLast(new HttpServerHandler());
                            }
                        });
    
                ChannelFuture future = serverBootstrap.bind(port).sync();
                future.channel().closeFuture().sync();
            } finally {
                boss.shutdownGracefully();
                woker.shutdownGracefully();
            }
        }
        public static void main(String[] args) throws Exception {
            start(8080);
        }
    }
    class HttpServerHandler extends ChannelInboundHandlerAdapter {
        private String content = "hello world,This is the Legion....";
        private final static String LOC = "302";
        private final static String NOT_FOND = "404";
        private final static String BAD_REQUEST = "400";
        private final static String INTERNAL_SERVER_ERROR = "500";
        private static Map<String, HttpResponseStatus> mapStatus = new HashMap<String, HttpResponseStatus>();
    
        static {
            mapStatus.put(LOC, HttpResponseStatus.FOUND);
            mapStatus.put(NOT_FOND, HttpResponseStatus.NOT_FOUND);
            mapStatus.put(BAD_REQUEST, HttpResponseStatus.BAD_REQUEST);
            mapStatus.put(INTERNAL_SERVER_ERROR, HttpResponseStatus.INTERNAL_SERVER_ERROR);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof HttpRequest) {
                HttpRequest request = (HttpRequest) msg;
                boolean keepaLive = HttpUtil.isKeepAlive(request);
                System.out.println("method" + request.method());
                System.out.println("uri" + request.uri());
                String uri = request.uri().replace("/", "").trim();
                System.out.println("uri after trim is: "+uri);
                FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                if (mapStatus.get(uri) != null) {
                    System.out.println("yes");
                    httpResponse.setStatus(mapStatus.get(uri));
                    httpResponse.content().writeBytes(mapStatus.get(uri).toString().getBytes());
                } else {
                    System.out.println("no");
                    httpResponse.content().writeBytes(content.getBytes());
                }
                //重定向处理
                if (httpResponse.status().equals(HttpResponseStatus.FOUND)) {
                    httpResponse.headers().set(HttpHeaderNames.LOCATION, "https://www.baidu.com/");
                }
                httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");
                httpResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, httpResponse.content().readableBytes());
                if (keepaLive) {
                    httpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
                    ctx.writeAndFlush(httpResponse);
                } else {
                    ctx.writeAndFlush(httpResponse).addListener(ChannelFutureListener.CLOSE);
                }
            }
        }
    
    }

    netty channelRead和channelReadComplete

    如果执行后者没有执行前者,可能原因

    你是不是在添加这个handler之前还添加了消息处理的handler,如lineBasedFrameDecoder或者FixLengthFramDecoder等,这样的话当消息没有到结束标志时,会进到complete方法里,到达消息的结束标志,才会调用read方

    https://www.zhihu.com/question/52908974/answer/132831480

    Although shim_ claims to have determined that the problem is not sending " " with each message, I see the need to provide a clarification here.

    In ChatClientInitializer.java you see a handler named framer which is a DelimiterBasedFrameDecoder object with a line delimiter in that specific example (More info on DelimiterBasedFrameDecoder can be found here).

    So, that means the client is expecting messages that ends with " " on that specific channel. If a message not ending with " " is received, then the channelRead() method will not be provoked.

    https://stackoverflow.com/questions/21334559/netty-channelread-never-called/21342263#21342263

    channelReadComplete is NOT called after each channelRead. The netty event loop will read from NIO socket and fire multiple channelRead until no more data to read or it should give up, then channelReadComplete is fired.

    另外一种Http方式

    Server

    package club.test;
    import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
    import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.FullHttpResponse;
    import io.netty.handler.codec.http.HttpContent;
    import io.netty.handler.codec.http.HttpHeaders;
    import io.netty.handler.codec.http.HttpHeaders.Values;
    import io.netty.handler.codec.http.HttpRequest;
    import io.netty.handler.codec.http.HttpRequestDecoder;
    import io.netty.handler.codec.http.HttpResponseEncoder;
    
    public class HttpServer {
        public void start(int port) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap(); // (2)
                b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
                        .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                                    @Override
                                    public void initChannel(SocketChannel ch) throws Exception {
                                        // server端发送的是httpResponse,所以要使用HttpResponseEncoder进行编码
                                        ch.pipeline().addLast(new HttpResponseEncoder());
                                        // server端接收到的是httpRequest,所以要使用HttpRequestDecoder进行解码
                                        ch.pipeline().addLast(new HttpRequestDecoder());
                                        ch.pipeline().addLast(new HttpServerInboundHandler());
                                    }
                                }).option(ChannelOption.SO_BACKLOG, 128) // (5)
                        .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
                ChannelFuture f = b.bind(port).sync(); // (7)
    
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            HttpServer server = new HttpServer();
            server.start(8000);
        }
    }
    class HttpServerInboundHandler extends ChannelInboundHandlerAdapter {
        //private static Logger    logger    = LoggerFactory.getLogger(HttpServerInboundHandler.class);
        private ByteBufToBytes reader;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof HttpRequest) {
                HttpRequest request = (HttpRequest) msg;
                System.out.println("messageType:" + request.headers().get("messageType"));
                System.out.println("businessType:" + request.headers().get("businessType"));
                if (HttpHeaders.isContentLengthSet(request)) {
                    reader = new ByteBufToBytes((int) HttpHeaders.getContentLength(request));
                }
            }
    
            if (msg instanceof HttpContent) {
                HttpContent httpContent = (HttpContent) msg;
                ByteBuf content = httpContent.content();
                reader.reading(content);
                content.release();
    
                if (reader.isEnd()) {
                    String resultStr = new String(reader.readFull());
                    System.out.println("Client said:" + resultStr);
    
                    FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer("I am ok"
                            .getBytes()));
                    response.headers().set(CONTENT_TYPE, "text/plain");
                    response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
                    response.headers().set(CONNECTION, Values.KEEP_ALIVE);
                    ctx.write(response);
                    ctx.flush();
                }
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            //logger.info("HttpServerInboundHandler.channelReadComplete");
            ctx.flush();
        }
    
    }
    class ByteBufToBytes {
        private ByteBuf    temp;
    
        private boolean    end    = true;
    
        public ByteBufToBytes(int length) {
            temp = Unpooled.buffer(length);
        }
    
        public void reading(ByteBuf datas) {
            datas.readBytes(temp, datas.readableBytes());
            if (this.temp.writableBytes() != 0) {
                end = false;
            } else {
                end = true;
            }
        }
    
        public boolean isEnd() {
            return end;
        }
    
        public byte[] readFull() {
            if (end) {
                byte[] contentByte = new byte[this.temp.readableBytes()];
                this.temp.readBytes(contentByte);
                this.temp.release();
                return contentByte;
            } else {
                return null;
            }
        }
    
        public byte[] read(ByteBuf datas) {
            byte[] bytes = new byte[datas.readableBytes()];
            datas.readBytes(bytes);
            return bytes;
        }
    }

    Client代码

    package club.test;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.http.DefaultFullHttpRequest;
    import io.netty.handler.codec.http.HttpContent;
    import io.netty.handler.codec.http.HttpHeaders;
    import io.netty.handler.codec.http.HttpMethod;
    import io.netty.handler.codec.http.HttpRequestEncoder;
    import io.netty.handler.codec.http.HttpResponse;
    import io.netty.handler.codec.http.HttpResponseDecoder;
    import io.netty.handler.codec.http.HttpVersion;
    
    import java.net.URI;
    
    public class HttpClient {
        public void connect(String host, int port) throws Exception {
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                Bootstrap b = new Bootstrap(); // (1)
                b.group(workerGroup); // (2)
                b.channel(NioSocketChannel.class); // (3)
                b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
                b.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
                        ch.pipeline().addLast(new HttpResponseDecoder());
                        // 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
                        ch.pipeline().addLast(new HttpRequestEncoder());
                        ch.pipeline().addLast(new HttpClientInboundHandler());
                    }
                });
    
                // Start the client.
                ChannelFuture f = b.connect(host, port).sync(); // (5)
    
                URI uri = new URI("http://127.0.0.1:8000");
                String msg = "Are you ok?";
                DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST,
                        uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes()));
    
                // 构建http请求
                request.headers().set(HttpHeaders.Names.HOST, host);
                request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
                request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes());
                request.headers().set("messageType", "normal");
                request.headers().set("businessType", "testServerState");
                // 发送http请求
                f.channel().write(request);
                f.channel().flush();
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
            }
    
        }
    
        public static void main(String[] args) throws Exception {
            HttpClient client = new HttpClient();
            client.connect("127.0.0.1", 8000);
        }
    }
    class HttpClientInboundHandler extends ChannelInboundHandlerAdapter {
        private ByteBufToBytes reader;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof HttpResponse) {
                HttpResponse response = (HttpResponse) msg;
                System.out.println("CONTENT_TYPE:" + response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
                if (HttpHeaders.isContentLengthSet(response)) {
                    reader = new ByteBufToBytes((int) HttpHeaders.getContentLength(response));
                }
            }
    
            if (msg instanceof HttpContent) {
                HttpContent httpContent = (HttpContent) msg;
                ByteBuf content = httpContent.content();
                reader.reading(content);
                content.release();
    
                if (reader.isEnd()) {
                    String resultStr = new String(reader.readFull());
                    System.out.println("Server said:" + resultStr);
                    
                    ctx.close();
                }
            }
        }
    
    }
    class ByteBufToBytes {
        private ByteBuf    temp;
    
        private boolean    end    = true;
    
        public ByteBufToBytes(int length) {
            temp = Unpooled.buffer(length);
        }
    
        public void reading(ByteBuf datas) {
            datas.readBytes(temp, datas.readableBytes());
            if (this.temp.writableBytes() != 0) {
                end = false;
            } else {
                end = true;
            }
        }
    
        public boolean isEnd() {
            return end;
        }
    
        public byte[] readFull() {
            if (end) {
                byte[] contentByte = new byte[this.temp.readableBytes()];
                this.temp.readBytes(contentByte);
                this.temp.release();
                return contentByte;
            } else {
                return null;
            }
        }
    
        public byte[] read(ByteBuf datas) {
            byte[] bytes = new byte[datas.readableBytes()];
            datas.readBytes(bytes);
            return bytes;
        }
    }
  • 相关阅读:
    Dockerfile 构建前端node应用并用shell脚本实现jenkins自动构建
    Nginx自定义404页面
    shell脚本逐个杀死k8s中某个应用的pod
    阿里云容器镜像加速器配置
    jenkins shell脚本自动化构建阿里云k8s上应用
    在容器服务kubernetes上配置https
    docker build 指定dockerfile
    centos 7 系统启动不了 出现报错dependency failed for /mnt , dependency failed for local file systems
    向Kubernetes集群删除Node
    SharePoint 2013 关于自定义显示列表表单的bug
  • 原文地址:https://www.cnblogs.com/legion/p/8670850.html
Copyright © 2011-2022 走看看