00001
00002 #ifndef INCLUDED_MessageQueue_h_
00003 #define INCLUDED_MessageQueue_h_
00004
00005 #ifdef PLATFORM_APERIOS
00006 # warning MessageQueue is not Aperios compatable
00007 #else
00008
00009 #include "ListMemBuf.h"
00010 #include "RCRegion.h"
00011 #include "SemaphoreManager.h"
00012 #include "MutexLock.h"
00013 #include "Shared/MarkScope.h"
00014 #include "Shared/attributes.h"
00015 #include <exception>
00016 #include <stdlib.h>
00017 #include <unistd.h>
00018
00019 #include "Shared/TimeET.h"
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 class MessageQueueBase {
00033 public:
00034
00035
00036 class MessageFilter {
00037 public:
00038
00039 virtual bool filterSendRequest(RCRegion* rcr)=0;
00040
00041 virtual ~MessageFilter() {}
00042 };
00043
00044
00045 MessageQueueBase()
00046 : lock(), overflowPolicy(THROW_BAD_ALLOC), isClosed(false), reportDroppings(false), numMessages(0),
00047 numReceivers(0), messagesRead(0)
00048 {
00049 for(unsigned int i=0; i<ProcessID::NumProcesses; ++i)
00050 filters[i]=NULL;
00051 }
00052
00053 virtual ~MessageQueueBase() {}
00054
00055
00056
00057
00058
00059
00060 typedef unsigned short index_t;
00061
00062
00063
00064 virtual SemaphoreManager::semid_t addReceiver() ATTR_must_check =0;
00065
00066 virtual void removeReceiver(SemaphoreManager::semid_t rcvr)=0;
00067
00068 virtual unsigned int getNumReceivers() const { return numReceivers; }
00069
00070
00071
00072
00073
00074
00075 virtual SemaphoreManager::semid_t addReadStatusListener() ATTR_must_check =0;
00076
00077 virtual void removeReadStatusListener(SemaphoreManager::semid_t sem)=0;
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093 virtual void sendMessage(RCRegion * rcr, bool autoDereference=false)=0;
00094
00095
00096
00097
00098
00099 virtual RCRegion * readMessage(index_t msg, SemaphoreManager::semid_t rcvr)=0;
00100
00101
00102
00103 virtual RCRegion * peekMessage(index_t msg)=0;
00104
00105 virtual void markRead(index_t msg, SemaphoreManager::semid_t rcvr)=0;
00106
00107 virtual void close() { AutoLock autolock(lock); isClosed=true; }
00108
00109
00110 virtual void setReportDroppings(bool report) { reportDroppings=report; }
00111
00112 virtual bool getReportDroppings() const { return reportDroppings; }
00113
00114
00115
00116 virtual unsigned int getMessageSN(index_t msg)=0;
00117
00118
00119 virtual unsigned int getMessagesRead() { return messagesRead; }
00120
00121
00122 virtual unsigned int getMessagesSent() { return numMessages; }
00123
00124
00125 virtual unsigned int getMessagesUnread() { return getMessagesSent() - getMessagesRead(); }
00126
00127
00128 typedef MarkScope AutoLock;
00129
00130 MutexLock<ProcessID::NumProcesses>& getLock() const { return lock; }
00131
00132
00133 virtual index_t oldest() const=0;
00134 virtual index_t newer(index_t it) const=0;
00135 virtual index_t older(index_t it) const=0;
00136 virtual index_t newest() const=0;
00137 virtual bool isEnd(index_t it) const=0;
00138
00139
00140 enum OverflowPolicy_t {
00141 DROP_OLDEST,
00142 DROP_NEWEST,
00143 WAIT,
00144 THROW_BAD_ALLOC
00145 };
00146
00147 void setOverflowPolicy(OverflowPolicy_t op) { overflowPolicy=op; }
00148
00149 OverflowPolicy_t getOverflowPolicy() const { return overflowPolicy; }
00150
00151
00152 static void setSemaphoreManager(SemaphoreManager* mgr) { semgr=mgr; }
00153
00154 static SemaphoreManager* getSemaphoreManager() { return semgr; }
00155
00156
00157
00158 void addMessageFilter(MessageFilter& filter) {
00159 filters[ProcessID::getID()]=&filter;
00160 }
00161
00162 void removeMessageFilter() {
00163 filters[ProcessID::getID()]=NULL;
00164 }
00165 protected:
00166
00167 static SemaphoreManager* semgr;
00168
00169 mutable MutexLock<ProcessID::NumProcesses> lock;
00170 volatile OverflowPolicy_t overflowPolicy;
00171 bool isClosed;
00172 bool reportDroppings;
00173 unsigned int numMessages;
00174 unsigned int numReceivers;
00175 unsigned int messagesRead;
00176 MessageFilter* filters[ProcessID::NumProcesses];
00177 private:
00178 MessageQueueBase(const MessageQueueBase&);
00179 MessageQueueBase& operator=(const MessageQueueBase&);
00180 };
00181
00182
00183
00184
00185 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS=10, unsigned int MAX_SENDERS=10>
00186 class MessageQueue : public MessageQueueBase {
00187 public:
00188
00189 static const unsigned int CAPACITY=MAX_UNREAD;
00190
00191 static const unsigned int RECEIVER_CAPACITY=MAX_RECEIVERS;
00192
00193
00194
00195
00196 static const unsigned int SENDER_CAPACITY=MAX_SENDERS;
00197
00198
00199 MessageQueue() : MessageQueueBase(), mq(), rcvrs(), sndrs() {}
00200
00201
00202 virtual ~MessageQueue();
00203
00204 virtual SemaphoreManager::semid_t addReadStatusListener() ATTR_must_check;
00205 virtual void removeReadStatusListener(SemaphoreManager::semid_t sem);
00206
00207 virtual SemaphoreManager::semid_t addReceiver() ATTR_must_check;
00208 virtual void removeReceiver(SemaphoreManager::semid_t rcvr);
00209
00210 virtual void sendMessage(RCRegion * rcr, bool autoDereference=false);
00211 virtual RCRegion * readMessage(index_t msg, SemaphoreManager::semid_t rcvr);
00212 virtual RCRegion * peekMessage(index_t msg);
00213 virtual void markRead(index_t msg, SemaphoreManager::semid_t rcvr);
00214
00215 virtual unsigned int getMessageSN(index_t msg) { return mq[msg].sn; }
00216
00217 virtual index_t oldest() const { AutoLock autolock(lock); return mq.begin(); }
00218 virtual index_t newer(index_t it) const { AutoLock autolock(lock); return mq.next(it); }
00219 virtual index_t older(index_t it) const { AutoLock autolock(lock); return mq.prev(it); }
00220 virtual index_t newest() const { AutoLock autolock(lock); return mq.prev(mq.end()); }
00221 virtual bool isEnd(index_t it) const { AutoLock autolock(lock); return it==mq.end() || it>=mq_t::MAX_ENTRIES; }
00222
00223 protected:
00224
00225 struct entry {
00226 entry() : id(), sn(), numRead(0) { memset(readFlags,0,sizeof(readFlags)); }
00227 entry(unsigned int serialNumber, RCRegion* r)
00228 : id(r->ID()), sn(serialNumber), numRead(0) { memset(readFlags,0,sizeof(readFlags)); }
00229 RCRegion::Identifier id;
00230 unsigned int sn;
00231 bool readFlags[MAX_RECEIVERS];
00232 unsigned int numRead;
00233 };
00234
00235
00236 typedef ListMemBuf<entry,MAX_UNREAD,index_t> mq_t;
00237
00238 mq_t mq;
00239
00240
00241 typedef ListMemBuf<SemaphoreManager::semid_t,MAX_RECEIVERS,index_t> rcvrs_t;
00242
00243 rcvrs_t rcvrs;
00244
00245
00246 typename rcvrs_t::index_t lookupReceiver(SemaphoreManager::semid_t rcvr) const;
00247
00248
00249 typedef ListMemBuf<SemaphoreManager::semid_t,MAX_SENDERS,index_t> sndrs_t;
00250
00251 sndrs_t sndrs;
00252 };
00253
00254 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00255 MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::~MessageQueue() {
00256
00257
00258
00259 while(!mq.empty()) {
00260 RCRegion * rcr = RCRegion::attach(mq.front().id);
00261 rcr->RemoveSharedReference();
00262 rcr->RemoveReference();
00263 mq.pop_front();
00264 }
00265 }
00266
00267 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00268 SemaphoreManager::semid_t MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::addReadStatusListener() {
00269 AutoLock autolock(lock);
00270 SemaphoreManager::semid_t sem=semgr->getSemaphore();
00271 if(sem==semgr->invalid()) {
00272 std::cerr << "ERROR: unable to add read status listener to message queue because semaphore manager is out of semaphores" << std::endl;
00273 return semgr->invalid();
00274 }
00275 if(sndrs.push_back(sem)==sndrs.end()) {
00276 std::cerr << "ERROR: unable to add read status listener to message queue because message queue can't register any more senders (MAX_SENDERS)" << std::endl;
00277 semgr->releaseSemaphore(sem);
00278 return semgr->invalid();
00279 }
00280 return sem;
00281 }
00282
00283 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00284 void MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::removeReadStatusListener(SemaphoreManager::semid_t sem) {
00285 AutoLock autolock(lock);
00286 for(index_t it=sndrs.begin(); it!=sndrs.end(); it=sndrs.next(it))
00287 if(sndrs[it]==sem) {
00288 sndrs.erase(it);
00289 semgr->releaseSemaphore(sem);
00290 break;
00291 }
00292 }
00293
00294 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00295 SemaphoreManager::semid_t MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::addReceiver() {
00296 AutoLock autolock(lock);
00297 SemaphoreManager::semid_t sem=semgr->getSemaphore();
00298 if(sem==semgr->invalid()) {
00299 std::cerr << "ERROR: unable to add receiver to message queue because semaphore manager is out of semaphores" << std::endl;
00300 return semgr->invalid();
00301 }
00302 if(rcvrs.push_back(sem)==rcvrs.end()) {
00303 std::cerr << "ERROR: unable to add receiver to message queue because message queue can't register any more receivers (MAX_RECEIVERS)" << std::endl;
00304 semgr->releaseSemaphore(sem);
00305 return semgr->invalid();
00306 }
00307 numReceivers++;
00308 return sem;
00309 }
00310
00311 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00312 void MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::removeReceiver(SemaphoreManager::semid_t rcvr) {
00313 AutoLock autolock(lock);
00314 index_t rcvr_id=rcvrs.begin();
00315 for(; rcvr_id!=rcvrs.end(); rcvr_id=rcvrs.next(rcvr_id))
00316 if(rcvrs[rcvr_id]==rcvr)
00317 break;
00318 if(rcvr_id==rcvrs.end()) {
00319 std::cerr << "WARNING: tried to remove message queue receiver " << rcvr << ", which is not registered as a receiver for this queue" << std::endl;
00320 return;
00321 }
00322 rcvrs.erase(rcvr_id);
00323 semgr->releaseSemaphore(rcvr);
00324 numReceivers--;
00325 for(index_t it=mq.begin(); it!=mq.end(); it=mq.next(it)) {
00326 if(mq[it].readFlags[rcvr_id]) {
00327
00328 mq[it].readFlags[rcvr_id]=false;
00329 mq[it].numRead--;
00330 } else if(mq[it].numRead==numReceivers) {
00331
00332 RCRegion * rcr = RCRegion::attach(mq[it].id);
00333 rcr->RemoveSharedReference();
00334 rcr->RemoveReference();
00335 it=mq.prev(it);
00336 mq.erase(mq.next(it));
00337 messagesRead++;
00338 for(index_t sit=sndrs.begin(); sit!=sndrs.end(); sit=sndrs.next(sit))
00339 semgr->raise(sndrs[sit],1);
00340 }
00341 }
00342 }
00343
00344 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00345 void MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::sendMessage(RCRegion * rcr, bool autoDereference) {
00346 AutoLock autolock(lock);
00347 if(rcr==NULL) {
00348 rcr=new RCRegion(0);
00349 autoDereference=true;
00350 }
00351 if(filters[ProcessID::getID()]!=NULL && !filters[ProcessID::getID()]->filterSendRequest(rcr)) {
00352 if(autoDereference)
00353 rcr->RemoveReference();
00354 return;
00355 }
00356 if(numReceivers==0) {
00357
00358
00359 messagesRead++;
00360 for(index_t sit=sndrs.begin(); sit!=sndrs.end(); sit=sndrs.next(sit))
00361 semgr->raise(sndrs[sit],1);
00362 if(autoDereference)
00363 rcr->RemoveReference();
00364 return;
00365 }
00366 if(isClosed) {
00367 if(reportDroppings)
00368 std::cerr << "Warning: MessageQueue dropping " << rcr->ID().key << " because queue is closed" << std::endl;
00369 if(autoDereference)
00370 rcr->RemoveReference();
00371 return;
00372 }
00373 if(mq.size()==mq.getMaxCapacity()) {
00374 switch(overflowPolicy) {
00375 case DROP_OLDEST: {
00376 if(reportDroppings)
00377 std::cerr << "WARNING: MessageQueue full, dropping oldest unread message (" << mq.front().id.key << ")" << std::endl;
00378 RCRegion * eldest = RCRegion::attach(mq.front().id);
00379 eldest->RemoveSharedReference();
00380 mq.pop_front();
00381 eldest->RemoveReference();
00382 } break;
00383 case DROP_NEWEST:
00384 if(reportDroppings)
00385 std::cerr << "WARNING: MessageQueue full, dropping newest unread message (" << rcr->ID().key << ")" << std::endl;
00386 if(autoDereference)
00387 rcr->RemoveReference();
00388 return;
00389 case WAIT:
00390 if(reportDroppings)
00391 std::cerr << "WARNING: MessageQueue full, waiting for readers to catch up" << std::endl;
00392 while(mq.size()==mq.getMaxCapacity()) {
00393
00394 unsigned int ll=lock.get_lock_level();
00395 lock.releaseAll();
00396 usleep(MutexLockBase::usleep_granularity*15);
00397 for(unsigned int i=0; i<ll; i++)
00398 lock.lock(ProcessID::getID());
00399 if(overflowPolicy!=WAIT) {
00400 sendMessage(rcr,autoDereference);
00401 return;
00402 }
00403 }
00404 break;
00405 case THROW_BAD_ALLOC:
00406 if(reportDroppings)
00407 std::cerr << "WARNING: MessageQueue full, throwing bad_alloc exception" << std::endl;
00408 throw std::bad_alloc();
00409 break;
00410 }
00411 }
00412 rcr->AddSharedReference();
00413 if(mq.push_back(entry(numMessages++,rcr))==mq.end()) {
00414
00415 std::cerr << "ERROR: MessageQueue unable to add message; buggy overflow policy?" << std::endl;
00416 exit(EXIT_FAILURE);
00417 }
00418
00419
00420
00421 for(index_t it=rcvrs.begin(); it!=rcvrs.end(); it=rcvrs.next(it))
00422 semgr->raise(rcvrs[it],1);
00423
00424 if(autoDereference)
00425 rcr->RemoveReference();
00426 }
00427
00428 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00429 RCRegion * MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::readMessage(index_t msg, SemaphoreManager::semid_t rcvr) {
00430 AutoLock autolock(lock);
00431 RCRegion * rcr = RCRegion::attach(mq[msg].id);
00432 index_t rcvr_id=lookupReceiver(rcvr);
00433 if(rcvr_id==rcvrs.end())
00434 return rcr;
00435 if(mq[msg].readFlags[rcvr_id]) {
00436 std::cerr << "WARNING: MessageQueue::readMessage(): Receiver re-reading message, could be recycled/invalidated any time" << std::endl;
00437 return rcr;
00438 }
00439 mq[msg].readFlags[rcvr_id]=true;
00440 mq[msg].numRead++;
00441 if(mq[msg].numRead==numReceivers) {
00442
00443 rcr->RemoveSharedReference();
00444 mq.erase(msg);
00445 messagesRead++;
00446 for(index_t sit=sndrs.begin(); sit!=sndrs.end(); sit=sndrs.next(sit))
00447 semgr->raise(sndrs[sit],1);
00448 }
00449 return rcr;
00450 }
00451
00452 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00453 RCRegion * MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::peekMessage(index_t msg) {
00454
00455 return RCRegion::attach(mq[msg].id);
00456 }
00457
00458 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00459 void MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::markRead(index_t msg, SemaphoreManager::semid_t rcvr) {
00460 AutoLock autolock(lock);
00461 index_t rcvr_id=lookupReceiver(rcvr);
00462 if(rcvr_id==rcvrs.end())
00463 return;
00464 if(mq[msg].readFlags[rcvr_id]) {
00465 std::cerr << "WARNING: MessageQueue::markRead(): Receiver re-reading message, could be recycled/invalidated any time" << std::endl;
00466 return;
00467 }
00468 mq[msg].readFlags[rcvr_id]=true;
00469 mq[msg].numRead++;
00470 if(mq[msg].numRead==numReceivers) {
00471
00472 RCRegion * rcr = RCRegion::attach(mq[msg].id);
00473 rcr->RemoveSharedReference();
00474 rcr->RemoveReference();
00475 mq.erase(msg);
00476 messagesRead++;
00477 for(index_t sit=sndrs.begin(); sit!=sndrs.end(); sit=sndrs.next(sit))
00478 semgr->raise(sndrs[sit],1);
00479 }
00480 }
00481
00482 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00483 typename MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::rcvrs_t::index_t
00484 MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::lookupReceiver(SemaphoreManager::semid_t rcvr) const {
00485 for(index_t rcvr_id=rcvrs.begin(); rcvr_id!=rcvrs.end(); rcvr_id=rcvrs.next(rcvr_id))
00486 if(rcvrs[rcvr_id]==rcvr)
00487 return rcvr_id;
00488 std::cerr << "WARNING: tried to look up queue receiver " << rcvr << ", which is not registered as a receiver for this queue" << std::endl;
00489 return rcvrs.end();
00490 }
00491
00492
00493
00494
00495
00496
00497 #endif //APERIOS check
00498
00499 #endif //INCLUDED