zoukankan      html  css  js  c++  java
  • Netty入门(二)时间服务器及客户端

      在这个例子中,我在服务器和客户端连接被创立时发送一个消息,然后在客户端解析收到的消息并输出。并且,在这个项目中我使用 POJO 代替 ByteBuf 来作为传输对象。

    一、服务器实现

    1.  首先我们自定义传输数据对象

     1 package com.coder.client;
     2 
     3 import java.util.Date;
     4 
     5 /**
     6  * 自定义时间数据类
     7  * @author Coder
     8  *
     9  */
    10 public class Time {
    11     private final long value;
    12 
    13     public Time() {
    14         // 除以1000是为了使时间精确到秒
    15         this(System.currentTimeMillis() / 1000L);
    16     }
    17 
    18     public Time(long value) {
    19         this.value = value;
    20     }
    21 
    22     public long value() {
    23         return value;
    24     }
    25 
    26     @Override
    27     public String toString() {
    28         return new Date((value()) * 1000L).toString();
    29     }
    30 }

    2.  然后我们需要自定义服务器数据编码类

     1 package com.coder.server;
     2 
     3 import com.coder.client.Time;
     4 
     5 import io.netty.buffer.ByteBuf;
     6 import io.netty.channel.ChannelHandlerContext;
     7 import io.netty.handler.codec.MessageToByteEncoder;
     8 
     9 /**
    10  * 服务器数据编码类
    11  * @author Coder
    12  *
    13  */
    14 public class TimeEncoderPOJO extends MessageToByteEncoder<Time> {
    15 
    16     // 发送数据时调用
    17     @Override
    18     protected void encode(ChannelHandlerContext ctx, Time msg, ByteBuf out) throws Exception {
    19         // 只传输当前时间,精确到秒
    20         out.writeInt((int)msg.value());
    21     }
    22 
    23 }

    3. 也需要自定义服务器的业务逻辑类,如下:

     1 package com.coder.server;
     2 
     3 import com.coder.client.Time;
     4 
     5 import io.netty.channel.ChannelFuture;
     6 import io.netty.channel.ChannelFutureListener;
     7 import io.netty.channel.ChannelHandlerContext;
     8 import io.netty.channel.ChannelInboundHandlerAdapter;
     9 
    10 /**
    11  * 服务器解码器
    12  * 连接建立时发送当前时间
    13  * @author Coder
    14  *
    15  */
    16 public class TimeServerHandlerPOJO extends ChannelInboundHandlerAdapter {
    17     /**
    18      * 连接建立的时候并且准备进行通信时被调用
    19      */
    20     @Override
    21     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
    22         // 发送当前时间信息
    23         ChannelFuture f = ctx.writeAndFlush(new Time());
    24         // 发送完毕之后关闭 Channel
    25         f.addListener(ChannelFutureListener.CLOSE);
    26     }
    27     
    28     @Override
    29     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    30         cause.printStackTrace();
    31         ctx.close();
    32     }
    33 }

    4. 有了上面的代码,我们就可以实现服务器程序了,如下:

     1 package com.coder.server;
     2 
     3 import io.netty.bootstrap.ServerBootstrap;
     4 import io.netty.channel.ChannelFuture;
     5 import io.netty.channel.ChannelInitializer;
     6 import io.netty.channel.ChannelOption;
     7 import io.netty.channel.EventLoopGroup;
     8 import io.netty.channel.nio.NioEventLoopGroup;
     9 import io.netty.channel.socket.SocketChannel;
    10 import io.netty.channel.socket.nio.NioServerSocketChannel;
    11 
    12 public class TimeServerPOJO {
    13 private int port;
    14     
    15     public TimeServerPOJO(int port) {
    16         this.port = port;
    17     }
    18     
    19     public void run() throws Exception {
    20         EventLoopGroup bossGroup = new NioEventLoopGroup();        // 用来接收进来的连接
    21         EventLoopGroup workerGroup = new NioEventLoopGroup();    // 用来处理已经被接收的连接
    22         System.out.println("准备运行端口:" + port);
    23         
    24         try {
    25             ServerBootstrap b = new ServerBootstrap();        // 启动NIO服务的辅助启动类
    26             b.group(bossGroup, workerGroup)
    27             .channel(NioServerSocketChannel.class)            // 这里告诉Channel如何接收新的连接
    28             .childHandler( new ChannelInitializer<SocketChannel>() {
    29                 @Override
    30                 protected void initChannel(SocketChannel ch) throws Exception {
    31                     // 自定义处理类
    32                     // 注意添加顺序
    33                     ch.pipeline().addLast(new TimeEncoderPOJO(),new TimeServerHandlerPOJO());
    34                 }
    35             })
    36             .option(ChannelOption.SO_BACKLOG, 128)
    37             .childOption(ChannelOption.SO_KEEPALIVE, true);
    38             
    39             // 绑定端口,开始接收进来的连接
    40             ChannelFuture f = b.bind(port).sync();
    41             
    42             // 等待服务器socket关闭
    43             f.channel().closeFuture().sync();
    44         } catch (Exception e) {
    45             workerGroup.shutdownGracefully();
    46             bossGroup.shutdownGracefully();
    47         }
    48     }
    49     
    50     public static void main(String[] args) throws Exception {
    51         int port = 8080;
    52         new TimeServer(port).run();
    53     }
    54 }

      执行代码后如下:

      

      这时候服务器在等待客户端的连接(非阻塞)。

    二、客户端实现

       客户端的实现与服务器类似。

    1. 自定义客户端数据解码类

     1 package com.coder.client;
     2 
     3 import java.util.List;
     4 
     5 import io.netty.buffer.ByteBuf;
     6 import io.netty.channel.ChannelHandlerContext;
     7 import io.netty.handler.codec.ByteToMessageDecoder;
     8 
     9 public class TimeDecoderPOJO extends ByteToMessageDecoder {
    10     /**
    11      * 有新数据接收时调用
    12      * 为防止分包现象,先将数据存入内部缓存,到达满足条件之后再进行解码
    13      */
    14     @Override
    15     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    16         if(in.readableBytes() < 4) {
    17             return;
    18         }
    19         
    20         // out添加对象则表示解码成功
    21         out.add(new Time(in.readUnsignedInt()));
    22     }
    23 }

    2. 自定义客户端业务逻辑类

     1 package com.coder.client;
     2 
     3 import io.netty.channel.ChannelHandlerContext;
     4 import io.netty.channel.ChannelInboundHandlerAdapter;
     5 
     6 /**
     7  * 客户端数据处理类
     8  * @author Coder
     9  *
    10  */
    11 public class TimeClientHandlerPOJO extends ChannelInboundHandlerAdapter {
    12     @Override
    13     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    14         // 直接将信息转换成Time类型输出即可
    15         Time time = (Time)msg;
    16         System.out.println(time);
    17         ctx.close();
    18     }
    19     
    20     @Override
    21     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    22         cause.printStackTrace();
    23         ctx.close();
    24     }
    25 }

    3. 客户端程序实现

       Netty 客户端的通信步骤大致为:

    1.  创建一个 NIO 线程组,用于处理服务器与客户端的连接,客户端不需要用到 boss worker。
    2.  创建一个 Bootstrap 对象,配置 Netty 的一系列参数,由于客户端 SocketChannel 没有父亲,所以不需要使用 childoption。
    3.  创建一个用于实际处理数据的类ChannelInitializer,进行初始化的准备工作,比如设置接受传出数据的字符集、格式以及实际处理数据的接口。
    4.  配置服务器 IP 和端口号,建立与服务器的连接。
     1 package com.coder.client;
     2 
     3 import io.netty.bootstrap.Bootstrap;
     4 import io.netty.channel.ChannelFuture;
     5 import io.netty.channel.ChannelInitializer;
     6 import io.netty.channel.ChannelOption;
     7 import io.netty.channel.EventLoopGroup;
     8 import io.netty.channel.nio.NioEventLoopGroup;
     9 import io.netty.channel.socket.SocketChannel;
    10 import io.netty.channel.socket.nio.NioSocketChannel;
    11 
    12 public class TimeClientPOJO {
    13     public static void main(String[] args) throws Exception{
    14         String host = "127.0.0.1";            // ip
    15         int port = 8080;                    // 端口
    16         EventLoopGroup workerGroup = new NioEventLoopGroup();
    17         
    18         try {
    19             Bootstrap b = new Bootstrap();            // 与ServerBootstrap类似
    20             b.group(workerGroup);                    // 客户端不需要boss worker
    21             b.channel(NioSocketChannel.class);
    22             b.option(ChannelOption.SO_KEEPALIVE, true);    // 客户端的socketChannel没有父亲
    23             b.handler(new ChannelInitializer<SocketChannel>() {
    24                 @Override
    25                 protected void initChannel(SocketChannel ch) throws Exception {
    26                     // POJO
    27                     ch.pipeline().addLast(new TimeDecoderPOJO() ,new TimeClientHandlerPOJO());
    28                 }
    29             });
    30             
    31             // 启动客户端,客户端用connect连接
    32             ChannelFuture f = b.connect(host, port).sync();
    33             
    34             // 等待连接关闭
    35             f.channel().closeFuture().sync();
    36         } finally {
    37             workerGroup.shutdownGracefully();
    38         }
    39     }
    40 }

     三、测试

       先运行服务器程序,运行结果如下图:

      

      然后运行客户端程序,运行结果如下图:

      

      需要注意的是,Eclipse 是可以同时运行多个 Java 程序的,可以通过点击

      

      来切换不同程序的控制台输出窗口。

  • 相关阅读:
    Codeforces Round #526 (Div. 2) E. The Fair Nut and Strings
    Codeforces Round #526 (Div. 2) D. The Fair Nut and the Best Path
    S2SH项目实现分页功能
    S2SH项目实现分页功能
    HibernateDaoSupport类的使用
    object references an unsaved transient instance
    object references an unsaved transient instance
    启动Tomcat报WEB-INFlibj2ee.jar jar not loaded异常的解决办法
    启动Tomcat报WEB-INFlibj2ee.jar jar not loaded异常的解决办法
    解决eclipse中出现Resource is out of sync with the file system问题
  • 原文地址:https://www.cnblogs.com/coderJiebao/p/Netty02.html
Copyright © 2011-2022 走看看