zoukankan      html  css  js  c++  java
  • netty入门

    一 Netty核心组件介绍

    1.1、 channel

    channel 是一个通道,我们通常说其是一个NIO的构造

    1.2、回调

    回调本质是一个方法,方法中的参数指向另一个方法的引用;

    1.3 、Futrure

    通知机制,当方法执行结束时会发一个通知消息;

    1.4ChannelHandler

    通道处理事件,即一般就是我们的处理业务逻辑的地方;常用的通道处理类 ChannelInboundHandler,SimpleChannelInboundHandler,ChannelHandlerAdapter;不同的通道处理类适配不同的适配器;如下图的处理类或者适配器就是我们常用的类;

    二 入门应用

    首先需要引入 netty依赖

           <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.55.Final</version>
            </dependency>
    

    2.1、 服务端

    1. 首选 我们 需要创建一个reactor模型的线程组,这里我们选择的是NIO异步线程
    2. 其次我们创建一个服务引导类,即服务端辅助启动器ServerBootstrap;在 ServerBootstrap 我们添加线程组NioEventLoopGroup和事件处理ChildChannelHandler;然后将 ServerBootstrap 绑定方法传入的参数 端口,执行 sync同步阻塞;
    3. 等待同步阻塞完成后,我们调用通道的closeFuture方法和sync将 线程阻塞,直到 处理器执行完成;
    4. 最后调用线程组的shutdownGracefully 方法释放资源;
    public void bind(int port) throws Exception{
    
            // 配置线程组 实质是 reactor线程组
            NioEventLoopGroup parentGroup = new NioEventLoopGroup();
            // 启动 NIO
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 启动类
            serverBootstrap.group(parentGroup)
                    .channel(NioServerSocketChannel.class)// 相当于 ServerSocketChannel
                    .option(ChannelOption.SO_BACKLOG,1024)//TCP参数
                    .childHandler(new ChildChannelHandler());// 处理事件
            // 绑定端口 同步阻塞等待同步成功 channelFuture 异步操作通知回调
            ChannelFuture channelFuture = serverBootstrap
                    .bind(port)
                    .sync();
            // 同步阻塞等待服务监听端口关闭
            channelFuture
                    .channel()
                    .closeFuture()
                    .sync();
            // 关闭资源
            parentGroup.shutdownGracefully();
    
        }
    

    其中的关键就是 new ChildChannelHandler(), 当来一个端口时就会新建一个处理类,保证了监听多个端口的可能性;

        /**
         * @Author lsc
         * <p>通道初始化 </p>
         * @Param
         * @Return
         */
        private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                //管道(Pipeline)持有某个通道的全部处理器
                ChannelPipeline pipeline = socketChannel.pipeline();
                // 添加处理器
                pipeline.addLast(new NettyServerHandler());
            }
        }
    

    然后我们看下处理器 ChildChannelHandler,核心方法有三个

    1. channelRead,读取通道的消息;
    2. channelReadComplete 读取消息完毕后,执行的回调;
    3. exceptionCaught 异常出现执行的回调;
    @Slf4j
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 转为字节缓冲区
            ByteBuf buf = (ByteBuf)msg;
            // 字节数组
            byte[] bytes = new byte[buf.readableBytes()];
            // 缓冲区数据读入字节数组
            buf.readBytes(bytes);
            // 编码转为字符串
            String body = (new String(bytes, "UTF-8"));
            System.out.println(" get the data from client : " + body);
            // 构造响应数据
            String responseData = "那天刚刚好遇见你";
            //  数据写入缓冲区
            ByteBuf resp = Unpooled.copiedBuffer(responseData.getBytes());
            // 写入数据响应
            ChannelFuture channelFuture = ctx.writeAndFlush(resp);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // 写入 seocketChannel
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // 异常关闭资源句柄
            ctx.close();
        }
    }
    

    2.2 、客户端

    1. 客户端我们也是创建 NIO线程组 NioEventLoopGroup;
    2. 使用 Bootstrap 进行辅助启动,在通道初始化的时候传入处理器 NettyClientHandler;
    3. bootstrap 连接时不止绑定了端口,还要绑定ip;
    4. 最后阻塞处理器执行完成后关闭资源
        public void connect(int port, String host) throws InterruptedException {
    
            // 创建线程组
            NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
            // netty启动辅助类
            Bootstrap bootstrap = new Bootstrap();
            //
            bootstrap.group(nioEventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 处理IO事件
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            // 异步操作
            ChannelFuture connect = bootstrap.connect(host, port).sync();
            // 关闭客户端
            connect.channel().closeFuture().sync();
            // 退出线程组
            nioEventLoopGroup.shutdownGracefully();
        }
    

    我们再来看下处理器NettyServerHandler

    处理器的构造都是大同小异

    1. exceptionCaught 异常回调;
    2. channelActive 当与服务端连接成功后被调用的回调;
    3. channelRead: 通道读取消息的回调;
    @Slf4j
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    public NettyClientHandler() {
            super();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.warn("Unexpected exception from downstream : [{}]" ,cause.getMessage());
        }
    
        /* *
         * @Author lsc
         * <p>触发回调 </p>
         * @Param [ctx]
         * @Return void
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            byte[] bytes = "关注公众号知识追寻者回复netty获取本教程源码".getBytes();
            // 创建节字缓冲区
            ByteBuf message = Unpooled.buffer(bytes.length);
            // 将数据写入缓冲区
            message.writeBytes(bytes);
            // 写入数据
            ctx.writeAndFlush(message);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 消息转为 字节缓冲区
            ByteBuf buf = (ByteBuf)msg;
            // 创建字节数组
            byte[] bytes = new byte[buf.readableBytes()];
            // 获得响应的数据写入字节数组
            buf.readBytes(bytes);
            // 字节数组转为字符串a
            String body = new String(bytes, "UTF-8");
            // 打印
            System.out.println("get the data from server: "+body);
        }
    
    }
    

    2.3 测试

    启动服务端,监听8080端口

        public static void main(String[] args) throws Exception {
            NettyServer nettyServer = new NettyServer();
            // 连接的ip d端口
            nettyServer.bind(8080);
        }
    

    服务端启动成功

    启动客户端,连接服务端,绑定监听端口

        public static void main(String[] args) throws Exception {
            NettyClient nettyClient = new NettyClient();
            // 连接的ip d端口
            nettyClient.connect(8080,"127.0.0.1");
        }
    

    客户端启动成功,与服务端连接后会收到服务端发的消息

    由于服务端与客户端连接成功后,客户端会激活channelActive 方法,故 服务端也收到一条消息;

    想获取本套教程源码和后续内容 关注 公众号 知识追寻者 , 后台回复 netty 获取源码

  • 相关阅读:
    Python3爬虫系列:理论+实验+爬取妹子图实战
    虚机安装后无网卡、网卡驱动
    Linux运维工程师面试题整理
    睡眠或者重启windows,无法ssh连接或者pingVMware的虚机
    W10: Warning: Changing a readonly file使用vi/vim报错问题解决
    keyboard-interactive authentication with the ssh2 server failed 的SecureCRT报错解决
    公网访问内网实现(内网穿透)
    Linux内网时钟同步问题(ntp和chrony)
    xshell的快捷复制粘贴设置
    Linux中shell去除空行的几种方法
  • 原文地址:https://www.cnblogs.com/zszxz/p/14416425.html
Copyright © 2011-2022 走看看