zoukankan      html  css  js  c++  java
  • JAVA Netty入门Demo实例代码(自写测试可用)实现客户端服务器端互传数据

    首先创建MAVEN项目

    pom.xml 写入

    1         <!-- netty -->
    2         <dependency>
    3             <groupId>io.netty</groupId>
    4             <artifactId>netty-all</artifactId>
    5             <version>4.1.36.Final</version>
    6         </dependency>

    服务器端

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    
    public class EchoServer {
    
        private final int port;
    
                  public EchoServer(int port) {
                      this.port = port;
                  }
        public void start() throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
                     try {
                             ServerBootstrap sb = new ServerBootstrap();
                             sb.group(group) // 绑定线程池
                                    .channel(NioServerSocketChannel.class) // 指定使用的channel
                                    .localAddress(this.port)// 绑定监听端口
                                     .childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
    
                                               @Override
                                      protected void initChannel(SocketChannel ch) throws Exception {
                                                              System.out.println("connected...; Client:" + ch.remoteAddress());
    //                                               ByteBuf byteBuf= Unpooled.copiedBuffer("$".getBytes());//防止粘包处理在消息末尾使用换行符对消息进行分割,或者使用其他特殊字符来对消息进行分割;
    //                                               ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));//防止粘包处理在消息末尾使用换行符对消息进行分割,或者使用其他特殊字符来对消息进行分割;
                                                              ch.pipeline().addLast(new EchoServerHandler()); // 客户端触发操作
                                                          }
                                  });
                              ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
                              System.out.println(EchoServer.class + " started and listen on " + cf.channel().localAddress());
                              cf.channel().closeFuture().sync(); // 关闭服务器通道
                          } finally {
                              group.shutdownGracefully().sync(); // 释放线程池资源
                          }
                  }
    
                  public static void main(String[] args) throws Exception {
                      new EchoServer(65535).start(); // 启动
                  }
        }
    

      服务器端操作

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    import java.util.Date;
    
    public class EchoServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //        数据读取事件
    //        System.out.println("我是读取事件" + msg.toString());
    //        ctx.write(msg);
            try {
                ByteBuf buf = (ByteBuf)msg;
    //创建目标大小的数组
                byte[] barray = new byte[buf.readableBytes()];
    //把数据从bytebuf转移到byte[]
                buf.getBytes(0,barray);
                //将byte[]转成字符串用于打印
                String str=new String(barray);
    
                if (str.length()>0)
                {
                    System.out.println(str);
                    System.out.println("收到消息回复一条消息给客户端");
                    System.out.println("client channelActive..");
                    ctx.writeAndFlush(Unpooled.copiedBuffer("服务器端发一条数据给客户端"+new Date().toString(), CharsetUtil.UTF_8)); // 必须有flush
                    System.out.flush();
    
                }
                else
                {
                    System.out.println("不能读啊");
                }
                buf.release();
            }finally {
    //buf.release();
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            System.out.println("server channelReadComplete..");
            // 第一种方法:写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。
            ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
            //ctx.flush(); // 第二种方法:在client端关闭channel连接,这样的话,会触发两次channelReadComplete方法。
            //ctx.flush().close().sync(); // 第三种:改成这种写法也可以,但是这中写法,没有第一种方法的好。
        }
    //    @Override
    //    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //        System.out.println("client channelActive..");
    //        ctx.writeAndFlush(Unpooled.copiedBuffer("服务器端发一条数据给客户端", CharsetUtil.UTF_8)); // 必须有flush
    //
    //    }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("server occur exception:" + cause.getMessage());
            cause.printStackTrace();
            ctx.close(); // 关闭发生异常的连接
        }
    }
    

      

    客户端

    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    
    import java.net.InetSocketAddress;
    
    public class EchoClient {
        private final String host;
        private final int port;
    
        public EchoClient() {
            this(0);
        }
    
        public EchoClient(int port) {
            this("localhost", port);
        }
    
        public EchoClient(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        public void start() throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group) // 注册线程池
                        .channel(NioSocketChannel.class) // 使用NioSocketChannel来作为连接用的channel类
                        .remoteAddress(new InetSocketAddress(this.host, this.port)) // 绑定连接端口和host信息
                        .handler(new ChannelInitializer<SocketChannel>() { // 绑定连接初始化器
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                System.out.println("连接connected...");
    //                            ByteBuf byteBuf= Unpooled.copiedBuffer("$".getBytes());//防止粘包处理在消息末尾使用换行符对消息进行分割,或者使用其他特殊字符来对消息进行分割;
    //                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));//防止粘包处理在消息末尾使用换行符对消息进行分割,或者使用其他特殊字符来对消息进行分割;
                                ch.pipeline().addLast(new EchoClientHandler());
                            }
                        });
                System.out.println("created..");
    
                ChannelFuture cf = b.connect().sync(); // 异步连接服务器
                System.out.println("connected..."); // 连接完成
    
                cf.channel().closeFuture().sync(); // 异步等待关闭连接channel
                System.out.println("closed.."); // 关闭完成
            } finally {
                group.shutdownGracefully().sync(); // 释放线程池资源
            }
        }
    
        public static void main(String[] args) throws Exception {
         new EchoClient("127.0.0.1", 65535).start(); // 连接127.0.0.1/65535,并启动
            System.out.println("===================================");
    //        new EchoClient("127.0.0.1", 65535).start();
        }
    }
    

      客户端操作

    import java.nio.charset.Charset;
    import java.util.Date;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.ByteBufUtil;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.CharsetUtil;
    
    public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client channelActive..");
            ctx.writeAndFlush(Unpooled.copiedBuffer("客户端发送一条新数据给你"+new Date().toString(), CharsetUtil.UTF_8)); // 必须有flush
    
            // 必须存在flush
            // ctx.write(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
    //         ctx.flush();
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            System.out.println("读取数据..");
            ByteBuf buf = msg.readBytes(msg.readableBytes());
            System.out.println("Client received:" +buf.toString(Charset.forName("utf-8")));
            //ctx.channel().close().sync();// client关闭channel连接
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }

    服务器接收发送数据:


    客户端接收发送数据
    
    
  • 相关阅读:
    几种任务调度的 Java 实现方法与比较
    nginx配置
    生产消费_lock和阻塞队列
    阻塞队列
    countdownlatch+cyclicbarrier+semphore
    01背包
    skiplist
    lru
    按序打印_lock和condition
    按序打印_volatile 无法保证顺序
  • 原文地址:https://www.cnblogs.com/wyq-study/p/14637952.html
Copyright © 2011-2022 走看看