zoukankan      html  css  js  c++  java
  • netty10---分包粘包

    客户端:根据 长度+数据 方式发送

    package com.server;
    
    import java.net.Socket;
    import java.nio.ByteBuffer;
    
    public class Client {
    
        public static void main(String[] args) throws Exception {
            Socket socket = new Socket("127.0.0.1", 10101);
            
            String message = "hello";
            
            byte[] bytes = message.getBytes();
            
            ByteBuffer buffer = ByteBuffer.allocate(4 + bytes.length);
            buffer.putInt(bytes.length);//netty是write,ByteBuffer是nio的,所以用put。
            buffer.put(bytes);
            
            byte[] array = buffer.array();
                
            for(int i=0; i<5; i++){
                socket.getOutputStream().write(array);
            }
                
            socket.close();
        }
    
    }

    服务端:根据 长度+数据 接收解码

    package com.server;
    
    import java.net.InetSocketAddress;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.jboss.netty.bootstrap.ServerBootstrap;
    import org.jboss.netty.channel.ChannelPipeline;
    import org.jboss.netty.channel.ChannelPipelineFactory;
    import org.jboss.netty.channel.Channels;
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
    import org.jboss.netty.handler.codec.string.StringDecoder;
    import org.jboss.netty.handler.codec.string.StringEncoder;
    
    public class Server {
    
        public static void main(String[] args) {
            //服务类
            ServerBootstrap bootstrap = new ServerBootstrap();
            //boss线程监听端口,worker线程负责数据读写
            ExecutorService boss = Executors.newCachedThreadPool();
            ExecutorService worker = Executors.newCachedThreadPool();
            //设置niosocket工厂
            bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
            //设置管道的工厂
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                @Override
                public ChannelPipeline getPipeline() throws Exception {
                    ChannelPipeline pipeline = Channels.pipeline();
                    pipeline.addLast("decoder", new MyDecoder());
                    pipeline.addLast("handler1", new HelloHandler());
                    return pipeline;
                }
            });
            bootstrap.bind(new InetSocketAddress(10101));
            System.out.println("start!!!");
        }
    }
    package com.server;
    
    import org.jboss.netty.buffer.ChannelBuffer;
    import org.jboss.netty.channel.Channel;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.handler.codec.frame.FrameDecoder;
    
    public class MyDecoder extends FrameDecoder {
        @Override  //FrameDecoder的decode方法
        protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
            //buffer是netty的ChannelBuffer
            if(buffer.readableBytes() > 4){ //必须大于基本的最短长度4个字节
                if(buffer.readableBytes() > 2048){ //防止socket攻击,缓存很大,所以限制数据的长度不能大于2048,
                    buffer.skipBytes(buffer.readableBytes());
                }
                //标记
                buffer.markReaderIndex();
                //长度
                int length = buffer.readInt();
                //buffer里面剩余的数据小于长度
                if(buffer.readableBytes() < length){
                    //前面做了标记,这里可以还原
                    buffer.resetReaderIndex();
                    //缓存当前剩余的buffer数据,等待剩下数据包到来
                    return null;
                }
                //大于长度,开始读数据
                byte[] bytes = new byte[length];
                buffer.readBytes(bytes);
                //往下传递对象给HelloHandler,这次的buffer处理完了,后面在来buffer的时候FrameDecoder会帮我们循环读取,
                return new String(bytes);
            }
            //缓存当前剩余的buffer数据,等待剩下数据包到来(FrameDecoder帮我们实现的),
            return null;
        }
    }
    package com.server;
    
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.channel.MessageEvent;
    import org.jboss.netty.channel.SimpleChannelHandler;
    
    public class HelloHandler extends SimpleChannelHandler {
        
        private int count = 1;//单线程的没有并发问题
    
        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            System.out.println(e.getMessage() + "  " +count);
            count++;
        }
    }
    2、看下粘包和分包是怎么样一个情况:
    
    粘包是因为服务端和客户端没有约定好数据结构,发送100个hello,粘包就是100个hello全在一起服务端分不开。分包就是服务端把100个hello随机的分开。
    
    定义一个稳定的结构:最简单结构:length + hello
    根据长度+数据会有攻击:
    ,把长度定义的很大Intger.max,这种数据包,会导致一直在缓存,把缓存撑的很大。通常被称为socket攻击,字节流式攻击。
    所以要 包头+长度+数据,前面加包头做标识。大于2048就清除,然后读取到包头就表示数据的开始。
    
    
    
    
    心中会有连个疑惑(FrameDecoder源码分析):
    1、为什么FrameDecoder return的对象就是往下传递的对象  (还是调用了sendUpstream)
    2、buffer里面数据未被读取完怎么办?    (cumulation缓存)
    3、为什么return null就可以缓存buffer     (cumulation缓存)
    3、FrameDecoder里面的cumulation其实就是一个缓存的buffer对象
    
    public abstract class FrameDecoder extends SimpleChannelUpstreamHandler //事件进入管道以后进入第一个handler的messageReceived方法,
        protected ChannelBuffer cumulation;//缓存的buffer对象
        @Override
        public void messageReceived(
                ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            Object m = e.getMessage();
            if (!(m instanceof ChannelBuffer)) {//判断是不是一个ChannelBuffer,不是就不管他了,
                ctx.sendUpstream(e);//传给下一个handler
                return;
            }
            ChannelBuffer input = (ChannelBuffer) m;//是ChannelBuffer,转成ChannelBuffer对象
            if (!input.readable()) {//没有数据读,也不管了。return
                return;
            }
            if (cumulation == null) {//如果renturn null还有数据,就缓存在buffer里面
                try {
                    // the cumulation buffer is not created yet so just pass the input to callDecode(...) method
                    callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
                } finally {
                    updateCumulation(ctx, input);
                }
            } else {
                input = appendToCumulation(input);
                try {
                    callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
                } finally {
                    updateCumulation(ctx, input);
                }
            }
        }
  • 相关阅读:
    Ubuntu系统下的实用软件推荐
    上传项目到GitHub
    eclipse+fileSyncPlugin+svn+jenkins+tomcat
    js,jq获取元素位置属性及兼容性写法
    清除浮动的几种方法
    sub,dl,dt,排版,横向滚动条,浮动元素居中,box-sizing
    a里面不能嵌套a
    随笔
    自适应屏幕轮播图详解
    tip
  • 原文地址:https://www.cnblogs.com/yaowen/p/9063227.html
Copyright © 2011-2022 走看看