Tekkotsu Homepage
Demos
Overview
Downloads
Dev. Resources
Reference
Credits

MessageQueueStatusThread.cc

Go to the documentation of this file.
00001 #ifndef PLATFORM_APERIOS
00002 #include "MessageQueueStatusThread.h"
00003 #include "MessageQueue.h"
00004 #include "Shared/debuget.h"
00005 #include <algorithm>
00006 
00007 using namespace std; 
00008 
00009 MessageQueueStatusThread::~MessageQueueStatusThread() {
00010   if(isStarted()) {
00011     stop();
00012     //join(); // join turns out to be a bad idea here -- the thread being stopped may be waiting on a lock we currently hold
00013   }
00014 }
00015 
00016 void MessageQueueStatusThread::addStatusListener(StatusListener* l) {
00017   if(l==NULL)
00018     return;
00019   if(find(statusListeners.begin(),statusListeners.end(),l)==statusListeners.end()) {
00020     //not already added
00021     statusListeners.push_back(l);
00022     if(!isStarted()) {
00023       if(queue==NULL)
00024         return;
00025       semid=queue->addReadStatusListener();
00026       if(semid==queue->getSemaphoreManager()->invalid()) {
00027         std::cerr << "ERROR: could not start MessageQueueStatusThread -- out of semaphore IDs" << std::endl;
00028         return;
00029       }
00030       //cout << "MessageQueueStatusThread added MessageQueue read listener" << endl;
00031       numRead=queue->getMessagesRead();
00032       start();
00033     }
00034   }
00035 }
00036 
00037 void MessageQueueStatusThread::removeStatusListener(StatusListener* l) {
00038   std::list<StatusListener*>::iterator it=find(statusListeners.begin(),statusListeners.end(),l);
00039   if(it!=statusListeners.end())
00040     statusListeners.erase(it);
00041   if(isStarted() && statusListeners.size()==0) {
00042     stop();
00043     //join(); // join turns out to be a bad idea here -- the thread being stopped may be waiting on a lock we currently hold
00044   }
00045 }
00046 
00047 void MessageQueueStatusThread::setMessageQueue(MessageQueueBase& mq) {
00048   if(running) {
00049     stop();
00050     //join(); // join turns out to be a bad idea here -- the thread being stopped may be waiting on a lock we currently hold
00051   }
00052   queue=&mq;
00053   if(statusListeners.size()!=0)
00054     start();
00055   /*
00056   MessageQueueBase* oldqueue=queue;
00057   SemaphoreManager::semid_t oldsem=semid;
00058   queue=&mq;
00059   semid=queue->addReadStatusListener();
00060   if(semid==queue->getSemaphoreManager()->invalid()) {
00061     std::cerr << "ERROR: could not switch MessageQueue -- new queue out of semaphores, stopping thread" << std::endl;
00062     queue=oldqueue;
00063     semid=oldsem;
00064     if(running)
00065       stop();
00066     return;
00067   }
00068   numRead=queue->getMessagesRead();
00069   if(oldqueue!=NULL && oldsem!=queue->getSemaphoreManager()->invalid()) {
00070     if(running)
00071       oldqueue->getSemaphoreManager()->raise(oldsem,1); //so run will notice the switchover
00072     oldqueue->removeReadStatusListener(oldsem);
00073   }*/
00074 }
00075 
00076 MessageQueueBase* MessageQueueStatusThread::getMessageQueue() {
00077   return queue;
00078 }
00079 
00080 bool MessageQueueStatusThread::launched() {
00081   return Thread::launched();
00082 }
00083 
00084 void * MessageQueueStatusThread::run() {
00085   for(;;) {
00086     queue->getSemaphoreManager()->lower(semid,1,true);
00087     //there might be a few reads, handle them as a group
00088     unsigned int more=queue->getSemaphoreManager()->getValue(semid);
00089     if(more>0)
00090       if(!queue->getSemaphoreManager()->lower(semid,more,false))
00091         std::cerr << "WARNING: MessageQueueStatusThread had a message notification disappear (is someone else using the semaphore?  Get your own!)" << std::endl;
00092     testCancel();
00093 #ifdef DEBUG
00094     // This part just for sanity checking -- could do away with numRead altogether otherwise
00095     unsigned int nowRead=queue->getMessagesRead();
00096     unsigned int read=nowRead-numRead;
00097     numRead=nowRead;
00098     ASSERT(read==more+1,"WARNING: MessageQueueStatusThread's semaphore count does not match queue's read count ("<< (more+1) << " vs " << read<<")");
00099 #endif
00100     //ok, notify the listeners
00101     fireMessagesRead(more+1);
00102   }
00103   return NULL; // not going to happen, just to make compiler happy
00104 }
00105 
00106 Thread& MessageQueueStatusThread::stop() {
00107   Thread::stop();
00108   if(semid!=queue->getSemaphoreManager()->invalid()) //if semid is still invalid, probably canceling before the launch got off
00109     queue->getSemaphoreManager()->raise(semid,1); //so run will notice the stop request
00110   return *this;
00111 }
00112 
00113 void MessageQueueStatusThread::cancelled() {
00114   if(queue==NULL)
00115     return;
00116   //cout << "MessageQueueStatusThread removing MessageQueue read listener" << endl;
00117   queue->removeReadStatusListener(semid);
00118   semid=queue->getSemaphoreManager()->invalid();
00119 }
00120 
00121 void MessageQueueStatusThread::fireMessagesRead(unsigned int howmany) {
00122   if(howmany==0)
00123     return;
00124   std::list<StatusListener*>::iterator it=statusListeners.begin();
00125   while(it!=statusListeners.end()) {
00126     std::list<StatusListener*>::iterator cur=it++; //increment early in case the listener changes subscription
00127     (*cur)->messagesRead(*queue,howmany);
00128   }
00129 }
00130 
00131 
00132 /*! @file
00133  * @brief 
00134  * @author Ethan Tira-Thompson (ejt) (Creator)
00135  */
00136 #endif

Tekkotsu v5.1CVS
Generated Mon May 9 04:58:45 2016 by Doxygen 1.6.3