Tekkotsu Homepage
Demos
Overview
Downloads
Dev. Resources
Reference
Credits

FileSystemDataSource.cc

Go to the documentation of this file.
00001 #include "FileSystemDataSource.h"
00002 #include "local/DeviceDrivers/LoggedDataDriver.h"
00003 #include "Shared/get_time.h"
00004 #include "Shared/RobotInfo.h"
00005 #include "Shared/string_util.h"
00006 #include "Shared/MarkScope.h"
00007 #include <set>
00008 #include <fstream>
00009 #include <sys/types.h>
00010 #include <sys/mman.h>
00011 #include <sys/stat.h>
00012 #include <regex.h>
00013 #include <dirent.h>
00014 #include <fcntl.h>
00015 #include <sys/resource.h>
00016 #include <cmath>
00017 #include <errno.h>
00018 
00019 using namespace std; 
00020 
00021 FileSystemDataSource::~FileSystemDataSource() {
00022   MarkScope autolock(lock);
00023   if(timeScale!=NULL)
00024     leavingRealtime(false);
00025   clearFiles();
00026 }
00027 
00028 unsigned int FileSystemDataSource::nextTimestamp() {
00029   return (curfile!=files.end()) ? static_cast<unsigned int>(nextTime) : -1U;
00030 }
00031 
00032 const std::string& FileSystemDataSource::nextName() {
00033   if(curfile!=files.end())
00034     return (*curfile)->filename;
00035   else {
00036     static const std::string noneStr="(none)";
00037     return noneStr;
00038   }
00039 }
00040 
00041 bool FileSystemDataSource::advance() {
00042   MarkScope autolock(lock);
00043   Thread::testCurrentCancel();
00044   if(curfile==files.end())
00045     return false;
00046   const unsigned int timestamp = get_time();
00047   if(!frozen) {
00048     if(nextTime>timestamp)
00049       return false;
00050     if(nextTime+(*curfile)->lifetime<=timestamp) {
00051       double looptime=getLoopTime(true);
00052       if(looptime>0) {
00053         while(nextTime+looptime<=timestamp)
00054           nextTime+=looptime;
00055       }
00056       while(nextTime+(*curfile)->lifetime<=timestamp)
00057         nextFrame(0); // too late, drop frames
00058     }
00059     if(curfile==files.end())
00060       return false;
00061   }
00062   
00063   preprepare(curfile);
00064 
00065   bool sentData=false;
00066   if((*curfile)->getData()!=NULL) {
00067     if(verbose>=2) {
00068       std::cout << "Applying '" << (*curfile)->filename << "' at " << timestamp;
00069       if(timestamp!=nextTime)
00070         std::cout << " (should've been " << nextTime << ")";
00071       std::cout << std::endl;
00072     }
00073     sentData = sendData();
00074     (*curfile)->done();
00075   }
00076   
00077   double origNext=nextTime;
00078   nextFrame();
00079   if(frozen)
00080     nextTime=origNext; // hold nextTime if frozen
00081   
00082   return sentData;
00083 }
00084 
00085 void FileSystemDataSource::registerSource() {
00086   ASSERT(!registered,"Already registered?");
00087   nextTime=get_time();
00088   path.addPrimitiveListener(this);
00089   filenameFilter.addPrimitiveListener(this);
00090   loop.addPrimitiveListener(this);
00091   framerate.addPrimitiveListener(this);
00092   plistValueChanged(path);
00093   registered=true;
00094 }
00095 
00096 void FileSystemDataSource::deregisterSource() {
00097   ASSERT(registered,"Already deregistered?");
00098   registered=false;
00099   nextTime=get_time();
00100   path.removePrimitiveListener(this);
00101   filenameFilter.removePrimitiveListener(this);
00102   loop.removePrimitiveListener(this);
00103   framerate.removePrimitiveListener(this);
00104   if(timeScale!=NULL)
00105     leavingRealtime(false);
00106 }
00107 
00108 void FileSystemDataSource::enteringRealtime(const plist::Primitive<double>& simTimeScale) {
00109   timeScale = &simTimeScale;
00110   resetPoller();
00111 }
00112 void FileSystemDataSource::leavingRealtime(bool /*isFullSpeed*/) {
00113   if(poller.isStarted()) {
00114     timeScale->removePrimitiveListener(this);
00115     poller.stop().join();
00116   }
00117   timeScale = NULL;
00118 }
00119 
00120 void FileSystemDataSource::doFreeze() {
00121   freezeTime=get_time();
00122   resetPoller();
00123 }
00124 void FileSystemDataSource::doUnfreeze() {
00125   nextTime+=get_time()-freezeTime;
00126   resetPoller();
00127 }
00128 
00129 void FileSystemDataSource::resetPoller() {
00130   MarkScope autolock(lock);
00131   MarkScope sl(poller.getStartLock());
00132   if(getTimeScale()>0 && nextTimestamp()!=-1U && registered && timeScale!=NULL && !frozen) {
00133     // should be running...
00134     if(!poller.isStarted()) { // start if it isn't running
00135       timeScale->addPrimitiveListener(this);
00136       poller.start();
00137     }
00138   } else if(poller.isStarted()) { // should not be running... stop if it is
00139     timeScale->removePrimitiveListener(this);
00140     poller.stop().join();
00141   }
00142 }
00143 
00144 const std::string& FileSystemDataSource::getUsedPath() const { return (path.size()==0) ? parent.path : path; }
00145 
00146 bool FileSystemDataSource::loadFileList(bool clearCurrent/*=true*/, bool reportMissing/*=true*/) {
00147   MarkScope autolock(lock);
00148   nextTime=freezeTime=get_time();
00149   struct stat sb;
00150   if(clearCurrent)
00151     clearFiles();
00152   else if(files.size()==0)
00153     clearCurrent=true; // was empty, pretend we just cleared it, have to do the same re-initialization
00154   if(getUsedPath().size()==0)
00155     return true; //empty path means disabled
00156   if(stat(getUsedPath().c_str(),&sb)) {
00157     if(reportMissing)
00158       std::cerr << "FileSystemDataSource could not access path '" << getUsedPath() << "'" << std::endl;
00159     return false;
00160   }
00161   if(sb.st_mode&S_IFDIR) {
00162     loadFileListFromDirectory();
00163   } else {
00164     //Test to see if the file matches the filter
00165     try {
00166       if(string_util::reMatch(getUsedPath(),filenameFilter))
00167         loadSingleFile(getUsedPath().c_str());
00168       else { //if it doesn't match the image RE, assume it's an index file
00169         if(!loadFileListFromIndex())
00170           std::cerr << "Source '" << getUsedPath() << "' does not match the filename filter '" << filenameFilter << "' and is not an index list." << std::endl;
00171       }
00172     } catch(const std::string& err) {
00173       std::cerr << err << std::endl;
00174     }
00175   }
00176   if(clearCurrent) {
00177     files_t::iterator it=curfile=files.begin();
00178     for(unsigned int numPreload=2; numPreload>0 && it!=files.end(); numPreload--)
00179       preprepare(it++);
00180   }
00181   actualLoopTime=naturalLoopTime=calcLoopTime();
00182   resetPoller();
00183   return true;
00184 }
00185 
00186 void* FileSystemDataSource::DataThread::run() {
00187   for(;;) {
00188     ASSERTRETVAL(getTimeScale()>0,"FileSystemDataSource::runloop in non-realtime mode",NULL);
00189     unsigned int next = parent.nextTimestamp();
00190     if(next==-1U)
00191       return NULL; // no more frames
00192     unsigned int cur = get_time();
00193     if(cur < next) {
00194       while(usleep(static_cast<useconds_t>((next-cur)*1000/getTimeScale()))) {
00195         if(errno!=EINTR) {
00196           perror("FileSystemDataSource::runloop(): nanosleep");
00197           break;
00198         }
00199         // may have been interrupted to recompute sleep time for change in time scale
00200         testCancel();
00201         cur = get_time();
00202         next = parent.nextTimestamp();
00203       }
00204     }
00205     testCancel();
00206     parent.advance();
00207   }
00208 }
00209 
00210 void FileSystemDataSource::setFrame(unsigned int f, unsigned int numPreload/*=2*/) {
00211   MarkScope autolock(lock);
00212   for(;curfile!=files.end() && (*curfile)->isPrepared(); ++curfile) {
00213     if(files.size()>MAX_LOAD)
00214       (*curfile)->release();
00215     else
00216       (*curfile)->done();
00217   }
00218   nextTime=freezeTime=get_time();
00219   curfile=files.begin();
00220   std::advance(curfile,f);
00221   files_t::iterator it=curfile;
00222   for(; numPreload>0 && it!=files.end(); numPreload--) {
00223     preprepare(it);
00224     if(++it==files.end() && loop)
00225       it=files.begin();
00226   }
00227 }
00228 
00229 void FileSystemDataSource::nextFrame(unsigned int numPreload/*=2*/) {
00230   MarkScope autolock(lock);
00231   if(numPreload==0 && verbose>=1)
00232     cout << "Dropping " << (*curfile)->filename << " scheduled " << nextTime <<  " for duration " << (*curfile)->lifetime << endl;
00233   if(files.size()>MAX_LOAD)
00234     (*curfile)->release();
00235   else
00236     (*curfile)->done();
00237   nextTime+=(*curfile)->lifetime;
00238   if(++curfile==files.end()) {
00239     if(!loop) {
00240       if(verbose>=3)
00241         cout << "Reached end of logged data source \"" << parent.getName() << '"' << endl;
00242     } else {
00243       nextTime+=initialDelay;
00244       curfile=files.begin();
00245       if(verbose>=3)
00246         cout << "Looping file system data source at " << nextTime << " to " << (*curfile)->filename << " (loop time=" << getLoopTime() << ")" << endl;
00247     }
00248   }
00249   files_t::iterator it=curfile;
00250   for(; numPreload>0 && it!=files.end(); numPreload--) {
00251     preprepare(it);
00252     if(++it==files.end() && loop)
00253       it=files.begin();
00254   }
00255 }
00256 
00257 double FileSystemDataSource::calcLoopTime() const {
00258   if(files.size()==0)
00259     return 0;
00260   double t=initialDelay;
00261   for(files_t::const_iterator it=files.begin(); it!=files.end(); ++it)
00262     t+=(*it)->lifetime;
00263   return t;
00264 }
00265 void FileSystemDataSource::setLoopTime(double t) {
00266   if(files.size()==0)
00267     return;
00268   double remain = t - getLoopTime(true);
00269   if(remain + files.back()->lifetime < 0) {
00270     std::cerr << "FileSystemDataSource::setLoopTime(" << t << ") would result in a negative frame lifetime" << std::endl;
00271     return;
00272   }
00273   files.back()->lifetime+=remain;
00274   actualLoopTime=t;
00275 }
00276 
00277 void FileSystemDataSource::clearFiles() {
00278   MarkScope autolock(lock);
00279   // FileInfo destructor should take care of deleting data buffers...
00280   for(files_t::iterator it=files.begin(); it!=files.end(); ++it)
00281     delete *it;
00282   files.clear();
00283   curfile=files.begin();
00284   initialDelay=0;
00285   actualLoopTime=naturalLoopTime=0;
00286 }
00287 
00288 void FileSystemDataSource::plistValueTouched(const plist::PrimitiveBase& pl) {
00289   if(&pl==&path) {
00290     // reassigning the same value still triggers reloading
00291     loadFileList();
00292   }
00293 }
00294 void FileSystemDataSource::plistValueChanged(const plist::PrimitiveBase& pl) {
00295   if(&pl==&path || &pl==& filenameFilter) {
00296     loadFileList();
00297   } else if(&pl==&loop) {
00298     if(loop && nextTimestamp()==-1U)
00299       curfile=files.begin();
00300     resetPoller();
00301   } else if(&pl==&framerate) {
00302     if(!usingIndexFile()) {
00303       const double dt=1000.f/framerate;
00304       bool first = (curfile==files.begin());
00305       if(!first)
00306         nextTime-=(*--curfile)->lifetime;
00307       for(files_t::const_iterator it=files.begin(); it!=files.end(); ++it)
00308         (*it)->lifetime=dt;
00309       if(!first)
00310         nextTime+=(*curfile++)->lifetime;
00311       parent.plistValueChanged(path); // to reset the loop time if sharing a source
00312     }
00313   } else if(&pl==timeScale) {
00314     if(poller.isRunning())
00315       poller.interrupt();
00316   } else {
00317     cerr << "FileSystemDataSource didn't handle call to plistValueChanged for " << pl.get() << endl;
00318   }
00319 }
00320 
00321 void FileSystemDataSource::loadXML(xmlNode* node) {
00322   bool wasListener;
00323   if((wasListener=path.isPrimitiveListener(this))) // intentional assignment
00324      path.removePrimitiveListener(this);
00325   plist::Dictionary::loadXML(node);
00326   if(wasListener) {
00327     path.addPrimitiveListener(this);
00328     plistValueChanged(path);
00329   }
00330 }
00331 
00332 void FileSystemDataSource::loadSingleFile(const std::string& file) {
00333   MarkScope autolock(lock);
00334   indexed=false;
00335   enqueueFile(file,1000.f/framerate);
00336 }
00337 
00338 void FileSystemDataSource::loadFileListFromDirectory() {
00339   regex_t re;
00340   if(int err=regcomp(&re,filenameFilter.c_str(),REG_EXTENDED | REG_NOSUB)) {
00341     char msg[128];
00342     regerror(err,&re,msg,128);
00343     std::cerr << "Bad filter '" << filenameFilter << "': " << msg << std::endl;
00344     regfree(&re);
00345     return;
00346   }
00347   DIR * d=opendir(getUsedPath().c_str());
00348   if(d==NULL) {
00349     std::cerr << "Could not open directory " << getUsedPath() << std::endl;
00350     regfree(&re);
00351     return;
00352   }
00353   struct dirent* res;
00354   
00355 #ifdef HAVE_READDIR_R
00356   struct dirent cur;
00357   if(readdir_r(d,&cur,&res)) {
00358     std::cerr << "Error reading files from " << getUsedPath() << std::endl;
00359     closedir(d);
00360     regfree(&re);
00361     return;
00362   }
00363 #else
00364   res=readdir(d);
00365 #endif
00366 
00367   std::set<std::string> dirfiles;
00368   while(res!=NULL) {
00369     int match=regexec(&re,res->d_name,0,NULL,0);
00370     if(match==0) {
00371       dirfiles.insert(res->d_name);
00372     } else if(match!=REG_NOMATCH) {
00373       char msg[128];
00374       regerror(match,&re,msg,128);
00375       std::cerr << "Regex error on '" << res->d_name << "': " << msg << std::endl;
00376     } // else std::cout << "Skipping " << res->d_name << std::endl;
00377 #ifdef HAVE_READDIR_R
00378     if(readdir_r(d,&cur,&res)) {
00379       std::cerr << "Error reading files from " << getUsedPath() << std::endl;
00380       closedir(d);
00381       regfree(&re);
00382       return;
00383     }
00384 #else
00385     res=readdir(d);
00386 #endif
00387   }
00388   closedir(d);
00389   regfree(&re);
00390   
00391   MarkScope autolock(lock);
00392   //std::cout << "Processing " << getUsedPath() << std::endl;
00393   double tinc=1000.f/framerate;
00394   for(std::set<std::string>::const_iterator it=dirfiles.begin(); it!=dirfiles.end(); ++it) {
00395     //std::cout << "Enqueuing " << *it << std::endl;
00396     enqueueFile((getUsedPath()+"/")+(*it),tinc);
00397   }
00398   indexed=false;
00399 }
00400 
00401 bool FileSystemDataSource::loadFileListFromIndex() {
00402   MarkScope autolock(lock);
00403   indexed=(indexed || files.size()==0);
00404   regex_t re;
00405   if(int err=regcomp(&re,filenameFilter.c_str(),REG_EXTENDED | REG_NOSUB)) {
00406     char msg[128];
00407     regerror(err,&re,msg,128);
00408     std::cerr << "Bad filter '" << filenameFilter << "': " << msg << std::endl;
00409     regfree(&re);
00410     return false;
00411   }
00412   
00413   ifstream in(getUsedPath().c_str());
00414   string cur;
00415   getline(in,cur);
00416   if(cur.find("First frame ")==0) //skip the header line from the GUI, e.g. 'First frame 42898 timestamp: 1439018'
00417     getline(in,cur);
00418   
00419   double tinc=1000.f/framerate;
00420   double lasttime=-tinc;
00421   while(in) {
00422     string fn = cur.substr(0,cur.find('\t'));
00423     int match=regexec(&re,fn.c_str(),0,NULL,0);
00424     if(match==0) {
00425       double curtime=lasttime+tinc;
00426       if(fn.size()!=cur.size()) {
00427         const char * timep=cur.c_str()+cur.rfind('\t');
00428         char * endp=NULL;
00429         curtime=strtof(timep,&endp);
00430         if(timep==endp) {
00431           std::cerr << "ERROR: '" << getUsedPath() << "' does not seem to be a valid index file." << std::endl;
00432           std::cerr << "       Use output from VisionGUI, or use format 'filename <tab> time'" << std::endl;
00433           std::cerr << "       Where 'time' is the time in milliseconds at which the file should be processed, relative" << std::endl;
00434           std::cerr << "       to the time at which the index file is loaded." << std::endl;
00435           regfree(&re);
00436           return false;
00437         }
00438         if(lasttime>=0) {
00439           files.back()->lifetime=curtime-lasttime;
00440           //std::cout << "(previous frame lifetime " << files.back()->lifetime << ") ";
00441         } else if(files.size()>0) {
00442           files.back()->lifetime+=curtime;
00443           //std::cout << "(previous frame increased lifetime to " << files.back()->lifetime << ") ";
00444         } else {
00445           initialDelay=curtime;
00446           nextTime=get_time()+curtime;
00447           //std::cout << "nextTime set to " << nextTime << " ";
00448         }
00449       }
00450       if(fn[0]!='/') { // if not absolute path, tack on path to index file (*do this after previous check*!)
00451         string::size_type srcdir=getUsedPath().rfind('/');
00452         if(srcdir!=string::npos)
00453           fn=getUsedPath().substr(0,srcdir+1)+fn;
00454       }
00455       //std::cout << "Enqueuing " << fn << " at " << curtime << endl;
00456       enqueueFile(fn,tinc);
00457       lasttime=curtime;
00458     } else if(match!=REG_NOMATCH) {
00459       char msg[128];
00460       regerror(match,&re,msg,128);
00461       std::cerr << "Regex error on '" << fn << "': " << msg << std::endl;
00462     } // else std::cout << "Skipping " << res->d_name << std::endl;
00463     getline(in,cur);
00464   }
00465   regfree(&re);
00466   return true;
00467 }
00468 
00469 void FileSystemDataSource::FileInfo::prepare() {
00470   if(prepared)
00471     return;
00472   if(data==NULL) {
00473     struct stat statbuf;
00474     if(stat(filename.c_str(),&statbuf)!=0) {
00475       std::string err="FileSystemDataSource::FileInfo::prepare() failed to stat file ";
00476       err+=filename;
00477       perror(err.c_str());
00478       return;
00479     }
00480     int fd=open(filename.c_str(),O_RDONLY);
00481     if(fd<0) {
00482       std::string err="FileSystemDataSource::FileInfo::prepare() unable to open file ";
00483       err+=filename;
00484       perror(err.c_str());
00485       return;
00486     }
00487     refcount=new unsigned int;
00488     *refcount=1;
00489     size=static_cast<size_t>(statbuf.st_size);
00490     data=static_cast<char*>(mmap(NULL,size,PROT_READ,MAP_PRIVATE|MAP_FILE,fd,0));
00491     if(data==MAP_FAILED) {
00492       data=NULL;
00493       size=0;
00494       std::string err="FileSystemDataSource::FileInfo::prepare() unable to mmap file ";
00495       err+=filename;
00496       perror(err.c_str());
00497       return;
00498     }
00499     if(close(fd)!=0) {
00500       std::string err="FileSystemDataSource::FileInfo::prepare() unable to close file ";
00501       err+=filename;
00502       perror(err.c_str());
00503       return;
00504     }
00505   }
00506   if(mlock(data,size)!=0) {
00507     if(errno==ENOMEM) {
00508       static bool firsterr=true; // give a warning just the first time if mlock fails because RLIMIT_MEMLOCK is too low
00509       if(firsterr) {
00510         firsterr=false;
00511         rlimit rl;
00512 #ifndef __CYGWIN__
00513         getrlimit(RLIMIT_MEMLOCK,&rl);
00514         cerr << "Notice: mlock() failed because RLIMIT_MEMLOCK is too low, limited to " << (rl.rlim_cur/1024) << "KB\n"
00515              << "Increasing this limit can smooth logged data I/O in low memory situations. (see ulimit/limit commands)" << endl;
00516 #endif
00517       }
00518     } else {
00519       std::string err="FileSystemDataSource::FileInfo::prepare() unable to mlock file ";
00520       err+=filename;
00521       perror(err.c_str());
00522     }
00523     return;
00524   }
00525   prepared=true;
00526 }
00527 
00528 void FileSystemDataSource::FileInfo::done() {
00529   if(data==NULL || !prepared)
00530     return;
00531   prepared=false;
00532   if(munlock(data,size)!=0) {
00533     std::string err="FileSystemDataSource::FileInfo::done() unable to munlock file ";
00534     err+=filename;
00535     perror(err.c_str());
00536   }
00537 }
00538 
00539 void FileSystemDataSource::FileInfo::release() {
00540   if(data==NULL)
00541     return;
00542   done();
00543   if(--(*refcount)==0) {
00544     if(munmap(data,size)!=0) {
00545       std::string err="FileSystemDataSource::FileInfo::release() unable to munmap file ";
00546       err+=filename;
00547       perror(err.c_str());
00548     }
00549     delete refcount;
00550   }
00551   data=NULL;
00552   size=0;
00553   refcount=NULL;
00554 }
00555 
00556 void FileSystemDataSource::preprepare(const FileSystemDataSource::files_t::iterator& fi) {
00557   if(verbose>=4 && (*fi)->getData()==NULL) {
00558     if(fi==curfile) 
00559       std::cout << "Loading '" << (*fi)->filename << "' at " << get_time() << std::endl;
00560     else {
00561       std::cout << "Preloading '" << (*fi)->filename << "' at " << get_time();
00562       files_t::iterator tmp = curfile;
00563       double tgtTime=nextTime; 
00564       {
00565         tgtTime+=(*tmp)->lifetime;
00566         ++tmp;
00567         if(tmp==files.end()) {
00568           tmp=files.begin();
00569           tgtTime+=initialDelay;
00570         }
00571       } while(tmp!=curfile && tmp!=fi);
00572       std::cout << ", scheduled for " << static_cast<unsigned int>(tgtTime) << std::endl;
00573     }
00574     
00575   }
00576   (*fi)->prepare();
00577 }
00578 
00579 /*! @file
00580  * @brief 
00581  * @author Ethan Tira-Thompson (ejt) (Creator)
00582  */

Tekkotsu Hardware Abstraction Layer 5.1CVS
Generated Mon May 9 05:01:38 2016 by Doxygen 1.6.3