zoukankan      html  css  js  c++  java
  • Netty入门程序

    maven创建project,引入依赖:

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.39.Final</version>
    </dependency>

    一、服务端程序

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class Server4HelloWorld {
        //监听线程组,监听客户端请求
        private EventLoopGroup acceptorGroup = null;
        //处理客户端相关操作线程组,负责处理与客户端的数据通讯
        private EventLoopGroup clientGroup = null;
        //服务启动相关配置信息
        private ServerBootstrap bootstrap = null;
        
        public Server4HelloWorld(){
            acceptorGroup = new NioEventLoopGroup();
            clientGroup = new NioEventLoopGroup();
            bootstrap = new ServerBootstrap();
            //绑定线程组
            bootstrap.group(acceptorGroup, clientGroup);
            //设置通讯模式为NIO
            bootstrap.channel(NioServerSocketChannel.class);
            //设置服务端可连接队列的大小
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            //SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区, SO_KEEPALIVE 开启心跳监测(保证连接有效)
            bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
                .option(ChannelOption.SO_RCVBUF, 16*1024)
                .option(ChannelOption.SO_KEEPALIVE, true);
        }
        
        public ChannelFuture doAccept(int port, final ChannelHandler... acceptHandlers) throws InterruptedException{
            /*
             * childHandler是ServerBootstrap独有的方法,是用于提供处理对象的。
             * 可以一次性增加若干个处理逻辑,类似责任链的处理方式。
             * 增加A、B两个逻辑,在处理客户端请求数据的时候,根据A->B顺序依次处理。
             * 
             * ChannelInitializer 用于提供处理器的一个模型对象
             * 其中定义了一个initChannel方法,用于处理逻辑责任链条的。
             * 可以保证ServerBootstrap只初始化依次处理器,尽量提供处理逻辑的重用,
             * 避免反复的创建处理器对象,节约资源开销
             */
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(acceptHandlers);
                };
            });
            //bind方法 - 绑定监听端口。ServerBootstrap可以绑定多个监听端口,多次调用bind方法即可。
            //sync -- 开始监听逻辑,返回一个ChannelFuture,代表的是监听成功后一个对应的未来结果
            //可以使用ChannelFuture用于后续的处理逻辑
            ChannelFuture future = bootstrap.bind(port).sync();
            return future;
        }
        
        /*
         * shutdownGracefully 是一个安全关闭的方法,可以保证不放弃任何一个已接收的客户端请求
         */
        public void release(){
            this.acceptorGroup.shutdownGracefully();
            this.clientGroup.shutdownGracefully();
        }
        
        public static void main(String[] args) {
            ChannelFuture future = null;
            Server4HelloWorld server = null;
            try {
                server = new Server4HelloWorld();
                future = server.doAccept(9999, new Server4HelloWorldHandler());
                System.out.println("server started");
                //关闭连接
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                if(null != future){
                    try {
                        future.channel().closeFuture().sync();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                if(null != server){
                    server.release();
                }
            }
        }    
    
    }

    Handler处理逻辑:

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelHandler.Sharable;
    
    /*
     * @Sharable注解
     * 代表当前Handler是一个可以共享的处理器,也就是意味着,服务器注册此Handler后,可以给多个客户端同时使用。
     * 如果不使用注解描述类型,则每次客户端请求时,必须为客户端重新创建一个新的Handler对象(实际开发中,推荐共享一个Handler,节约内存资源)。
     * 如果Handler是一个Sharable的,一定避免定义可写的实例变量,因为这不安全。
     * bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new XxxHandler());
                };
            });
        
     */
    
    @Sharable
    public class Server4HelloWorldHandler extends ChannelInboundHandlerAdapter{
        
        /*
         * 业务处理逻辑
         * 用于处理读取数据请求的逻辑
         * ctx - 上下文对象,其中包含客户端建立连接的所有资源,如:对应的Channel
         * msg - 读取到的数据,默认类型是ByteBuf,ByteBuf是Netty自定义的,是对ByteBuffer的封装
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //获取读取的数据,是一个缓冲
            ByteBuf readBuffer = (ByteBuf)msg;
            //创建一个字节数组,用于保存缓冲中的数据
            byte[] tempDatas = new byte[readBuffer.readableBytes()];
            //将缓存中的数据读取到字节数组中
            readBuffer.readBytes(tempDatas);
            String message = new String(tempDatas, "UTF-8");
            System.out.println("from client:" + message);
            if("exit".equals(message)){
                ctx.close();
                return;
            }
            String line = "server message to client";
            //写操作自动释放缓存,避免内存溢出问题
            ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
            //注意,如果使用的是write方法,不会刷新缓存,缓存中的数据不会发送到客户端,必须再次调用flush方法才行
            //ctx.write(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
            //ctx.flush();
        }
        
        /**
         * 异常处理逻辑,当客户端异常退出的时候,也会运行
         * ChannelHandlerContext 关闭,也代表当前与客户端连接的资源关闭
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println(" server exceptionCaught method run...");
            ctx.close();
        }
        
    }

    二、客户端程序

    import java.util.Scanner;
    import java.util.concurrent.TimeUnit;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    /**
     * 因为客户端是请求的发起者,不需要监听。
     * 只需要定义唯一的一个线程组即可。
     */
    public class Client4HelloWorld {
        
        //处理请求和服务端响应的线程组
        private EventLoopGroup group = null;
        //服务启动相关配置信息
        private Bootstrap bootstrap;
        
        public Client4HelloWorld(){
            group = new NioEventLoopGroup();
            bootstrap = new Bootstrap();
            //绑定线程组
            bootstrap.group(group);
            //设定通讯模式为NIO
            bootstrap.channel(NioSocketChannel.class);
        }
        
        public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{
            /*
             * 客户端的Bootstrap没有childHandler方法,只有handler方法。
             * 等同于ServerBootstrap的childHandler方法
             * 在客户端必须绑定处理器,即必须调用handler方法
             * 在服务端必须绑定处理器,即必须调用childHandler方法
             */
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(handlers);
                };
            });
            //建立连接
            ChannelFuture future = bootstrap.connect(host, port).sync();
            return future;
        }
        
        public void release(){
            this.group.shutdownGracefully();
        }
        
        public static void main(String[] args) {
            Client4HelloWorld client = null;
            ChannelFuture future = null;
            try {
                client = new Client4HelloWorld();
                future = client.doRequest("localhost", 9999, new Client4HelloWorldHandler());
                Scanner s = null;
                while(true){
                    s = new Scanner(System.in);
                    System.out.println("enter message send to server (enter 'exit' for close client)...>>");
                    String line = s.nextLine();
                    
                    if("exit".equals(line)){
                        //addListener -- 增加监听,当某条件满足的时候,触发监听器
                        //ChannelFutureListener.CLOSE 代表ChannelFuture执行返回后,关闭连接
                        future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")))
                        .addListener(ChannelFutureListener.CLOSE);
                        break;
                    }
                    future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
                    TimeUnit.SECONDS.sleep(1);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                if(future != null){
                    try {
                        future.channel().closeFuture().sync();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                if(client != null){
                    client.release();
                }
            }
        }
    }

    Handler处理逻辑:

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.ReferenceCountUtil;
    
    public class Client4HelloWorldHandler extends ChannelInboundHandlerAdapter{
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                ByteBuf readBuffer = (ByteBuf)msg;
                byte[] tempDatas = new byte[readBuffer.readableBytes()];
                readBuffer.readBytes(tempDatas);
                System.out.println("from server :" + new String(tempDatas, "UTF-8"));
            } finally {
                //用于释放缓存,避免内存溢出
                ReferenceCountUtil.release(msg);
            }
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("client exceptionCaught method run ...");
        }
    }
  • 相关阅读:
    Kali渗透测试工具-netcat
    信息收集工具-dimtry
    Beef xss神器
    Scapy编写ICMP扫描脚本
    全国职业技能大赛信息安全管理与评估-MySQL弱口令利用
    crawler 听课笔记 碎碎念 2 一些爬虫须知的基本常识和流程
    crawler 听课笔记 碎碎念 3 关于python的细枝末节的回顾复习
    关于互信息(Mutual Information),我有些话要说
    最让人头疼的清洗数据过程----选择合适的方式快速命中所需的数据
    利用小虫虫做一枚合格宅男,果然牡丹花下做鬼也风流
  • 原文地址:https://www.cnblogs.com/myitnews/p/11441514.html
Copyright © 2011-2022 走看看