zoukankan      html  css  js  c++  java
  • Netty搭建服务端的简单应用

    Netty简介

    Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
    也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
    “快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

    说明:本文只介绍netty框架最基本的应用,而且是每次客户端请求完毕会关闭连接,后续会写一篇客户端先与服务端建立连接,然后一条条发送数据,发送完毕主动关闭连接的博客。

    Netty搭建WebSocket服务端

    Netty服务端

    1.引入依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.9.RELEASE</version> <!-- 我这里用的1.5.9 -->
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.blaze</groupId>
        <artifactId>netty-demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>netty-demo</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-thymeleaf</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.4</version>
            </dependency>
    
            <!--fastjson-->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.50</version>
            </dependency>
    
            <!--netty依赖-->
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.43.Final</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <configuration>
                        <mainClass>com.blaze.nettydemo.server.NettyServer</mainClass>
                    </configuration>
                </plugin>
            </plugins>
            <finalName>netty-demo</finalName>
        </build>
    
    </project>

    2.Netty服务端

    NettyServer

    package com.blaze.nettydemo.server;
    
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import org.springframework.stereotype.Component;
    
    /**
     * create by zy 2019/11/15 9:14
     * TODO
     */
    @Component
    public class NettyServer {
        public static void main(String[] args) {
            int port = 9898;
            new NettyServer().bind(port);
        }
    
        public void bind(int port) {
            /**
             * interface EventLoopGroup extends EventExecutorGroup extends ScheduledExecutorService extends ExecutorService
             * 配置服务端的 NIO 线程池,用于网络事件处理,实质上他们就是 Reactor 线程组
             * bossGroup 用于服务端接受客户端连接,workerGroup 用于进行 SocketChannel 网络读写
             */
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                /**
                 * ServerBootstrap 是 Netty 用于启动 NIO 服务端的辅助启动类,用于降低开发难度
                 */
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChildChannelHandler());
    
                /**服务器启动辅助类配置完成后,调用 bind 方法绑定监听端口,调用 sync 方法同步等待绑定操作完成*/
                ChannelFuture f = b.bind(port).sync();
    
                System.out.println(Thread.currentThread().getName() + ",服务器开始监听端口,等待客户端连接.........");
    
                /**下面会进行阻塞,等待服务器连接关闭之后 main 方法退出,程序结束*/
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                /**优雅退出,释放线程池资源*/
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        /**
         * 初始化连接
         */
        @Component
        private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
            @Override
            public void initChannel(SocketChannel socketChannel) throws Exception {
                /**
                 * 设置 netty 服务端的 handler
                 */
                socketChannel.pipeline().addLast(new NettyServerHandler());
    
                /**
                 * 如果使用 netty 搭建 http 服务端,则用下面三个设置代替上面一个设置
                 */
                //socketChannel.pipeline().addLast(new HttpServerCodec());// http 编解码
                //socketChannel.pipeline().addLast("httpAggregator", new HttpObjectAggregator(512 * 1024)); // http 消息聚合器
                //socketChannel.pipeline().addLast(new HttpServerHandler());
            }
        }
    }

    NettyServerHandler

    package com.blaze.nettydemo.server;
    
    import com.alibaba.fastjson.JSON;
    import com.blaze.nettydemo.model.RequestModel;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import org.springframework.stereotype.Component;
    
    /**
     * create by zy 2019/11/15 10:06
     * TODO
     */
    @Component
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        /**
         * 收到客户端消息,自动触发
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            /**
             * 将 msg 转为 Netty 的 ByteBuf 对象,类似 JDK 中的 java.nio.ByteBuffer,不过 ButeBuf 功能更强,更灵活
             */
            ByteBuf buf = (ByteBuf) msg;
            /**
             * readableBytes:获取缓冲区可读字节数,然后创建字节数组
             * 从而避免了像 java.nio.ByteBuffer 时,只能盲目的创建特定大小的字节数组,比如 1024
             */
            byte[] reg = new byte[buf.readableBytes()];
            /**
             * readBytes:将缓冲区字节数组复制到新建的 byte 数组中
             * 然后将字节数组转为字符串
             */
            buf.readBytes(reg);
            String body = new String(reg, "UTF-8");
            System.out.println(Thread.currentThread().getName() + ",The server receive  order : " + body);
    
            String respMsg = "I am Server, success!";
    
            /**
             * 业务处理代码 此处省略
             * ......
             */
    
    
            /**
             * 回复消息
             * copiedBuffer:创建一个新的缓冲区,内容为里面的参数
             * 通过 ChannelHandlerContext 的 write 方法将消息异步发送给客户端
             */
            ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes());
            ctx.write(respByteBuf);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            /**
             * flush:将消息发送队列中的消息写入到 SocketChannel 中发送给对方,为了频繁的唤醒 Selector 进行消息发送
             * Netty 的 write 方法并不直接将消息写如 SocketChannel 中,调用 write 只是把待发送的消息放到发送缓存数组中,再通过调用 flush
             * 方法,将发送缓冲区的消息全部写入到 SocketChannel 中
             */
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            /**当发生异常时,关闭 ChannelHandlerContext,释放和它相关联的句柄等资源 */
            ctx.close();
    
        }
    }

    3.Netty客户端

    NettyClient

    package com.blaze.nettydemo.client;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import org.springframework.stereotype.Component;
    
    /**
     * create by zy 2019/11/15 10:08
     * TODO
     */
    @Component
    public class NettyClient {
        /**
         * 使用 3 个线程模拟三个客户端
         */
        public static void main(String[] args) {
            for (int i = 0; i < 3; i++) {
                new Thread(new MyThread()).start();
            }
        }
    
        static class MyThread implements Runnable {
            /**服务端 ip 及端口*/
            @Override
            public void run() {
                connect("193.168.19.25", 9898);
            }
    
            public void connect(String host, int port) {
                /**配置客户端 NIO 线程组/池*/
                EventLoopGroup group = new NioEventLoopGroup();
                try {
                    /**
                     * Bootstrap 与 ServerBootstrap 都继承(extends)于 AbstractBootstrap
                     * 创建客户端辅助启动类,并对其配置,与服务器稍微不同,这里的 Channel 设置为 NioSocketChannel
                     * 然后为其添加 Handler,这里直接使用匿名内部类,实现 initChannel 方法
                     * 作用是当创建 NioSocketChannel 成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件
                     */
                    Bootstrap b = new Bootstrap();
                    b.group(group).channel(NioSocketChannel.class)
                            .option(ChannelOption.TCP_NODELAY, true)
                            .handler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                public void initChannel(SocketChannel ch) throws Exception {
                                    ch.pipeline().addLast(new NettyClientHandler());
                                }
                            });
    
                    /**connect:发起异步连接操作,调用同步方法 sync 等待连接成功*/
                    ChannelFuture channelFuture = b.connect(host, port).sync();
                    System.out.println(Thread.currentThread().getName() + ",客户端发起异步连接..........");
    
                    /**等待客户端链路关闭*/
                    channelFuture.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    /**优雅退出,释放NIO线程组*/
                    group.shutdownGracefully();
                }
            }
    
        }
    }

    NettyClientHandler

    package com.blaze.nettydemo.client;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import org.springframework.stereotype.Component;
    
    import java.util.logging.Logger;
    
    
    /**
     * create by zy 2019/11/15 10:09
     * TODO
     */
    @Component
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
        private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
        /**
         * 当客户端和服务端 TCP 链路建立成功之后,Netty 的 NIO 线程会调用 channelActive 方法
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            String reqMsg = "客户端请求服务端发送的数据";
    
            byte[] reqMsgByte = reqMsg.getBytes("UTF-8");
            ByteBuf reqByteBuf = Unpooled.buffer(reqMsgByte.length);
            /**
             * writeBytes:将指定的源数组的数据传输到缓冲区
             * 调用 ChannelHandlerContext 的 writeAndFlush 方法将消息发送给服务器
             */
            reqByteBuf.writeBytes(reqMsgByte);
            ctx.writeAndFlush(reqByteBuf);
        }
    
        /**
         * 当服务端返回应答消息时,channelRead 方法被调用,从 Netty 的 ByteBuf 中读取并打印应答消息
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            System.out.println(Thread.currentThread().getName() + ",Server return Message:" + body);
            ctx.close();
        }
    
        /**
         * 当发生异常时,打印异常 日志,释放客户端资源
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            /**释放资源*/
            logger.warning("Unexpected exception from downstream : " + cause.getMessage());
            ctx.close();
    
        }
    }

    4.Netty Http服务端

    NettyServer

    package com.blaze.nettydemo.server;
    
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import org.springframework.stereotype.Component;
    
    /**
     * create by zy 2019/11/15 9:14
     * TODO
     */
    @Component
    public class NettyServer {
        public static void main(String[] args) {
            int port = 9898;
            new NettyServer().bind(port);
        }
    
        public void bind(int port) {
            /**
             * interface EventLoopGroup extends EventExecutorGroup extends ScheduledExecutorService extends ExecutorService
             * 配置服务端的 NIO 线程池,用于网络事件处理,实质上他们就是 Reactor 线程组
             * bossGroup 用于服务端接受客户端连接,workerGroup 用于进行 SocketChannel 网络读写
             */
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                /**
                 * ServerBootstrap 是 Netty 用于启动 NIO 服务端的辅助启动类,用于降低开发难度
                 */
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChildChannelHandler());
    
                /**服务器启动辅助类配置完成后,调用 bind 方法绑定监听端口,调用 sync 方法同步等待绑定操作完成*/
                ChannelFuture f = b.bind(port).sync();
    
                System.out.println(Thread.currentThread().getName() + ",服务器开始监听端口,等待客户端连接.........");
    
                /**下面会进行阻塞,等待服务器连接关闭之后 main 方法退出,程序结束*/
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                /**优雅退出,释放线程池资源*/
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        /**
         * 初始化连接
         */
        @Component
        private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
            @Override
            public void initChannel(SocketChannel socketChannel) throws Exception {
                /**
                 * 设置 netty 服务端的 handler
                 */
                //socketChannel.pipeline().addLast(new NettyServerHandler());
    
                /**
                 * 如果使用 netty 搭建 http 服务端,则用下面三个设置代替上面一个设置
                 */
                socketChannel.pipeline().addLast(new HttpServerCodec());// http 编解码
                socketChannel.pipeline().addLast("httpAggregator", new HttpObjectAggregator(512 * 1024)); // http 消息聚合器
                socketChannel.pipeline().addLast(new HttpServerHandler());
            }
        }
    }

    HttpServerHandler

    package com.blaze.nettydemo.server;
    
    import com.alibaba.fastjson.JSONObject;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.codec.http.*;
    import io.netty.util.CharsetUtil;
    import org.springframework.stereotype.Component;
    
    
    /**
     * create by zy 2019/11/19 9:21
     * TODO
     */
    @Component
    public class HttpServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            if (msg instanceof FullHttpRequest) {
                FullHttpRequest req = (FullHttpRequest) msg;
    
                try {
                    // 1.获取URI
                    String uri = req.uri();
                    System.out.println("uri:" + uri);
                    // 2.获取请求体
                    ByteBuf buf = req.content();
                    String content = buf.toString(CharsetUtil.UTF_8);
    
                    // 3.根据请求的方法uri不同处理不同的逻辑
                    Object rc = new Object();
                    switch (uri) {
                        case "/test1":
                            // ......
                            break;
                        case "/ltest2":
                            // ......
                            break;
                        default:
                            break;
                    }
                    // 4.返回结果
                    response(ctx, rc);
                } finally {
                    req.release();
                }
            }
        }
    
        private void response(ChannelHandlerContext ctx, Object c) {
    
            // 1.设置响应
            FullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                    HttpResponseStatus.OK,
                    Unpooled.copiedBuffer(JSONObject.toJSONString(c), CharsetUtil.UTF_8));
            resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
            // 2.发送
            // 注意必须在使用完之后,close channel
            ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
        }
    }

    Http客户端,使用postman请求服务端进行测试即可。

    本文参考:https://blog.csdn.net/wangmx1993328/article/details/83036285

  • 相关阅读:
    linux -- 基于zookeeper搭建yarn的HA高可用集群
    Linux -- 之HDFS实现自动切换HA(全新HDFS)
    Hadoop格式化 From hu-hadoop1/192.168.11.11 to hu-hadoop2:8485 failed on connection exception: java.net.
    Directory /home/hdfs/name is in an inconsistent state: storage directory does not exist or is not a
    react学习01
    单页面应用(spa)引入百度地图(Cannot read property 'dc' of undefined)
    npm 发布包
    Vue学习-01
    echarts3.0使用总结
    webpack学习--创建一个webpack打包流程
  • 原文地址:https://www.cnblogs.com/blazeZzz/p/11933389.html
Copyright © 2011-2022 走看看