zoukankan      html  css  js  c++  java
  • netty 入门示例

      server 服务端

      入口

    package com.sxmd.gateway.server;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    /**
     * Description: 服务端
     *
     * @author cy
     * @date 2019年09月03日 8:51
     * Version 1.0
     */
    public class ServerMain {
    
        public static void main(String[] args){
            ServerMain.bind(8754);
        }
    
        public static void bind(int port) {
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap boot = new ServerBootstrap();
                boot.group(boss,worker)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 100)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ServerPipeLine());
                ChannelFuture f = null;
    
                    f = boot.bind(port).sync();
    
                System.out.println("服务端开始监听,等待客户端连接");
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    
    }
    package com.sxmd.gateway.server;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    
    /**
     * Description:
     *
     * @author cy
     * @date 2019年09月03日 9:10
     * Version 1.0
     */
    public class ServerPipeLine extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel channel) throws Exception {
            //以$为分隔符
            ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());
            // 解决粘包的问题
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new DelimiterBasedFrameDecoder(2018,buf));
            // pipeline.addLast(new StringDecoder());
            pipeline.addLast(new ServerInHandler());
        }
    }
    package com.sxmd.gateway.server;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    import java.net.SocketAddress;
    
    /**
     * Description:
     *
     * @author cy
     * @date 2019年09月03日 9:14
     * Version 1.0
     */
    public class ServerInHandler extends SimpleChannelInboundHandler<Object> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext cxf, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] reg = new byte[buf.readableBytes()];
            buf.readBytes(reg);
            String body = new String(reg, "UTF-8");
            System.out.println(Thread.currentThread().getName() + "服务端收到的消息:" + body);
            // 回复消息
            String respMsg = "你好," + body + ",我收到了你的消息$";
            ByteBuf byteBuf = Unpooled.copiedBuffer(respMsg.getBytes());
            cxf.writeAndFlush(byteBuf);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            /**flush:将消息发送队列中的消息写入到 SocketChannel 中发送给对方,为了频繁的唤醒 Selector 进行消息发送
             * Netty 的 write 方法并不直接将消息写如 SocketChannel 中,调用 write 只是把待发送的消息放到发送缓存数组中,再通过调用 flush
             * 方法,将发送缓冲区的消息全部写入到 SocketChannel 中
             * */
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            /**当发生异常时,关闭 ChannelHandlerContext,释放和它相关联的句柄等资源 */
            cause.printStackTrace();
            ctx.close();
        }
    
    }

    客户端 client

    package com.sxmd.gateway.client;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoop;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoop;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    /**
     * Description: 客户端
     *
     * @author cy
     * @date 2019年09月03日 8:51
     * Version 1.0
     */
    public class ClientMain {
    
        public static void main(String[] args) {
            for (int i = 0; i < 1; i++) {
                new Thread(new ClientThread()).start();
            }
        }
    
        static class ClientThread implements Runnable{
            @Override
            public void run() {
                connect("192.168.141.124",8754);
            }
            public void connect(String host,int port){
                EventLoopGroup group = new NioEventLoopGroup();
                try {
                    Bootstrap b = new Bootstrap();
                    b.group(group)
                            .channel(NioSocketChannel.class)
                            .option(ChannelOption.TCP_NODELAY, true)
                            .handler(new ClientPipeLine());
                    ChannelFuture sync = b.connect(host, port).sync();
                    System.out.println(Thread.currentThread().getName() + ",客户端发起异步连接..........");
                    /**等待客户端链路关闭*/
                    sync.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    group.shutdownGracefully();
                }
            }
    
        }
    
    }
    package com.sxmd.gateway.client;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    /**
     * Description:
     *
     * @author cy
     * @date 2019年09月03日 9:35
     * Version 1.0
     */
    public class ClientPipeLine extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel channel) throws Exception {
            //以$为分隔符
            ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());
            ChannelPipeline pipeline = channel.pipeline();
            // 防止粘包的问题
            pipeline.addLast(new DelimiterBasedFrameDecoder(2018,buf));
            // pipeline.addLast(new StringDecoder());
            pipeline.addLast(new ClientInHandler());
        }
    }
    package com.sxmd.gateway.client;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    /**
     * Description:
     *
     * @author cy
     * @date 2019年09月03日 9:39
     * Version 1.0
     */
    public class ClientInHandler extends SimpleChannelInboundHandler<Object> {
    
    
        /**
         * 当客户端和服务端 TCP 链路建立成功之后,Netty 的 NIO 线程会调用 channelActive 方法
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("客户连接服务端成功");
            for (int i = 0; i < 10; i++) {
                String reqMsg = "我是客户端"+i+"$";
                ByteBuf reqByteBuf = Unpooled.copiedBuffer(reqMsg.getBytes());
                /**
                 * writeBytes:将指定的源数组的数据传输到缓冲区
                 * 调用 ChannelHandlerContext 的 writeAndFlush 方法将消息发送给服务器
                 */
                ctx.writeAndFlush(reqByteBuf);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            System.out.println(Thread.currentThread().getName() + "服务端返回:" + body);
        }
    
        /**
         * 当发生异常时,打印异常 日志,释放客户端资源
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            /**释放资源*/
            cause.printStackTrace();
            ctx.close();
        }
    
    }
  • 相关阅读:
    SpringCloud之初入江湖
    消息中间件RabbitMQ
    分布式搜索引擎ElasticSearch
    MongoDB简介
    SpringBoot和SpringCloud版本对应
    终于有人把Elasticsearch原理讲透了!
    nginx不停服,重新加载配置
    小程序自定义头部标题栏并且自适应各种手机屏幕(滚动头部渐隐渐现)
    Navicat链接数据库报错1130解决方案
    传统的小程序登录 和 云开发小程序登录
  • 原文地址:https://www.cnblogs.com/chengyangyang/p/11454286.html
Copyright © 2011-2022 走看看