zoukankan      html  css  js  c++  java
  • Netty(1):第一个netty程序

    为什么选择Netty

      netty是业界最流行的NIO框架之一,它的健壮型,功能,性能,可定制性和可扩展性都是首屈一指的,Hadoop的RPC框架Avro就使用了netty作为底层的通信框架,此外netty在互联网,大数据,网络游戏,企业应用,电信软件等众多行业都得到了成功的商业应用。正因为以上的一些特性,使得netty已经成为java NIO编程的首选框架。

    构建netty开发环境

      其实使用netty很简单,直接将其jar包引入到工程中即可使用。 去 http://netty.io/网站上下载最新版本的jar包(由于官网上netty5已经被废弃,但是这里仍然使用netty5进行开发, 可以考虑从csnd下载),我这里下载的为:netty-5.0.0.Alpha1.tar.bz2。这其实是一个压缩文件,解压这个文件,取里面的所有类集合到一起的那个jar包netty-all-5.0.0.Alpha1.jar即可。另外还需要注意的是,我这里使用的jdk版本是1.8。

    第一个netty程序
      这里利用netty来实现一个时钟的小程序,服务器端接收特定的指令然后将服务器时间返回给客户端,客户端按照一定的时间间隔往服务器端发送命令。

     1 package com.rampage.netty.time;
     2 
     3 import io.netty.bootstrap.ServerBootstrap;
     4 import io.netty.channel.ChannelFuture;
     5 import io.netty.channel.ChannelInitializer;
     6 import io.netty.channel.ChannelOption;
     7 import io.netty.channel.EventLoopGroup;
     8 import io.netty.channel.nio.NioEventLoopGroup;
     9 import io.netty.channel.socket.SocketChannel;
    10 import io.netty.channel.socket.nio.NioServerSocketChannel;
    11 
    12 /**
    13  * 时钟程序的服务器端
    14  * @author zyq
    15  *
    16  */
    17 public class TimeServer {
    18     
    19     public static void main(String[] args) throws Exception {
    20         new TimeServer().bind(8080);
    21     }
    22     
    23     public void bind(int port) throws Exception {
    24         // 配置服务器的NIO线程组
    25         EventLoopGroup bossGroup = new NioEventLoopGroup();
    26         EventLoopGroup workerGroup = new NioEventLoopGroup();
    27         
    28         try {
    29             ServerBootstrap bootStrap = new ServerBootstrap();
    30             
    31             // 进行链式调用(每一次调用的返回结果都是ServerBootstrap)
    32             // group带两个参数第一个表示给父(acceptor)用的EventExecutorGroup(其实就是线程池)
    33             // 第二个参数表示子(client)线程池
    34             // channel方法可以带一个ServerChannel类来创建进行IO操作的通道。
    35             // option方法给Channel定制对应的选项
    36             // childHandler方法用来处理Channel中的请求
    37             bootStrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
    38             .option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());
    39             
    40             // 绑定端口,等待同步成功
    41             // bind方法返回一个ChannelFuture类,就是相当于绑定端口并且创建一个新的channel
    42             // sync方法会等待ChannelFuture的处理结束
    43             ChannelFuture future = bootStrap.bind(port).sync();
    44             
    45             // 等待服务器监听端口关闭
    46             future.channel().closeFuture().sync();
    47         } finally {
    48             // 优雅地退出,释放线程资源
    49             bossGroup.shutdownGracefully();
    50             workerGroup.shutdownGracefully();
    51         }
    52     }
    53     
    54     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    55 
    56         @Override
    57         protected void initChannel(SocketChannel arg0) throws Exception {
    58             arg0.pipeline().addLast(new TimeServerHandler());
    59         }
    60         
    61     }
    62 }
    TimeServer
     1 package com.rampage.netty.time;
     2 
     3 import java.util.Date;
     4 
     5 import io.netty.buffer.ByteBuf;
     6 import io.netty.buffer.Unpooled;
     7 import io.netty.channel.ChannelHandlerAdapter;
     8 import io.netty.channel.ChannelHandlerContext;
     9 
    10 /**
    11  * 时间服务器的处理类,只有netty5中的ChannelHandlerAdapter中才有ChannelRead和ChannelReadComplete方法。
    12  * @author zyq
    13  *
    14  */
    15 public class TimeServerHandler extends ChannelHandlerAdapter {
    16     @Override 
    17     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    18         // netty中的ByteBuf类相当于jdk中的ByteBuffer类,但是功能更加强大
    19         ByteBuf buf = (ByteBuf) msg;
    20         
    21         // readableBytes返回缓冲区可读的字节数
    22         byte[] req = new byte[buf.readableBytes()];
    23         
    24         // 将缓冲区的字节数复制到新的字节数组中去
    25         buf.readBytes(req);
    26         
    27         // 根据客户端传来的信息得到应答信息
    28         String body = new String(req, "UTF-8");
    29         System.out.println("The time server receive order:" + body);
    30         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
    31         
    32         // 给客户端的回应
    33         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
    34         
    35         // 为了防止频繁唤醒Selector进行消息发送,Netty的write方法并不直接将消息写入到SocketChannel中,而是把消息放入到缓冲数组
    36         ctx.write(resp);
    37     }
    38 
    39     @Override
    40     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    41         // 将放到缓冲数组中的消息写入到SocketChannel中去
    42         ctx.flush();
    43     }
    44 
    45     @Override
    46     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    47         ctx.close();
    48     }
    49     
    50     
    51 }
    TimeServerHandler
     1 package com.rampage.netty.time;
     2 
     3 import io.netty.bootstrap.Bootstrap;
     4 import io.netty.channel.ChannelFuture;
     5 import io.netty.channel.ChannelInitializer;
     6 import io.netty.channel.ChannelOption;
     7 import io.netty.channel.EventLoopGroup;
     8 import io.netty.channel.nio.NioEventLoopGroup;
     9 import io.netty.channel.socket.SocketChannel;
    10 import io.netty.channel.socket.nio.NioSocketChannel;
    11 
    12 /**
    13  * 时钟程序的客户端
    14  * @author zyq
    15  *
    16  */
    17 public class TimeClient {
    18     
    19     public static void main(String[] args) throws Exception {
    20         new TimeClient().connect("127.0.0.1", 8080);
    21     }
    22     
    23     public void connect(String host, int port) throws Exception {
    24         // 配置客户端NIO线程池
    25         EventLoopGroup group = new NioEventLoopGroup();
    26         
    27         Bootstrap strap = new Bootstrap();
    28         try {
    29             // 这里用了匿名内部类,各个函数的含义同Server端
    30             strap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
    31             .handler(new ChannelInitializer<SocketChannel>() {
    32 
    33                 @Override
    34                 protected void initChannel(SocketChannel arg0) throws Exception {
    35                     arg0.pipeline().addLast(new TimeClientHandler());
    36                 }
    37                 
    38             });
    39             
    40             // 发起异步连接操作
    41             ChannelFuture future = strap.connect(host, port).sync();
    42             
    43             // 等待客户端关闭(注意调用的是closeFuture如果直接调用close会立马关闭)
    44             future.channel().closeFuture().sync();
    45         } finally {
    46             // 优雅的关闭
    47             group.shutdownGracefully();
    48         }
    49     }
    50 }
    TimeClient
     1 package com.rampage.netty.time;
     2 
     3 import java.util.logging.Logger;
     4 
     5 import io.netty.buffer.ByteBuf;
     6 import io.netty.buffer.Unpooled;
     7 import io.netty.channel.ChannelHandlerAdapter;
     8 import io.netty.channel.ChannelHandlerContext;
     9 
    10 public class TimeClientHandler extends ChannelHandlerAdapter {
    11 
    12     private static final Logger LOGGER = Logger.getLogger(TimeClientHandler.class.getName());
    13     
    14     private final ByteBuf firstMsg;
    15     
    16     public TimeClientHandler() {
    17         byte[] req = "QUERY TIME ORDER".getBytes();
    18         firstMsg = Unpooled.buffer(req.length);
    19         firstMsg.writeBytes(req);
    20     }
    21     
    22     /**
    23      * channel连通之后的处理
    24      */
    25     @Override
    26     public void channelActive(ChannelHandlerContext ctx) throws Exception {
    27         ctx.writeAndFlush(firstMsg);
    28     }
    29     
    30     @Override
    31     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    32         ByteBuf buf = (ByteBuf) msg;
    33         byte[] resp = new byte[buf.readableBytes()];
    34         buf.readBytes(resp);
    35         
    36         String body = new String(resp, "UTF-8");
    37         System.out.println("Now is:" + body);
    38         
    39         // 两秒钟后继续向服务器端发送消息
    40         Thread.sleep(2000);
    41         byte[] req = "QUERY TIME ORDER".getBytes();
    42         ByteBuf sendMsg = Unpooled.buffer(req.length);
    43         sendMsg.writeBytes(req);
    44         ctx.writeAndFlush(sendMsg);
    45     }
    46 
    47     @Override
    48     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    49         LOGGER.warning("Unexpected exception from downstream:" + cause.getMessage());
    50         ctx.close();
    51     }
    52 
    53     
    54 }
    TimeClientHandler

    服务器端的运行结果如下:

    The time server receive order:QUERY TIME ORDER
    The time server receive order:QUERY TIME ORDER
    The time server receive order:QUERY TIME ORDER
    The time server receive order:QUERY TIME ORDER

    ...

    客户端的运行结果如下:

    Now is:Wed Aug 03 05:55:30 PDT 2016
    Now is:Wed Aug 03 05:55:33 PDT 2016
    Now is:Wed Aug 03 05:55:35 PDT 2016
    Now is:Wed Aug 03 05:55:37 PDT 2016

    ...

    可以发现通过Netty框架来实现NIO极大的简化了编码和维护难度。代码调理更加清晰。

    TimeClientHandler

  • 相关阅读:
    常用模块(一)
    面向对象进阶:反射以及内置方法
    面向对象三大特性之多态、封装与装饰器
    面向对象的三大特性之继承
    python之面向对象
    python之内置函数
    python之迭代器,生成器以及列表推导式
    比较好用的linux命令
    使用redission实现分布式信号量以及遇到的一些坑
    linux一些命令
  • 原文地址:https://www.cnblogs.com/Kidezyq/p/5734491.html
Copyright © 2011-2022 走看看