MessageReceiver.cc
Go to the documentation of this file.00001 #ifndef PLATFORM_APERIOS
00002
00003 #include "MessageReceiver.h"
00004 #include "Shared/debuget.h"
00005
00006
00007
00008
00009 MessageReceiver::MessageReceiver(MessageQueueBase& mq, bool (*callback) (RCRegion*), bool startThread, bool subscribe)
00010 : Thread(), queue(mq), semid(mq.getSemaphoreManager()->invalid()),
00011 nextMessage(0), lastProcessedMessage(-1U), process(callback), curit((index_t)-1)
00012 {
00013 if(startThread)
00014 start();
00015 else if(subscribe) {
00016 ASSERTRET(semid==queue.getSemaphoreManager()->invalid(),"semid is already set?");
00017 semid=queue.addReceiver();
00018 if(semid==queue.getSemaphoreManager()->invalid())
00019 std::cerr << "ERROR: could not start MessageReceiver -- out of semaphore IDs" << std::endl;
00020 }
00021 }
00022
00023 MessageReceiver::~MessageReceiver() {
00024 if(!isStarted())
00025 return;
00026 stop();
00027 join();
00028 queue.removeReceiver(semid);
00029 semid=queue.getSemaphoreManager()->invalid();
00030 }
00031
00032 RCRegion * MessageReceiver::peekNextMessage() {
00033 MessageQueueBase::AutoLock autolock(queue.getLock());
00034 findCurrentMessage();
00035 if(queue.isEnd(curit))
00036 return NULL;
00037 return queue.peekMessage(curit);
00038 }
00039
00040 RCRegion * MessageReceiver::getNextMessage() {
00041 MessageQueueBase::AutoLock autolock(queue.getLock());
00042 findCurrentMessage();
00043 if(queue.isEnd(curit))
00044 return NULL;
00045 nextMessage=queue.getMessageSN(curit)+1;
00046 curit=queue.newer(curit);
00047 return queue.readMessage(curit,semid);
00048 }
00049
00050 Thread& MessageReceiver::stop() {
00051 Thread::stop();
00052 queue.getSemaphoreManager()->raise(semid,1);
00053 return *this;
00054 }
00055
00056 void MessageReceiver::findCurrentMessage() {
00057 if(queue.isEnd(curit)) {
00058 curit=queue.newest();
00059 while(!queue.isEnd(curit) && queue.getMessageSN(curit)>=nextMessage)
00060 curit=queue.older(curit);
00061 curit=queue.newer(curit);
00062 } else {
00063 while(!queue.isEnd(curit) && queue.getMessageSN(curit)<nextMessage)
00064 curit=queue.newer(curit);
00065 }
00066 }
00067
00068 void MessageReceiver::finish() {
00069 if(isStarted()) {
00070 stop();
00071 join();
00072 }
00073 if(semid!=queue.getSemaphoreManager()->invalid()) {
00074
00075 while(processNextMessage()) {}
00076 queue.removeReceiver(semid);
00077 semid=queue.getSemaphoreManager()->invalid();
00078 }
00079 }
00080
00081 bool MessageReceiver::launched() {
00082 if(semid==queue.getSemaphoreManager()->invalid())
00083 semid=queue.addReceiver();
00084 if(semid==queue.getSemaphoreManager()->invalid()) {
00085 std::cerr << "ERROR: could not start MessageReceiver -- out of semaphore IDs" << std::endl;
00086 return false;
00087 }
00088 return Thread::launched();
00089 }
00090
00091 unsigned int MessageReceiver::runloop() {
00092
00093 pushNoCancel();
00094 waitNextMessage();
00095 while(processNextMessage()) {
00096 queue.getSemaphoreManager()->lower(semid,1,false);
00097 }
00098 popNoCancel();
00099 return 0;
00100 }
00101
00102 bool MessageReceiver::waitNextMessage() {
00103 return queue.getSemaphoreManager()->lower(semid,1,true);
00104 }
00105
00106 bool MessageReceiver::processNextMessage() {
00107 RCRegion * msg=peekNextMessage();
00108 if(msg==NULL)
00109 return false;
00110
00111 bool used=false;
00112 if(lastProcessedMessage!=queue.getMessageSN(curit)) {
00113 lastProcessedMessage=queue.getMessageSN(curit);
00114
00115 used=process(msg);
00116 if(used)
00117 markRead(false);
00118
00119
00120
00121
00122 }
00123 msg->RemoveReference();
00124 return used;
00125 }
00126
00127 void MessageReceiver::markRead(bool checkNext) {
00128 findCurrentMessage();
00129 if(queue.isEnd(curit))
00130 return;
00131 nextMessage=queue.getMessageSN(curit)+1;
00132 queue.markRead(curit,semid);
00133 curit=queue.newer(curit);
00134 if(checkNext && !queue.isEnd(curit))
00135 queue.getSemaphoreManager()->raise(semid,1);
00136 }
00137
00138
00139
00140
00141
00142
00143
00144
00145 #endif //PLATFORM_APERIOS check (aperios doesn't support pthreads...)