zoukankan      html  css  js  c++  java
  • 创建安全的 Netty 程序

    1.使用 SSL/TLS 创建安全的 Netty 程序

    SSL 和 TLS 是众所周知的标准和分层的协议,它们可以确保数据时私有的

    Netty提供了SSLHandler对网络数据进行加密

    使用Https

    public class SslChannelInitialzer extends ChannelInitializer<Channel>{
    
        private final SSLContext context;
        private final boolean client;
        private final boolean startTls;
    
        public SslChannelInitialzer(SSLContext context, boolean client, boolean startTls) {
            this.context = context;
            this.client = client;
            this.startTls = startTls;
        }
    
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            SSLEngine engine = context.createSSLEngine();
            engine.setUseClientMode(client);
            ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));
        }
    
    }

    2.使用 Netty 创建 HTTP/HTTPS 程序

    public class HttpDecoderEncodeIntializer extends ChannelInitializer<Channel>{
    
        private final boolean client;
        
        public HttpDecoderEncodeIntializer(boolean client) {
            this.client = client;
        }
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            
            if (client) {
                pipeline.addLast("decoder", new HttpResponseDecoder());
                pipeline.addLast("", new HttpRequestEncoder());
           pipeline.addLast("decompressor", new HttpContentDecompressor()); //添加解压缩 Handler }
    else { pipeline.addLast("decoder", new HttpRequestEncoder()); pipeline.addLast("encoder", new HttpResponseDecoder()); } } }

    如果你需要在 ChannelPipeline 中有一个解码器和编码器,还分别有一个在客户端和服务器简单的编解码器:HttpClientCodec 和 HttpServerCodec

    pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024));  聚合消息

     WebSocket

    WebSocketServerProtocolHandler

     处理空闲连接和超时

    • IdleStateHandler,当一个通道没有进行读写或运行了一段时间后出发IdleStateEvent
    • ReadTimeoutHandler,在指定时间内没有接收到任何数据将抛出ReadTimeoutException
    • WriteTimeoutHandler,在指定时间内有写入数据将抛出WriteTimeoutException、

    最常用的是IdleStateHandler,下面代码显示了如何使用IdleStateHandler,如果60秒内没有接收数据或发送数据,操作将失败,连接将关闭

    public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
     
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
            pipeline.addLast(new HeartbeatHandler());
        }
     
        public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {
            private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(
                    "HEARTBEAT", CharsetUtil.UTF_8));
     
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                if (evt instanceof IdleStateEvent) {
                    ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    super.userEventTriggered(ctx, evt);
                }
            }
        }
    }

    分隔符协议  解决粘包问题

    使用LineBasedFrameDecoder提取" "分隔帧

    /**
     * 处理换行分隔符消息
     *
     */
    public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
     
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new LineBasedFrameDecoder(65 * 1204), new FrameHandler());
        }
     
        public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                // do something with the frame
            }
        }
    }

      如果框架的东西除了换行符还有别的分隔符,可以使用DelimiterBasedFrameDecoder,只需要将分隔符传递到构造方法中。如果想实现自己的以分隔符为基础的协议,这些解码器是有用的。

    例如,现在有个协议,它只处理命令,这些命令由名称和参数形成,名称和参数由一个空格分隔

    public class CmdHandlerInitializer extends ChannelInitializer<Channel> {
     
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new CmdDecoder(65 * 1024), new CmdHandler());
        }
     
        public static final class Cmd {
            private final ByteBuf name;
            private final ByteBuf args;
     
            public Cmd(ByteBuf name, ByteBuf args) {
                this.name = name;
                this.args = args;
            }
     
            public ByteBuf getName() {
                return name;
            }
     
            public ByteBuf getArgs() {
                return args;
            }
        }
     
        public static final class CmdDecoder extends LineBasedFrameDecoder {
     
            public CmdDecoder(int maxLength) {
                super(maxLength);
            }
     
            @Override
            protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
                ByteBuf frame = (ByteBuf) super.decode(ctx, buffer);
                if (frame == null) {
                    return null;
                }
                int index = frame.indexOf(frame.readerIndex(), frame.writerIndex(), (byte) ' ');
                return new Cmd(frame.slice(frame.readerIndex(), index), frame.slice(index + 1, frame.writerIndex()));
            }
        }
     
        public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd> {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, Cmd msg) throws Exception {
                // do something with the command
            }
        }
     
    }
    一般经常会碰到以长度为基础的协议,对于这种情况Netty有两个不同的解码器可以帮助我们来解码:
    • FixedLengthFrameDecoder
    • LengthFieldBasedFrameDecoder

    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65*1024, 0, 8))

    读取大文件

    @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            File file = new File("test.txt");
            FileInputStream fis = new FileInputStream(file);
            FileRegion region = new DefaultFileRegion(fis.getChannel(), 0, file.length());
            Channel channel = ctx.channel();
            channel.writeAndFlush(region).addListener(new ChannelFutureListener() {
                
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if(!future.isSuccess()){
                        Throwable cause = future.cause();
                        // do something
                    }
                }
            });
        }
    public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
        private final File file;
     
        public ChunkedWriteHandlerInitializer(File file) {
            this.file = file;
        }
     
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new ChunkedWriteHandler())
                .addLast(new WriteStreamHandler());
        }
     
        public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                super.channelActive(ctx);
                ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
            }
        }
    }

    通过JBoss编组序列化

    使用ProtoBuf序列化

    /**
     * 使用protobuf序列化数据,进行编码解码
     * 注意:使用protobuf需要protobuf-java-2.5.0.jar
     * @author Administrator
     *
     */
    public class ProtoBufInitializer extends ChannelInitializer<Channel> {
     
        private final MessageLite lite;
     
        public ProtoBufInitializer(MessageLite lite) {
            this.lite = lite;
        }
     
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new ProtobufVarint32FrameDecoder())
                .addLast(new ProtobufEncoder())
                .addLast(new ProtobufDecoder(lite))
                .addLast(new ObjectHandler());
        }
     
        public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception {
                // do something
            }
        }
    }
    也可以自己实现,参照RPC

    Bootstrap   当需要引导客户端或一些无连接协议时

     创建Bootstrap实例使用new关键字,下面是Bootstrap的方法:
    • group(...),设置EventLoopGroup,EventLoopGroup用来处理所有通道的IO事件
    • channel(...),设置通道类型
    • channelFactory(...),使用ChannelFactory来设置通道类型
    • localAddress(...),设置本地地址,也可以通过bind(...)或connect(...)
    • option(ChannelOption<T>, T),设置通道选项,若使用null,则删除上一个设置的ChannelOption
    • attr(AttributeKey<T>, T),设置属性到Channel,若值为null,则指定键的属性被删除
    • handler(ChannelHandler),设置ChannelHandler用于处理请求事件
    • clone(),深度复制Bootstrap,Bootstrap的配置相同
    • remoteAddress(...),设置连接地址
    • connect(...),连接远程通道
    • bind(...),创建一个新的Channel并绑

    ServerBootstrap  引导服务器

    从Channel引导客户端

       有时候需要从另一个Channel引导客户端,例如写一个代理或需要从其他系统检索数据。从其他系统获取数据时比较常见的,有很多Netty应用程序必须要和企业现有的系统集成,如Netty程序与内部系统进行身份验证,查询数据库等

    可以不用再创建新的引导

    public class BootstrapingFromChannel {
        
        public static void main(String[] args) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
                        ChannelFuture connectFuture;
     
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            Bootstrap b = new Bootstrap();
                            b.channel(NioSocketChannel.class).handler(
                                    new SimpleChannelInboundHandler<ByteBuf>() {
                                        @Override
                                        protected void channelRead0(ChannelHandlerContext ctx,
                                                ByteBuf msg) throws Exception {
                                            System.out.println("Received data");
                                            msg.clear();
                                        }
                                    });
                            b.group(ctx.channel().eventLoop());
                            connectFuture = b.connect(new InetSocketAddress("127.0.0.1", 2048));
                        }
     
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
                                throws Exception {
                            if (connectFuture.isDone()) {
                                // do something with the data
                            }
                        }
                    });
            ChannelFuture f = b.bind(2048);
            f.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        System.out.println("Server bound");
                    } else {
                        System.err.println("bound fail");
                        future.cause().printStackTrace();
                    }
                }
            });
        }
    }
    服务端和客户端在同一环境下

    使用通道选项和属性

     使用ChannelOption和属性可以让事情变得很简单,例如Netty WebSocket服务器根据用户自动路由消息,通过使用属性,应用程序能在通道存储用户ID以确定消息应该发送到哪里。应用程序可以通过使用一个通道选项进一步自动化,给定时间内没有收到消息将自动断开连接

        public static void main(String[] args) {
            //创建属性键对象
            final AttributeKey<Integer> id = AttributeKey.valueOf("ID");
            //客户端引导对象
            Bootstrap b = new Bootstrap();
            //设置EventLoop,设置通道类型
            b.group(new NioEventLoopGroup()).channel(NioSocketChannel.class)
            //设置ChannelHandler
                .handler(new SimpleChannelInboundHandler<ByteBuf>() {
        
                    @Override
                    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                        //通道注册后执行,获取属性值
                        Integer idValue = ctx.channel().attr(id).get();
                        System.out.println(idValue);
                        //do something with the idValue
                    }
    
                    @Override
                    protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                        System.out.println("Reveived data");
                        msg.clear();
                    }
                });
            //设置通道选项,在通道注册后或被创建后设置
            b.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
            //设置通道属性
            b.attr(id, 123456);
            ChannelFuture f = b.connect("www.manning.com",80);
            f.syncUninterruptibly();
        }
  • 相关阅读:
    几个ssh和sftp的命令
    发现一个github的奇葩设定
    插耳机对orientation sensor的影响
    android中MediaPlayer类的用法
    Oracle 高性能SQL引擎剖析----执行计划
    【转】对列式数据库的一点总结和展望
    【转】大数据分析(Big Data OLAP)引擎Dremel, Tenzing 以及Impala
    TCP/IP协议详解---概述
    读取HttpWebResponse流的两种方法及注意的问题
    This project references NuGet package(s) that are missing on this computer.
  • 原文地址:https://www.cnblogs.com/mxz1994/p/9464309.html
Copyright © 2011-2022 走看看