zoukankan      html  css  js  c++  java
  • Netty 入门-TCP服务

    简单编写一个案例实现:

    (1) 服务器端启动监听6666端口, 收到客户端信息之后打印下客户端信息且回传一段服务器端的信息

    (2) 客户端启动之后连接到6666端口,且发送一段信息,收到服务器端信息之后打印下服务器发送的信息

    1. 代码实现

    0. pom

            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.11.Final</version>
            </dependency>

    logback.xml 注意是关闭日志

    <configuration>
        <appender name="consoleLog" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>
                    %msg%n
                </pattern>
            </encoder>
        </appender>
    
        <root level="error">
            <appender-ref ref="consoleLog"></appender-ref>
        </root>
    </configuration>

    1. 服务器端程序

    1. Server

    package netty;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class NettyServer {
    
        public static void main(String[] args) throws InterruptedException {
            // 1. 创建bossGrou好eworkerGroup
            // bossGroup只负责连接请求,其他交给workerGroup, 两个都是无线循环
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            // 创建服务器端启动对象用于设置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 使用链式编程设置参数
            bootstrap.group(bossGroup, workerGroup)// 设置两个组
                    .channel(NioServerSocketChannel.class) // 设置服务器的通道
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 设置通道测试对象(匿名对象)
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 给pipeline添加一个handler
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    });
    
            System.out.println("服务端is ok。。。");
    
            // 启动服务器并绑定端口。绑定一个端口并且同步,生成一个ChannelFuture对象
            ChannelFuture channelFuture = bootstrap.bind(6666).sync();
            // 对关闭通道进行监控
            channelFuture.channel().closeFuture().sync();
        }
    }

    2. Handler

    package netty;
    
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    /**
     * 自定义服务器端处理handler,需要继承netty定义的ChannelInboundHandlerAdapter 类
     */
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 读取事件
         *
         * @param ctx 上下文对象,含有pipeline管道;通道channel;地址address等信息
         * @param msg 客户端发送的数据(实际类型是ByteBuf - netty 封装的ByteBuffer)
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("ctx = " + ctx);
            // 强转为netty的ByteBuffer(实际就是包装的ByteBuffer)
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("客户端发送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
            System.out.println("客户端地址:" + ctx.channel().remoteAddress());
        }
    
        /**
         * 数据读取完毕事件
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // 将数据写到客户端(write + flush)
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!", CharsetUtil.UTF_8));
        }
    
        /**
         * 发生异常事件
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    2. 客户端程序

    client

    package netty;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class NettyClient {
    
        public static void main(String[] args) throws InterruptedException {
            // 创建一个事件循环组
            EventLoopGroup eventExecutors = new NioEventLoopGroup();
            try {
                // 创建一个启动Bootstrap(注意是Netty包下的)
                Bootstrap bootstrap = new Bootstrap();
                // 链式设置参数
                bootstrap.group(eventExecutors) // 设置线程组
                        .channel(NioSocketChannel.class) // 设置通道class
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new ClientHandler());
                            }
                        });
                System.out.println("客户端is ok...");
    
                // 启动客户端连接服务器(ChannelFuture 是netty的异步模型)
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
                // 监听关闭通道
                channelFuture.channel().closeFuture().sync();
            } finally {
                // 关闭
                eventExecutors.shutdownGracefully();
            }
        }
    }

    Handler

    package netty;
    
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    /**
     * 自定义服务器端处理handler,需要继承netty定义的ChannelInboundHandlerAdapter 类
     */
    public class ClientHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 通道就绪事件
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("ClientHandler ctx: " + ctx);
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服务器!", CharsetUtil.UTF_8));
        }
    
        /**
         * 读取事件
         *
         * @param ctx 上下文对象,含有pipeline管道;通道channel;地址address等信息
         * @param msg 客户端发送的数据(实际类型是ByteBuf - netty 封装的ByteBuffer)
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 强转为netty的ByteBuffer(实际就是包装的ByteBuffer)
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("服务器会送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
            System.out.println("服务器地址:" + ctx.channel().remoteAddress());
        }
    
        /**
         * 发生异常事件
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    3. 测试

    (1) 启动服务器端:

    服务端is ok。。。

    (2) 启动客户端

    客户端is ok...

    (3) 之后服务器先接到消息,服务器端日志:

    ctx = ChannelHandlerContext(ServerHandler#0, [id: 0x5028f46e, L:/127.0.0.1:6666 - R:/127.0.0.1:58692])
    客户端发送的消息是:hello, 服务器!
    客户端地址:/127.0.0.1:58692

    (4) 客户端日志

    ClientHandler ctx: ChannelHandlerContext(ClientHandler#0, [id: 0x8b3d82f9, L:/127.0.0.1:58692 - R:/127.0.0.1:6666])
    服务器会送的消息是:hello, 客户端!
    服务器地址:/127.0.0.1:6666

    2. 重要类查看

    1.  new NioEventLoopGroup(); 默认会创建当前服务器逻辑处理器数量*2个NioEventLoop (循环事件)

    最终获取数量是:

    private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    测试如下:(我的机子是4个逻辑处理器的)

    对于服务器端的BossGroup来说,没必要用满8个,可以通过参数进行修改。

    EventLoopGroup bossGroup = new NioEventLoopGroup(1);

    再次查看:(也证实了每个NioEventLoop都有自己的Selector、executor任务处理器、taskQueue工作队列等信息,其实也就是多线程的一套信息)

     2. ServerHandler处理时打印线程名称可以看到是用了workerGroup线程池

    修改netty.ServerHandler#channelRead

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("当前线程: " + Thread.currentThread().getName());
            System.out.println("ctx = " + ctx);
            // 强转为netty的ByteBuffer(实际就是包装的ByteBuffer)
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("客户端发送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
            System.out.println("客户端地址:" + ctx.channel().remoteAddress());
        }

    可以看到服务器打印的线程名如下:(直到打印到8,线程池创满8个)

    服务端is ok。。。
    当前线程: nioEventLoopGroup-3-1
    ctx = ChannelHandlerContext(ServerHandler#0, [id: 0xe1252cb5, L:/127.0.0.1:6666 - R:/127.0.0.1:59393])
    客户端发送的消息是:hello, 服务器!
    客户端地址:/127.0.0.1:59393
    当前线程: nioEventLoopGroup-3-2
    ctx = ChannelHandlerContext(ServerHandler#0, [id: 0x890f8270, L:/127.0.0.1:6666 - R:/127.0.0.1:59470])
    客户端发送的消息是:hello, 服务器!
    客户端地址:/127.0.0.1:59470

    3. ChannelHandlerContext 类

    ChannelHandlerContext 包含非常重要的信息,如下:

      可以看到包含我们创建的ServerHandler

      有next、prev属性(双向链表结构)

      有pipeline管道,管道内部有channel通道信息以及head、tail头尾等信息。channel包含的重要信息如下:

    补充: 我另一台机子的逻辑处理器是8个,查看默认创建的 bossGroup 包含16 个NioEventLoop

    (1) 通过任务管理器查看:

     如图,1个插槽表示1个CPU; 内核是4表示4核CPU; 8个逻辑处理器表示8线程(一般一个核心对应了一个线程,而intel开发出了超线程技术,1个核心能够做到2个线程计算,所以4个核心则能够做到8个线程)

    (2) 也可以通过Java程序获取可用的processors(处理器数量)

    int i = Runtime.getRuntime().availableProcessors(); // 8
    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    关于在MAC上进行 LARAVEL 环境 Homestead 安装过程记录
    js 贷款计算器
    js 实现阶乘
    js 两点间距离函数
    composer Your requirements could not be resolved to an installable set of packages
    vue 项目优化记录 持续更新...
    vue 项目打包
    vue 真机调试页面出现空白
    vue 真机调试
    谈谈-Android状态栏的编辑
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/14466529.html
Copyright © 2011-2022 走看看