zoukankan      html  css  js  c++  java
  • Netty——基本使用介绍

    参考:https://blog.csdn.net/haoyuyang/article/details/53243785

    1.为什么选择Netty

    上一篇文章我们已经了解了Socket通信(IO/NIO/AIO)编程,对于通信模型已经有了一个基本的认识。其实上一篇文章中,我们学习的仅仅是一个模型,如果想把这些真正的用于实际工作中,那么还需要不断的完善、扩展和优化。比如经典的TCP读包写包问题,或者是数据接收的大小,实际的通信处理与应答的处理逻辑等等一些细节问题需要认真的去思考,而这些都需要大量的时间和经历,以及丰富的经验。所以想学好Socket通信不是件容易事,那么接下来就来学习一下新的技术Netty,为什么会选择Netty?因为它简单!使用Netty不必编写复杂的逻辑代码去实现通信,再也不需要去考虑性能问题,不需要考虑编码问题,半包读写等问题。强大的Netty已经帮我们实现好了,我们只需要使用即可。

    Netty是最流行的NIO框架,它的健壮性、功能、性能、可定制性和可扩展性在同类框架都是首屈一指的。它已经得到成百上千的商业/商用项目验证,如Hadoop的RPC框架Avro、RocketMQ以及主流的分布式通信框架Dubbox等等。

    2.Netty简介

    Netty是基于Java NIO client-server的网络应用框架,使用Netty可以快速开发网络应用,例如服务器和客户端协议。Netty提供了一种新的方式来开发网络应用程序,这种新的方式使它很容易使用和具有很强的扩展性。Netty的内部实现是很复杂的,但是Netty提供了简单易用的API从网络处理代码中解耦业务逻辑。Netty是完全基于NIO实现的,所以整个Netty都是异步的。

    网络应用程序通常需要有较高的可扩展性,无论是Netty还是其他的基于Java Nio的框架,都会提供可扩展性的解决方案。Netty中一个关键组成部分是它的异步特性,本片文章将讨论同步(阻塞)和异步(非阻塞)的IO来说明为什么使用异步代码解决扩展性问题以及如何使用异步。
    3.Netty架构组成(借用一下网上的图片)

    Netty实现原理浅析,写的很不错,感兴趣的可以看一下。

    4.Helloworld入门

    在学习Netty之前,先来回顾一下NIO的通信步骤:

    ①创建ServerSocketChannel,为其配置非阻塞模式。

    ②绑定监听,配置TCP参数,录入backlog大小等。

    ③创建一个独立的IO线程,用于轮询多路复用器Selector。

    ④创建Selector,将之前创建的ServerSocketChannel注册到Selector上,并设置监听标识位SelectionKey.OP_ACCEPT。

    ⑤启动IO线程,在循环体中执行Selector.select()方法,轮询就绪的通道。

    ⑥当轮询到处于就绪状态的通道时,需要进行操作位判断,如果是ACCEPT状态,说明是新的客户端接入,则调用accept方法接收新的客户端。

    ⑦设置新接入客户端的一些参数,如非阻塞,并将其继续注册到Selector上,设置监听标识位等。

    ⑧如果轮询的通道标识位是READ,则进行读取,构造Buffer对象等。

    ⑨更细节的问题还有数据没发送完成继续发送的问题......

    好啦,开始学习Netty了。先去http://netty.io/上下载所有的Netty包。

    Netty通信的步骤:

    ①创建两个NIO线程组,一个专门用于网络事件处理(接受客户端的连接),另一个则进行网络通信的读写。

    ②创建一个ServerBootstrap对象,配置Netty的一系列参数,例如接受传出数据的缓存大小等。

    ③创建一个用于实际处理数据的类ChannelInitializer,进行初始化的准备工作,比如设置接受传出数据的字符集、格式以及实际处理数据的接口。

    ④绑定端口,执行同步阻塞方法等待服务器端启动即可。

    强烈推荐读一读Netty官方翻译文档。

    好了,说了那么多,下面就来HelloWorld入门吧!

    服务器端:

    package com.zh.springboot_netty.netty;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    /*
    服务器端:
     */
    public class Server {
    
        private int port;
    
        public Server(int port) {
            this.port = port;
        }
    
        public void run() {
            EventLoopGroup bossGroup = new NioEventLoopGroup(); //用于处理服务器端接收客户端连接
            EventLoopGroup workerGroup = new NioEventLoopGroup(); //进行网络通信(读写)
            try {
                ServerBootstrap bootstrap = new ServerBootstrap(); //辅助工具类,用于服务器通道的一系列配置
                bootstrap.group(bossGroup, workerGroup) //绑定两个线程组
                        .channel(NioServerSocketChannel.class) //指定NIO的模式
                        .childHandler(new ChannelInitializer<SocketChannel>() { //配置具体的数据处理方式
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                System.out.println(Thread.currentThread().getName() + ",服务器初始化通道...");
                            /*    ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                                socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                                socketChannel.pipeline().addLast(new StringDecoder());*/
                                socketChannel.pipeline().addLast(new ServerHandler());
                            }
                        })
                        /**
                         * 对于ChannelOption.SO_BACKLOG的解释:
                         * 服务器端TCP内核维护有两个队列,我们称之为A、B队列。客户端向服务器端connect时,会发送带有SYN标志的包(第一次握手),服务器端
                         * 接收到客户端发送的SYN时,向客户端发送SYN ACK确认(第二次握手),此时TCP内核模块把客户端连接加入到A队列中,然后服务器接收到
                         * 客户端发送的ACK时(第三次握手),TCP内核模块把客户端连接从A队列移动到B队列,连接完成,应用程序的accept会返回。也就是说accept
                         * 从B队列中取出完成了三次握手的连接。
                         * A队列和B队列的长度之和就是backlog。当A、B队列的长度之和大于ChannelOption.SO_BACKLOG时,新的连接将会被TCP内核拒绝。
                         * 所以,如果backlog过小,可能会出现accept速度跟不上,A、B队列满了,导致新的客户端无法连接。要注意的是,backlog对程序支持的
                         * 连接数并无影响,backlog影响的只是还没有被accept取出的连接
                         */
                        .option(ChannelOption.SO_BACKLOG, 128) //设置TCP缓冲区
                        .option(ChannelOption.SO_SNDBUF, 32 * 1024) //设置发送数据缓冲大小
                        .option(ChannelOption.SO_RCVBUF, 32 * 1024) //设置接受数据缓冲大小
                        .childOption(ChannelOption.SO_KEEPALIVE, true); //保持连接
                ChannelFuture future = bootstrap.bind(port).sync();
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) {
            new Server(8379).run();
        }
    }

    ServerHandler类:

    package com.zh.springboot_netty.netty;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class ServerHandler  extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            //do something msg
            ByteBuf buf = (ByteBuf)msg;
            byte[] data = new byte[buf.readableBytes()];
            buf.readBytes(data);
            String request = new String(data, "utf-8");
            System.out.println("Server: " + request);
            //写给客户端
            String response = "我是反馈的信息";
            ctx.writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
            //.addListener(ChannelFutureListener.CLOSE);
    
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }

    客户端:

    package com.zh.springboot_netty.netty;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    
    /*
    客户端
     */
    public class Client {
    
        public static void main(String[] args) throws InterruptedException {
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            System.out.println(Thread.currentThread().getName() + ",客户端初始化管道...");
                          /*  ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
                            socketChannel.pipeline().addLast(new StringDecoder());*/
                            socketChannel.pipeline().addLast(new ClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect("127.0.0.1", 8379).sync();
            future.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
            future.channel().closeFuture().sync();
            workerGroup.shutdownGracefully();
        }
    
    }

    ClientHandler类:

    package com.zh.springboot_netty.netty;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                ByteBuf buf = (ByteBuf) msg;
                byte[] data = new byte[buf.readableBytes()];
                buf.readBytes(data);
                System.out.println("Client:" + new String(data).trim());
               /* String str = (String) msg;
                System.out.println(str);*/
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }

    运行结果:

     

  • 相关阅读:
    整合Grafana
    Prometheus环境搭建
    RocketMQ单机部署
    记二进制搭建k8s集群完成后,部署时容器一直在创建中的问题
    接口重复提交解决方案
    记一次生产环境nginx图片上传不了的问题
    怎么进行中间件的学习
    MongoDB学习笔记之文档
    MongoDB学习笔记
    根据端口杀掉指定进程
  • 原文地址:https://www.cnblogs.com/nongzihong/p/11851369.html
Copyright © 2011-2022 走看看