zoukankan      html  css  js  c++  java
  • Netty实战

    一、Netty异步和事件驱动
    1.Java网络编程回顾
    socket.accept 阻塞
    socket.setsockopt /非阻塞
    2.NIO异步非阻塞
    a).nio 非阻塞的关键时使用选择器(java.nio.channels.Selector)来实现;可以监控多个socket读写的完成状态来协调其他socket的读写,以提高资源使用率;
    b).异步事件驱动,socket请求发起,立即响应,后端执行处理请求,处理完毕通知客户端;
    c).Channel 链接实体的连接,请求和响应的载体;Netty通过回调来处理事件,时间触发->ChannelHandler->channelActive()来处理事件;
    d).ChannelFuture通过ChannelFutureListener来监听事件提供通知处理完成的结果;Future是需要手动去获取结果;每个I/O操作都会立即返回一个ChannelFuture,不会阻塞,后台处理I/O,至于何时处理完成,异步时间驱动通知;
    3.Nttey的异步变成模型建立在Future和回调的基础之上,再将时间派发到ChannelHandler之上进行处理;

    二、Netty Demo
    1.EchoServerHandler

    package chap01;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    @ChannelHandler.Sharable
    public class EchoServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx,Object msg){
            ByteBuf in = (ByteBuf) msg;
            System.out.println("Server received: "+ in.toString(CharsetUtil.UTF_8));
            ctx.write(in);
        }
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx){
            ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    2.EchoServer

    package chap01;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    import java.net.InetSocketAddress;
    
    public class EchoServer {
        private final int port;
    
        public  EchoServer(int port){
            this.port = port;
        }
    
        public static void main(String[] args) throws  Exception{
    //        if (args.length !=-1){
    //            System.err.println("Usage: " + EchoServer.class.getSimpleName()+"<port>");
    //        }
    //        int port = Integer.parseInt(args[0]);
    //        new EchoServer(port).start();
            new EchoServer(9001).start();
        }
    
        public void start() throws Exception{
            final  EchoServerHandler serverHandler = new EchoServerHandler();
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>(){
                   @Override
                   public void initChannel(SocketChannel ch) throws Exception{
                       ch.pipeline().addLast(serverHandler);
                   }
                });
                ChannelFuture f = b.bind().sync();
                f.channel().closeFuture().sync();
            }finally {
                group.shutdownGracefully().sync();
            }
        }
    }
    

     3.EchoClientHandler

    package chap01;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.CharsetUtil;
    
    public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        public void channelActive(ChannelHandlerContext ctx){
            ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
        }
    
        @Override
        public void channelRead0(ChannelHandlerContext ctx,ByteBuf in){
            System.out.println("Client received: "+in.toString(CharsetUtil.UTF_8));
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
            cause.printStackTrace();
            ctx.close();
        }
    
    }
    

    4.EchoClient

    package chap01;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    import java.net.InetSocketAddress;
    
    public class EchoClient {
        private final String host;
        private final int port;
    
        public EchoClient(String host,int port){
            this.host = host;
            this.port = port;
        }
    
        public void start() throws Exception{
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host,port)).handler(new ChannelInitializer<SocketChannel>(){
                   @Override
                   public void initChannel(SocketChannel ch) throws Exception{
                       ch.pipeline().addLast(new EchoClientHandler());
                   }
                });
                ChannelFuture f = b.connect().sync();
            }finally {
                group.shutdownGracefully().sync();
            }
        }
    
        public static void main(String[] args) throws  Exception{
    //        if (args.length!=2){
    //            System.err.println("Usage: "+EchoClient.class.getSimpleName() + "<host><port>");
    //                    return;
    //        }
    //        String host = args[0];
    //        int port = Integer.parseInt(args[1]);
    //        new EchoClient(host,port).start();
            new EchoClient("127.0.0.1",9001).start();
        }
    }
    

    三、Netty组件和设计
    1.Channel、Event Loop和ChannelFuture
    a).Channel 即Socket,提供基本的I/O操作,bind(),connet(),read(),write();Event Loop控制流的管理,和并发,Channel生命周期只和一个Event Loop进行绑定,多个EventLoop组成一个Event LoopGroup;Channel Future用以接收回调,Netty中所有的处理都是异步的,每个请求可能不会立即返回,addListener()方法会添加一个监听器ChannelFutureListener用来回调通知;
    2.ChannelHandler、ChannelPipeLine
    a).ChannelHandler 用于接受入站事件和数据;常用ChannelInBoundHandler;
    b).ChannelPipeLine 提供了Channel Handler链的容器,一个Channel被创建时,她会被ChannelHandler安装到ChannelPipeLine中;

  • 相关阅读:
    vue-fullcalendar插件
    iframe 父框架调用子框架的函数
    关于调试的一点感想
    hdfs 删除和新增节点
    hadoop yarn 实战错误汇总
    Ganglia 安装 No package 'ck' found
    storm on yarn(CDH5) 部署笔记
    spark on yarn 安装笔记
    storm on yarn安装时 提交到yarn失败 failed
    yarn storm spark
  • 原文地址:https://www.cnblogs.com/therunningfish/p/10959225.html
Copyright © 2011-2022 走看看