zoukankan      html  css  js  c++  java
  • 第一个netty程序--时间服务

    依赖版本

          <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.68.Final</version>
            </dependency>
    

    服务端

    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    import java.net.InetSocketAddress;
    import java.util.Date;
    
    
    public class TimeServer {
        public static void main(String... args) throws InterruptedException {
            // 只包含一个serverchannel,代表服务器自身的已绑定到某个本地端口的正在监听的套接字
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            // 包含所有已创建的用来处理传入客户端连接的channel
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup) //可以只使用一个EventLoopGroup
                    .channel(NioServerSocketChannel.class)  //指定使用NIO传输Channel
                    .localAddress(new InetSocketAddress(8080))
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(Channel channel) throws Exception {
                            channel.pipeline().addLast(new TimeChannelHandlerAdapter());
                        }
                    });
    
            // 异步操作的结果的占位符
            ChannelFuture f = null;
            try {
                f = b.bind().sync(); // 异步绑定服务器,调用sync()方法阻塞等待直到绑定完成
                f.channel().closeFuture().sync(); // 获取channel的closeFuture,并且阻塞当前线程直到完成
            } finally {
                // 释放所有的资源
                bossGroup.shutdownGracefully().sync();
                workerGroup.shutdownGracefully().sync();
            }
        }
    
        public static class TimeChannelHandlerAdapter extends ChannelInboundHandlerAdapter {
            /**
             * 对于每个传入的消息都要调用
             */
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf buf = (ByteBuf) msg;
                byte[] req = new byte[buf.readableBytes()];
                buf.readBytes(req);
                String body = new String(req);
                System.out.println(body);
                ByteBuf resp = Unpooled.copiedBuffer(new Date(System.currentTimeMillis()).toString().getBytes());
                ctx.write(resp); // 将消息写给发送者,而不冲刷出站消息
            }
    
            /**
             * 通知ChannelInboundHandler最后一次对channelRead()的调用是当前批量读取中的额最后一条消息
             */
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                // 将未决消息冲刷到远程节点,并且关闭该Channel
                ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
            }
    
            /**
             * 读取操作期间有异常抛出时会调用
             */
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                cause.printStackTrace();
                ctx.close();
            }
        }
    }
    
    

    客户端

    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.util.CharsetUtil;
    
    import java.net.InetSocketAddress;
    
    
    public class TimeClient {
        public static void main(String...args) throws InterruptedException {
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress("127.0.0.1",8080))
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            try {
                // 异步连接到远程节点,阻塞等待直到连接完成
                ChannelFuture f = b.connect().sync();
                f.channel().closeFuture().sync();
            }finally {
                group.shutdownGracefully().sync();
            }
    
        }
    
        @ChannelHandler.Sharable    // 标记该类的实例可以被多个Channel共享
        private static class TimeClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
            /**
             * 当一个新的连接已经被建立时,会被调用
             */
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                byte[] req = "query time order".getBytes();
                ByteBuf resp = Unpooled.buffer(req.length);
                resp.writeBytes(req);
                ctx.writeAndFlush(resp);
            }
    
    
            /**
             * 每当接收数据时,都会调用这个方法,服务器发送的消息可能被分块接收
             */
            @Override
            protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
                // 记录已接收消息的转储
                System.out.println("client received: " + byteBuf.toString(CharsetUtil.UTF_8));
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                cause.printStackTrace();
                ctx.close();
            }
        }
    }
    
  • 相关阅读:
    Field client in com.rachel.web.ConsumerController required a bean of type 'org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient' that could not be found.
    MySQl创建用户和授权
    MySQL之索引原理与慢查询优化
    MySQL之视图、触发器、事务、存储过程、函数
    Navicat工具、pymysql模块
    MySQL之多表查询
    MySQL之单表查询
    MySQL行(记录)的详细操作
    MySQL的库表详细操作
    MySQL数据库初识
  • 原文地址:https://www.cnblogs.com/fly-book/p/15328927.html
Copyright © 2011-2022 走看看