zoukankan      html  css  js  c++  java
  • Netty入门实例

    通过代码和代码中的详细注释,可以对Netty有个入门的了解

    TimeServer(初始化服务,绑定端口,指定IO handler)

     1 import io.netty.bootstrap.ServerBootstrap;
     2 import io.netty.channel.ChannelFuture;
     3 import io.netty.channel.ChannelInitializer;
     4 import io.netty.channel.ChannelOption;
     5 import io.netty.channel.EventLoopGroup;
     6 import io.netty.channel.nio.NioEventLoopGroup;
     7 import io.netty.channel.socket.SocketChannel;
     8 import io.netty.channel.socket.nio.NioServerSocketChannel;
     9 
    10 /**
    11  * 服务端
    12  *
    13  * @author zhya
    14  * @date 2018/9/14
    15  **/
    16 public class TimeServer {
    17 
    18     public static void main(String[] args) {
    19         try {
    20             // 启动服务端,绑定8080端口
    21             new TimeServer().bind(8080);
    22         } catch (InterruptedException e) {
    23             e.printStackTrace();
    24         }
    25     }
    26 
    27     /**
    28      * 初始化服务端
    29      *
    30      * @param port
    31      * @throws InterruptedException
    32      */
    33     public void bind(int port) throws InterruptedException {
    34         // group 是NIO线程组,用于网络事件,实际就是Reactor线程组
    35         // bossGroup 用于处理连接
    36         EventLoopGroup bossGroup = new NioEventLoopGroup();
    37         // bossGroup 用于处理Socket读写操作
    38         EventLoopGroup workerGroup = new NioEventLoopGroup();
    39 
    40         try {
    41             // 服务初始化工具,封装初始化服务的复杂代码
    42             ServerBootstrap serverBootstrap = new ServerBootstrap();
    43             // 指定group
    44             serverBootstrap.group(bossGroup, workerGroup)
    45                     // 设置channel类型
    46                     .channel(NioServerSocketChannel.class)
    47                     // 设置缓存
    48                     .option(ChannelOption.SO_BACKLOG, 1024)
    49                     // 在创建NioSocketChannel成功后,将他的ChannelHandler(TimeClientHandler)设置到ChannelPipeline中,用于处理网络IO事件
    50                     .childHandler(new ChildChannelHandler());
    51 
    52             // 绑定端口(同步,阻塞,等待绑定完成)
    53             ChannelFuture future = serverBootstrap.bind(port).sync();
    54 
    55             // future接收异步通知回调,阻塞,等待链路关闭后退出
    56             future.channel().closeFuture().sync();
    57         } finally {
    58             // 优雅的关闭线程组
    59             bossGroup.shutdownGracefully();
    60             workerGroup.shutdownGracefully();
    61         }
    62     }
    63 
    64     /**
    65      * IO事件处理器
    66      *
    67      * @author zhya
    68      * @date 2018/9/14
    69      **/
    70     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    71         @Override
    72         protected void initChannel(SocketChannel socketChannel) throws Exception {
    73             // 设置处理IO事件的处理器
    74             socketChannel.pipeline().addLast(new TimeServerHandler());
    75         }
    76     }
    77 }

    TimeServerHandler(处理服务端Channel中的读写事件)

     1 import io.netty.buffer.ByteBuf;
     2 import io.netty.buffer.Unpooled;
     3 import io.netty.channel.ChannelHandlerAdapter;
     4 import io.netty.channel.ChannelHandlerContext;
     5 
     6 import java.time.LocalDateTime;
     7 
     8 /**
     9  * 对服务端网络事件进行读写操作的处理器
    10  *
    11  * @author zhya
    12  * @date 2018/9/14
    13  **/
    14 public class TimeServerHandler extends ChannelHandlerAdapter {
    15 
    16     /**
    17      * 读到部分客户端数据后回调
    18      *
    19      * @param ctx
    20      * @param msg
    21      * @throws Exception
    22      */
    23     @Override
    24     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    25         // 将客户端数据转成ByteBuf
    26         ByteBuf byteBuf = (ByteBuf) msg;
    27         byte[] req = new byte[byteBuf.readableBytes()];
    28 
    29         // 从ByteBuf中读取客户端发送的数据
    30         byteBuf.readBytes(req);
    31 
    32         // 展示客户端发送来的数据
    33         String body = new String(req, "UTF-8");
    34         System.out.println("request body : " + body);
    35 
    36         String currentTime = LocalDateTime.now().toString();
    37         // 回写数据给客户端(write只是把待发送消息放到发送缓存数组中)
    38         ctx.write(Unpooled.copiedBuffer(currentTime.getBytes()));
    39     }
    40 
    41     /**
    42      * 完成读客户端数据后回调
    43      *
    44      * @param ctx
    45      * @throws Exception
    46      */
    47     @Override
    48     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    49         System.out.println("channel read complete");
    50         // 刷新,将消息队列中的消息发送给SocketChannel
    51         ctx.flush();
    52     }
    53 
    54     /**
    55      * 遇到错误后回调
    56      *
    57      * @param ctx
    58      * @param cause
    59      * @throws Exception
    60      */
    61     @Override
    62     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    63         System.out.println("encounter exception");
    64         System.out.println(cause.getMessage());
    65         // 发送异常关闭ChannelHandlerContext,释放资源
    66         ctx.close();
    67     }
    68 }

    TimeClient(初始化客户端SocketChannel,连接ServerSocket,指定IO handler)

     1 import io.netty.bootstrap.Bootstrap;
     2 import io.netty.channel.ChannelFuture;
     3 import io.netty.channel.ChannelInitializer;
     4 import io.netty.channel.ChannelOption;
     5 import io.netty.channel.EventLoopGroup;
     6 import io.netty.channel.nio.NioEventLoopGroup;
     7 import io.netty.channel.socket.SocketChannel;
     8 import io.netty.channel.socket.nio.NioSocketChannel;
     9 
    10 /**
    11  * 客户端
    12  *
    13  * @author zhya
    14  * @date 2018/9/14
    15  **/
    16 public class TimeClient {
    17     public static void main(String[] args) {
    18         try {
    19             // 构造100个线程,开启客户端连接服务器
    20             for (int i = 0; i < 100; i++) {
    21                 new Thread(() -> {
    22                     try {
    23                         // 开启客户端连接服务器
    24                         new TimeClient().connect("127.0.0.1", 8080);
    25                     } catch (Exception e) {
    26                         e.printStackTrace();
    27                     }
    28                 }).start();
    29             }
    30         } catch (Exception e) {
    31             e.printStackTrace();
    32         }
    33     }
    34 
    35     /**
    36      * 初始化客户端
    37      *
    38      * @param host
    39      * @param port
    40      * @throws Exception
    41      */
    42     public void connect(String host, int port) throws Exception {
    43         // 负责处理客户端IO事件的NIO线程组
    44         EventLoopGroup group = new NioEventLoopGroup();
    45         try {
    46             // 客户端初始化工具类,封装初始化客户端的复杂代码
    47             Bootstrap bootstrap = new Bootstrap();
    48             // 指定group
    49             bootstrap.group(group)
    50                     // 设置channel类型为NioSocketChannel(socket client)
    51                     .channel(NioSocketChannel.class)
    52                     // 设置TCP参数,无延时
    53                     .option(ChannelOption.TCP_NODELAY, true)
    54                     // 在创建NioSocketChannel成功后,将他的ChannelHandler(TimeClientHandler)设置到ChannelPipeline中,用于处理网络IO事件
    55                     .handler(new ChannelInitializer<SocketChannel>() {
    56                         @Override
    57                         protected void initChannel(SocketChannel socketChannel) throws Exception {
    58                             // 设置处理IO事件的处理器
    59                             socketChannel.pipeline().addLast(new TimeClientHandler());
    60                         }
    61                     });
    62             // 阻塞,指导成功建立连接
    63             ChannelFuture future = bootstrap.connect(host, port).sync();
    64 
    65             // 阻塞,等待异步回调通知
    66             future.channel().closeFuture().sync();
    67         } finally {
    68             // 优雅的关闭线程组,释放资源
    69             group.shutdownGracefully();
    70         }
    71     }
    72 }

    TimeClientHandler(处理客户端channel连接、读写事件)

     1 import io.netty.buffer.ByteBuf;
     2 import io.netty.buffer.Unpooled;
     3 import io.netty.channel.ChannelHandlerAdapter;
     4 import io.netty.channel.ChannelHandlerContext;
     5 
     6 /**
     7  * 处理客户端IO事件的处理器
     8  *
     9  * @author zhya
    10  * @date 2018/9/14
    11  **/
    12 public class TimeClientHandler extends ChannelHandlerAdapter {
    13 
    14     /**
    15      * 定义发送数据的载体/容器
    16      */
    17     private final ByteBuf firstMsg;
    18 
    19     /**
    20      * 初始化要发送的数据和ByteBuf
    21      */
    22     public TimeClientHandler() {
    23         byte[] req = "get time".getBytes();
    24         firstMsg = Unpooled.buffer(req.length);
    25         firstMsg.writeBytes(req);
    26     }
    27 
    28     /**
    29      * 当channel可用的时候回调
    30      *
    31      * @param ctx
    32      * @throws Exception
    33      */
    34     @Override
    35     public void channelActive(ChannelHandlerContext ctx) throws Exception {
    36         System.out.println("channel active");
    37         // 向服务端写入并刷新数据
    38         ctx.writeAndFlush(firstMsg);
    39     }
    40 
    41     /**
    42      * 当读取到数据时回调(并非读取到所有的数据)
    43      *
    44      * @param ctx
    45      * @param msg
    46      * @throws Exception
    47      */
    48     @Override
    49     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    50         // 转换
    51         ByteBuf byteBuf = (ByteBuf) msg;
    52         byte[] rep = new byte[byteBuf.readableBytes()];
    53         byteBuf.readBytes(rep);
    54 
    55         // 输出
    56         String body = new String(rep, "UTF-8");
    57         System.out.println("reponse body : " + body);
    58     }
    59 
    60     /**
    61      * 当遇到异常时回调
    62      *
    63      * @param ctx
    64      * @param cause
    65      * @throws Exception
    66      */
    67     @Override
    68     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    69         System.out.println("encounter exception");
    70         System.out.println(cause.getMessage());
    71 
    72         // 停止ChannelHandlerContext,释放资源
    73         ctx.close();
    74     }
    75 }
  • 相关阅读:
    20210603
    20210602
    20210601
    20210531-已编辑
    2021053101
    操作系统笔记(一)
    尘埃落定,扬帆起航
    RTL级低功耗设计
    关于毛刺
    电路级拾珍
  • 原文地址:https://www.cnblogs.com/zhya/p/9645862.html
Copyright © 2011-2022 走看看