Tekkotsu Homepage
Demos
Overview
Downloads
Dev. Resources
Reference
Credits

MessageQueue.h

Go to the documentation of this file.
00001 //-*-c++-*-
00002 #ifndef INCLUDED_MessageQueue_h_
00003 #define INCLUDED_MessageQueue_h_
00004 
00005 #ifdef PLATFORM_APERIOS
00006 #  warning MessageQueue is not Aperios compatable
00007 #else
00008 
00009 #include "ListMemBuf.h"
00010 #include "RCRegion.h"
00011 #include "SemaphoreManager.h"
00012 #include "MutexLock.h"
00013 #include "Shared/MarkScope.h"
00014 #include "Shared/attributes.h"
00015 #include <exception>
00016 #include <stdlib.h>
00017 #include <unistd.h> // for usleep
00018 
00019 #include "Shared/TimeET.h"
00020 
00021 //! Defines the interface for sending new shared memory regions between processes
00022 /*! This base class holds all of the template-independent code to allow general
00023  *  operations on MessageQueues.  The templated version of MessageQueue provides
00024  *  concrete implementation, which is what you would instantiate.
00025  *  
00026  *  Each message entails its own shared memory region, as compared to
00027  *  SharedQueue, where a single large buffer is maintained, and all messages are
00028  *  copied into the common buffer.  This class is better for large regions since
00029  *  it can avoid copying data around.
00030  * 
00031  *  @see MessageQueue, MessageQueueStatusListener, MessageReceiver */
00032 class MessageQueueBase {
00033 public:
00034 
00035   //! an interface for filtering (or otherwise monitoring) messages being sent through a MessageQueue, see MessageQueueBase::addMessageFilter()
00036   class MessageFilter {
00037   public:
00038     //! called immediately prior to sending a message -- return true to pass the message into the queue, false to drop it
00039     virtual bool filterSendRequest(RCRegion* rcr)=0;
00040     //! to make compiler warning happy
00041     virtual ~MessageFilter() {}
00042   };
00043   
00044   //!constructor
00045   MessageQueueBase()
00046     : lock(), overflowPolicy(THROW_BAD_ALLOC), isClosed(false), reportDroppings(false), numMessages(0),
00047       numReceivers(0), messagesRead(0)
00048   {
00049     for(unsigned int i=0; i<ProcessID::NumProcesses; ++i)
00050       filters[i]=NULL;
00051   }
00052   //!destructor
00053   virtual ~MessageQueueBase() {}
00054   
00055   
00056   //! The storage type for message entry indicies
00057   /*! This index is to be used with accessor functions, but may be recycled for
00058    *  a new message after all receivers have read the previous message.  If you
00059    *  wish to have a unique message identifier, see getMessageSN() */
00060   typedef unsigned short index_t;
00061   
00062   
00063   //! add one to the receiver reference count
00064   virtual SemaphoreManager::semid_t addReceiver() ATTR_must_check =0;
00065   //! remove one from the receiver reference count
00066   virtual void removeReceiver(SemaphoreManager::semid_t rcvr)=0;
00067   //! return the receiver reference count
00068   virtual unsigned int getNumReceivers() const { return numReceivers; }
00069 
00070   //! registers a semaphore which should be raised whenever a message is marked read
00071   /*! The number of these are limited to the MAX_SENDERS template parameter of
00072     *  MessageQueue... returns false if too many are already registered
00073     *  
00074     *  You probably don't want to call this directly, use a MessageQueueStatusThread */
00075   virtual SemaphoreManager::semid_t addReadStatusListener() ATTR_must_check =0;
00076   //! removes a semaphore from the status listener list
00077   virtual void removeReadStatusListener(SemaphoreManager::semid_t sem)=0;
00078   
00079   
00080   //! post a message into the queue -- a shared reference is added, the caller retains control current reference
00081   /*! Thus, if you are sending a region and do not intend to use it again, either pass
00082    *  true for autoDereference or call RCRegion::removeReference() after sending
00083    *  to free the sender's memory.
00084    *  
00085    *  If no one dereferences the region, you can continue to access the region,
00086    *  even as the receiver accesses it as well.  Thus if both sides retain references,
00087    *  you can use the region as a shared memory area for future communication.
00088    *  (beware of race conditions!)
00089    *
00090    *  If @a rcr is NULL, an empty message will be sent (there's still some overhead
00091    *  to this -- may want to consider a semaphore instead of a MessageQueue if all
00092    *  you're going to do is 'ping' another process with empty messages) */
00093   virtual void sendMessage(RCRegion * rcr, bool autoDereference=false)=0;
00094   //! request access to a particular message, increments read counter -- do not call more than once per receiver!
00095   /*! The message is marked read and will be popped from the queue if all
00096    *  receivers have read the message as well.  The caller inherits a reference
00097    *  to the returned region -- call removeReference when you are done with
00098    *  it */
00099   virtual RCRegion * readMessage(index_t msg, SemaphoreManager::semid_t rcvr)=0;
00100   //! request access to a particular message, does not mark message -- call as often as you like
00101   /*! The caller inherits a reference to the returned region -- call
00102    *  removeReference when you are done with it */
00103   virtual RCRegion * peekMessage(index_t msg)=0;
00104   //! increments read counter -- do not call more than once per receiver per message!
00105   virtual void markRead(index_t msg, SemaphoreManager::semid_t rcvr)=0;
00106   //! do not allow any new messages to be posted
00107   virtual void close() { AutoLock autolock(lock); isClosed=true; }
00108 
00109   //! sets #reportDroppings
00110   virtual void setReportDroppings(bool report) { reportDroppings=report; }
00111   //! gets #reportDroppings
00112   virtual bool getReportDroppings() const { return reportDroppings; }
00113   
00114   
00115   //! Each message gets a unique, monotonically increasing serial number; this function returns that number (MessageQueue::serialNumber)
00116   virtual unsigned int getMessageSN(index_t msg)=0;
00117   
00118   //! Checks to see how many messages have been processed (read by all receivers and removed from queue)
00119   virtual unsigned int getMessagesRead() { return messagesRead; }
00120   
00121   //! Returns the number of messages which have been sent
00122   virtual unsigned int getMessagesSent() { return numMessages; }
00123   
00124   //! Returns the number of messages which have been sent but not yet read
00125   virtual unsigned int getMessagesUnread() { return getMessagesSent() - getMessagesRead(); }
00126   
00127   //! a typedef to make it easier to obtain a lock on the queue for the extent of a scope
00128   typedef MarkScope AutoLock;
00129   //! returns a reference to the queue's inter-process lock
00130   MutexLock<ProcessID::NumProcesses>& getLock() const { return lock; }
00131 
00132   
00133   virtual index_t oldest() const=0;          //!< return oldest message still in the queue (may or may not have been read by this process)
00134   virtual index_t newer(index_t it) const=0; //!< return the next message in the queue (may or may not have been read by this process)
00135   virtual index_t older(index_t it) const=0; //!< return the previous message in the queue (may or may not have been read by this process)
00136   virtual index_t newest() const=0;          //!< return most recent message added to the queue (may or may not have been read by this process)
00137   virtual bool isEnd(index_t it) const=0;    //!< returns true if @a it is the one-past-the-end of the queue
00138   
00139   //! an enumerations of policies for dealing with overflow, pass to setOverflowPolicy()
00140   enum OverflowPolicy_t {
00141     DROP_OLDEST,     //!< the oldest unread message is dropped
00142     DROP_NEWEST,     //!< the most recently added message is dropped (i.e. the overflowing message is ignored)
00143     WAIT,            //!< the adding process/thread polls until space is available
00144     THROW_BAD_ALLOC  //!< throw a std::bad_alloc exception (falls through to abort() if you don't catch it)
00145   };
00146   //! allows you to pick how to handle running out of space in the queue, see OverflowPolicy_t
00147   void setOverflowPolicy(OverflowPolicy_t op) { overflowPolicy=op; }
00148   //! returns the current overflow policy, see OverflowPolicy_t
00149   OverflowPolicy_t getOverflowPolicy() const { return overflowPolicy; }
00150   
00151   //! sets #semgr
00152   static void setSemaphoreManager(SemaphoreManager* mgr) { semgr=mgr; }
00153   //! gets #semgr
00154   static SemaphoreManager* getSemaphoreManager() { return semgr; }
00155   
00156   //! once called, any messages put into the queue must pass through @a filter first (note: there can only be one filter per process!)
00157   /*! if a filter was previously registered, it is replaced with the new @a filter */
00158   void addMessageFilter(MessageFilter& filter) {
00159     filters[ProcessID::getID()]=&filter;
00160   }
00161   //! removes the current filter in place, if any
00162   void removeMessageFilter() {
00163     filters[ProcessID::getID()]=NULL;
00164   }
00165 protected:
00166   //! the global semaphore manager, needs to be set (once, globally) via setSemaphoreManager() before any receivers are added
00167   static SemaphoreManager* semgr;
00168   
00169   mutable MutexLock<ProcessID::NumProcesses> lock; //!< a lock to grant serial access to the queue
00170   volatile OverflowPolicy_t overflowPolicy; //!< the choice of how to handle message overflow -- see OverflowPolicy_t
00171   bool isClosed; //!< if true, new messages will be rejected
00172   bool reportDroppings; //!< if true, output will be sent on cerr when overflow occurs
00173   unsigned int numMessages; //!< number of messages which have been sent (serial number of next message)
00174   unsigned int numReceivers; //!< how many receivers to expect
00175   unsigned int messagesRead; //!< number of messages which have been read and removed from queue
00176   MessageFilter* filters[ProcessID::NumProcesses]; //!< provides storage of one message filter per process
00177 private:
00178   MessageQueueBase(const MessageQueueBase&); //!< this shouldn't be called...
00179   MessageQueueBase& operator=(const MessageQueueBase&); //!< this shouldn't be called...
00180 };
00181 
00182 //! An implementation of MessageQueueBase, which provides mechanisms for sending shared memory regions between processes
00183 /*! MAX_UNREAD is assigned to #CAPACITY, MAX_RECEIVERS is assigned to #RECEIVER_CAPACITY, and MAX_SENDERS is assigned to #SENDER_CAPACITY
00184  *  @see MessageQueueBase, MessageQueueStatusListener, MessageReceiver */
00185 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS=10, unsigned int MAX_SENDERS=10>
00186 class MessageQueue : public MessageQueueBase {
00187 public:
00188   //! total number of messages which can be backed up in the queue
00189   static const unsigned int CAPACITY=MAX_UNREAD;
00190   //! total number of receivers which can be registered
00191   static const unsigned int RECEIVER_CAPACITY=MAX_RECEIVERS;
00192   //! total number of senders which can be registered
00193   /*! More specifically, this is the maximum number of StatusListeners -- anyone
00194    *  can call sendMessage(), but only this number can get direct notification when
00195    *  messages are received. */
00196   static const unsigned int SENDER_CAPACITY=MAX_SENDERS;
00197   
00198   //! constructor
00199   MessageQueue() : MessageQueueBase(), mq(), rcvrs(), sndrs() {}
00200   
00201   //! destructor
00202   virtual ~MessageQueue();
00203   
00204   virtual SemaphoreManager::semid_t addReadStatusListener() ATTR_must_check;
00205   virtual void removeReadStatusListener(SemaphoreManager::semid_t sem);
00206 
00207   virtual SemaphoreManager::semid_t addReceiver() ATTR_must_check;
00208   virtual void removeReceiver(SemaphoreManager::semid_t rcvr);
00209   
00210   virtual void sendMessage(RCRegion * rcr, bool autoDereference=false);
00211   virtual RCRegion * readMessage(index_t msg, SemaphoreManager::semid_t rcvr);
00212   virtual RCRegion * peekMessage(index_t msg);
00213   virtual void markRead(index_t msg, SemaphoreManager::semid_t rcvr);
00214 
00215   virtual unsigned int getMessageSN(index_t msg) { /*AutoLock autolock(lock);*/ return mq[msg].sn; }
00216   
00217   virtual index_t oldest() const { AutoLock autolock(lock); return mq.begin(); }
00218   virtual index_t newer(index_t it) const { AutoLock autolock(lock); return mq.next(it); }
00219   virtual index_t older(index_t it) const { AutoLock autolock(lock); return mq.prev(it); }
00220   virtual index_t newest() const { AutoLock autolock(lock); return mq.prev(mq.end()); }
00221   virtual bool isEnd(index_t it) const { AutoLock autolock(lock); return it==mq.end() || it>=mq_t::MAX_ENTRIES; }
00222   
00223 protected:
00224   //! data storage needed for each message
00225   struct entry {
00226     entry() : id(), sn(), numRead(0) { memset(readFlags,0,sizeof(readFlags)); } //!< constructor
00227     entry(unsigned int serialNumber, RCRegion* r)
00228     : id(r->ID()), sn(serialNumber), numRead(0) { memset(readFlags,0,sizeof(readFlags)); } //!< constructor, pass message info
00229     RCRegion::Identifier id; //!< the identifier for the shared memory region so that other regions can attach it
00230     unsigned int sn; //!< serial number for this message (not the same as its index in the queue -- indicies are reused, this id is unique to this message
00231     bool readFlags[MAX_RECEIVERS]; //!< a flag for each receiver to indicate if they have read it
00232     unsigned int numRead; //!< a count of the number of receivers which have read this message (should always equal sum(readFlags))
00233   };
00234   
00235   //! shorthand for the type of data storage of message entries
00236   typedef ListMemBuf<entry,MAX_UNREAD,index_t> mq_t;
00237   //! the data storage of message entries
00238   mq_t mq;
00239 
00240   //! shorthand for the type of data storage of message entries
00241   typedef ListMemBuf<SemaphoreManager::semid_t,MAX_RECEIVERS,index_t> rcvrs_t;
00242   //! the data storage of receiver semaphores
00243   rcvrs_t rcvrs;
00244 
00245   //! returns the index within #rcvrs of the receiver id @a rcvr
00246   typename rcvrs_t::index_t lookupReceiver(SemaphoreManager::semid_t rcvr) const;
00247   
00248   //! shorthand for the type of data storage of message entries
00249   typedef ListMemBuf<SemaphoreManager::semid_t,MAX_SENDERS,index_t> sndrs_t;
00250   //! the data storage of receiver semaphores
00251   sndrs_t sndrs;
00252 };
00253 
00254 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00255 MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::~MessageQueue() {
00256     //lock shouldn't be necessary -- refcount should ensure the containing
00257     //region isn't deleted until only one process has access anyway
00258     //AutoLock autolock(lock);
00259     while(!mq.empty()) {
00260       RCRegion * rcr = RCRegion::attach(mq.front().id);
00261       rcr->RemoveSharedReference();
00262       rcr->RemoveReference();
00263       mq.pop_front();
00264     }
00265 }
00266 
00267 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00268 SemaphoreManager::semid_t MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::addReadStatusListener() {
00269     AutoLock autolock(lock);
00270     SemaphoreManager::semid_t sem=semgr->getSemaphore();
00271     if(sem==semgr->invalid()) {
00272       std::cerr << "ERROR: unable to add read status listener to message queue because semaphore manager is out of semaphores" << std::endl;
00273       return semgr->invalid();
00274     }
00275     if(sndrs.push_back(sem)==sndrs.end()) {
00276       std::cerr << "ERROR: unable to add read status listener to message queue because message queue can't register any more senders (MAX_SENDERS)" << std::endl;
00277       semgr->releaseSemaphore(sem);
00278       return semgr->invalid();
00279     }
00280     return sem;
00281 }
00282 
00283 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00284 void MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::removeReadStatusListener(SemaphoreManager::semid_t sem) {
00285     AutoLock autolock(lock);
00286     for(index_t it=sndrs.begin(); it!=sndrs.end(); it=sndrs.next(it))
00287       if(sndrs[it]==sem) {
00288         sndrs.erase(it);
00289         semgr->releaseSemaphore(sem);
00290         break;
00291       }
00292 }
00293 
00294 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00295 SemaphoreManager::semid_t MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::addReceiver() {
00296     AutoLock autolock(lock);
00297     SemaphoreManager::semid_t sem=semgr->getSemaphore();
00298     if(sem==semgr->invalid()) {
00299       std::cerr << "ERROR: unable to add receiver to message queue because semaphore manager is out of semaphores" << std::endl;
00300       return semgr->invalid();
00301     }
00302     if(rcvrs.push_back(sem)==rcvrs.end()) {
00303       std::cerr << "ERROR: unable to add receiver to message queue because message queue can't register any more receivers (MAX_RECEIVERS)" << std::endl;
00304       semgr->releaseSemaphore(sem);
00305       return semgr->invalid();
00306     }
00307     numReceivers++;
00308     return sem;
00309 }
00310 
00311 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00312 void MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::removeReceiver(SemaphoreManager::semid_t rcvr) {
00313     AutoLock autolock(lock);
00314     index_t rcvr_id=rcvrs.begin();
00315     for(; rcvr_id!=rcvrs.end(); rcvr_id=rcvrs.next(rcvr_id))
00316       if(rcvrs[rcvr_id]==rcvr)
00317         break;
00318     if(rcvr_id==rcvrs.end()) {
00319       std::cerr << "WARNING: tried to remove message queue receiver " << rcvr << ", which is not registered as a receiver for this queue" << std::endl;
00320       return;
00321     }
00322     rcvrs.erase(rcvr_id);
00323     semgr->releaseSemaphore(rcvr);
00324     numReceivers--;
00325     for(index_t it=mq.begin(); it!=mq.end(); it=mq.next(it)) {
00326       if(mq[it].readFlags[rcvr_id]) {
00327         // the removed receiver had read this message, decrement the read count
00328         mq[it].readFlags[rcvr_id]=false;
00329         mq[it].numRead--;
00330       } else if(mq[it].numRead==numReceivers) {
00331         //all *remaining* processes have gotten a look, remove the neutral MessageQueue reference
00332         RCRegion * rcr = RCRegion::attach(mq[it].id);
00333         rcr->RemoveSharedReference();
00334         rcr->RemoveReference();
00335         it=mq.prev(it);
00336         mq.erase(mq.next(it));
00337         messagesRead++;
00338         for(index_t sit=sndrs.begin(); sit!=sndrs.end(); sit=sndrs.next(sit))
00339           semgr->raise(sndrs[sit],1);
00340       }
00341     }
00342 }
00343 
00344 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00345 void MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::sendMessage(RCRegion * rcr, bool autoDereference/*=false*/) {
00346     AutoLock autolock(lock);
00347     if(rcr==NULL) {
00348       rcr=new RCRegion(0);
00349       autoDereference=true;
00350     }
00351     if(filters[ProcessID::getID()]!=NULL && !filters[ProcessID::getID()]->filterSendRequest(rcr)) {
00352       if(autoDereference)
00353         rcr->RemoveReference();
00354       return;
00355     }
00356     if(numReceivers==0) {
00357       //if(reportDroppings)
00358       //std::cerr << "Warning: MessageQueue dropping " << rcr->ID().key << " because there are no receivers" << std::endl;
00359       messagesRead++; // counts as a read message (read by all 0 readers is still read by all readers!)
00360       for(index_t sit=sndrs.begin(); sit!=sndrs.end(); sit=sndrs.next(sit))
00361         semgr->raise(sndrs[sit],1);
00362       if(autoDereference)
00363         rcr->RemoveReference();
00364       return;
00365     }
00366     if(isClosed) {
00367       if(reportDroppings)
00368         std::cerr << "Warning: MessageQueue dropping " << rcr->ID().key << " because queue is closed" << std::endl;
00369       if(autoDereference)
00370         rcr->RemoveReference();
00371       return;
00372     }
00373     if(mq.size()==mq.getMaxCapacity()) {
00374       switch(overflowPolicy) {
00375         case DROP_OLDEST: {
00376           if(reportDroppings)
00377             std::cerr << "WARNING: MessageQueue full, dropping oldest unread message (" << mq.front().id.key << ")" << std::endl;
00378           RCRegion * eldest = RCRegion::attach(mq.front().id);
00379           eldest->RemoveSharedReference();
00380           mq.pop_front();
00381           eldest->RemoveReference();
00382         } break;
00383         case DROP_NEWEST:
00384           if(reportDroppings)
00385             std::cerr << "WARNING: MessageQueue full, dropping newest unread message (" << rcr->ID().key << ")" << std::endl;
00386           if(autoDereference)
00387             rcr->RemoveReference();
00388           return;
00389         case WAIT:
00390           if(reportDroppings)
00391             std::cerr << "WARNING: MessageQueue full, waiting for readers to catch up" << std::endl;
00392           while(mq.size()==mq.getMaxCapacity()) {
00393             //have to release locks so readers can get access
00394             unsigned int ll=lock.get_lock_level();
00395             lock.releaseAll();
00396             usleep(MutexLockBase::usleep_granularity*15);
00397             for(unsigned int i=0; i<ll; i++)
00398               lock.lock(ProcessID::getID());
00399             if(overflowPolicy!=WAIT) { //may have been changed by a different thread while we were waiting
00400               sendMessage(rcr,autoDereference); //retry with the new policy
00401               return;
00402             }
00403           }
00404           break;
00405         case THROW_BAD_ALLOC:
00406           if(reportDroppings)
00407             std::cerr << "WARNING: MessageQueue full, throwing bad_alloc exception" << std::endl;
00408           throw std::bad_alloc();
00409           break;
00410       }
00411     }
00412     rcr->AddSharedReference();
00413     if(mq.push_back(entry(numMessages++,rcr))==mq.end()) {
00414       //our overflow policy should've prevented this
00415       std::cerr << "ERROR: MessageQueue unable to add message; buggy overflow policy?" << std::endl;
00416       exit(EXIT_FAILURE);
00417     }
00418     
00419     //std::cout << Process::getName() << " sent " << (numMessages-1) << " at " << TimeET() << std::endl;
00420     //notify receivers
00421     for(index_t it=rcvrs.begin(); it!=rcvrs.end(); it=rcvrs.next(it))
00422       semgr->raise(rcvrs[it],1);
00423     
00424     if(autoDereference)
00425       rcr->RemoveReference();
00426 }
00427 
00428 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00429 RCRegion * MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::readMessage(index_t msg, SemaphoreManager::semid_t rcvr) {
00430     AutoLock autolock(lock);
00431     RCRegion * rcr = RCRegion::attach(mq[msg].id);
00432     index_t rcvr_id=lookupReceiver(rcvr);
00433     if(rcvr_id==rcvrs.end())
00434       return rcr;
00435     if(mq[msg].readFlags[rcvr_id]) {
00436       std::cerr << "WARNING: MessageQueue::readMessage(): Receiver re-reading message, could be recycled/invalidated any time" << std::endl;
00437       return rcr; // already read, just return it
00438     }
00439     mq[msg].readFlags[rcvr_id]=true;
00440     mq[msg].numRead++;
00441     if(mq[msg].numRead==numReceivers) {
00442       //all processes have gotten a look, remove the neutral MessageQueue reference
00443       rcr->RemoveSharedReference();
00444       mq.erase(msg);
00445       messagesRead++;
00446       for(index_t sit=sndrs.begin(); sit!=sndrs.end(); sit=sndrs.next(sit))
00447         semgr->raise(sndrs[sit],1);
00448     }
00449     return rcr;
00450 }
00451 
00452 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00453 RCRegion * MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::peekMessage(index_t msg) {
00454     //AutoLock autolock(lock); //I don't think a lock is necessary here
00455     return RCRegion::attach(mq[msg].id);
00456 }
00457 
00458 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00459 void MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::markRead(index_t msg, SemaphoreManager::semid_t rcvr) {
00460     AutoLock autolock(lock);
00461     index_t rcvr_id=lookupReceiver(rcvr);
00462     if(rcvr_id==rcvrs.end())
00463       return;
00464     if(mq[msg].readFlags[rcvr_id]) {
00465       std::cerr << "WARNING: MessageQueue::markRead(): Receiver re-reading message, could be recycled/invalidated any time" << std::endl;
00466       return; // already read, just return it
00467     }
00468     mq[msg].readFlags[rcvr_id]=true;
00469     mq[msg].numRead++;
00470     if(mq[msg].numRead==numReceivers) {
00471       //all processes have gotten a look, remove the neutral MessageQueue reference
00472       RCRegion * rcr = RCRegion::attach(mq[msg].id);
00473       rcr->RemoveSharedReference();
00474       rcr->RemoveReference();
00475       mq.erase(msg);
00476       messagesRead++;
00477       for(index_t sit=sndrs.begin(); sit!=sndrs.end(); sit=sndrs.next(sit))
00478         semgr->raise(sndrs[sit],1);
00479     }
00480 }
00481 
00482 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00483 typename MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::rcvrs_t::index_t
00484 MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::lookupReceiver(SemaphoreManager::semid_t rcvr) const {
00485   for(index_t rcvr_id=rcvrs.begin(); rcvr_id!=rcvrs.end(); rcvr_id=rcvrs.next(rcvr_id))
00486     if(rcvrs[rcvr_id]==rcvr)
00487       return rcvr_id;
00488   std::cerr << "WARNING: tried to look up queue receiver " << rcvr << ", which is not registered as a receiver for this queue" << std::endl;
00489   return rcvrs.end();
00490 }
00491 
00492 /*! @file
00493  * @brief Defines MessageQueue, which provides mechanisms for sending shared memory regions between processes
00494  * @author ejt (Creator)
00495  */
00496 
00497 #endif //APERIOS check
00498 
00499 #endif //INCLUDED

Tekkotsu v5.1CVS
Generated Mon May 9 04:58:45 2016 by Doxygen 1.6.3