zoukankan      html  css  js  c++  java
  • Netty简单使用

    丢弃服务器

    详细代码:test-netty4-discard

    丢弃服务器,就是将收到的所有数据都丢掉,不做任何处理

    DiscardServerHandler

    package org.zln.test.netty4.discard;
    
    @Slf4j
    public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //丢弃收到的数据
            ((ByteBuf) msg).release();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
    //        出现异常时关闭连接
            ctx.close();
        }
    }
    
    

    DiscardServerHandler继承自ChannelInboundHandlerAdapter,

    ChannelInboundHandlerAdapter实现了ChannelInboundHandler接口。

    ChannelInboundHandler提供了很多事件处理方法

    • channelRead

    当服务端收到新数据的时候,channelRead方法被调用

    收到的消息的类型是 ByteBuf,它是一个引用计数对象,必须显示调用release方法来释放。

    一般在channelRead中处理的代码形如

    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    	try {
    	// Do something with msg
    	} finally { 
           ReferenceCountUtil.release(msg);
    	} 
    }
    
    • exceptionCaught

    当服务端产生Throwable对象后,就会调用exceptionCaught

    在大部分情况下,捕获的异常应该被记录下来并且把关联的 channel 给关闭掉。

    然而这个方法的处理方式会在遇到不同异常的情况下有不同的实现,

    比如你可能想在关闭连接之前发送一个错误码的响应消息。

    DiscardServer

    package org.zln.test.netty4.discard;
    
    public class DiscardServer {
        private int port;
    
        public DiscardServer(int port) {
            this.port = port;
        }
    
        public static void main(String[] args) {
            int port = 8080;
            if (args.length > 0) {
                port = Integer.parseInt(args[0]);
            }
            new DiscardServer(port).run();
        }
    
        private void run() {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) 
                                throws Exception {
                                socketChannel.pipeline()
                                    .addLast(new DiscardServerHandler());
                            }
                        })
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true);
                ChannelFuture f = b.bind(port).sync();
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    }
    
    

    NioEventLoopGroup:用于处理I/O操作的多线程事件循环器

    bossGroup:接收客户端连接

    workerGroup:处理已经接收到的连接

    ​ 一旦boss接收到连接,就会把连接信息注册到worker中

    ServerBootstrap:用于启动NIO服务的辅助类

    测试

    telnet localhost 8080

    目前在DiscardServerHandler上,是直接丢弃的,没有打印出来,所以我们现在加一个打印

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf in = (ByteBuf) msg;
            System.out.println(in.toString(CharsetUtil.UTF_8));
        }
    
    

    在toString中已经做了release动作,所以不需要再次手工释放资源

    详细代码:test-netty4-discard-demo2

    应答服务器

    详细代码:test-netty4-ask

    将客户端的请求消息原样返回

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.write(msg);//写入后,内部已经实现了消息资源的释放
            ctx.flush();
        }
    

    或者

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.writeAndFlush(msg);//写入后,内部已经实现了消息资源的释放
        }
    
    

    时间服务器

    时间服务器

    在与客户端建立连接时,就发送时间消息

    TimeServerHandler

    public class TimeServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            final ByteBuf time = ctx.alloc().buffer(4);
            time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
            final ChannelFuture f = ctx.writeAndFlush(time);
            f.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    assert f == future;
                    ctx.close();
                }
            });
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
            throws Exception {
            cause.printStackTrace();
    //        出现异常时关闭连接
            ctx.close();
        }
    }
    

    final ByteBuf time = ctx.alloc().buffer(4);

    分配指定大小的缓冲

    因为要写入32位蒸熟,所以分配4个字节大小的缓冲区

    为什么不需要flip操作?

    传统NIO缓冲区,因为只有一个位置索引,所以在写完后,如果想要读区,需要执行一次flip操作,将位置指针设置到头部。

    Netty提供的ByteBuf缓冲区对象,有读写两个指针,执行写的时候只是写的指针索引增加,读指针位置索引不变,所以不需要flip操作。

    ChannelFuture

    表示一个还未发送的I/O事件

    对ChannelFuture添加监听,可以得知当前I/O操作的具体执行情况

    addListener

    添加一个事件监听。

    如果直接ctx.close();的话,由于writeAndFlush是异步的,会出现还没写完连接就被关闭的情况。

    所以要在监听到写完成的事件后再执行close操作

    ctx.close()

    连接关闭也不是立马生效的,其也是返回一个ChannelFuture对象

    另一种简单的监听写法为

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            final ByteBuf time = ctx.alloc().buffer(4);
            time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
            final ChannelFuture f = ctx.writeAndFlush(time);
            f.addListener(ChannelFutureListener.CLOSE);
        }
    
    

    内置的ChannelFutureListener.CLOSE其实和我们自己对ChannelFutureListener内容是完全一样的

    TimeClient

    public class TimeClient {
    
        private static final String HOST = "localhost";
        private static final int PORT = 8080;
    
        public static void main(String[] args) {
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(workerGroup);
                b.channel(NioSocketChannel.class);
                b.option(ChannelOption.SO_KEEPALIVE, true);
                b.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new TimeClientHandler());
                    }
                });
                ChannelFuture f = b.connect(HOST, PORT).sync();
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    • 比较客户端与服务的启动类的若干不同

    1、只有worker没有boss

    2、使用NioSocketChannel,不是NioServerSocketChannel

    3、不需要childOption。因为客户端的SocketChannel没有父类

    4、使用connect,不是bind

    TimeClientHandler

    public class TimeClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf m = (ByteBuf) msg;
            try {
                long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
                System.out.println(new Date(currentTimeMillis));
                ctx.close();
            } finally {
                m.release();
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
    //        出现异常时关闭连接
            ctx.close();
        }
    }
    

    客户端的Handler就比较简单了,就不细说了

    整个流程是这样子的

    1、客户端发起连接

    2、服务端channelActive监听到连接,发送时间数据

    3、客户端channelRead接收到服务端请求,打印数据

    漏洞

    这里其实是有一个漏洞的,就是我们其实是希望一条完整的消息大小为4个字节,可实际情况是不一定的。

    也就是会函数TCP/IP协议的粘包与拆包的问题

    使用对象传递日期

    完整代码

    我们需要处理的数据,往往是有一个结构的,一般会封装到一个对象中,

    而数据在网络中实际传输的时候,肯定是以字节的形式的。

    如果我们在代码上想要直接处理对象,那么就需要编写解码器和编码器。

    编码器:将发送的对象转化为字节

    解码器:将收到的字节转化为对象

    • UnixTime
    public class UnixTime {
        private final long value;
    
        public UnixTime() {
            this(System.currentTimeMillis() / 1000L + 2208988800L);
        }
    
        public UnixTime(long value) {
            this.value = value;
        }
    
        public long value() {
            return value;
        }
    
        @Override
        public String toString() {
            return new Date((value() - 2208988800L) * 1000L).toString();
        }
    
    }
    

    我们把时间戳封装在UnixTime对象中

    • TimeDecoder:解码器
    public class TimeDecoder extends ByteToMessageDecoder {
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if (in.readableBytes() < 4) {
                return;
            }
            out.add(new UnixTime(in.readUnsignedInt()));
        }
    }
    
    • TimeEncoder:编码器
    public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) 
            throws Exception {
            out.writeInt((int) msg.value());
        }
    }
    
    • TimeServerHandler

    使用了编码器后,我们可以直接发送对象

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ChannelFuture f = ctx.writeAndFlush(new UnixTime());
            f.addListener(ChannelFutureListener.CLOSE);
        }
    
    
    • TimeClientHandler

    使用了解码器后,直接转化成对象

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            UnixTime m = (UnixTime) msg;
            System.out.println("收到:" + m);
            ctx.close();
        }
    
    
    • 配置

    编码器和解码器编写完后,需要配置到启动类中

    服务端配置编码器:socketChannel.pipeline().addLast(new TimeEncoder(),new TimeServerHandler());

    客户端配置解码器:ch.pipeline().addLast(new TimeDecoder(),new TimeClientHandler());

    聊天应用

    详细代码

    ChatServerHandler

    @Slf4j
    public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
    
        public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel incoming = ctx.channel();
            for (Channel channel : channels) {
                channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入
    ");
            }
            channels.add(ctx.channel());
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            Channel incoming = ctx.channel();
            for (Channel channel : channels) {
                channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开
    ");
            }
            channels.remove(ctx.channel());
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            Channel incoming = ctx.channel();
            for (Channel channel : channels) {
                if (channel != incoming) {
                    channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + msg + "
    ");
                } else {
                    channel.writeAndFlush("[响应]" + msg + "
    ");
                }
            }
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            Channel incoming = ctx.channel();
            System.out.println("ChatClient:" + incoming.remoteAddress() + "在线");
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            Channel incoming = ctx.channel();
            System.out.println("ChatClient:" + incoming.remoteAddress() + "掉线");
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            Channel incoming = ctx.channel();
            System.out.println("ChatClient:" + incoming.remoteAddress() + "异常"); // 当出现异常就关闭连接
            cause.printStackTrace();
            ctx.close();
        }
    
    
    }
    

    ChatServerInitializer

    public class ChatServerInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("handler", new ChatServerHandler());
            System.out.println("ChatClient:" + ch.remoteAddress() + "连接上");
        }
    }
    

    ChatServer

    public class ChatServer {
        private int port;
    
        public ChatServer(int port) {
            this.port = port;
        }
    
        public static void main(String[] args) {
            int port = 8080;
            if (args.length > 0) {
                port = Integer.parseInt(args[0]);
            }
            new ChatServer(port).run();
        }
    
        private void run() {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChatServerInitializer())
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true);
    
                System.out.println("服务端启动完成");
    
                ChannelFuture f = b.bind(port).sync();
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    }
    

    ChatClientHandler

    public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println(msg);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
    //        出现异常时关闭连接
            ctx.close();
        }
    }
    

    ChatClientInitializer

    public class ChatClientInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("handler", new ChatClientHandler());
        }
    }
    

    ChatClient

    public class ChatClient {
    
        private static final String HOST = "localhost";
        private static final int PORT = 8080;
    
        public static void main(String[] args) {
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(workerGroup);
                b.channel(NioSocketChannel.class);
                b.option(ChannelOption.SO_KEEPALIVE, true);
                b.handler(new ChatClientInitializer());
                ChannelFuture f = b.connect(HOST, PORT).sync();
                Channel channel = f.channel();
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
                while (true) {
                    System.out.println("请输入:");
                    String line = bufferedReader.readLine();
                    if (line.startsWith("exit")) {
                        System.out.println("结束聊天");
                        break;
                    }
                    channel.writeAndFlush(line + "
    ");
                }
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                workerGroup.shutdownGracefully();
            }
        }
    }
    
  • 相关阅读:
    【每天一道PAT】1001 A+B Format
    C++ STL总结
    开篇
    happen-before原则
    java多线程的状态转换以及基本操作
    集合初始容量
    fail-fast机制
    Stack
    Iterator
    Vector
  • 原文地址:https://www.cnblogs.com/sherrykid/p/8709456.html
Copyright © 2011-2022 走看看