zoukankan      html  css  js  c++  java
  • Netty

    Netty - 入门

    1. Netty开发环境的搭建

    使用maven构建项目:

    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty</artifactId>
      <version>3.9.4.Final</version>
    </dependency>
    

    2. Time程序的开发

    2.1. TimeServer开发

    TimeServer:

    public class TimeServer {
    
        public static void main(String[] args) {
            int port = 8080;
            if (args != null && args.length > 0) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    //
                }
            }
            new TimeServer().bind(port);
        }
    
        private void bind(int port) {
            //配置服务端的NIO线程组
            /*
            Reactor线程组
            bossGroup: 服务端接受客户端连接
            workerGroup: 进行SocketChannel的网络读写
             */
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                //ServerBootstarp: 用于启动NIO服务端的辅助启动类
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup);
                /*
                创建的Channel伪NioServerSocketChannel
                对应JDK NIO中的ServerSocketChannel
                 */
                b.channel(NioServerSocketChannel.class);
                b.option(ChannelOption.SO_BACKLOG, 1024);
                //IO事件处理类
                b.childHandler(new ChildChannelHandler());
                //绑定端口,同步等待成功
                ChannelFuture f = b.bind(port).sync();
                //等待服务端监听端口关闭
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
            }
        }
    
        private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    
            @Override
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new TimeServerHandler());
            }
        }
    
    }
    

    TimeServerHandler:

    public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            //将消息发送队列中的消息写入到SocketChannel中发送给对方
            /**
             * 调用write()方法只是把待发送的消息放到发送缓冲数组中,
             * 调用flush()方法,将发送缓冲区中的消息全部写到SocketChannel中
             */
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            /*
            ByteBuf类似JDK中的java.nio.ByteBuffer
            ByteBuf.readableBytes: 获取缓冲区可读的字节数
            ByteBuf.readBytes: 将缓冲区的字节数组复制到新建的byte数组中
             */
            //将msg转换为Netty的ByteBuf对象
            ByteBuf buf = (ByteBuf) msg;
            //创建一个字节数组
            byte[] req = new byte[buf.readableBytes()];
            //将buf中的内容复制到字节数组
            buf.readBytes(req);
            //转换为字符串,获取到请求消息
            String body = new String(req, "UTF-8");
            System.out.println("The time server receive order: " + body);
            String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
            //创建应答消息
            ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
            //异步发送应答消息给客户端
            ctx.write(resp);
        }
    }
    

    2.2. TimeClient开发

    TimeClient:

    public class TimeClient {
    
        public void connect(int port, String host) throws Exception{
            //配置客户端NIO线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group);
                b.channel(NioSocketChannel.class);
                b.option(ChannelOption.TCP_NODELAY, true);
                b.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new TimeClientHandler());
                    }
                });
                //发起异步连接请求
                ChannelFuture f = b.connect(host, port).sync();
                //等待客户端链路关闭
                f.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (args != null && args.length > 0){
                try {
                    port = Integer.valueOf(args[0]);
                }catch (NumberFormatException e){
                    // 采用默认值
                }
            }
            new TimeClient().connect(port, "127.0.0.1");
        }
    
    }
    

    TimeClientHandler:

    public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    
        private final ByteBuf firstMessage;
    
        public TimeClientHandler() {
            byte[] req = "QUERY TIME ORDER".getBytes();
            firstMessage = Unpooled.buffer(req.length);
            firstMessage.writeBytes(req);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(firstMessage);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            System.out.println("Now is: " + body);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            //释放资源
            System.out.println("Unexpected exception from downstram: " + cause.getMessage());
            ctx.close();
        }
    }
    

    3. 总结

    • Server端

      1. 创建一个Reactor线程组

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
      2. 创建一个用于启动NIO服务的辅助启动类

        ServerBootstrap b = new ServerBootstrap();
        
      3. 设置参数

        // 设置主从“线程池”
        b.group(bossGroup, workerGroup);
        // 指定Channel通道的类型
        b.channel(NioServerSocketChannel.class);
        // 设置一些参数
        b.option(ChannelOption.SO_BACKLOG, 1024);
        // 设置子通道SocketChannel的处理类
        b.childHandler(new ChildChannelHandler());
        
      4. 绑定并监听某个端口

        ChannelFuture f = b.bind(port).sync();
        
      5. 端口关闭,服务停止

        f.channel().closeFuture().sync();
        
    • Client端

      1. 创建一个Reactor线程组

        EventLoopGroup group = new NioEventLoopGroup();
        
      2. 创建一个辅助启动类实例

        Bootstrap b = new Bootstrap();
        
      3. 设置参数

        //初始化线程池
        b.group(group);
        //指定Channel通道的类型
        b.channel(NioSocketChannel.class);
        //设置一些参数
        b.option(ChannelOption.TCP_NODELAY, true);
        //设置SocketChannel的处理器
        b.handler(new ChannelInitializer<SocketChannel>() {
        	@Override
        	protected void initChannel(SocketChannel ch) throws Exception {
        		ch.pipeline().addLast(new TimeClientHandler());
        	}
        });
        
      4. 发起一个异步连接请求,连接指定的服务地址

        ChannelFuture f = b.connect(host, port).sync();
        
      5. 端口关闭,服务停止

        f.channel().closeFuture().sync();
        

    从上面的一些分析可以看出,服务端和客户端最基础的配置都是5个步骤。先创建一个线程组EventLoopGroup和辅助启动类(服务端为ServerBootstrap,客户端为Bootstrap),接着设置一系列的参数:线程池、通道类型、通道参数、处理器等,最后启动服务,服务端为bind()监听,客户端为connect()连接。

    具体的处理类需要继承ChannelHandlerAdapter,这里存在一个问题,《Netty权威指南》第2版使用的Netty版本为5.x,但现在已经废弃,所以使用的是4.x的版本。书中原来继承的是ChannelHandlerAdapter,在4.x的版本中这个接口并没有channelRead方法,所以用channelInboundHandlerAdapter代替。

    TimeClientHandler中,当客户端和服务端TCP链路建立成功后,NIO线程会调用channelActive方法,发送内容给服务端。当服务端返回应答消息时,会调用channelRead方法。

    channelRead中,用到了一个很重要的Netty中的类:ByteBuf。ByteBuf是一个很好的经过优化的数据容器。ByteBuf有两部分:一个用于读,一个用于写。我们可以按顺序读取数据,并且可以跳到开始再读一遍。具体的内容可以参考这个博文:Netty 缓存buffer介绍及使用

  • 相关阅读:
    【Azure 应用服务】在Azure App Service多实例的情况下,如何在应用中通过代码获取到实例名(Instance ID)呢?
    【Azure 应用服务】App Service For Windows 中如何设置代理实现前端静态文件和后端Java Spring Boot Jar包
    【Azure Developer】使用Azure Key Vault 的Key签名后,离线验证的一些参考资料
    【Azure Function】调试 VS Code Javascript Function本地不能运行,报错 Value cannot be null. (Parameter 'provider')问题
    【Azure 应用服务】App Service 使用Tomcat运行Java应用,如何设置前端网页缓存的相应参数呢(Xms512m Xmx1204m)?
    【Azure API 管理】APIM添加Logtoeventhub的策略后,一些相关APIM与Event Hub的问题
    【Azure API 管理】为调用APIM的请求启用Trace 调试APIM Policy的利器
    【Azure 事件中心】China Azure上是否有Kafka服务简答
    【Azure 应用服务】探索在Azure上设置禁止任何人访问App Service的默认域名(Default URL)
    【Azure 微服务】记一次错误的更新Service Fabric 证书而引发的集群崩溃而只能重建
  • 原文地址:https://www.cnblogs.com/yisany/p/10141956.html
Copyright © 2011-2022 走看看