00001 #include "FileSystemDataSource.h"
00002 #include "local/DeviceDrivers/LoggedDataDriver.h"
00003 #include "Shared/get_time.h"
00004 #include "Shared/RobotInfo.h"
00005 #include "Shared/string_util.h"
00006 #include "Shared/MarkScope.h"
00007 #include <set>
00008 #include <fstream>
00009 #include <sys/types.h>
00010 #include <sys/mman.h>
00011 #include <sys/stat.h>
00012 #include <regex.h>
00013 #include <dirent.h>
00014 #include <fcntl.h>
00015 #include <sys/resource.h>
00016 #include <cmath>
00017 #include <errno.h>
00018
00019 using namespace std;
00020
00021 FileSystemDataSource::~FileSystemDataSource() {
00022 MarkScope autolock(lock);
00023 if(timeScale!=NULL)
00024 leavingRealtime(false);
00025 clearFiles();
00026 }
00027
00028 unsigned int FileSystemDataSource::nextTimestamp() {
00029 return (curfile!=files.end()) ? static_cast<unsigned int>(nextTime) : -1U;
00030 }
00031
00032 const std::string& FileSystemDataSource::nextName() {
00033 if(curfile!=files.end())
00034 return (*curfile)->filename;
00035 else {
00036 static const std::string noneStr="(none)";
00037 return noneStr;
00038 }
00039 }
00040
00041 bool FileSystemDataSource::advance() {
00042 MarkScope autolock(lock);
00043 Thread::testCurrentCancel();
00044 if(curfile==files.end())
00045 return false;
00046 const unsigned int timestamp = get_time();
00047 if(!frozen) {
00048 if(nextTime>timestamp)
00049 return false;
00050 if(nextTime+(*curfile)->lifetime<=timestamp) {
00051 double looptime=getLoopTime(true);
00052 if(looptime>0) {
00053 while(nextTime+looptime<=timestamp)
00054 nextTime+=looptime;
00055 }
00056 while(nextTime+(*curfile)->lifetime<=timestamp)
00057 nextFrame(0);
00058 }
00059 if(curfile==files.end())
00060 return false;
00061 }
00062
00063 preprepare(curfile);
00064
00065 bool sentData=false;
00066 if((*curfile)->getData()!=NULL) {
00067 if(verbose>=2) {
00068 std::cout << "Applying '" << (*curfile)->filename << "' at " << timestamp;
00069 if(timestamp!=nextTime)
00070 std::cout << " (should've been " << nextTime << ")";
00071 std::cout << std::endl;
00072 }
00073 sentData = sendData();
00074 (*curfile)->done();
00075 }
00076
00077 double origNext=nextTime;
00078 nextFrame();
00079 if(frozen)
00080 nextTime=origNext;
00081
00082 return sentData;
00083 }
00084
00085 void FileSystemDataSource::registerSource() {
00086 ASSERT(!registered,"Already registered?");
00087 nextTime=get_time();
00088 path.addPrimitiveListener(this);
00089 filenameFilter.addPrimitiveListener(this);
00090 loop.addPrimitiveListener(this);
00091 framerate.addPrimitiveListener(this);
00092 plistValueChanged(path);
00093 registered=true;
00094 }
00095
00096 void FileSystemDataSource::deregisterSource() {
00097 ASSERT(registered,"Already deregistered?");
00098 registered=false;
00099 nextTime=get_time();
00100 path.removePrimitiveListener(this);
00101 filenameFilter.removePrimitiveListener(this);
00102 loop.removePrimitiveListener(this);
00103 framerate.removePrimitiveListener(this);
00104 if(timeScale!=NULL)
00105 leavingRealtime(false);
00106 }
00107
00108 void FileSystemDataSource::enteringRealtime(const plist::Primitive<double>& simTimeScale) {
00109 timeScale = &simTimeScale;
00110 resetPoller();
00111 }
00112 void FileSystemDataSource::leavingRealtime(bool ) {
00113 if(poller.isStarted()) {
00114 timeScale->removePrimitiveListener(this);
00115 poller.stop().join();
00116 }
00117 timeScale = NULL;
00118 }
00119
00120 void FileSystemDataSource::doFreeze() {
00121 freezeTime=get_time();
00122 resetPoller();
00123 }
00124 void FileSystemDataSource::doUnfreeze() {
00125 nextTime+=get_time()-freezeTime;
00126 resetPoller();
00127 }
00128
00129 void FileSystemDataSource::resetPoller() {
00130 MarkScope autolock(lock);
00131 MarkScope sl(poller.getStartLock());
00132 if(getTimeScale()>0 && nextTimestamp()!=-1U && registered && timeScale!=NULL && !frozen) {
00133
00134 if(!poller.isStarted()) {
00135 timeScale->addPrimitiveListener(this);
00136 poller.start();
00137 }
00138 } else if(poller.isStarted()) {
00139 timeScale->removePrimitiveListener(this);
00140 poller.stop().join();
00141 }
00142 }
00143
00144 const std::string& FileSystemDataSource::getUsedPath() const { return (path.size()==0) ? parent.path : path; }
00145
00146 bool FileSystemDataSource::loadFileList(bool clearCurrent, bool reportMissing) {
00147 MarkScope autolock(lock);
00148 nextTime=freezeTime=get_time();
00149 struct stat sb;
00150 if(clearCurrent)
00151 clearFiles();
00152 else if(files.size()==0)
00153 clearCurrent=true;
00154 if(getUsedPath().size()==0)
00155 return true;
00156 if(stat(getUsedPath().c_str(),&sb)) {
00157 if(reportMissing)
00158 std::cerr << "FileSystemDataSource could not access path '" << getUsedPath() << "'" << std::endl;
00159 return false;
00160 }
00161 if(sb.st_mode&S_IFDIR) {
00162 loadFileListFromDirectory();
00163 } else {
00164
00165 try {
00166 if(string_util::reMatch(getUsedPath(),filenameFilter))
00167 loadSingleFile(getUsedPath().c_str());
00168 else {
00169 if(!loadFileListFromIndex())
00170 std::cerr << "Source '" << getUsedPath() << "' does not match the filename filter '" << filenameFilter << "' and is not an index list." << std::endl;
00171 }
00172 } catch(const std::string& err) {
00173 std::cerr << err << std::endl;
00174 }
00175 }
00176 if(clearCurrent) {
00177 files_t::iterator it=curfile=files.begin();
00178 for(unsigned int numPreload=2; numPreload>0 && it!=files.end(); numPreload--)
00179 preprepare(it++);
00180 }
00181 actualLoopTime=naturalLoopTime=calcLoopTime();
00182 resetPoller();
00183 return true;
00184 }
00185
00186 void* FileSystemDataSource::DataThread::run() {
00187 for(;;) {
00188 ASSERTRETVAL(getTimeScale()>0,"FileSystemDataSource::runloop in non-realtime mode",NULL);
00189 unsigned int next = parent.nextTimestamp();
00190 if(next==-1U)
00191 return NULL;
00192 unsigned int cur = get_time();
00193 if(cur < next) {
00194 while(usleep(static_cast<useconds_t>((next-cur)*1000/getTimeScale()))) {
00195 if(errno!=EINTR) {
00196 perror("FileSystemDataSource::runloop(): nanosleep");
00197 break;
00198 }
00199
00200 testCancel();
00201 cur = get_time();
00202 next = parent.nextTimestamp();
00203 }
00204 }
00205 testCancel();
00206 parent.advance();
00207 }
00208 }
00209
00210 void FileSystemDataSource::setFrame(unsigned int f, unsigned int numPreload) {
00211 MarkScope autolock(lock);
00212 for(;curfile!=files.end() && (*curfile)->isPrepared(); ++curfile) {
00213 if(files.size()>MAX_LOAD)
00214 (*curfile)->release();
00215 else
00216 (*curfile)->done();
00217 }
00218 nextTime=freezeTime=get_time();
00219 curfile=files.begin();
00220 std::advance(curfile,f);
00221 files_t::iterator it=curfile;
00222 for(; numPreload>0 && it!=files.end(); numPreload--) {
00223 preprepare(it);
00224 if(++it==files.end() && loop)
00225 it=files.begin();
00226 }
00227 }
00228
00229 void FileSystemDataSource::nextFrame(unsigned int numPreload) {
00230 MarkScope autolock(lock);
00231 if(numPreload==0 && verbose>=1)
00232 cout << "Dropping " << (*curfile)->filename << " scheduled " << nextTime << " for duration " << (*curfile)->lifetime << endl;
00233 if(files.size()>MAX_LOAD)
00234 (*curfile)->release();
00235 else
00236 (*curfile)->done();
00237 nextTime+=(*curfile)->lifetime;
00238 if(++curfile==files.end()) {
00239 if(!loop) {
00240 if(verbose>=3)
00241 cout << "Reached end of logged data source \"" << parent.getName() << '"' << endl;
00242 } else {
00243 nextTime+=initialDelay;
00244 curfile=files.begin();
00245 if(verbose>=3)
00246 cout << "Looping file system data source at " << nextTime << " to " << (*curfile)->filename << " (loop time=" << getLoopTime() << ")" << endl;
00247 }
00248 }
00249 files_t::iterator it=curfile;
00250 for(; numPreload>0 && it!=files.end(); numPreload--) {
00251 preprepare(it);
00252 if(++it==files.end() && loop)
00253 it=files.begin();
00254 }
00255 }
00256
00257 double FileSystemDataSource::calcLoopTime() const {
00258 if(files.size()==0)
00259 return 0;
00260 double t=initialDelay;
00261 for(files_t::const_iterator it=files.begin(); it!=files.end(); ++it)
00262 t+=(*it)->lifetime;
00263 return t;
00264 }
00265 void FileSystemDataSource::setLoopTime(double t) {
00266 if(files.size()==0)
00267 return;
00268 double remain = t - getLoopTime(true);
00269 if(remain + files.back()->lifetime < 0) {
00270 std::cerr << "FileSystemDataSource::setLoopTime(" << t << ") would result in a negative frame lifetime" << std::endl;
00271 return;
00272 }
00273 files.back()->lifetime+=remain;
00274 actualLoopTime=t;
00275 }
00276
00277 void FileSystemDataSource::clearFiles() {
00278 MarkScope autolock(lock);
00279
00280 for(files_t::iterator it=files.begin(); it!=files.end(); ++it)
00281 delete *it;
00282 files.clear();
00283 curfile=files.begin();
00284 initialDelay=0;
00285 actualLoopTime=naturalLoopTime=0;
00286 }
00287
00288 void FileSystemDataSource::plistValueTouched(const plist::PrimitiveBase& pl) {
00289 if(&pl==&path) {
00290
00291 loadFileList();
00292 }
00293 }
00294 void FileSystemDataSource::plistValueChanged(const plist::PrimitiveBase& pl) {
00295 if(&pl==&path || &pl==& filenameFilter) {
00296 loadFileList();
00297 } else if(&pl==&loop) {
00298 if(loop && nextTimestamp()==-1U)
00299 curfile=files.begin();
00300 resetPoller();
00301 } else if(&pl==&framerate) {
00302 if(!usingIndexFile()) {
00303 const double dt=1000.f/framerate;
00304 bool first = (curfile==files.begin());
00305 if(!first)
00306 nextTime-=(*--curfile)->lifetime;
00307 for(files_t::const_iterator it=files.begin(); it!=files.end(); ++it)
00308 (*it)->lifetime=dt;
00309 if(!first)
00310 nextTime+=(*curfile++)->lifetime;
00311 parent.plistValueChanged(path);
00312 }
00313 } else if(&pl==timeScale) {
00314 if(poller.isRunning())
00315 poller.interrupt();
00316 } else {
00317 cerr << "FileSystemDataSource didn't handle call to plistValueChanged for " << pl.get() << endl;
00318 }
00319 }
00320
00321 void FileSystemDataSource::loadXML(xmlNode* node) {
00322 bool wasListener;
00323 if((wasListener=path.isPrimitiveListener(this)))
00324 path.removePrimitiveListener(this);
00325 plist::Dictionary::loadXML(node);
00326 if(wasListener) {
00327 path.addPrimitiveListener(this);
00328 plistValueChanged(path);
00329 }
00330 }
00331
00332 void FileSystemDataSource::loadSingleFile(const std::string& file) {
00333 MarkScope autolock(lock);
00334 indexed=false;
00335 enqueueFile(file,1000.f/framerate);
00336 }
00337
00338 void FileSystemDataSource::loadFileListFromDirectory() {
00339 regex_t re;
00340 if(int err=regcomp(&re,filenameFilter.c_str(),REG_EXTENDED | REG_NOSUB)) {
00341 char msg[128];
00342 regerror(err,&re,msg,128);
00343 std::cerr << "Bad filter '" << filenameFilter << "': " << msg << std::endl;
00344 regfree(&re);
00345 return;
00346 }
00347 DIR * d=opendir(getUsedPath().c_str());
00348 if(d==NULL) {
00349 std::cerr << "Could not open directory " << getUsedPath() << std::endl;
00350 regfree(&re);
00351 return;
00352 }
00353 struct dirent* res;
00354
00355 #ifdef HAVE_READDIR_R
00356 struct dirent cur;
00357 if(readdir_r(d,&cur,&res)) {
00358 std::cerr << "Error reading files from " << getUsedPath() << std::endl;
00359 closedir(d);
00360 regfree(&re);
00361 return;
00362 }
00363 #else
00364 res=readdir(d);
00365 #endif
00366
00367 std::set<std::string> dirfiles;
00368 while(res!=NULL) {
00369 int match=regexec(&re,res->d_name,0,NULL,0);
00370 if(match==0) {
00371 dirfiles.insert(res->d_name);
00372 } else if(match!=REG_NOMATCH) {
00373 char msg[128];
00374 regerror(match,&re,msg,128);
00375 std::cerr << "Regex error on '" << res->d_name << "': " << msg << std::endl;
00376 }
00377 #ifdef HAVE_READDIR_R
00378 if(readdir_r(d,&cur,&res)) {
00379 std::cerr << "Error reading files from " << getUsedPath() << std::endl;
00380 closedir(d);
00381 regfree(&re);
00382 return;
00383 }
00384 #else
00385 res=readdir(d);
00386 #endif
00387 }
00388 closedir(d);
00389 regfree(&re);
00390
00391 MarkScope autolock(lock);
00392
00393 double tinc=1000.f/framerate;
00394 for(std::set<std::string>::const_iterator it=dirfiles.begin(); it!=dirfiles.end(); ++it) {
00395
00396 enqueueFile((getUsedPath()+"/")+(*it),tinc);
00397 }
00398 indexed=false;
00399 }
00400
00401 bool FileSystemDataSource::loadFileListFromIndex() {
00402 MarkScope autolock(lock);
00403 indexed=(indexed || files.size()==0);
00404 regex_t re;
00405 if(int err=regcomp(&re,filenameFilter.c_str(),REG_EXTENDED | REG_NOSUB)) {
00406 char msg[128];
00407 regerror(err,&re,msg,128);
00408 std::cerr << "Bad filter '" << filenameFilter << "': " << msg << std::endl;
00409 regfree(&re);
00410 return false;
00411 }
00412
00413 ifstream in(getUsedPath().c_str());
00414 string cur;
00415 getline(in,cur);
00416 if(cur.find("First frame ")==0)
00417 getline(in,cur);
00418
00419 double tinc=1000.f/framerate;
00420 double lasttime=-tinc;
00421 while(in) {
00422 string fn = cur.substr(0,cur.find('\t'));
00423 int match=regexec(&re,fn.c_str(),0,NULL,0);
00424 if(match==0) {
00425 double curtime=lasttime+tinc;
00426 if(fn.size()!=cur.size()) {
00427 const char * timep=cur.c_str()+cur.rfind('\t');
00428 char * endp=NULL;
00429 curtime=strtof(timep,&endp);
00430 if(timep==endp) {
00431 std::cerr << "ERROR: '" << getUsedPath() << "' does not seem to be a valid index file." << std::endl;
00432 std::cerr << " Use output from VisionGUI, or use format 'filename <tab> time'" << std::endl;
00433 std::cerr << " Where 'time' is the time in milliseconds at which the file should be processed, relative" << std::endl;
00434 std::cerr << " to the time at which the index file is loaded." << std::endl;
00435 regfree(&re);
00436 return false;
00437 }
00438 if(lasttime>=0) {
00439 files.back()->lifetime=curtime-lasttime;
00440
00441 } else if(files.size()>0) {
00442 files.back()->lifetime+=curtime;
00443
00444 } else {
00445 initialDelay=curtime;
00446 nextTime=get_time()+curtime;
00447
00448 }
00449 }
00450 if(fn[0]!='/') {
00451 string::size_type srcdir=getUsedPath().rfind('/');
00452 if(srcdir!=string::npos)
00453 fn=getUsedPath().substr(0,srcdir+1)+fn;
00454 }
00455
00456 enqueueFile(fn,tinc);
00457 lasttime=curtime;
00458 } else if(match!=REG_NOMATCH) {
00459 char msg[128];
00460 regerror(match,&re,msg,128);
00461 std::cerr << "Regex error on '" << fn << "': " << msg << std::endl;
00462 }
00463 getline(in,cur);
00464 }
00465 regfree(&re);
00466 return true;
00467 }
00468
00469 void FileSystemDataSource::FileInfo::prepare() {
00470 if(prepared)
00471 return;
00472 if(data==NULL) {
00473 struct stat statbuf;
00474 if(stat(filename.c_str(),&statbuf)!=0) {
00475 std::string err="FileSystemDataSource::FileInfo::prepare() failed to stat file ";
00476 err+=filename;
00477 perror(err.c_str());
00478 return;
00479 }
00480 int fd=open(filename.c_str(),O_RDONLY);
00481 if(fd<0) {
00482 std::string err="FileSystemDataSource::FileInfo::prepare() unable to open file ";
00483 err+=filename;
00484 perror(err.c_str());
00485 return;
00486 }
00487 refcount=new unsigned int;
00488 *refcount=1;
00489 size=static_cast<size_t>(statbuf.st_size);
00490 data=static_cast<char*>(mmap(NULL,size,PROT_READ,MAP_PRIVATE|MAP_FILE,fd,0));
00491 if(data==MAP_FAILED) {
00492 data=NULL;
00493 size=0;
00494 std::string err="FileSystemDataSource::FileInfo::prepare() unable to mmap file ";
00495 err+=filename;
00496 perror(err.c_str());
00497 return;
00498 }
00499 if(close(fd)!=0) {
00500 std::string err="FileSystemDataSource::FileInfo::prepare() unable to close file ";
00501 err+=filename;
00502 perror(err.c_str());
00503 return;
00504 }
00505 }
00506 if(mlock(data,size)!=0) {
00507 if(errno==ENOMEM) {
00508 static bool firsterr=true;
00509 if(firsterr) {
00510 firsterr=false;
00511 rlimit rl;
00512 #ifndef __CYGWIN__
00513 getrlimit(RLIMIT_MEMLOCK,&rl);
00514 cerr << "Notice: mlock() failed because RLIMIT_MEMLOCK is too low, limited to " << (rl.rlim_cur/1024) << "KB\n"
00515 << "Increasing this limit can smooth logged data I/O in low memory situations. (see ulimit/limit commands)" << endl;
00516 #endif
00517 }
00518 } else {
00519 std::string err="FileSystemDataSource::FileInfo::prepare() unable to mlock file ";
00520 err+=filename;
00521 perror(err.c_str());
00522 }
00523 return;
00524 }
00525 prepared=true;
00526 }
00527
00528 void FileSystemDataSource::FileInfo::done() {
00529 if(data==NULL || !prepared)
00530 return;
00531 prepared=false;
00532 if(munlock(data,size)!=0) {
00533 std::string err="FileSystemDataSource::FileInfo::done() unable to munlock file ";
00534 err+=filename;
00535 perror(err.c_str());
00536 }
00537 }
00538
00539 void FileSystemDataSource::FileInfo::release() {
00540 if(data==NULL)
00541 return;
00542 done();
00543 if(--(*refcount)==0) {
00544 if(munmap(data,size)!=0) {
00545 std::string err="FileSystemDataSource::FileInfo::release() unable to munmap file ";
00546 err+=filename;
00547 perror(err.c_str());
00548 }
00549 delete refcount;
00550 }
00551 data=NULL;
00552 size=0;
00553 refcount=NULL;
00554 }
00555
00556 void FileSystemDataSource::preprepare(const FileSystemDataSource::files_t::iterator& fi) {
00557 if(verbose>=4 && (*fi)->getData()==NULL) {
00558 if(fi==curfile)
00559 std::cout << "Loading '" << (*fi)->filename << "' at " << get_time() << std::endl;
00560 else {
00561 std::cout << "Preloading '" << (*fi)->filename << "' at " << get_time();
00562 files_t::iterator tmp = curfile;
00563 double tgtTime=nextTime;
00564 {
00565 tgtTime+=(*tmp)->lifetime;
00566 ++tmp;
00567 if(tmp==files.end()) {
00568 tmp=files.begin();
00569 tgtTime+=initialDelay;
00570 }
00571 } while(tmp!=curfile && tmp!=fi);
00572 std::cout << ", scheduled for " << static_cast<unsigned int>(tgtTime) << std::endl;
00573 }
00574
00575 }
00576 (*fi)->prepare();
00577 }
00578
00579
00580
00581
00582