zoukankan      html  css  js  c++  java
  • Netty框架介绍

    Netty框架介绍

    转自:http://www.toutiao.com/i6398482015196283394/

    一 初步了解Netty

    Netty是由JBoss公司推出的一个高性能事件驱动型异步非阻塞的IO(NIO)框架。用于建立TCP等底层的连接,基于Netty可以建立高性能的Http服务器。支持HTTP、WebSocket、Protobuf、Binary TCP和UDP。

    Netty提供了NIO和BIO(OIO阻塞IO)两种模式处理逻辑,其中NIO主要通过一个BOSS线程处理等待链接的接入,若干个Worker线程(从worder线程池取出一个赋给channel,因为channel持有真正的java网络对象)接过BOSS线程递交过来的Channel进行数据读写并且触发相应的事件传递给pipeline进行数据处理,而BIO(OIO)方式服务器端虽然还是通过一个BOSS线程来处理等待链接的接入,但是客户端还是由主线程直接connect,另外写数据C/S两端都是直接主线程写,而数据读取则是通过一个Worker线程BLOCK方式读取(一直等待,直到读到数据,除非channel关闭)。

    总体结构图:(摘自Netty官网)

    二 Netty组件

    为了更好的理解和进一步深入Netty,我们先总体认识一下Netty中的组件以及在整个框架中如何协调工作的。Netty应用中必不可少的组件:

    Bootstrap or ServerBootstrap

    EventLoop

    EventLoopGroup

    ChannelPipeline

    Channel

    Future or ChannelFuture

    ChannelInitializer

    ChannelHandler

    Bootstrap:一个Netty应用通常由一个Bootstrap开始,它主要作用是配置整个Netty程序,串联起各个组件,ServerBootstrap用于server端,Bootstrap用于client端。

    Handler:为了支持各种协议和处理数据的方式,便诞生了Handler组件。Handler主要用来处理各种事件,这里的事件很广泛,比如可以是连接,数据接收,异常,数据转换等。

    ChannelInboundHandler:一个最常用的Handler。这个Handler的作用就是处理接收到数据时的事件,也就是说,我们的业务逻辑一般都写在这个Handler里的,ChannelInboundHandler就是用来处理核心业务逻辑的。

    ChannelInitializer:当一个链接建立时,我们需要知道怎么来接收或发送数据,当然,我们有各种各样的Handler实现来处理它,那么ChannelInitializer便是用来配置这些Handler,它会提供一个ChannelPipeline并把Handler加入到ChannelPipeline。

    ChannelPipeline:一个Netty应用基于ChannelPipeline机制,这种机制需要依赖于EventLoop和EventLoopGroup,因为它们三个都和事件或者事件处理有关。

    EventLoop: EventLoop目的是为Channel处理IO操作,一个Channel对应一个EventLop,而一个EventLoop对应一个线程,也就是说,仅有一个线程在负责一个Channel的IO操作。EventLoopGroup会包含多个EventLoop。

    Channel: Channel代表了一个Socket链接,或者其它和IO操作相关的组件,它和EventLoop一起用来参与IO处理。

    Future:在Netty中所有的IO操作都是异步的,因此,你不能立刻得知消息是否被正确处理,但是我们可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFutures,它们可以注册一个监听,当操作执行成功或失败时监听会自动触发。总之,所有的操作都会返回一个ChannelFuture。

     Netty处理连接请求和业务逻辑

    当一个连接到达,Netty会注册一个channel,然后EventLoopGroup会分配一个EventLoop绑定到这个channel,在这个channel的整个生命周期过程中,都会由绑定的这个EventLoop来为它服务。

     Netty使用案例

    以下例子是使用Netty4.x编写的案例,不详细描述了,直接上代码:

    server端:

    package com.cn.netty;

    import io.netty.bootstrap.ServerBootstrap;

    import io.netty.channel.ChannelFuture;

    import io.netty.channel.ChannelInitializer;

    import io.netty.channel.ChannelOption;

    import io.netty.channel.nio.NioEventLoopGroup;

    import io.netty.channel.socket.SocketChannel;

    import io.netty.channel.socket.nio.NioServerSocketChannel;

    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

    import io.netty.handler.codec.LengthFieldPrepender;

    import io.netty.handler.codec.string.StringDecoder;

    import io.netty.handler.codec.string.StringEncoder;

    import java.nio.charset.Charset;

    public class MyNettyServer {

    public static void main(String[] args) {

      new MyNettyServer().serverStart();

    }

    //用于分配处理业务线程的线程组个数

    private static final int PARENTGROUPSIZE = Runtime.getRuntime().availableProcessors();

    private static final int CHILDGROUPSIZE = PARENTGROUPSIZE*2;

    private static final String HOST = "127.0.0.1";

    private static final int PORT = 9999;

    NioEventLoopGroup parentGroup;

    NioEventLoopGroup childGroup;

    /**

    *服务端启动方法

    */

    public void serverStart(){

      try {

        final ServerBootstrap server = new ServerBootstrap();

        parentGroup = new NioEventLoopGroup(PARENTGROUPSIZE);

        childGroup = new NioEventLoopGroup(CHILDGROUPSIZE);

        server.group(parentGroup, childGroup);//设置线程组

        server.channel(NioServerSocketChannel.class);

        server.option(ChannelOption.SO_KEEPALIVE,true);//保持连接

        server.option(ChannelOption.SO_BACKLOG,1024);

        server.option(ChannelOption.TCP_NODELAY,true);

        server.childHandler(new ChannelInitializer<SocketChannel>() {

          @Override

          protected void initChannel(SocketChannel channel) throws Exception {

            channel.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));

            channel.pipeline().addLast("frameEncoder",new LengthFieldPrepender(4));

            channel.pipeline().addLast("decoder",new StringDecoder(Charset.forName("GBK")));

            channel.pipeline().addLast("encoder",new StringEncoder(Charset.forName("GBK")));

            channel.pipeline().addLast(new MyServerHandler());

          }

        });

        ChannelFuture future = server.bind(HOST,PORT).sync();

        if(future.isSuccess()){

          System.out.println("......Netty Server Started......");

        }else{

          System.out.println("......Netty Server Failed......");

        }

        future.channel().closeFuture().sync();

      } catch (Exception e) {

        e.printStackTrace();

      }finally{

        this.parentGroup.shutdownGracefully();

        this.childGroup.shutdownGracefully();

      }

    }

    }

    package com.cn.netty;

    import io.netty.channel.ChannelHandlerContext;

    import io.netty.channel.SimpleChannelInboundHandler;

    public class MyServerHandler extends SimpleChannelInboundHandler<String>{

    /*

    *收到消息时调用

    */

    @Override

    protected void channelRead(ChannelHandlerContext context, String msg)throws Exception {

    System.out.println("server received msg:"+msg);

    context.writeAndFlush(new String("客户端你好"));

    }

    @Override

    public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {

    cause.printStackTrace();

    context.flush();

    context.close();

    }

    }

    Client端:

    package com.cn.netty;

    import io.netty.bootstrap.Bootstrap;

    import io.netty.channel.Channel;

    import io.netty.channel.ChannelFuture;

    import io.netty.channel.ChannelInitializer;

    import io.netty.channel.ChannelOption;

    import io.netty.channel.EventLoopGroup;

    import io.netty.channel.nio.NioEventLoopGroup;

    import io.netty.channel.socket.SocketChannel;

    import io.netty.channel.socket.nio.NioSocketChannel;

    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

    import io.netty.handler.codec.LengthFieldPrepender;

    import io.netty.handler.codec.string.StringDecoder;

    import io.netty.handler.codec.string.StringEncoder;

    import java.nio.charset.Charset;

    public class MyNettyClient { 

    public static void main(String[] args) {

    Channel channel = new MyNettyClient().clientStart();

    channel.writeAndFlush(new String("服务器你好"));

    }

    private static final String HOST = "127.0.0.1";

    private static final int PORT = 9999;

    public Channel clientStart(){

    Channel channel = null;

    Bootstrap client = new Bootstrap();

    EventLoopGroup group = new NioEventLoopGroup();

    try {

    client.group(group);

    client.channel(NioSocketChannel.class);

    client.option(ChannelOption.SO_KEEPALIVE,true);

    client.handler(new ChannelInitializer<SocketChannel>() {

    @Override

    protected void initChannel(SocketChannel channel)throws Exception {

    channel.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));

    channel.pipeline().addLast("frameEncoder",new LengthFieldPrepender(4));

    //编码格式

    channel.pipeline().addLast("encoder",new StringEncoder(Charset.forName("GBK")));

    //解码格式

    channel.pipeline().addLast("decoder",new StringDecoder(Charset.forName("GBK")));

    channel.pipeline().addLast(new MyClientHandler());

    }

    });

    ChannelFuture future = client.connect(HOST, PORT).sync();

    if(future.isSuccess()){

    System.out.println("......netty client started......");

    }

    channel = future.channel();

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    return channel;

    }

    }

    packagecom.cn.netty;

    importio.netty.channel.ChannelHandlerContext;

    importio.netty.channel.SimpleChannelInboundHandler;

    public class MyClientHandler extends SimpleChannelInboundHandler<String>{

    /*

    *收到消息时调用

    */

    @Override

    protected void channelRead(ChannelHandlerContext context, String msg)throwsException {

    System.out.println("收到服务器发来的消息:"+msg);

    }

    /*

    *建立连接时调用

    */

    @Override

    public void channelActive(ChannelHandlerContext context)throwsException {

    super.channelActive(context);

    }

    }

  • 相关阅读:
    使用eclipse创建maven+动态web的项目
    关于Maven项目build时出现No compiler is provided in this environment的处理
    spark日志输出
    spark并行度加载关系数据库
    【java记录】序列化拷贝
    客户端远程访问高可用(HA)hdfs
    spark算法
    算子的分类和 宽依赖算子、窄依赖算子
    单元测试junit使用
    spark1.x和spark2.x兼容Iterable和Iterator问题【未解决】
  • 原文地址:https://www.cnblogs.com/lzhl/p/6590132.html
Copyright © 2011-2022 走看看