MessageQueueStatusThread.cc
Go to the documentation of this file.00001 #ifndef PLATFORM_APERIOS
00002 #include "MessageQueueStatusThread.h"
00003 #include "MessageQueue.h"
00004 #include "Shared/debuget.h"
00005 #include <algorithm>
00006
00007 using namespace std;
00008
00009 MessageQueueStatusThread::~MessageQueueStatusThread() {
00010 if(isStarted()) {
00011 stop();
00012
00013 }
00014 }
00015
00016 void MessageQueueStatusThread::addStatusListener(StatusListener* l) {
00017 if(l==NULL)
00018 return;
00019 if(find(statusListeners.begin(),statusListeners.end(),l)==statusListeners.end()) {
00020
00021 statusListeners.push_back(l);
00022 if(!isStarted()) {
00023 if(queue==NULL)
00024 return;
00025 semid=queue->addReadStatusListener();
00026 if(semid==queue->getSemaphoreManager()->invalid()) {
00027 std::cerr << "ERROR: could not start MessageQueueStatusThread -- out of semaphore IDs" << std::endl;
00028 return;
00029 }
00030
00031 numRead=queue->getMessagesRead();
00032 start();
00033 }
00034 }
00035 }
00036
00037 void MessageQueueStatusThread::removeStatusListener(StatusListener* l) {
00038 std::list<StatusListener*>::iterator it=find(statusListeners.begin(),statusListeners.end(),l);
00039 if(it!=statusListeners.end())
00040 statusListeners.erase(it);
00041 if(isStarted() && statusListeners.size()==0) {
00042 stop();
00043
00044 }
00045 }
00046
00047 void MessageQueueStatusThread::setMessageQueue(MessageQueueBase& mq) {
00048 if(running) {
00049 stop();
00050
00051 }
00052 queue=&mq;
00053 if(statusListeners.size()!=0)
00054 start();
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074 }
00075
00076 MessageQueueBase* MessageQueueStatusThread::getMessageQueue() {
00077 return queue;
00078 }
00079
00080 bool MessageQueueStatusThread::launched() {
00081 return Thread::launched();
00082 }
00083
00084 void * MessageQueueStatusThread::run() {
00085 for(;;) {
00086 queue->getSemaphoreManager()->lower(semid,1,true);
00087
00088 unsigned int more=queue->getSemaphoreManager()->getValue(semid);
00089 if(more>0)
00090 if(!queue->getSemaphoreManager()->lower(semid,more,false))
00091 std::cerr << "WARNING: MessageQueueStatusThread had a message notification disappear (is someone else using the semaphore? Get your own!)" << std::endl;
00092 testCancel();
00093 #ifdef DEBUG
00094
00095 unsigned int nowRead=queue->getMessagesRead();
00096 unsigned int read=nowRead-numRead;
00097 numRead=nowRead;
00098 ASSERT(read==more+1,"WARNING: MessageQueueStatusThread's semaphore count does not match queue's read count ("<< (more+1) << " vs " << read<<")");
00099 #endif
00100
00101 fireMessagesRead(more+1);
00102 }
00103 return NULL;
00104 }
00105
00106 Thread& MessageQueueStatusThread::stop() {
00107 Thread::stop();
00108 if(semid!=queue->getSemaphoreManager()->invalid())
00109 queue->getSemaphoreManager()->raise(semid,1);
00110 return *this;
00111 }
00112
00113 void MessageQueueStatusThread::cancelled() {
00114 if(queue==NULL)
00115 return;
00116
00117 queue->removeReadStatusListener(semid);
00118 semid=queue->getSemaphoreManager()->invalid();
00119 }
00120
00121 void MessageQueueStatusThread::fireMessagesRead(unsigned int howmany) {
00122 if(howmany==0)
00123 return;
00124 std::list<StatusListener*>::iterator it=statusListeners.begin();
00125 while(it!=statusListeners.end()) {
00126 std::list<StatusListener*>::iterator cur=it++;
00127 (*cur)->messagesRead(*queue,howmany);
00128 }
00129 }
00130
00131
00132
00133
00134
00135
00136 #endif