zoukankan      html  css  js  c++  java
  • zico源代码分析(一) 数据接收和存储部分

    zorka和zico的代码地址:https://github.com/jitlogic

    由于zico是zorka的collecter端,所以在介绍zico之前首先说一下zorka和数据结构化存储和传输的部分。zorka抓取到数据后,会封装成一条TraceRecord,TraceRecord中包含本条trace的类,方法,调用链等信息,但是这些信息都是将对应的ID存在TraceRecord,而ID对应的详细信息会存在Symbol中,也就是说Symbol相当于一个数据字典。在传送和存储的之前会进行打TAG操作,已经来标记一条信息是Symbol的一个条目或者是TraceRecord中的一个条目。

    首先,先试着理一下Zico工作的主要步骤:

    ZicoServer通过socket建立连接,在run()方法中新建了ZicoServerConnector(继承自ZicoConnector),ZicoServerConnector中的核心部分是runCycle()函数。

    //此方法在run方法中被调用
        private void runCycle() throws IOException {
            ZicoPacket pkt = recv();
            switch (pkt.getStatus()) {
                case ZICO_PING: {
                    send(ZICO_PONG);
                    break;
                }
                case ZICO_HELLO: {
                    List<Object> lst = ZicoCommonUtil.unpack(pkt.getData());
                    log.debug("Encountered ZICO HELLO packet: " + lst + "(addr=" + saddr + ")");
                    if (lst.size() > 0 && lst.get(0) instanceof HelloRequest) {
                        context = factory.get(socket, (HelloRequest) lst.get(0));
                        send(ZICO_OK);
                    } else {
                        log.error("ZICO_HELLO packet with invalid content: " + lst + "(addr=" + addr + ")");
                        send(ZICO_BAD_REQUEST);
                    }
                    break;
                }
                case ZICO_DATA: {
                    //log.debug("Received ZICO data packet from " + saddr + ": status=" + pkt.getStatus()
                    //        + ", dlen=" + pkt.getData().length);
                    if (context != null) {
                        for (Object o : ZicoCommonUtil.unpack(pkt.getData())) {
                            context.process(o);
                        }
                        context.commit();
                        send(ZICO_OK);
                    } else {
                        log.error("Client " + saddr + " not authorized.");
                        send(ZICO_AUTH_ERROR);
                    }
                    break;
                }
                default:
                    log.error("ZICO packet from " + saddr + " with invalid status code: " + pkt.getStatus());
                    send(ZICO_BAD_REQUEST);
                    break;
            }
        }

    recv()函数是将通过socket读取的数据封装成一个ZicoPacket对象返回。在连接建立的初期,zico和zorka之间会发送ping-pong报文以及hello报文来测试建立的连接。ZicoPacket中的getData()方法返回一个byte数组,ZicoCommonUtil中的unpack()方法会将数据从byte数组中解包(unpack list of fressian-encoded objects from byte array)。unpack()函数执行的返回结果是List<object>,ZicoDataProcessor将对这个List<object>执行process()方法和commit()方法。

    只有ReceiverContext实现了ZicoDataProcessor接口,下面我们看一下ReceiverContext这个类。

     @Override
        public synchronized void process(Object obj) throws IOException {
    
            if (hostStore.hasFlag(HostInfo.DELETED)) {
                log.info("Resetting connection for " + hostStore.getName() + " due to dirty SID map.");
                throw new ZicoException(ZicoPacket.ZICO_EOD,
                        "Host has been deleted. Connection needs to be reset. Try again.");
            }
    
            if (hostStore.hasFlag(HostInfo.DISABLED)) {
                // Host store is disabled. Ignore incoming packets.
                return;
            }
    
            if (dirtySidMap) {
                log.info("Resetting connection for " + hostStore.getName() + " due to dirty SID map.");
                throw new ZicoException(ZicoPacket.ZICO_EOD,
                    "Host was disabled, then enabled and SID map is dirty. Resetting connection.");
            }
    
            try {
                if (obj instanceof Symbol) {
                    processSymbol((Symbol) obj);
                } else if (obj instanceof TraceRecord) {
                    processTraceRecord((TraceRecord) obj);
                } else {
                    if (obj != null) {
                        log.warn("Unsupported object type:" + obj.getClass());
                    } else {
                        log.warn("Attempted processing NULL object (?)");
                    }
                }
            } catch (Exception e) {
                log.error("Error processing trace record: ", e);
            }
        }

    ReceiverContext中包含一个HostStore(Represents performance data store for a single agent)对象。

    下面,我们看一下ReceiverContext中的processTraceRecord()方法。

    private void processTraceRecord(TraceRecord rec) throws IOException {
            if (!hostStore.hasFlag(HostInfo.DISABLED)) {
                rec.traverse(this);
                visitedObjects.clear();
                hostStore.processTraceRecord(rec);
            } else {
                log.debug("Dropping trace for inactive host: " + hostStore.getName());
            }
        } 

    此方法中调用了TraceRecord的tracerse方法。

       public void traverse(MetadataChecker checker) throws IOException {
            classId = checker.checkSymbol(classId, this);
            methodId = checker.checkSymbol(methodId, this);
            signatureId = checker.checkSymbol(signatureId, this);
    
            if (exception instanceof SymbolicException) {
                ((SymbolicException) exception).traverse(checker);
            }
    
            if (attrs != null) {
                Map<Integer, Object> newAttrs = new LinkedHashMap<Integer, Object>();
                for (Map.Entry<Integer, Object> e : attrs.entrySet()) {
                    newAttrs.put(checker.checkSymbol(e.getKey(), this), e.getValue());
                }
                attrs = newAttrs;
            }
    
            if (children != null) {
                for (TraceRecord child : children) {
                    child.traverse(checker);
                }
            }
    
            if (marker != null && 0 != (flags & TRACE_BEGIN)) {
                marker.traverse(checker);
            }
        }

    获取了三个id是****,读取了traceRecord及其孩子的attrs(Attributes grabbed at this method execution (by spy instrumentation engine))。

      在processTraceRecord()方法中调用了HostStore类中的ProcessTraceRecord(TraceRecord obj)方法,在其中将TraceRecord(Represents trace information about single method execution,May contain references to information about calls from this method)包装成了TraceInfoRecord,TraceInfoRecord比TraceRecord多了四个属性:dataOffs,dataLen,indexOffs,indexLen。个人猜测用来控制块操作的。

      在HostStore类的processTraceRecord()方法中将TraceRecord进行了更加详细的解析,将TraceRecord的对象分解为DataSore,IndexStore,infos等并且分别写入文件。

        public void processTraceRecord(TraceRecord rec) throws IOException {
    
            TraceRecordStore traceDataStore = getTraceDataStore();
            TraceRecordStore traceIndexStore = getTraceIndexStore();
            BTreeMap<Long,TraceInfoRecord> infos = getInfos();
            Map<Integer,String> tids = getTids();
    
            if (traceDataStore == null || traceIndexStore == null || infos == null || tids == null
                    || hasFlag(HostInfo.DISABLED|HostInfo.DELETED)) {
                throw new ZicoRuntimeException("Store " + getName() + " is closed and cannot accept records.");
            }
    
            TraceRecordStore.ChunkInfo dchunk = traceDataStore.write(rec);
    
            List<TraceRecord> tmp = rec.getChildren();
    
            int numRecords = ZicoUtil.numRecords(rec);
    
            rec.setChildren(null);
            TraceRecordStore.ChunkInfo ichunk = traceIndexStore.write(rec);
            rec.setChildren(tmp);
    
    
            TraceInfoRecord tir = new TraceInfoRecord(rec,numRecords,
                    dchunk.getOffset(), dchunk.getLength(),
                    ichunk.getOffset(), ichunk.getLength(),
                    rec.getAttrs() != null ? ZicoUtil.toIntArray(rec.getAttrs().keySet()) : null);
    
            checkAttrs(tir);
    
            infos.put(tir.getDataOffs(), tir);
    
            int traceId = tir.getTraceId();
    
            if (!tids.containsKey(traceId)) {
                tids.put(traceId, symbolRegistry.symbolName(traceId));
            }
    
        }

      到此,总结一下数据从sokect读取到写入文件的过程,ZicoServerConnector中负责根据socket生成inputStream从socket中读取数据,此时读取的数据是fressian编码的byte数组,ZicoCommonUtil类的unpack()方法将这个byte数组解析为一个List<Object>,ReceiverContext类进一步将这些Object区分为Symbol和TraceRecord,而HostStore进一步将TraceRecord解析为更加详细的信息,也就是对应下面的目录中的信息。

    zico将收到的agent端的数据存储在zico/data文件下,目录结构如下图所示:

    tdat中***存储的是host列表。

    tidx:***(还不清楚)

    host.properties中存储着host相关的信息(name,IP,group等等)

    symbol.dat***

    trace.db****

    HostStore类代表一个agent的性能数据的存储(Represents performance data store for a single agent)。在HostStore类的构造函数中调用了open()函数,我们可以根据以下代码分析目录结构

        public synchronized void open() {
            try {
                load();
                if (symbolRegistry == null) {
                    symbolRegistry = new PersistentSymbolRegistry(
                        new File(ZicoUtil.ensureDir(rootPath), "symbols.dat"));
                }
    
                if (traceDataStore == null) {
                    traceDataStore = new TraceRecordStore(config, this, "tdat", 1, this);
                }
    
                if (traceIndexStore == null) {
                    traceIndexStore = new TraceRecordStore(config, this, "tidx", 4);
                }
    
                db = dbf.openDB(ZorkaUtil.path(rootPath, "traces.db"));

                 //private BTreeMap<Long, TraceInfoRecord> infos;
                 infos = db.getTreeMap(DB_INFO_MAP);
                 //private Map<Integer, String> tids;
                 tids = db.getTreeMap(DB_TIDS_MAP);
                 //private BTreeMap<Integer,Set<Integer>> attrs;
                 attrs = db.getTreeMap(DB_ATTR_MAP);

            } catch (IOException e) {
                log.error("Cannot open host store " + name, e);
            }
        }

          PersistentSymbolRegistry类继承自SymbolRegistry类,而SymbolRegistry类中包含了两个Map(symbolIds以及symbolNames)。PersistentSymbolRegistry类中的open()方法会从symbol.dat文件中读取symbol,并put进symbolIds和symbolNames。

          TraceReocrdStore类在其构造函数中会新建一个RDSSotre(Raw Data Store (RDS) builds upon RAGZ stream classes)对象。在RSSsotre的open()方法中,会建立RAGZInputStream和RAGZOutputStream来读写RAGZSegment(represents a single in-memory segment)。RAGZSegment类有unpack()方法对数据进行解压,此处调用了java.util.zip包。

      在zico-core包的pom文件中, 有以下语句

            <dependency>
                <groupId>org.mapdb</groupId>
                <artifactId>mapdb</artifactId>
                <version>${mapdb.version}</version>
            </dependency>
           

      此处将mapdb导入了maven项目。对于trace.db文件使用了org.parboiled.BaseParser。  http://parboiled.org。这是一个轻量级no-SQL数据库

     第二部分

    zico使用了google guice IOC,在ProdZicoModule中有如下语句

    @Override
        public void configure(Binder binder) {
            super.configure(binder);
            binder.bind(UserManager.class).asEagerSingleton();
            binder.bind(UserContext.class).to(UserHttpContext.class);
            binder.bind(DBFactory.class).to(FileDBFactory.class);
            binder.bind(ZicoDataProcessorFactory.class).to(HostStoreManager.class);
        }

    先分析最后一句话,ZicoDataProcessorFactory接口中只有一个get方法,此方法在HostStoreManager方法中实现如下

      @Override
        public ZicoDataProcessor get(Socket socket, HelloRequest hello) throws IOException {
    
            if (hello.getHostname() == null) {
                log.error("Received HELLO packet with null hostname.");
                throw new ZicoException(ZicoPacket.ZICO_BAD_REQUEST, "Null hostname.");
            }
    
            HostStore store = getHost(hello.getHostname(), !enableSecurity);
    
            if (store == null) {
                throw new ZicoException(ZicoPacket.ZICO_AUTH_ERROR, "Unauthorized.");
            }
    
            if (store.getAddr() == null || store.getAddr().length() == 0) {
                store.setAddr(socket.getInetAddress().getHostAddress());
                store.save();
            }
    
            if (enableSecurity) {
                if (store.getAddr() != null && !store.getAddr().equals(socket.getInetAddress().getHostAddress())) {
                    throw new ZicoException(ZicoPacket.ZICO_AUTH_ERROR, "Unauthorized.");
                }
    
                if (store.getPass() != null && !store.getPass().equals(hello.getAuth())) {
                    throw new ZicoException(ZicoPacket.ZICO_AUTH_ERROR, "Unauthorized.");
                }
            }
    
            return new ReceiverContext(store);
        }

    函数里面主要做了查找hostStore,并返回和这个hostStore相关的ReceiverContext。ReceiverContext实现了ZicoDataProcessor的process()和commit()方法。

     第三部分:分析一下TraceDataService相关

    ***

    HostStore中有search()函数:

     public TraceInfoSearchResult search(TraceInfoSearchQuery query) throws IOException {
    
            SymbolRegistry symbolRegistry = getSymbolRegistry();
            BTreeMap<Long,TraceInfoRecord> infos = getInfos();
            TraceRecordStore traceDataStore = getTraceDataStore();
            TraceRecordStore traceIndexStore = getTraceIndexStore();
    
            if (symbolRegistry == null || infos == null || traceDataStore == null || traceIndexStore == null) {
                throw new ZicoRuntimeException("Host store " + getName() + " is closed.");
            }
    
            List<TraceInfo> lst = new ArrayList<TraceInfo>(query.getLimit());
    
            TraceInfoSearchResult result = new TraceInfoSearchResult();
            result.setSeq(query.getSeq());
            result.setResults(lst);
    
            TraceRecordMatcher matcher = null;
    
            int traceId = query.getTraceName() != null ? symbolRegistry.symbolId(query.getTraceName()) : 0;
    
            if (query.getSearchExpr() != null) {
                if (query.hasFlag(TraceInfoSearchQuery.EQL_QUERY)) {
                    matcher = new EqlTraceRecordMatcher(symbolRegistry,
                            Parser.expr(query.getSearchExpr()),
                            0, 0, getName());
                } else if (query.getSearchExpr().length() > 0 && query.getSearchExpr().startsWith("~")) {
                    matcher = new FullTextTraceRecordMatcher(symbolRegistry,
                            TraceRecordSearchQuery.SEARCH_ALL, Pattern.compile(query.getSearchExpr().substring(1)));
                } else {
                    matcher = new FullTextTraceRecordMatcher(symbolRegistry,
                            TraceRecordSearchQuery.SEARCH_ALL, query.getSearchExpr());
                }
            }
    
            // TODO implement query execution time limit
    
            int searchFlags = query.getFlags();
    
            boolean asc = 0 == (searchFlags & TraceInfoSearchQuery.ORDER_DESC);
    
            Long initialKey = asc
                    ? infos.higherKey(query.getOffset() != 0 ? query.getOffset() : Long.MIN_VALUE)
                    : infos.lowerKey(query.getOffset() != 0 ? query.getOffset() : Long.MAX_VALUE);
    
            long tstart = System.nanoTime();
    
            for (Long key = initialKey; key != null; key = asc ? infos.higherKey(key) : infos.lowerKey(key)) {
    
                long t = System.nanoTime()-tstart;
    
                if ((lst.size() >= query.getLimit()) || (t > MAX_SEARCH_T1 && lst.size() > 0) || (t > MAX_SEARCH_T2)) {
                    result.markFlag(TraceInfoSearchResult.MORE_RESULTS);
                    return result;
                }
    
                TraceInfoRecord tir = infos.get(key);
    
                result.setLastOffs(key);
    
                if (query.hasFlag(TraceInfoSearchQuery.ERRORS_ONLY) && 0 == (tir.getTflags() & TraceMarker.ERROR_MARK)) {
                    continue;
                }
    
                if (query.getStartDate() != 0 && tir.getClock() < query.getStartDate()) {
                    continue;
                }
    
                if (query.getEndDate() != 0 && tir.getClock() > query.getEndDate()) {
                    continue;
                }
    
                if (traceId != 0 && tir.getTraceId() != traceId) {
                    continue;
                }
    
                if (tir.getDuration() < query.getMinMethodTime()) {
                    continue;
                }
    
                TraceRecord idxtr = (query.hasFlag(TraceInfoSearchQuery.DEEP_SEARCH) && matcher != null)
                        ? traceDataStore.read(tir.getDataChunk())
                        : traceIndexStore.read(tir.getIndexChunk());
    
                if (idxtr != null) {
                    if (matcher instanceof EqlTraceRecordMatcher) {
                        ((EqlTraceRecordMatcher) matcher).setTotalTime(tir.getDuration());
                    }
                    if (matcher == null || recursiveMatch(matcher, idxtr)) {
                        lst.add(toTraceInfo(tir, idxtr));
                    }
                }
    
            }
    
            return result;
        }

    //DataReceptionUnitTest中可以看到HostStore中包含一个SymbolRegistry。

     //zico在和agent端进行交互时,使用了RESTful风格。

  • 相关阅读:
    Emacs 安装 jedi
    PHP+ MongoDB
    Debian 7 安装 Emacs 24.3
    在Code first中使用数据库里的视图
    Emacs安装auto-complete
    Debian 7.4 中配置PHP环境
    ASP.NET MVC 下载列表
    JDicom使用指南
    Windows常用的DOS命令
    Entity Framework问题总结
  • 原文地址:https://www.cnblogs.com/shuaiwang/p/4522905.html
Copyright © 2011-2022 走看看