DatabasePager加载数据的过程:
多线程 使用DatabasePager加载数据的流程:
左侧的图框表示数据的检索和输入, 中间的白色框表示用于数据存储的内存空间,而右边的图框表示存储数据的输出。此外,蓝色图框表示可以在DatabaseThread线程中完成的工作, 而橙色图框表示由线程之外的函数完成的工作。
1 void DatabasePager::DatabaseThread::run() 2 { 3 OSG_INFO<<_name<<": DatabasePager::DatabaseThread::run"<<std::endl; 4 5 6 bool firstTime = true; 7 8 osg::ref_ptr<DatabasePager::ReadQueue> read_queue; 9 osg::ref_ptr<DatabasePager::ReadQueue> out_queue; 10 11 switch(_mode) 12 { 13 case(HANDLE_ALL_REQUESTS): 14 read_queue = _pager->_fileRequestQueue; 15 break; 16 case(HANDLE_NON_HTTP): 17 read_queue = _pager->_fileRequestQueue; 18 out_queue = _pager->_httpRequestQueue; 19 break; 20 case(HANDLE_ONLY_HTTP): 21 read_queue = _pager->_httpRequestQueue; 22 break; 23 } 24 25 26 do 27 { 28 _active = false; 29 30 read_queue->block(); 31 32 if (_done) 33 { 34 break; 35 } 36 37 _active = true; 38 39 OSG_INFO<<_name<<": _pager->size()= "<<read_queue->size()<<" to delete = "<<read_queue->_childrenToDeleteList.size()<<std::endl; 40 41 42 43 // 44 // delete any children if required. 45 // 46 if (_pager->_deleteRemovedSubgraphsInDatabaseThread/* && !(read_queue->_childrenToDeleteList.empty())*/) 47 { 48 ObjectList deleteList; 49 { 50 // Don't hold lock during destruction of deleteList 51 OpenThreads::ScopedLock<OpenThreads::Mutex> lock(read_queue->_requestMutex); 52 if (!read_queue->_childrenToDeleteList.empty()) 53 { 54 deleteList.swap(read_queue->_childrenToDeleteList); 55 read_queue->updateBlock(); 56 } 57 } 58 } 59 60 // 61 // load any subgraphs that are required. 62 // 63 osg::ref_ptr<DatabaseRequest> databaseRequest; 64 read_queue->takeFirst(databaseRequest); 65 66 bool readFromFileCache = false; 67 68 osg::ref_ptr<FileCache> fileCache = osgDB::Registry::instance()->getFileCache(); 69 osg::ref_ptr<FileLocationCallback> fileLocationCallback = osgDB::Registry::instance()->getFileLocationCallback(); 70 osg::ref_ptr<Options> dr_loadOptions; 71 std::string fileName; 72 int frameNumberLastRequest = 0; 73 bool cacheNodes = false; 74 if (databaseRequest.valid()) 75 { 76 { 77 OpenThreads::ScopedLock<OpenThreads::Mutex> drLock(_pager->_dr_mutex); 78 dr_loadOptions = databaseRequest->_loadOptions.valid() ? databaseRequest->_loadOptions->cloneOptions() : new osgDB::Options; 79 dr_loadOptions->setTerrain(databaseRequest->_terrain); 80 dr_loadOptions->setParentGroup(databaseRequest->_group); 81 fileName = databaseRequest->_fileName; 82 frameNumberLastRequest = databaseRequest->_frameNumberLastRequest; 83 } 84 85 86 if (dr_loadOptions->getFileCache()) fileCache = dr_loadOptions->getFileCache(); 87 if (dr_loadOptions->getFileLocationCallback()) fileLocationCallback = dr_loadOptions->getFileLocationCallback(); 88 89 // disable the FileCache if the fileLocationCallback tells us that it isn't required for this request. 90 if (fileLocationCallback.valid() && !fileLocationCallback->useFileCache()) fileCache = 0; 91 92 93 cacheNodes = (dr_loadOptions->getObjectCacheHint() & osgDB::Options::CACHE_NODES)!=0; 94 if (cacheNodes) 95 { 96 //OSG_NOTICE<<"Checking main ObjectCache"<<std::endl; 97 // check the object cache to see if the file we want has already been loaded. 98 osg::ref_ptr<osg::Object> objectFromCache = osgDB::Registry::instance()->getRefFromObjectCache(fileName); 99 100 // if no object with fileName in ObjectCache then try the filename appropriate for fileCache 101 if (!objectFromCache && (fileCache.valid() && fileCache->isFileAppropriateForFileCache(fileName))) 102 { 103 if (fileCache->existsInCache(fileName)) 104 { 105 objectFromCache = osgDB::Registry::instance()->getRefFromObjectCache(fileCache->createCacheFileName(fileName)); 106 } 107 } 108 109 110 osg::Node* modelFromCache = dynamic_cast<osg::Node*>(objectFromCache.get()); 111 if (modelFromCache) 112 { 113 //OSG_NOTICE<<"Found object in cache "<<fileName<<std::endl; 114 115 // assign the cached model to the request 116 { 117 OpenThreads::ScopedLock<OpenThreads::Mutex> drLock(_pager->_dr_mutex); 118 databaseRequest->_loadedModel = modelFromCache; 119 } 120 121 // move the request to the dataToMerge list so it can be merged during the update phase of the frame. 122 { 123 OpenThreads::ScopedLock<OpenThreads::Mutex> listLock( _pager->_dataToMergeList->_requestMutex); 124 _pager->_dataToMergeList->addNoLock(databaseRequest.get()); 125 databaseRequest = 0; 126 } 127 128 // skip the rest of the do/while loop as we have done all the processing we need to do. 129 continue; 130 } 131 else 132 { 133 //OSG_NOTICE<<"Not Found object in cache "<<fileName<<std::endl; 134 } 135 136 // need to disable any attempt to use the cache when loading as we're handle this ourselves to avoid threading conflicts 137 { 138 OpenThreads::ScopedLock<OpenThreads::Mutex> drLock(_pager->_dr_mutex); 139 databaseRequest->_objectCache = new ObjectCache; 140 dr_loadOptions->setObjectCache(databaseRequest->_objectCache.get()); 141 } 142 } 143 144 145 // check if databaseRequest is still relevant 146 if ((_pager->_frameNumber-frameNumberLastRequest)<=1) 147 { 148 149 // now check to see if this request is appropriate for this thread 150 switch(_mode) 151 { 152 case(HANDLE_ALL_REQUESTS): 153 { 154 // do nothing as this thread can handle the load 155 if (fileCache.valid() && fileCache->isFileAppropriateForFileCache(fileName)) 156 { 157 if (fileCache->existsInCache(fileName)) 158 { 159 readFromFileCache = true; 160 } 161 } 162 break; 163 } 164 case(HANDLE_NON_HTTP): 165 { 166 // check the cache first 167 bool isHighLatencyFileRequest = false; 168 169 if (fileLocationCallback.valid()) 170 { 171 isHighLatencyFileRequest = fileLocationCallback->fileLocation(fileName, dr_loadOptions.get()) == FileLocationCallback::REMOTE_FILE; 172 } 173 else if (fileCache.valid() && fileCache->isFileAppropriateForFileCache(fileName)) 174 { 175 isHighLatencyFileRequest = true; 176 } 177 178 if (isHighLatencyFileRequest) 179 { 180 if (fileCache.valid() && fileCache->existsInCache(fileName)) 181 { 182 readFromFileCache = true; 183 } 184 else 185 { 186 OSG_INFO<<_name<<": Passing http requests over "<<fileName<<std::endl; 187 out_queue->add(databaseRequest.get()); 188 databaseRequest = 0; 189 } 190 } 191 break; 192 } 193 case(HANDLE_ONLY_HTTP): 194 { 195 // accept all requests, as we'll assume only high latency requests will have got here. 196 break; 197 } 198 } 199 } 200 else 201 { 202 databaseRequest = 0; 203 } 204 } 205 206 207 if (databaseRequest.valid()) 208 { 209 210 // load the data, note safe to write to the databaseRequest since once 211 // it is created this thread is the only one to write to the _loadedModel pointer. 212 //OSG_NOTICE<<"In DatabasePager thread readNodeFile("<<databaseRequest->_fileName<<")"<<std::endl; 213 //osg::Timer_t before = osg::Timer::instance()->tick(); 214 215 216 // assume that readNode is thread safe... 217 ReaderWriter::ReadResult rr = readFromFileCache ? 218 fileCache->readNode(fileName, dr_loadOptions.get(), false) : 219 Registry::instance()->readNode(fileName, dr_loadOptions.get(), false); 220 221 osg::ref_ptr<osg::Node> loadedModel; 222 if (rr.validNode()) loadedModel = rr.getNode(); 223 if (rr.error()) OSG_WARN<<"Error in reading file "<<fileName<<" : "<<rr.message() << std::endl; 224 if (rr.notEnoughMemory()) OSG_INFO<<"Not enought memory to load file "<<fileName << std::endl; 225 226 if (loadedModel.valid() && 227 fileCache.valid() && 228 fileCache->isFileAppropriateForFileCache(fileName) && 229 !readFromFileCache) 230 { 231 fileCache->writeNode(*(loadedModel), fileName, dr_loadOptions.get()); 232 } 233 234 { 235 OpenThreads::ScopedLock<OpenThreads::Mutex> drLock(_pager->_dr_mutex); 236 if ((_pager->_frameNumber-databaseRequest->_frameNumberLastRequest)>1) 237 { 238 OSG_INFO<<_name<<": Warning DatabaseRquest no longer required."<<std::endl; 239 loadedModel = 0; 240 } 241 } 242 243 //OSG_NOTICE<<" node read in "<<osg::Timer::instance()->delta_m(before,osg::Timer::instance()->tick())<<" ms"<<std::endl; 244 245 if (loadedModel.valid()) 246 { 247 loadedModel->getBound(); 248 249 bool loadedObjectsNeedToBeCompiled = false; 250 osg::ref_ptr<osgUtil::IncrementalCompileOperation::CompileSet> compileSet = 0; 251 if (!rr.loadedFromCache()) 252 { 253 // find all the compileable rendering objects 254 DatabasePager::FindCompileableGLObjectsVisitor stateToCompile(_pager, _pager->getMarkerObject()); 255 loadedModel->accept(stateToCompile); 256 257 loadedObjectsNeedToBeCompiled = _pager->_doPreCompile && 258 _pager->_incrementalCompileOperation.valid() && 259 _pager->_incrementalCompileOperation->requiresCompile(stateToCompile); 260 261 // move the databaseRequest from the front of the fileRequest to the end of 262 // dataToCompile or dataToMerge lists. 263 if (loadedObjectsNeedToBeCompiled) 264 { 265 // OSG_NOTICE<<"Using IncrementalCompileOperation"<<std::endl; 266 267 compileSet = new osgUtil::IncrementalCompileOperation::CompileSet(loadedModel.get()); 268 compileSet->buildCompileMap(_pager->_incrementalCompileOperation->getContextSet(), stateToCompile); 269 compileSet->_compileCompletedCallback = new DatabasePagerCompileCompletedCallback(_pager, databaseRequest.get()); 270 _pager->_incrementalCompileOperation->add(compileSet.get(), false); 271 } 272 } 273 else 274 { 275 OSG_NOTICE<<"Loaded from ObjectCache"<<std::endl; 276 } 277 278 279 { 280 OpenThreads::ScopedLock<OpenThreads::Mutex> drLock(_pager->_dr_mutex); 281 databaseRequest->_loadedModel = loadedModel; 282 databaseRequest->_compileSet = compileSet; 283 } 284 // Dereference the databaseRequest while the queue is 285 // locked. This prevents the request from being 286 // deleted at an unpredictable time within 287 // addLoadedDataToSceneGraph. 288 if (loadedObjectsNeedToBeCompiled) 289 { 290 OpenThreads::ScopedLock<OpenThreads::Mutex> listLock( 291 _pager->_dataToCompileList->_requestMutex); 292 _pager->_dataToCompileList->addNoLock(databaseRequest.get()); 293 databaseRequest = 0; 294 } 295 else 296 { 297 OpenThreads::ScopedLock<OpenThreads::Mutex> listLock( 298 _pager->_dataToMergeList->_requestMutex); 299 _pager->_dataToMergeList->addNoLock(databaseRequest.get()); 300 databaseRequest = 0; 301 } 302 303 } 304 305 // _pager->_dataToCompileList->pruneOldRequestsAndCheckIfEmpty(); 306 } 307 else 308 { 309 OpenThreads::Thread::YieldCurrentThread(); 310 } 311 312 313 // go to sleep till our the next time our thread gets scheduled. 314 315 if (firstTime) 316 { 317 // do a yield to get round a peculiar thread hang when testCancel() is called 318 // in certain circumstances - of which there is no particular pattern. 319 YieldCurrentThread(); 320 firstTime = false; 321 } 322 323 } while (!testCancel() && !_done); 324 }