zoukankan      html  css  js  c++  java
  • Netty 入门-TCP服务

    简单编写一个案例实现:

    (1) 服务器端启动监听6666端口, 收到客户端信息之后打印下客户端信息且回传一段服务器端的信息

    (2) 客户端启动之后连接到6666端口,且发送一段信息,收到服务器端信息之后打印下服务器发送的信息

    1. 代码实现

    0. pom

            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.11.Final</version>
            </dependency>

    logback.xml 注意是关闭日志

    <configuration>
        <appender name="consoleLog" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>
                    %msg%n
                </pattern>
            </encoder>
        </appender>
    
        <root level="error">
            <appender-ref ref="consoleLog"></appender-ref>
        </root>
    </configuration>

    1. 服务器端程序

    1. Server

    package netty;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class NettyServer {
    
        public static void main(String[] args) throws InterruptedException {
            // 1. 创建bossGrou好eworkerGroup
            // bossGroup只负责连接请求,其他交给workerGroup, 两个都是无线循环
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            // 创建服务器端启动对象用于设置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 使用链式编程设置参数
            bootstrap.group(bossGroup, workerGroup)// 设置两个组
                    .channel(NioServerSocketChannel.class) // 设置服务器的通道
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 设置通道测试对象(匿名对象)
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 给pipeline添加一个handler
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    });
    
            System.out.println("服务端is ok。。。");
    
            // 启动服务器并绑定端口。绑定一个端口并且同步,生成一个ChannelFuture对象
            ChannelFuture channelFuture = bootstrap.bind(6666).sync();
            // 对关闭通道进行监控
            channelFuture.channel().closeFuture().sync();
        }
    }

    2. Handler

    package netty;
    
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    /**
     * 自定义服务器端处理handler,需要继承netty定义的ChannelInboundHandlerAdapter 类
     */
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 读取事件
         *
         * @param ctx 上下文对象,含有pipeline管道;通道channel;地址address等信息
         * @param msg 客户端发送的数据(实际类型是ByteBuf - netty 封装的ByteBuffer)
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("ctx = " + ctx);
            // 强转为netty的ByteBuffer(实际就是包装的ByteBuffer)
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("客户端发送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
            System.out.println("客户端地址:" + ctx.channel().remoteAddress());
        }
    
        /**
         * 数据读取完毕事件
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // 将数据写到客户端(write + flush)
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!", CharsetUtil.UTF_8));
        }
    
        /**
         * 发生异常事件
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    2. 客户端程序

    client

    package netty;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class NettyClient {
    
        public static void main(String[] args) throws InterruptedException {
            // 创建一个事件循环组
            EventLoopGroup eventExecutors = new NioEventLoopGroup();
            try {
                // 创建一个启动Bootstrap(注意是Netty包下的)
                Bootstrap bootstrap = new Bootstrap();
                // 链式设置参数
                bootstrap.group(eventExecutors) // 设置线程组
                        .channel(NioSocketChannel.class) // 设置通道class
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new ClientHandler());
                            }
                        });
                System.out.println("客户端is ok...");
    
                // 启动客户端连接服务器(ChannelFuture 是netty的异步模型)
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
                // 监听关闭通道
                channelFuture.channel().closeFuture().sync();
            } finally {
                // 关闭
                eventExecutors.shutdownGracefully();
            }
        }
    }

    Handler

    package netty;
    
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    /**
     * 自定义服务器端处理handler,需要继承netty定义的ChannelInboundHandlerAdapter 类
     */
    public class ClientHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 通道就绪事件
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("ClientHandler ctx: " + ctx);
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服务器!", CharsetUtil.UTF_8));
        }
    
        /**
         * 读取事件
         *
         * @param ctx 上下文对象,含有pipeline管道;通道channel;地址address等信息
         * @param msg 客户端发送的数据(实际类型是ByteBuf - netty 封装的ByteBuffer)
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 强转为netty的ByteBuffer(实际就是包装的ByteBuffer)
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("服务器会送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
            System.out.println("服务器地址:" + ctx.channel().remoteAddress());
        }
    
        /**
         * 发生异常事件
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    3. 测试

    (1) 启动服务器端:

    服务端is ok。。。

    (2) 启动客户端

    客户端is ok...

    (3) 之后服务器先接到消息,服务器端日志:

    ctx = ChannelHandlerContext(ServerHandler#0, [id: 0x5028f46e, L:/127.0.0.1:6666 - R:/127.0.0.1:58692])
    客户端发送的消息是:hello, 服务器!
    客户端地址:/127.0.0.1:58692

    (4) 客户端日志

    ClientHandler ctx: ChannelHandlerContext(ClientHandler#0, [id: 0x8b3d82f9, L:/127.0.0.1:58692 - R:/127.0.0.1:6666])
    服务器会送的消息是:hello, 客户端!
    服务器地址:/127.0.0.1:6666

    2. 重要类查看

    1.  new NioEventLoopGroup(); 默认会创建当前服务器逻辑处理器数量*2个NioEventLoop (循环事件)

    最终获取数量是:

    private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    测试如下:(我的机子是4个逻辑处理器的)

    对于服务器端的BossGroup来说,没必要用满8个,可以通过参数进行修改。

    EventLoopGroup bossGroup = new NioEventLoopGroup(1);

    再次查看:(也证实了每个NioEventLoop都有自己的Selector、executor任务处理器、taskQueue工作队列等信息,其实也就是多线程的一套信息)

     2. ServerHandler处理时打印线程名称可以看到是用了workerGroup线程池

    修改netty.ServerHandler#channelRead

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("当前线程: " + Thread.currentThread().getName());
            System.out.println("ctx = " + ctx);
            // 强转为netty的ByteBuffer(实际就是包装的ByteBuffer)
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("客户端发送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
            System.out.println("客户端地址:" + ctx.channel().remoteAddress());
        }

    可以看到服务器打印的线程名如下:(直到打印到8,线程池创满8个)

    服务端is ok。。。
    当前线程: nioEventLoopGroup-3-1
    ctx = ChannelHandlerContext(ServerHandler#0, [id: 0xe1252cb5, L:/127.0.0.1:6666 - R:/127.0.0.1:59393])
    客户端发送的消息是:hello, 服务器!
    客户端地址:/127.0.0.1:59393
    当前线程: nioEventLoopGroup-3-2
    ctx = ChannelHandlerContext(ServerHandler#0, [id: 0x890f8270, L:/127.0.0.1:6666 - R:/127.0.0.1:59470])
    客户端发送的消息是:hello, 服务器!
    客户端地址:/127.0.0.1:59470

    3. ChannelHandlerContext 类

    ChannelHandlerContext 包含非常重要的信息,如下:

      可以看到包含我们创建的ServerHandler

      有next、prev属性(双向链表结构)

      有pipeline管道,管道内部有channel通道信息以及head、tail头尾等信息。channel包含的重要信息如下:

    补充: 我另一台机子的逻辑处理器是8个,查看默认创建的 bossGroup 包含16 个NioEventLoop

    (1) 通过任务管理器查看:

     如图,1个插槽表示1个CPU; 内核是4表示4核CPU; 8个逻辑处理器表示8线程(一般一个核心对应了一个线程,而intel开发出了超线程技术,1个核心能够做到2个线程计算,所以4个核心则能够做到8个线程)

    (2) 也可以通过Java程序获取可用的processors(处理器数量)

    int i = Runtime.getRuntime().availableProcessors(); // 8
    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    docker
    SAML(Security assertion markUp language) 安全断言标记语言
    kafka消息系统
    OBS 对象存储技术学习
    AOP之AspectJ
    sql查漏补缺
    todolist
    springboot 注解整理
    前端之jQuery
    前端之BOM和DOM
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/14466529.html
Copyright © 2011-2022 走看看