zoukankan      html  css  js  c++  java
  • Netty服务端与客户端(源码一)

    首先,整理NIO进行服务端开发的步骤:

      (1)创建ServerSocketChannel,配置它为非阻塞模式

      (2)绑定监听配置TCP参数backlog的大小

      (3)创建一个独立的I/O线程,用于轮询多路复用器Selector

      (4)创建Selector,将之前创建的ServerSocketChannel注册到Selector上监听SelectionKeyACCEPT

      (5)启动I/O线程,在循环体中执行Selector.select()方法,轮训就绪的Channel。

      (6)当轮询到了处于就绪状态的Channel时,需要对其进行判断,如果是OP_ACCEPT状态,说明是新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端。

      (7)设置新接入的客户端链路SocketChannel为非阻塞模式,配置其他的一些TCP参数。

      (8)将SocketChannel注册到Selector,监听OP_READ操作位

      (9)如果轮询的Channel为OP_READ,则说明SocketChannel中有新的就绪的数据包需要读取,则构造ByteBuffer对象,读取数据包。

      (10)如果轮询的Channel为OP_WRITE,则说明还有数据没有发送完成,需要继续发送。  

     Netty时间服务器服务端 TimeServer:

     1 package netty;
     2 
     3 import io.netty.bootstrap.ServerBootstrap;
     4 import io.netty.channel.ChannelFuture;
     5 import io.netty.channel.ChannelInitializer;
     6 import io.netty.channel.ChannelOption;
     7 import io.netty.channel.EventLoopGroup;
     8 import io.netty.channel.nio.NioEventLoopGroup;
     9 import io.netty.channel.socket.SocketChannel;
    10 import io.netty.channel.socket.nio.NioServerSocketChannel;
    11 
    12 
    13 public class TimeServer {
    14     
    15     public void bind(int port) throws Exception{
    16         //配置服务端的NIO线程组 一个用于服务端接收客户端的连接,另一个用于进行SocketChannel的网络读写
    17         EventLoopGroup bossGroup = new NioEventLoopGroup();
    18         EventLoopGroup workerGroup = new NioEventLoopGroup();
    19         try{
    20             ServerBootstrap b = new ServerBootstrap();
    21             b.group(bossGroup,workerGroup)
    22             .channel(NioServerSocketChannel.class)
    23             .option(ChannelOption.SO_BACKLOG, 1024)
    24             .childHandler(new ChildChannelHandler());
    25             //绑定端口,同步等待成功
    26             ChannelFuture f = b.bind(port).sync();
    27             //等待服务器监听端口关闭
    28             f.channel().closeFuture().sync();
    29         }finally{
    30             //优雅退出,释放线程池资源
    31             bossGroup.shutdownGracefully();
    32             workerGroup.shutdownGracefully();
    33         }
    34     }
    35     
    36     private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
    37         protected void initChannel(SocketChannel arg0) throws Exception{
    38             arg0.pipeline().addLast(new TimeServerHandler());
    39         }
    40     }
    41 }

    ServerBootstrap是Netty用于启动NIO服务端的辅助类,目的是降低服务端的开发难度。

    绑定childChannelHandler,其作用类似于Reactor模式中的handler类,主要用于处理网络I/O事件,例如记录日志、对消息进行编解码等。

    使用bind绑定监听端口,随后,调用它的同步阻塞方法sync等待绑定操作完成,完成后Netty会返回一个ChannelFuture,主要用于异步操作的通知回调

    Netty时间服务器服务端 TimeServerHandler:

     1 package netty;
     2 
     3 import java.io.IOException;
     4 import io.netty.buffer.ByteBuf;
     5 import io.netty.buffer.Unpooled;
     6 import io.netty.channel.ChannelHandlerAdapter;
     7 import io.netty.channel.ChannelHandlerContext;
     8 
     9 public class TimeServerHandler extends ChannelHandlerAdapter{
    10     
    11     public void channelRead(ChannelHandlerContext ctx,Object msg) throws IOException{
    12         //将msg转换成Netty的ByteBuf对象
    13         ByteBuf buf = (ByteBuf)msg;
    14         //将缓冲区中的字节数组复制到新建的byte数组中,
    15         byte[] req = new byte[buf.readableBytes()];
    16         buf.readBytes(req);
    17         //获取请求消息
    18         String body = new String(req,"UTF-8");
    19         System.out.println("The time server receive order:" + body);
    20         //如果是"QUERY TIME ORDER"则创建应答消息
    21         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
    22                 System.currentTimeMillis()).toString() : "BAD ORDER";
    23                 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
    24         //异步发送应答消息给客户端
    25         ctx.write(resp);
    26     }
    27     
    28     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{
    29         ctx.flush();
    30     }
    31     
    32     public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
    33         ctx.close();
    34     }
    35 }

    相比昨天原生的NIO服务端,代码量大大减少。

     Netty时间服务器客户端 TimeClient:

     1 package netty;
     2 
     3 import io.netty.bootstrap.Bootstrap;
     4 import io.netty.channel.ChannelFuture;
     5 import io.netty.channel.ChannelInitializer;
     6 import io.netty.channel.ChannelOption;
     7 import io.netty.channel.EventLoopGroup;
     8 import io.netty.channel.nio.NioEventLoopGroup;
     9 import io.netty.channel.socket.SocketChannel;
    10 import io.netty.channel.socket.nio.NioSocketChannel;
    11 
    12 public class TimeClient {
    13     
    14     public void connect(int port,String host) throws Exception{
    15         //创建客户端处理I/O读写的NioEventLoopGroup Group线程组
    16         EventLoopGroup group = new NioEventLoopGroup();
    17         try{
    18             //创建客户端辅助启动类Bootstrap
    19             Bootstrap b = new Bootstrap();
    20             b.group(group).channel(NioSocketChannel.class)
    21             .option(ChannelOption.TCP_NODELAY, true)
    22             .handler(new ChannelInitializer<SocketChannel>(){
    23                 //将ChannelHandler设置到ChannelPipleline中,用于处理网络I/O事件
    24                 @Override
    25                 protected void initChannel(SocketChannel ch) throws Exception {
    26                     ch.pipeline().addLast(new TimeClientHandler());
    27                 }
    28             });
    29             //发起异步连接操作,然后调用同步方法等待连接成功。
    30             ChannelFuture f = b.connect(host,port).sync();
    31             
    32             //等待客户端链路关闭
    33             f.channel().closeFuture().sync();
    34         }finally{
    35             //优雅退出,释放NIO线程组
    36             group.shutdownGracefully();
    37         }
    38     }
    39     
    40     public static void main(String[] args) throws Exception{
    41         int port = 8080;
    42         if(args != null && args.length > 0){
    43             try{
    44                 port = Integer.valueOf(args[0]);
    45             }catch(NumberFormatException e){
    46                 //采用默认值
    47             }
    48         }
    49         new TimeClient().connect(port, "127.0.0.1");
    50     }
    51     
    52 }

     Netty时间服务器客户端 TimeClientHandler:

     1 package netty;
     2 
     3 import io.netty.buffer.ByteBuf;
     4 import io.netty.buffer.Unpooled;
     5 import io.netty.channel.ChannelHandlerAdapter;
     6 import io.netty.channel.ChannelHandlerContext;
     7 
     8 import java.util.logging.Logger;
     9 
    10 public class TimeClientHandler extends ChannelHandlerAdapter{
    11     
    12     private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
    13     
    14     private final ByteBuf firstMessage;
    15     
    16     public TimeClientHandler(){
    17         byte[] req = "QUERY TIME ORDER".getBytes();
    18         firstMessage = Unpooled.buffer(req.length);
    19         firstMessage.writeBytes(req);
    20     }
    21     
    22     //当客户端与服务端TCP链路简历成功后,Netty的NIO线程会调用该方法,发送查询时间的指令给服务器
    23     public void channelActive(ChannelHandlerContext ctx){
    24         //将请求消息发送给服务端
    25         ctx.writeAndFlush(firstMessage);
    26     }
    27     
    28     //当服务器返回应答消息时,该方法被调用
    29     public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
    30         ByteBuf buf = (ByteBuf) msg;
    31         byte[] req = new byte[buf.readableBytes()];
    32         buf.readBytes(req);
    33         String body = new String(req,"UTF-8");
    34         System.out.println("Now is :" + body);
    35     }
    36     
    37     public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
    38         
    39         //释放资源
    40         logger.warning("Unexpected exception from downstream :" + cause.getMessage());
    41         ctx.close();
    42     }
    43 }

     运行结果:

    Server:

    Client:

     
  • 相关阅读:
    四月⑨日
    4月⑦日
    4月6日
    四月⑧日
    第一天上课
    modelsim的do文件
    SCCB协议
    FPGA之VGA控制
    FPGA常用术语
    乘法器之六(硬件乘法器)
  • 原文地址:https://www.cnblogs.com/yangsy0915/p/6139648.html
Copyright © 2011-2022 走看看