zoukankan      html  css  js  c++  java
  • 解决netty tcp自定义消息格式粘包/拆包问题

    项目地址:https://github.com/KouReal/netty-study-0816

    参考:https://www.cnblogs.com/sidesky/p/6913109.html

    使用到的开源库:Reflections,  根据注解找到一个包下的所有类Class

    https://stackoverflow.com/questions/13128552/how-to-scan-classes-for-annotations

    https://github.com/ronmamo/reflections

    <dependency>
        <groupId>org.reflections</groupId>
        <artifactId>reflections</artifactId>
        <version>0.9.11</version>
    </dependency>
    public class ProtocolMap {
        private static Logger logger = LoggerFactory.getLogger(ProtocolMap.class);
        private static Map<Integer, String> pmap_name = new HashMap<>();
        private static Map<Integer,Class<?>> pmap = new HashMap<>();
    
        private static void setpmap() throws ProtocolException{
            Header header = new Header();
            Class<?> header_cls = header.getClass();
            Field[] header_fs = header_cls.getDeclaredFields();
            Map<String, Integer> namemap = new HashMap<>();
            for (Field f : header_fs) {
                f.setAccessible(true);
                try {
                    namemap.put(f.getName(),(Integer)f.get(header));
                    pmap_name.put((Integer)f.get(header), f.getName());
                } catch (IllegalArgumentException e) {
                    e.printStackTrace();
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
            }
            
            logger.info("setpmap:{}",namemap);
            
            Reflections reflections = new Reflections("protocol");
            Set<Class<?>> msg_cls_set = reflections.getTypesAnnotatedWith(MyMessage.class);
            logger.info("class sets:{}",msg_cls_set);
            for (Class<?> msg_cls : msg_cls_set) {
                MyMessage anno = msg_cls.getAnnotation(MyMessage.class);
                String name = anno.value();
                if(!namemap.containsKey(name)){
                    throw new ProtocolException("协议class:"+msg_cls+"的MyMessage注解值:【"+name+"】对应的协议未定义");
                }
                Integer protocol_id = namemap.get(name);
                pmap.put(protocol_id, msg_cls);
            }
            for (Field f : header_fs) {
                f.setAccessible(true);
                try {
                    Integer id = (Integer)f.get(header);
                    if(!pmap.containsKey(id)){
                        pmap.put(id, null);
                    }
                } catch (IllegalArgumentException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (IllegalAccessException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
        public static boolean checkprotocol(Integer id) throws ProtocolException{
            if(pmap_name.isEmpty()){
                setpmap();
            }
            return pmap_name.containsKey(id);
        }
        public static Class<?> getclass(Integer id) throws ProtocolException{
            if(pmap_name.isEmpty()){
                setpmap();
            }
            return pmap.get(id);
        }
    }
    public class LenPreMsgDecoder extends ByteToMessageDecoder{
        private static Logger logger = LoggerFactory.getLogger(LenPreMsgDecoder.class);
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            logger.info("decode: readinx:{},readable:{},writeidx:{}",in.readerIndex(),in.readableBytes(),in.writerIndex());
            if(in.readableBytes()<LenPreMsg.BASE_PRE_LEN){
                logger.info("readable too little");
                return ;
            }
            int beginindex;
            int protocol_id;
            while(true){
                beginindex = in.readerIndex();
                in.markReaderIndex();
                logger.info("before readInt:,readerindex:{},readable:{}",in.readerIndex(),in.readableBytes());
                protocol_id = in.readInt();
                logger.info("protocolid:{}",protocol_id);
                if(ProtocolMap.checkprotocol(protocol_id)){
                    break;
                }
                in.resetReaderIndex();
                in.readByte();
                if(in.readableBytes()<LenPreMsg.BASE_PRE_LEN){
                    return;
                }
            }
            
            int len = in.readInt();
            logger.info("read len:{}",len);
            
            if(len==0){
                //heartbeat
                out.add(new LenPreMsg(protocol_id, 0, null));
                return ;
            }
            logger.info("after read length,readablebytes:{}",in.readableBytes());
            if(in.readableBytes()<len){
                in.readerIndex(beginindex);
                return ;
            }
            byte[] data = new byte[len];
            in.readBytes(data);
            Class<?> protocol_cls = ProtocolMap.getclass(protocol_id);
            logger.info("start serialize,cls:{}",protocol_cls);
            Object obj = (Object) SerializeUtil.deserializeWithProtostuff(data, protocol_cls);
            logger.info("decoder get obj:{}",obj);
            out.add(new LenPreMsg(protocol_id, len, obj));
            
            
        }
        
    
    }
     

    @Test
        public void test1(){
            RegDiscover regDiscover = new RegDiscover("httpsdsfsafsaerver", "addsdfsafsr");
            RpcRequest rpcRequest = new RpcRequest("734209479jdfjsfdjsfip794079014",
                    "adfjashfpaoh73497498d80fh32", 
                    "asjdfaiohpgh", 
                    "djas;fja938u2893", 
                    "0f8y0409y3", 
                    "7398ujfhhhe9hf93h2f9h");
            RegService regService = new RegService("customerfdsafsadf", "xxxdsfadsfasdf");
            List<LenPreMsg> msgs = asList(new LenPreMsg(Header.reg_discover, 1, regDiscover),
                    new LenPreMsg(Header.rpc_request, 1, rpcRequest),
                    new LenPreMsg(Header.reg_addservice, 1, regService)); 
    //        List<Integer> protocols = asList(Header.reg_discover,Header.rpc_request,Header.reg_addservice);
            for(int i=0;i<4;++i){
                new Thread(new Runnable() {
                    
                    @Override
                    public void run() {
                        for(int i=0;i<10;++i){
                            try {
                                cli.sendtoserver(msgs.get(i%3));
                            } catch (Exception e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        }
                        
                    }
                }).start();
            }
  • 相关阅读:
    ASP.NET WEB API 自定义模型校验过滤器
    使用asp.net mvc部分视图渲染html
    .Net中的并行编程-7.基于BlockingCollection实现高性能异步队列
    python爬虫技术的选择
    优雅的处理异常
    解决asp.net动态压缩
    .Net中的并行编程-6.常用优化策略
    使用快捷键提升C#开发效率
    .Net中的并行编程-5.流水线模型实战
    .Net中的并行编程-4.实现高性能异步队列
  • 原文地址:https://www.cnblogs.com/CreatorKou/p/11365378.html
Copyright © 2011-2022 走看看