zoukankan      html  css  js  c++  java
  • Netty入门(4)

    使用SSL/TLS创建安全的Netty程序

    Java提供了抽象的SslContext和SslEngine,实际上SslContext可以用来获取SslEngine来进行加密和解密。Netty拓展了Java的SslEngine,称SslHandler,用来对网络数据进行加密和解密。

     1、制作自签证书

    #keytool -genkey -keysize 2048 -validity 365 -keyalg RSA -dnam e "CN=gornix.com" -keypass 654321 -storepass 123456 -keystore gornix.jks

    keytool为JDK提供的生成证书工具

    • -keysize 2048 密钥长度2048位(这个长度的密钥目前可认为无法被暴力破解)
    • -validity 365 证书有效期365天
    • -keyalg RSA 使用RSA非对称加密算法
    • -dname "CN=gornix.com" 设置Common Name为gornix.com,这是我的域名
    • -keypass 654321 密钥的访问密码为654321
    • -storepass 123456 密钥库的访问密码为123456(其实这两个密码也可以设置一样,通常都设置一样,方便记)
    • -keystore gornix.jks 指定生成的密钥库文件为gornix.jks

    2、服务端程序

    public class SocketServerHelper {
        
        private static int WORKER_GROUP_SIZE = Runtime.getRuntime().availableProcessors() * 2; 
    
        private static EventLoopGroup bossGroup; 
        private static EventLoopGroup workerGroup;  
        
        private static Class<? extends ServerChannel> channelClass;
        
        public static void startSpiderServer() throws Exception {
            ServerBootstrap b = new ServerBootstrap();
            b.childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childOption(ChannelOption.SO_REUSEADDR, true)    
            .childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(false))
            .childOption(ChannelOption.SO_RCVBUF, 1048576)
            .childOption(ChannelOption.SO_SNDBUF, 1048576);
            
            bossGroup = new NioEventLoopGroup(1);
            workerGroup = new NioEventLoopGroup(WORKER_GROUP_SIZE);
            channelClass = NioServerSocketChannel.class;
            System.out.println("workerGroup size:" + WORKER_GROUP_SIZE);
            System.out.println("preparing to start spider server...");
            b.group(bossGroup, workerGroup);  
            b.channel(channelClass);
            KeyManagerFactory keyManagerFactory = null;
            KeyStore keyStore = KeyStore.getInstance("JKS");
            keyStore.load(new FileInputStream("G:\ssl.jks"), "123456".toCharArray());
            keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
            keyManagerFactory.init(keyStore,"123456".toCharArray());
            SslContext sslContext = SslContextBuilder.forServer(keyManagerFactory).build();
            b.childHandler(new SslChannelInitializer(sslContext)); 
            b.bind(9912).sync();  
            System.out.println("spider server start sucess, listening on port " + 9912 + ".");  
        }
        
        public static void main(String[] args) throws Exception {
            SocketServerHelper.startSpiderServer();
        }
          
        public static void shutdown() {  
            System.out.println("preparing to shutdown spider server...");
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();  
            System.out.println("spider server is shutdown.");
        }
    }
    public class SslChannelInitializer extends ChannelInitializer<Channel> {
        private final SslContext context;
    
        public SslChannelInitializer(SslContext context) {
            this.context = context;
        }
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            SSLEngine engine = context.newEngine(ch.alloc());
            engine.setUseClientMode(false);
            ch.pipeline().addFirst("ssl", new SslHandler(engine));
            ChannelPipeline pipeline = ch.pipeline(); 
            pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));  
            pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));  //最大16M                
            pipeline.addLast("decoder", new StringDecoder(Charset.forName("UTF-8")));  
            pipeline.addLast("encoder", new StringEncoder(Charset.forName("UTF-8")));  
            pipeline.addLast("spiderServerBusiHandler", new SpiderServerBusiHandler());
        }
    }

    3、客户端程序

    public class SocketClientHelper {
         public static void main(String[] args) {
                Channel channel = SocketClientHelper.createChannel("localhost",9912);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                SocketHelper.writeMessage(channel, "ssh over tcp test 1");
                SocketHelper.writeMessage(channel, "ssh over tcp test 2");
                SocketHelper.writeMessage(channel, "ssh over tcp test 3");
                SocketHelper.writeMessage(channel, "ssh over tcp test 4");
                SocketHelper.writeMessage(channel, "ssh over tcp test 5");
            }
            
            public static Channel createChannel(String host, int port) {
                Channel channel = null;  
                Bootstrap b = getBootstrap();
                try {  
                    channel = b.connect(host, port).sync().channel();
                    System.out.println(MessageFormat.format("connect to spider server ({0}:{1,number,#}) success for thread [" + Thread.currentThread().getName() + "].", host , port));
                } catch (Exception e) {
                    e.printStackTrace();
                }  
                return channel;
            }
            
            public static Bootstrap getBootstrap(){  
                EventLoopGroup group;
                Class<? extends Channel> channelClass = NioSocketChannel.class;
                group = new NioEventLoopGroup();
                Bootstrap b = new Bootstrap();  
                b.group(group).channel(channelClass);
                b.option(ChannelOption.SO_KEEPALIVE, true);
                b.option(ChannelOption.TCP_NODELAY, true);
                b.option(ChannelOption.SO_REUSEADDR, true);
                b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
                TrustManagerFactory tf = null; 
                try {
                    KeyStore keyStore = KeyStore.getInstance("JKS");
                    keyStore.load(new FileInputStream("G:\ssl.jks"), "123456".toCharArray());
                    tf = TrustManagerFactory.getInstance("SunX509");
                    tf.init(keyStore);
                    SslContext sslContext = SslContextBuilder.forClient().trustManager(tf).build();
                    b.handler(new SslChannelInitializer(sslContext));
                    return b;
                } catch(Exception e) {
                    e.printStackTrace();
                }
                return null;
            }
    }
    public class SslChannelInitializer extends ChannelInitializer<Channel> {
        
        private final SslContext context;
    
        public SslChannelInitializer(SslContext context) {
            this.context = context;
        }
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            SSLEngine engine = context.newEngine(ch.alloc());
            engine.setUseClientMode(true);
            ch.pipeline().addFirst("ssl", new SslHandler(engine));
            ChannelPipeline pipeline = ch.pipeline(); 
            pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));  
            pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));  //最大16M                
            pipeline.addLast("decoder", new StringDecoder(Charset.forName("UTF-8")));  
            pipeline.addLast("encoder", new StringEncoder(Charset.forName("UTF-8")));  
            pipeline.addLast("spiderClientBusiHandler", new SpiderClientBusiHandler());
        }
    }

    可见SSL也没什么神秘的,就是在普通的TCP连接基础上包了一层处理而已(但如果要自己实现这层处理那可是相当复杂的),这层处理体现在Netty中就是一个SslHandler,把这个SslHandler加入到TCP连接的处理管线中即可。

    PS:我们也可以使用基于认证和报文头加密的方式实现安全性。

    处理空闲和超时

    IdleStateHandler:当一个通道没有进行读写或者运行了一段时间后发出IdleStateEvent

    ReadTimeoutHandler:在指定时间没没有接收到任何数据将抛出ReadTimeoutException

    WriteTimeoutHandler:在指定时间内没有写入数据将抛出WriteTimeoutException

    public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
            pipeline.addLast(new HeartbeatHandler());
        }
        
        public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {
            
            private static final ByteBuf HEARTBEAT_SEQ = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8));
    
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                if (evt instanceof IdleStateEvent) {
                    ctx.writeAndFlush(HEARTBEAT_SEQ.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    super.userEventTriggered(ctx, evt);
                }
            }
            
        }
    }

    分隔符协议

    DelimiterBasedFrameDecoder,解码器,接收ByteBuf由一个或者多个分隔符拆分,如NUL或者换行符。

    LineBasedFrameDecoder,解码器,接收ByteBuf以分隔符结束,如" "和" "

    public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new LineBasedFrameDecoder(65*1024), new FrameHandler());
        }
    
        public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
            @Override
            protected void channelRead0(ChannelHandlerContext arg0, ByteBuf arg1) throws Exception {
                // do something
            }
            
        }
    }

    长度为基础的协议

    FixedLengthFrameDecoder:解码器,固定长度提取帧

    LengthFieldBasedFrameDecoder:解码器,读取头部长度并提取帧的长度

    public class LengthBasedInitializer extends ChannelInitializer<Channel> {
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65*1024, 0, 8))
                            .addLast(new FrameHandler());
        }
        
        public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                // do something
            }
            
        }
    
    }

    写大数据

    写大量的数据的一个有效的方法就是使用异步框架,如果内存和网络都处于爆满负荷状态,你需要停止写,Netty提供zero-memory-copy机制,这种方法在将文件内容写到网络堆栈空间时可以获得最大的性能:

    public class WriteBigData extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            File file = new File("");
            FileInputStream fis = new FileInputStream(file);
            FileRegion region = new DefaultFileRegion(fis.getChannel(), 0, file.length());
            Channel channel = ctx.channel();
            channel.writeAndFlush(region).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        Throwable cause = future.cause();
                        // do something
                    }
                }
            });
        }
    
    }

    如果只想发送指定的数据块,可以使用ChunkedFile、ChunkedNioFile、ChunkedStream、ChunkedNioStream等。

    Protobuf序列化传输

    ProtobufDecoder、ProtobufEncoder、ProtobufVarint32FrameDecoder、ProtobufVarint32LengthPrepender,使用Protobuf需要映入protobuf-java-2.5.0.jar

    1、下载编译器,将protoc.exe配置到环境变量:https://github.com/google/protobuf/releases

    2、编写.proto文件,参考:https://blog.csdn.net/hry2015/article/details/70766603

    syntax = "proto3"; // 声明可以选择protobuf的编译器版本(v2和v3)
    option java_outer_classname = "MessageProto"; // 指定生成的java类的类名
    message Message {  // 相当于c语言中的struct语句,表示定义一个信息,其实也就是类。
      string id = 1; // 要传输的字段了,子段后面需要有一个数字编号,从1开始递增
      string content = 2;
    }

    3、CMD执行编译操作

    protoc ./Message.proto --java_out=./

    4、引入maven

    <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
       <version>3.5.1</version>
    </dependency>

    5、服务端

    public class ServerPoHandlerProto extends ChannelInboundHandlerAdapter {
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            MessageProto.Message message = (MessageProto.Message) msg;
            if (ConnectionPool.getChannel(message.getId()) == null) {
                ConnectionPool.putChannel(message.getId(), ctx);
            }
            System.err.println("server:" + message.getId());
            ctx.writeAndFlush(message);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
    bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() { 
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            // 实体类传输数据,protobuf序列化
                            ch.pipeline().addLast("decoder",  
                                    new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));  
                            ch.pipeline().addLast("encoder",  
                                    new ProtobufEncoder());  
                            ch.pipeline().addLast(new ServerPoHandlerProto());
                            
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

    6、客户端

    public class ClientPoHandlerProto extends ChannelInboundHandlerAdapter {
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            MessageProto.Message message = (MessageProto.Message) msg;
            System.out.println("client:" + message.getContent());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
        
    }
    Bootstrap b = new Bootstrap();
                b.group(workerGroup);
                b.channel(NioSocketChannel.class);
                b.option(ChannelOption.SO_KEEPALIVE, true);
                b.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        // 实体类传输数据,protobuf序列化
                        ch.pipeline().addLast("decoder",  
                                new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));  
                        ch.pipeline().addLast("encoder",  
                                new ProtobufEncoder());  
                        ch.pipeline().addLast(new ClientPoHandlerProto());
                    
                    }
                });

    单元测试

    package com.netty.learn.demo6;
    
    import java.util.List;
    import java.util.Random;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.embedded.EmbeddedChannel;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    public class EmbeddedChannelInboundTest {
        public static void main(String[] args) {
            Random r = new Random();
            ByteBuf byteBuf = Unpooled.buffer();
            for (int i = 0; i < 3; i++) {
                int one = r.nextInt();
                byteBuf.writeInt(one);
                System.out.println("generate one: " + one);
            }
    
            EmbeddedChannel embeddedChannel = new EmbeddedChannel();
    
            // 获取channelPipeLine
            ChannelPipeline channelPipeline = embeddedChannel.pipeline();
            channelPipeline.addFirst(new DecodeTest());
            channelPipeline.addLast(new SimpleChannelInBoundHandlerTest());
    
            // 写入测试数据
            embeddedChannel.writeInbound(byteBuf);
            embeddedChannel.finish();
    
            // 验证测试数据
            System.out.println("embeddedChannel readInbound:" + embeddedChannel.readInbound());
            System.out.println("embeddedChannel readInbound:" + embeddedChannel.readInbound());
            System.out.println("embeddedChannel readInbound:" + embeddedChannel.readInbound());
        }
    
    }
    
    // 解码器
    class DecodeTest extends ByteToMessageDecoder {
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if (in.readableBytes() >= 4) {
                out.add(in.readInt());
            }
        }
    }
    
    // channelHandler
    @SuppressWarnings("rawtypes")
    class SimpleChannelInBoundHandlerTest extends SimpleChannelInboundHandler {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("Received message:" + msg);
            ctx.fireChannelRead(msg);
        }
    }
  • 相关阅读:
    MMT
    dappradar 分布式应用雷达
    dac去中心化自治公司
    如何立即手动执行BW周期性处理链
    BW之数据源 增量管理DELTA (比较详细的)
    abap问题:call transformation出来的xml字符串不能被proxy识别
    SHIFT 去掉前导0,SHIFT语法
    浅谈SAP实施过程中关键用户的作用
    什么是UAT测试?
    FI 基础财务知识
  • 原文地址:https://www.cnblogs.com/ijavanese/p/9931808.html
Copyright © 2011-2022 走看看