ThreadedMessageQueue.h
Go to the documentation of this file.00001 #ifndef INCLUDED_ThreadedMessageQueue_h_
00002 #define INCLUDED_ThreadedMessageQueue_h_
00003
00004 #include "Thread.h"
00005 #include "Shared/MarkScope.h"
00006 #include <list>
00007 #include <stdexcept>
00008 #include <algorithm>
00009
00010
00011
00012 template<class T>
00013 class ThreadedMessageQueue {
00014 public:
00015
00016 ThreadedMessageQueue() : lock(), signal(), msgs(), receiver(NULL) {}
00017
00018
00019 ~ThreadedMessageQueue() { finishCallback(); }
00020
00021
00022 void send(const T& msg) { MarkScope l(lock); msgs.push_back(msg); signal.broadcast(); }
00023
00024
00025 template<class F> void for_each(const F& f) { std::for_each(msgs.begin(),msgs.end(),f); }
00026
00027
00028 template<class F> void remove(const F& f) { MarkScope l(lock); msgs.remove_if(f); }
00029
00030
00031 void remove(const T e) { MarkScope l(lock); msgs.remove(e); }
00032
00033
00034 size_t size() const { return msgs.size(); }
00035
00036
00037 void clear() { MarkScope l(lock); msgs.clear(); }
00038
00039
00040 const T& front() const {
00041 MarkScope l(lock);
00042 while(msgs.size()==0)
00043 signal.wait(lock);
00044 return msgs.front();
00045 }
00046
00047
00048 void pop() { MarkScope l(lock); if(msgs.size()>0) msgs.pop_front(); }
00049
00050
00051 template<typename F, typename C>
00052 void spawnCallback(F fn, C& cl) {
00053 stopCallback();
00054 receiver = new ReceiverThread<F,C>(*this, fn, cl);
00055 receiver->start();
00056 }
00057
00058
00059 void stopCallback() {
00060 if(receiver!=NULL) {
00061 {
00062 #ifdef USE_SIGNAL_TO_CANCEL_THREAD
00063
00064
00065 MarkScope l(lock);
00066 #endif
00067 receiver->stop();
00068 receiver->keepRunning=false;
00069 }
00070 receiver->join();
00071 delete receiver;
00072 receiver=NULL;
00073 }
00074 }
00075
00076
00077
00078
00079 void finishCallback() {
00080 if(receiver==NULL)
00081 return;
00082 {
00083 MarkScope l(lock);
00084 if(msgs.size()==0)
00085 receiver->stop();
00086 receiver->keepRunning=false;
00087 }
00088 receiver->join();
00089 delete receiver;
00090 receiver=NULL;
00091 }
00092
00093
00094
00095 void finishQueue() {
00096 if(receiver==NULL)
00097 return;
00098 {
00099 MarkScope l(lock);
00100 if(msgs.size()==0)
00101 receiver->stop();
00102 receiver->block=false;
00103 }
00104 receiver->join();
00105 delete receiver;
00106 receiver=NULL;
00107 }
00108
00109 protected:
00110 public:
00111 mutable Thread::Lock lock;
00112 Thread::Condition signal;
00113 std::list<T> msgs;
00114
00115
00116 class ReceiverThreadBase : public Thread {
00117 public:
00118 ReceiverThreadBase() : Thread(), keepRunning(true), block(true) {}
00119 bool keepRunning;
00120 bool block;
00121 };
00122
00123
00124 template<typename F, class C>
00125 class ReceiverThread : public ReceiverThreadBase {
00126 public:
00127
00128 ReceiverThread(ThreadedMessageQueue<T>& tmq, F f, C& c) : ReceiverThreadBase(), q(tmq), fn(f), cl(c) {}
00129
00130 protected:
00131 ThreadedMessageQueue& q;
00132 F fn;
00133 C& cl;
00134
00135 virtual void* run() {
00136 while(ReceiverThreadBase::keepRunning && (ReceiverThreadBase::block || q.size()>0)) {
00137
00138
00139
00140
00141
00142
00143 T item = q.front();
00144 q.pop();
00145 (cl.*fn)(item);
00146 }
00147 return NULL;
00148 }
00149 };
00150 ReceiverThreadBase* receiver;
00151
00152 private:
00153 ThreadedMessageQueue(const ThreadedMessageQueue& o);
00154 ThreadedMessageQueue& operator=(const ThreadedMessageQueue& o);
00155 };
00156
00157
00158
00159
00160
00161
00162 #endif