zoukankan      html  css  js  c++  java
  • Netty入门

    Netty简介

    Netty是一个提供异步事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

    换句话说,Netty是一个NIO框架,使用它可以简单快速地开发网络应用程序,比如客户端和服务端的协议。Netty大大简化了网络程序的开发过程比如TCP和UDP的 Socket的开发。

    为什么选择Netty?

    和NIO比较,要实现一个通信要简单得很多,性能很好。分布式消息中间件、storm、Dubble都是使用Netty作为底层通信。

    Netty5.0要求jdk1.6以上。

    http://netty.io

    编码步骤

    1. 创建两个N  io线程组,一个事件处理(接收客户端连接),一个网络读写通信

    2. 创建一个ServerBootStrap,配置Netty参数;

    3. 创建实际处理的ChannelInitializer,进行初始化的准备工作,比如设置接收传出的字符集,格式,已经实际处理数据接口

    4. 绑定端口执行同步阻塞方法等待服务器端启动。

    详细介绍参照Netty 5用户指南: http://ifeve.com/netty5-user-guide

    代码示例:

    public class Server {
    
        public static void main(String[] args) throws Exception {
            //1 创建线两个程组 
            //一个是用于处理服务器端接收客户端连接的
            //一个是进行网络通信的(网络读写的)
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
            
            //2 创建辅助工具类,用于服务器通道的一系列配置
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup)        //绑定俩个线程组
            .channel(NioServerSocketChannel.class)        //指定NIO的模式
            .option(ChannelOption.SO_BACKLOG, 1024)        //设置tcp缓冲区
            .option(ChannelOption.SO_SNDBUF, 32*1024)    //设置发送缓冲大小
            .option(ChannelOption.SO_RCVBUF, 32*1024)    //这是接收缓冲大小
            .option(ChannelOption.SO_KEEPALIVE, true)    //保持连接
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    //3 在这里配置具体数据接收方法的处理
                    sc.pipeline().addLast(new ServerHandler());
                }
            });
            
            //4 进行绑定 
            ChannelFuture cf1 = b.bind(8765).sync();
            //ChannelFuture cf2 = b.bind(8764).sync();
            //5 等待关闭
            cf1.channel().closeFuture().sync();
            //cf2.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
        }
    }
    
    
    public class ServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("server channel active... ");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "utf-8");
            System.out.println("Server :" + body);
            String response = "进行返回给客户端的响应:" + body;
            ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
            // .addListener(ChannelFutureListener.CLOSE); //发送完后关闭通道
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            System.out.println("读完了");
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
            ctx.close();
        }
    
    }
    
    
    public class Client {
    
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(new ClientHandler());
                }
            });
    
            ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
            // ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync();
            // 发送消息
            Thread.sleep(1000);
            cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
            cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
            // cf2.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
            Thread.sleep(2000);
            cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
            // cf2.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
    
            cf1.channel().closeFuture().sync();
            // cf2.channel().closeFuture().sync();
            group.shutdownGracefully();
        }
    }
    
    
    public class ClientHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                ByteBuf buf = (ByteBuf) msg;
    
                byte[] req = new byte[buf.readableBytes()];
                buf.readBytes(req);
    
                String body = new String(req, "utf-8");
                System.out.println("Client :" + body);
            } finally {
                // 缓存区需要及时清理,如果有写的方法,则不用清理。因为write方法帮我们做了清理
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
    }
    View Code

    netty的tcp拆包粘包问题

    定长方案代码示例:FixedLengthFrameDecoder

    public class Server {
    
        public static void main(String[] args) throws Exception {
            // 1 创建2个线程,一个是负责接收客户端的连接。一个是负责进行数据传输的
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
    
            // 2 创建服务器辅助类
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.SO_SNDBUF, 32 * 1024).option(ChannelOption.SO_RCVBUF, 32 * 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            // 设置定长字符串接收
                            sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
                            // 设置字符串形式的解码
                            sc.pipeline().addLast(new StringDecoder());
                            sc.pipeline().addLast(new ServerHandler());
                        }
                    });
    
            // 4 绑定连接
            ChannelFuture cf = b.bind(8765).sync();
    
            // 等待服务器监听端口关闭
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
        }
    }
    
    
    public class ServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(" server channel active... ");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String request = (String) msg;
            System.out.println("Server :" + msg);
            String response = request;
            ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
    
        }
    
    }
    
    
    public class Client {
    
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup group = new NioEventLoopGroup();
    
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
                    sc.pipeline().addLast(new StringDecoder());
                    sc.pipeline().addLast(new ClientHandler());
                }
            });
    
            ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
    
            cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaaaabbbbb".getBytes()));
            cf.channel().writeAndFlush(Unpooled.copiedBuffer("ccccccc".getBytes()));
            cf.channel().writeAndFlush(Unpooled.copiedBuffer("ddddd   ".getBytes()));
            // 等待客户端端口关闭
            cf.channel().closeFuture().sync();
            group.shutdownGracefully();
    
        }
    }
    
    
    public class ClientHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client channel active... ");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String response = (String) msg;
            System.out.println("Client: " + response);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        }
    
    }
    View Code

    分隔符方案代码示例:DelimiterBasedFrameDecoder

    public class Server {
        public static void main(String[] args) throws Exception {
            // 1 创建2个线程,一个是负责接收客户端的连接。一个是负责进行数据传输的
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
    
            // 2 创建服务器辅助类
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.SO_SNDBUF, 32 * 1024).option(ChannelOption.SO_RCVBUF, 32 * 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            // 设置特殊分隔符
                            ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
                            sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                            // 设置字符串形式的解码
                            sc.pipeline().addLast(new StringDecoder());
                            sc.pipeline().addLast(new ServerHandler());
                        }
                    });
    
            // 4 绑定连接
            ChannelFuture cf = b.bind(8765).sync();
    
            // 等待服务器监听端口关闭
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
    
        }
    
    }
    
    
    public class ServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(" server channel active... ");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String request = (String) msg;
            System.out.println("Server :" + msg);
            String response = "服务器响应:" + msg + "$_";
            ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
            ctx.close();
        }
    
    }
    
    
    public class Client {
    
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup group = new NioEventLoopGroup();
    
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    //
                    ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
                    sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                    sc.pipeline().addLast(new StringDecoder());
                    sc.pipeline().addLast(new ClientHandler());
                }
            });
    
            ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
    
            cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbb$_".getBytes()));
            cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cccc$_".getBytes()));
    
            // 等待客户端端口关闭
            cf.channel().closeFuture().sync();
            group.shutdownGracefully();
    
        }
    }
    
    
    public class ClientHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client channel active... ");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                String response = (String) msg;
                System.out.println("Client: " + response);
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
    }
    View Code

    自定义方案:

    http://blog.csdn.net/zbw18297786698/article/details/53691915

  • 相关阅读:
    Json Web Token
    logstash 收集 IIS 日志实践
    Lucene Query In Kibana
    autofac 在.net core 与经典asp.net中的差异
    .net core 集成 autofac.
    向量化
    神经网络学习1
    漏斗限流
    正则化(Regularization)
    简单限流
  • 原文地址:https://www.cnblogs.com/lostyears/p/8479283.html
Copyright © 2011-2022 走看看