Tekkotsu Homepage
Demos
Overview
Downloads
Dev. Resources
Reference
Credits

netstream.h

Go to the documentation of this file.
00001 #ifndef INCLUDED_netstream_h_
00002 #define INCLUDED_netstream_h_
00003 
00004 #ifdef PLATFORM_APERIOS
00005 #error netstream not yet supported on AIBO/Aperios
00006 #endif
00007 
00008 #include <cstdio>
00009 #include <cstring>
00010 #include <iostream>
00011 #include <iosfwd>
00012 #include <stdio.h>
00013 #include <stdlib.h>
00014 #include <ctype.h>
00015 #include <unistd.h>
00016 #include <stdexcept>
00017 
00018 #include <errno.h>
00019 #include <signal.h>
00020 //#include <sys/time.h>
00021 #include <sys/types.h>
00022 #include <sys/socket.h>
00023 #include <netinet/in.h>
00024 #include <arpa/inet.h>
00025 #include <netdb.h>
00026 #include <cstring>
00027 #include <string>
00028 
00029 #if defined(__FreeBSD__) || defined(__APPLE__)
00030 #  include <sys/event.h>
00031 #else
00032 #  include <poll.h>
00033 #endif
00034 
00035 #define OTAssert(str, b) if(!b) std::cerr << "ERR " << __FILE__ << '(' << __LINE__ << "): " << str << std::endl;
00036 
00037 class IPaddr {
00038 public:
00039   static const IPaddr ANY;
00040   static const IPaddr BROADCAST;
00041   
00042   typedef std::string ipname_t;
00043   typedef unsigned int ipnum_t;
00044   typedef unsigned short ipport_t;
00045   const static unsigned int maxHostNameLen;
00046   
00047   IPaddr();
00048   explicit IPaddr(const ipnum_t& num);
00049   explicit IPaddr(const ipname_t& name);
00050   IPaddr(const ipnum_t& num, const ipport_t& port);
00051   IPaddr(const ipname_t& name, const ipport_t& port);
00052   virtual ~IPaddr() {}
00053 
00054   virtual bool set_num(const ipnum_t& num);
00055   virtual bool set_name(const ipname_t& name);
00056   virtual bool set_addr(const ipnum_t& num, const ipport_t& port) { return set_num(num) && set_port(port); }
00057   virtual bool set_addr(const ipname_t& name, const ipport_t& port) { return set_name(name) && set_port(port); }
00058   virtual bool set_port(const ipport_t& port) { ipport=port; server.sin_port=htons(port); return true; }
00059   virtual ipnum_t get_num() const { return ntohl(server.sin_addr.s_addr); }
00060   virtual const ipname_t& get_name() const { return ipname; }
00061   virtual ipname_t get_display_num() const;
00062   virtual ipname_t get_rname() const;
00063   virtual ipport_t get_port() const { return ipport; }
00064   
00065   virtual bool is_valid() { return server.sin_addr.s_addr!=INADDR_NONE; }
00066 
00067   virtual const sockaddr_in& get_addr() const { return server; }
00068   
00069   bool operator==(const IPaddr& a) const { return server.sin_addr.s_addr==a.server.sin_addr.s_addr && server.sin_port==a.server.sin_port; }
00070 protected:
00071   void Init();
00072   
00073   struct sockaddr_in server;
00074   ipname_t ipname;
00075   ipport_t ipport;
00076 };
00077 
00078 
00079 class netstream_server {
00080 public:
00081   netstream_server() : tgtAddress(), datagram(false), opsock(-1) {}
00082   netstream_server(unsigned int aPort, bool useDatagram=false) : tgtAddress(), datagram(useDatagram), opsock(-1) { serve(aPort,datagram); }
00083   netstream_server(const IPaddr& addr, bool useDatagram=false) : tgtAddress(), datagram(useDatagram), opsock(-1) { serve(addr,datagram); }
00084   ~netstream_server() { close(); }
00085   
00086   bool serve(unsigned int aPort, bool useDatagram=false) { return serve(IPaddr(ntohl(INADDR_ANY),aPort),useDatagram); }
00087   bool serve(const IPaddr& addr, bool useDatagram=false);
00088   
00089   template<class T> bool accept(T& stream) const; //!< blocks until a connection is available, then transfers it to the specified stream
00090   void close() { ::close(opsock); opsock=-1; }
00091   
00092   bool isServing() const { return opsock>=0; }
00093   
00094 protected:
00095   static const int BACKLOG=5;
00096   IPaddr tgtAddress;
00097   bool datagram;
00098   int opsock;
00099 };
00100 
00101 
00102 template <class charT, class traits=std::char_traits<charT> >
00103 class basic_netbuf : public std::basic_streambuf<charT,traits> {
00104   friend class netstream_server;
00105 public:
00106 //  Types:
00107   typedef charT                     char_type;
00108   typedef typename traits::int_type int_type;
00109   typedef typename traits::pos_type pos_type;
00110   typedef typename traits::off_type off_type;
00111   typedef traits                    traits_type;
00112   
00113 //Constructors/Destructors:
00114   basic_netbuf();
00115   basic_netbuf(const IPaddr& addr, bool datagram=false);
00116   basic_netbuf(const IPaddr::ipname_t& host, const IPaddr::ipport_t port, bool datagram=false);
00117   basic_netbuf(size_t buf_in_size, size_t buf_out_size);
00118   virtual ~basic_netbuf();
00119   
00120   
00121 //basic_netbuf specific Functions:
00122 public:
00123   virtual bool      open(const IPaddr& addr, bool datagram=false);
00124   virtual bool      open(const IPaddr::ipname_t& str, bool datagram=false);
00125   virtual bool      open(const IPaddr::ipname_t& ahost, unsigned int aPort, bool datagram=false) { return open(IPaddr(ahost,aPort),datagram); }
00126   virtual bool      listen(unsigned int aPort, bool datagram=false) { return listen(IPaddr(ntohl(INADDR_ANY),aPort),datagram); }
00127   virtual bool      listen(const IPaddr& addr, bool datagram=false);
00128   virtual bool      is_connecting() const { return openInProgress; }
00129   virtual bool      is_open() const { return (sock!=INVALID_SOCKET); }
00130   virtual void      update_status();
00131   virtual void      close();
00132   virtual void      reset() { close(); reconnect(); }
00133   
00134   virtual int     adoptFD(int fd) { int old=sock; sock=fd; update_status(); return old; }
00135 
00136   virtual void      setReconnect(bool doReconnect) { autoReconnect = doReconnect; }
00137   virtual bool      getReconnect() const { return autoReconnect; }
00138   
00139   virtual void      setEcho(bool val = true) { is_echoing = val; }
00140   virtual bool      getEcho() { return is_echoing; }
00141   
00142   virtual const IPaddr&   getPeerAddress() const { return peerAddress; }
00143   virtual const IPaddr&   getLocalAddress() const { return localAddress; }
00144   
00145 protected:
00146   virtual void      reconnect();
00147   static void     printBuffer(const char* buf, int buflen, const char* header, const char* prefix);
00148   void          Init() { Init(def_buf_in_size, def_buf_out_size); }
00149   void          Init(size_t insize, size_t outsize);
00150   basic_netbuf* rdbuf() { return *this; } // for generic use with netstream_server
00151   
00152 
00153 //Inherited Functions:
00154 public:
00155   virtual void      in_sync(); //users shouldn't need to call this directly... but can if want to
00156   virtual void      out_flush();
00157 
00158 protected:
00159   using std::basic_streambuf<charT,traits>::eback;
00160   using std::basic_streambuf<charT,traits>::gptr;
00161   using std::basic_streambuf<charT,traits>::egptr;
00162   using std::basic_streambuf<charT,traits>::gbump;
00163   using std::basic_streambuf<charT,traits>::pbase;
00164   using std::basic_streambuf<charT,traits>::pptr;
00165   using std::basic_streambuf<charT,traits>::epptr;
00166   using std::basic_streambuf<charT,traits>::pbump;
00167   
00168   //  lib.streambuf.virt.get Get area:
00169   virtual std::streamsize showmanyc();
00170   //  virtual streamsize xsgetn(char_type* s, streamsize n);
00171   virtual int_type    underflow();
00172   virtual int_type    uflow();
00173   
00174   //  lib.streambuf.virt.pback Putback:
00175   //  virtual int_type pbackfail(int_type c = traits::eof() );
00176   //  lib.streambuf.virt.put Put area:
00177   //  virtual streamsize xsputn(const char_type* s, streamsize n);
00178   virtual int_type    overflow(int_type c  = traits::eof());
00179   
00180   //  lib.streambuf.virt.buffer Buffer management and positioning:
00181   //  virtual _Myt basic_netbuf<char_type, traits_type>* setbuf(char_type* s, streamsize n);
00182   virtual pos_type seekoff(off_type off, std::ios_base::seekdir way, std::ios_base::openmode which = std::ios_base::in | std::ios_base::out);
00183   virtual pos_type seekpos(pos_type sp, std::ios_base::openmode which = std::ios_base::in | std::ios_base::out) {  return seekoff(sp,std::ios_base::beg,which); }
00184   virtual int     sync();
00185   //  lib.streambuf.virt.locales Locales:
00186   //  virtual void imbue(const locale &loc);
00187   
00188 //Data:
00189 protected:
00190   //! this allows us to reset status variables like openInProgress if a thread cancel occurs
00191   template<typename T> struct SetScope {
00192     SetScope(T& target, const T& enter, const T& leave) : tgt(target), dtr(leave) { target=enter; }
00193     ~SetScope() { tgt=dtr; }
00194     T& tgt;
00195     T dtr;
00196   };
00197   static const int INVALID_SOCKET=-1;
00198   charT *buf_in, *buf_out;
00199   bool using_buf_in, using_buf_out;
00200   static const size_t def_buf_in_size;
00201   static const size_t def_buf_out_size;
00202   int sock;
00203   bool openInProgress;
00204   bool canRead;
00205   bool canWrite;
00206   bool is_echoing;
00207   bool autoReconnect;
00208   bool sockFromServer;
00209   
00210   IPaddr peerAddress;
00211   IPaddr localAddress;
00212   IPaddr tgtAddress;
00213   bool isDatagram;
00214 
00215 private:
00216   basic_netbuf(const basic_netbuf&); // copy constructor, don't call
00217   basic_netbuf& operator=(const basic_netbuf&); //!< assignment, don't call
00218 };
00219 template <class charT, class traits>
00220 const size_t basic_netbuf<charT,traits>::def_buf_in_size=(1<<14)-28;
00221 template <class charT, class traits>
00222 const size_t basic_netbuf<charT,traits>::def_buf_out_size=(1<<14)-28;
00223 
00224 
00225 template <class charT, class traits=std::char_traits<charT> >
00226 class basic_netbuf_interface {
00227 public:
00228   basic_netbuf_interface() : nb() {}
00229   basic_netbuf_interface(const IPaddr::ipname_t& host, const IPaddr::ipport_t port, bool datagram) : nb(host,port,datagram) {}
00230   basic_netbuf_interface(const IPaddr& addr, bool datagram) : nb(addr,datagram) {}
00231   basic_netbuf_interface(size_t buf_in_size, size_t buf_out_size) : nb(buf_in_size,buf_out_size) {}
00232   
00233   inline bool     open(const IPaddr& addr, bool datagram=false) { return nb.open(addr,datagram); }
00234   inline bool     open(const IPaddr::ipname_t& str, bool datagram=false) { return nb.open(str,datagram); }
00235   inline bool     open(const IPaddr::ipname_t& ahost, unsigned int aPort, bool datagram=false) { return nb.open(ahost,aPort,datagram); }
00236   inline bool     listen(unsigned int aPort, bool datagram=false) { return nb.listen(aPort,datagram); }
00237   inline bool     listen(const IPaddr& addr, bool datagram=false) { return nb.listen(addr,datagram); }
00238   
00239   inline bool     is_connecting() const { return nb.is_connecting(); }
00240   inline bool     is_open() const { return nb.is_open(); }
00241   inline void     update_status() { nb.update_status(); }
00242   
00243   inline void     close() { nb.close(); }
00244   inline void     reset() { nb.close(); nb.reconnect(); }
00245   
00246   inline void     setReconnect(bool reconnect) { nb.setReconnect(reconnect); }
00247   inline bool     getReconnect() const { return nb.getReconnect(); }
00248   
00249   inline void     setEcho(bool val=true) { nb.setEcho(val); }
00250   inline bool     getEcho() { return nb.getEcho(); }
00251   
00252   inline const IPaddr&    getPeerAddress() const { return nb.getPeerAddress(); }
00253   inline const IPaddr&    getLocalAddress() const { return nb.getLocalAddress(); }
00254   
00255   inline basic_netbuf<charT, traits>* rdbuf() const { return const_cast<basic_netbuf<charT, traits>*>(&nb); }
00256 
00257 protected:
00258   virtual ~basic_netbuf_interface() {}
00259   basic_netbuf<charT, traits> nb;
00260 };
00261 
00262 template <class charT, class traits=std::char_traits<charT> >
00263 class basic_inetstream : public virtual basic_netbuf_interface<charT,traits>, public std::basic_istream<charT,traits>
00264 {
00265 public:
00266   typedef charT                     char_type;
00267   typedef typename traits::int_type int_type;
00268   typedef typename traits::pos_type pos_type;
00269   typedef typename traits::off_type off_type;
00270   typedef traits                    traits_type;
00271   //  lib.ifstream.cons Constructors:
00272   basic_inetstream() : basic_netbuf_interface<charT,traits>(), std::basic_istream<charT,traits>(basic_netbuf_interface<charT,traits>::rdbuf()) {}
00273   basic_inetstream(const IPaddr& addr, bool datagram=false) : basic_netbuf_interface<charT,traits>(addr,datagram), std::basic_istream<charT,traits>(basic_netbuf_interface<charT,traits>::rdbuf()) {}
00274   basic_inetstream(const IPaddr::ipname_t& host, const IPaddr::ipport_t port, bool datagram=false) : basic_netbuf_interface<charT,traits>(host,port,datagram), std::basic_istream<charT,traits>(basic_netbuf_interface<charT,traits>::rdbuf()) {}
00275   basic_inetstream(size_t buf_in_size, size_t buf_out_size) : basic_netbuf_interface<charT,traits>(buf_in_size,buf_out_size), std::basic_istream<charT,traits>(basic_netbuf_interface<charT,traits>::rdbuf()) {}
00276   using basic_netbuf_interface<charT,traits>::rdbuf;
00277 };
00278 
00279 
00280 template <class charT, class traits=std::char_traits<charT> >
00281 class basic_onetstream : public virtual basic_netbuf_interface<charT,traits>, public std::basic_ostream<charT,traits>
00282 {
00283 public:
00284   typedef charT                     char_type;
00285   typedef typename traits::int_type int_type;
00286   typedef typename traits::pos_type pos_type;
00287   typedef typename traits::off_type off_type;
00288   typedef traits                    traits_type;
00289   //  lib.ifstream.cons Constructors:
00290   basic_onetstream() : basic_netbuf_interface<charT,traits>(), std::basic_ostream<charT,traits>(basic_netbuf_interface<charT,traits>::rdbuf()) {}
00291   basic_onetstream(const IPaddr& addr, bool datagram=false) : basic_netbuf_interface<charT,traits>(addr,datagram), std::basic_ostream<charT,traits>(basic_netbuf_interface<charT,traits>::rdbuf()) {}
00292   basic_onetstream(const IPaddr::ipname_t& host, const IPaddr::ipport_t port, bool datagram=false) : basic_netbuf_interface<charT,traits>(host,port,datagram), std::basic_ostream<charT,traits>(basic_netbuf_interface<charT,traits>::rdbuf()) {}
00293   basic_onetstream(size_t buf_in_size, size_t buf_out_size) : basic_netbuf_interface<charT,traits>(buf_in_size,buf_out_size) , std::basic_ostream<charT,traits>(basic_netbuf_interface<charT,traits>::rdbuf()){}
00294   using basic_netbuf_interface<charT,traits>::rdbuf;
00295 };
00296 
00297 
00298 template <class charT, class traits=std::char_traits<charT> >
00299 class basic_ionetstream : public virtual basic_netbuf_interface<charT,traits>, public std::basic_iostream<charT,traits>
00300 {
00301 public:
00302   typedef charT                     char_type;
00303   typedef typename traits::int_type int_type;
00304   typedef typename traits::pos_type pos_type;
00305   typedef typename traits::off_type off_type;
00306   typedef traits                    traits_type;
00307   //  lib.ifstream.cons Constructors:
00308   basic_ionetstream() : basic_netbuf_interface<charT,traits>(), std::basic_iostream<charT,traits>(basic_netbuf_interface<charT,traits>::rdbuf()) {}
00309   basic_ionetstream(const IPaddr& addr, bool datagram=false) : basic_netbuf_interface<charT,traits>(addr,datagram), std::basic_iostream<charT,traits>(basic_netbuf_interface<charT,traits>::rdbuf()) {}
00310   basic_ionetstream(const IPaddr::ipname_t& host, const IPaddr::ipport_t port, bool datagram=false) : basic_netbuf_interface<charT,traits>(host,port,datagram), std::basic_iostream<charT,traits>(basic_netbuf_interface<charT,traits>::rdbuf()) {}
00311   basic_ionetstream(size_t buf_in_size, size_t buf_out_size) : basic_netbuf_interface<charT,traits>(buf_in_size,buf_out_size) , std::basic_iostream<charT,traits>(basic_netbuf_interface<charT,traits>::rdbuf()){}
00312   using basic_netbuf_interface<charT,traits>::rdbuf;
00313 };
00314 
00315 /*
00316 template<class T>
00317 class char_traits {
00318 public:
00319   typedef T char_type;
00320   typedef int int_type;
00321   typedef T* pos_type;
00322   typedef unsigned int off_type;
00323   static void copy(pos_type dst, pos_type src, off_type size) {
00324     memcpy(dst,src,size);
00325   }
00326   static void move(pos_type dst, pos_type src, off_type size) {
00327     memmove(dst,src,size);
00328   }
00329   static int to_int_type(T c) { return c; }
00330   static int eof() { return EOF; }
00331 };*/
00332 
00333 typedef basic_netbuf<char, std::char_traits<char> > netbuf;
00334 typedef basic_inetstream<char, std::char_traits<char> > inetstream;
00335 typedef basic_onetstream<char, std::char_traits<char> > onetstream;
00336 typedef basic_ionetstream<char, std::char_traits<char> > ionetstream;
00337 
00338 
00339 template<class T> bool netstream_server::accept(T& stream) const {
00340   if(opsock<0)
00341     return false;
00342   stream.close();
00343   
00344   while(!stream.is_open()) {
00345     // block until connection
00346     sockaddr tmp;
00347     socklen_t tmplen=sizeof(tmp);
00348     int sock2 = ::accept(opsock, &tmp, &tmplen);
00349     if(sock2 < 0) {
00350       switch(errno) {
00351         default: // generally report errors
00352           perror("netstream accept"); 
00353         // except for cancellation or socket was closed by another thread
00354         case EINTR: case EBADF: case ECONNABORTED:
00355           return false;
00356       }
00357     } else {
00358       //replace with accepted socket
00359       stream.rdbuf()->sock=sock2;
00360       stream.rdbuf()->sockFromServer=true;
00361       stream.rdbuf()->isDatagram=datagram;
00362       stream.rdbuf()->tgtAddress = tgtAddress;
00363       sockaddr_in addr;
00364       socklen_t addr_size=sizeof(addr);
00365       int err = ::getpeername(sock2, reinterpret_cast<sockaddr *>(&addr), &addr_size);
00366       if(err<0)
00367         perror("netstream getpeername");
00368         else {
00369           stream.rdbuf()->peerAddress.set_addr(ntohl(addr.sin_addr.s_addr), ntohs(addr.sin_port));
00370         }
00371       err = ::getsockname(sock2, reinterpret_cast<sockaddr *>(&addr), &addr_size);
00372       if(err<0)
00373         perror("netstream getsockname");
00374         else {
00375           stream.rdbuf()->localAddress.set_addr(ntohl(addr.sin_addr.s_addr), ntohs(addr.sin_port));
00376         }
00377       //cout << " netstream connected to " << getPeerAddress().get_display_num() << ":" << getPeerAddress().get_port()
00378       //  << " from " << getLocalAddress().get_display_num() << ":" << getLocalAddress().get_port() << endl;
00379     }
00380   }
00381   
00382   if(datagram) {
00383     const int sndbuf=(stream.rdbuf()->epptr() - stream.rdbuf()->pbase());
00384     if ( ::setsockopt ( stream.rdbuf()->sock, SOL_SOCKET, SO_SNDBUF, ( const char* ) &sndbuf, sizeof ( sndbuf ) ) == -1 ) {
00385       perror("netstream SO_SNDBUF setsockopt");
00386     }
00387   }
00388   
00389   return true;
00390 }
00391 
00392 
00393 template <class charT, class traits>
00394 basic_netbuf<charT,traits>::basic_netbuf() 
00395   : std::basic_streambuf<charT,traits>(), buf_in(NULL), buf_out(NULL), using_buf_in(false), using_buf_out(false),
00396   sock(INVALID_SOCKET), openInProgress(false), canRead(false), canWrite(false), is_echoing(false), autoReconnect(false), sockFromServer(),
00397   peerAddress(), localAddress(), tgtAddress(), isDatagram() {
00398   Init();
00399 }
00400 template <class charT, class traits>
00401 basic_netbuf<charT,traits>::basic_netbuf(const IPaddr& addr, bool datagram)
00402   : std::basic_streambuf<charT,traits>(), buf_in(NULL), buf_out(NULL), using_buf_in(false), using_buf_out(false),
00403   sock(INVALID_SOCKET), openInProgress(false), canRead(false), canWrite(false), is_echoing(false), autoReconnect(false), sockFromServer(),
00404   peerAddress(), localAddress(), tgtAddress(), isDatagram()  {
00405   Init();
00406   open(addr,datagram);
00407 }
00408 
00409 template <class charT, class traits>
00410 basic_netbuf<charT,traits>::basic_netbuf(const IPaddr::ipname_t& host, const IPaddr::ipport_t port, bool datagram)
00411   : std::basic_streambuf<charT,traits>(), buf_in(NULL), buf_out(NULL), using_buf_in(false), using_buf_out(false),
00412   sock(INVALID_SOCKET), openInProgress(false), canRead(false), canWrite(false), is_echoing(false), autoReconnect(false), sockFromServer(),
00413   peerAddress(), localAddress(), tgtAddress(), isDatagram()  {
00414   Init();
00415   open(host,port,datagram);
00416 }
00417 
00418 template <class charT, class traits>
00419 basic_netbuf<charT,traits>::basic_netbuf(size_t buf_in_size, size_t buf_out_size) 
00420   : std::basic_streambuf<charT,traits>(), buf_in(NULL), buf_out(NULL), using_buf_in(false), using_buf_out(false),
00421   sock(INVALID_SOCKET), openInProgress(false), canRead(false), canWrite(false), is_echoing(false), autoReconnect(false), sockFromServer(),
00422   peerAddress(), localAddress(), tgtAddress(), isDatagram()  {
00423   Init();
00424 }
00425 
00426 template <class charT, class traits>
00427 basic_netbuf<charT,traits>::~basic_netbuf() {
00428   autoReconnect=false;
00429   if(is_open()) {
00430     out_flush();
00431     close();
00432   }
00433   if(using_buf_in)
00434     delete [] buf_in;
00435   if(using_buf_out)
00436     delete [] buf_out;
00437 }
00438 
00439 template<class charT, class traits>
00440 void
00441 basic_netbuf<charT,traits>::Init(size_t insize, size_t outsize) {
00442   buf_in = new charT[insize];
00443   buf_out = new charT[outsize];
00444   using_buf_in = using_buf_out = true;
00445   this->setg(buf_in,buf_in+insize,buf_in+insize);
00446   this->setp(buf_out,buf_out+outsize);
00447   //  cout << "Buffer is:" << endl;
00448   //  cout << "Input buffer: " << egptr() << " to " << egptr() << " at " << gptr() << endl;
00449   //  cout << "Output buffer: " << pbase() << " to " << epptr() << " at " << pptr() << endl;
00450 }
00451 
00452 template<class charT, class traits>
00453 void
00454 basic_netbuf<charT,traits>::update_status() {
00455   if(sock==INVALID_SOCKET) {
00456     canRead=canWrite=false;
00457     return;
00458   }
00459 #if defined(__FreeBSD__) || defined(__APPLE__)
00460   int kq = kqueue();
00461   struct kevent ke[2];
00462   EV_SET(&ke[0],sock,EVFILT_READ,EV_ADD,0,0,0);
00463   EV_SET(&ke[1],sock,EVFILT_WRITE,EV_ADD,0,0,0);
00464   struct kevent ko[10];
00465   timespec tv = {0,0};
00466   int res = kevent(kq,ke,2,ko,10,&tv);
00467   if(res<0) {
00468     perror("basic_netbuf kevent");
00469   }
00470   bool eof=false;
00471   canRead=canWrite=false;
00472   for(int i=0; i<res; ++i) {
00473     //std::cout << ko[i].ident << ' ' << ko[i].filter << ' ' << ko[i].flags << ' ' << ko[i].fflags << ' ' << ko[i].data << std::endl;
00474     u_short errorflag = (ko[i].flags&EV_ERROR);
00475     if(errorflag && ko[i].data==ENOTSUP) {
00476       errorflag=0;
00477     } else if(ko[i].filter==EVFILT_READ)
00478       canRead = ko[i].data>0;
00479     else if(ko[i].filter==EVFILT_WRITE)
00480       canWrite = true;
00481     eof |= (ko[i].flags & (EV_EOF|errorflag));
00482   }
00483 	::close(kq);
00484   int type;
00485   socklen_t optlen = sizeof(type);
00486   res = getsockopt(sock, SOL_SOCKET, SO_TYPE, &type, &optlen);
00487   bool notsock = (res<0 && errno==ENOTSOCK);
00488   if(res<0 && errno!=ENOTSOCK)
00489     perror("basic_netbuf getsockopt");
00490   if(!canRead && ((notsock && !canWrite && eof) || (!notsock && (!canWrite || eof)))) {
00491     close();
00492     if(autoReconnect)
00493       reconnect();
00494     if(is_open())
00495       update_status();
00496   }
00497 #else
00498   pollfd pfd;
00499   pfd.fd=sock;
00500   pfd.events = POLLIN | POLLOUT;
00501   if(poll(&pfd,1,0)==-1)
00502     perror("basic_netbuf poll");
00503   if(pfd.revents&(POLLERR|POLLHUP|POLLNVAL)) {
00504     close();
00505     if(autoReconnect)
00506       reconnect();
00507     if(is_open())
00508       update_status();
00509   } else {
00510     canRead = (pfd.revents&POLLIN);
00511     canWrite = (pfd.revents&POLLOUT);
00512   }
00513 #endif
00514 }
00515 
00516 template <class charT, class traits>
00517 bool
00518 basic_netbuf<charT,traits>::open(const IPaddr& addr, bool datagram) {
00519   if(is_open())
00520     close();
00521   tgtAddress=addr;
00522   SetScope<bool> progress(openInProgress,true,false);
00523   
00524   while(!is_open()) {
00525     //cout << "netstream opening " << addr.get_display_num() << ":" << addr.get_port() << endl;
00526     // create socket
00527     int opsock = ::socket(AF_INET, datagram ? (int)SOCK_DGRAM : (int)SOCK_STREAM, 0);
00528     if(opsock < 0) {
00529       perror("netstream socket()");
00530       //std::cerr << "netstream error: socket failed to create stream socket" << std::endl;
00531       return false;
00532     }
00533     int on=1;
00534     if ( ::setsockopt ( opsock, SOL_SOCKET, SO_REUSEADDR, ( const char* ) &on, sizeof ( on ) ) == -1 ) {
00535       perror("netstream SO_REUSEADDR setsockopt");
00536     }
00537 #ifdef __APPLE__
00538     if ( ::setsockopt ( opsock, SOL_SOCKET, SO_NOSIGPIPE, ( const char* ) &on, sizeof ( on ) ) == -1 ) {   
00539       perror("netstream SO_NOSIGPIPE setsockopt");   
00540     }
00541 #endif
00542     if(datagram) {
00543       if ( ::setsockopt ( opsock, SOL_SOCKET, SO_BROADCAST, ( const char* ) &on, sizeof ( on ) ) == -1 ) {
00544         perror("netstream SO_BROADCAST setsockopt");
00545       }
00546       const int sndbuf=(epptr()-pbase());
00547       if ( ::setsockopt ( opsock, SOL_SOCKET, SO_SNDBUF, ( const char* ) &sndbuf, sizeof ( sndbuf ) ) == -1 ) {
00548         perror("netstream SO_SNDBUF setsockopt");
00549       }
00550     }
00551     
00552     if(datagram && addr.get_num()==IPaddr::BROADCAST.get_num()) {
00553       // don't connect, use sendto so we can support broadcast
00554       sock=opsock;
00555       sockFromServer=false;
00556       isDatagram=datagram;
00557       peerAddress=addr;
00558       sockaddr_in server;
00559       socklen_t server_size=sizeof(server);
00560       int err = ::getsockname(sock, reinterpret_cast<sockaddr *>(&server), &server_size);
00561       if(err<0)
00562         perror("netstream getsockname");
00563       else {
00564         localAddress.set_addr(ntohl(server.sin_addr.s_addr), ntohs(server.sin_port));
00565       }
00566       return true;
00567     }
00568     
00569     // connect to server.
00570     sockaddr_in server = addr.get_addr();
00571     int err = ::connect(opsock, (const sockaddr *) &server, sizeof(server));
00572     if(err < 0) {
00573       //perror("netstream connect()");
00574       //cout << "netstream error: connect failed to connect to requested address" << endl;
00575 			::close(opsock);
00576       if(!autoReconnect) {
00577         return false;
00578       }
00579       usleep(750000); // don't try to reconnect too fast
00580     } else {
00581       sock=opsock;
00582       sockFromServer=false;
00583       isDatagram=datagram;
00584       socklen_t server_size=sizeof(server);
00585       err = ::getpeername(sock, reinterpret_cast<sockaddr *>(&server), &server_size);
00586       if(err<0)
00587         perror("netstream getpeername");
00588       else {
00589         peerAddress.set_addr(ntohl(server.sin_addr.s_addr), ntohs(server.sin_port));
00590       }
00591       err = ::getsockname(sock, reinterpret_cast<sockaddr *>(&server), &server_size);
00592       if(err<0)
00593         perror("netstream getsockname");
00594       else {
00595         localAddress.set_addr(ntohl(server.sin_addr.s_addr), ntohs(server.sin_port));
00596       }
00597       //cout << " netstream connected to " << getPeerAddress().get_display_num() << ":" << getPeerAddress().get_port()
00598       //  << " from " << getLocalAddress().get_display_num() << ":" << getLocalAddress().get_port() << endl;
00599     }
00600   }
00601   
00602   return true;
00603 }
00604 
00605 template <class charT, class traits>
00606 bool
00607 basic_netbuf<charT,traits>::listen(const IPaddr& addr, bool datagram) {
00608   if(is_open())
00609     close();
00610   tgtAddress=addr;
00611   SetScope<bool> progress(openInProgress,true,false);
00612   
00613   // create socket
00614   int opsock = ::socket(AF_INET, datagram ? (int)SOCK_DGRAM : (int)SOCK_STREAM, 0);
00615   if(opsock < 0) {
00616     perror("netstream socket");
00617     //cout << "netstream error: socket failed to create stream socket" << endl;
00618     return false;
00619   }
00620   int on=1;
00621   if ( ::setsockopt ( opsock, SOL_SOCKET, SO_REUSEADDR, ( const char* ) &on, sizeof ( on ) ) == -1 ) {
00622     perror("netstream SO_REUSEADDR setsockopt");
00623   }
00624 #ifdef __APPLE__
00625   if ( ::setsockopt ( opsock, SOL_SOCKET, SO_REUSEPORT, ( const char* ) &on, sizeof ( on ) ) == -1 ) {   
00626     perror("netstream SO_NOSIGPIPE setsockopt");   
00627   }
00628   if ( ::setsockopt ( opsock, SOL_SOCKET, SO_NOSIGPIPE, ( const char* ) &on, sizeof ( on ) ) == -1 ) {   
00629     perror("netstream SO_NOSIGPIPE setsockopt");   
00630   }
00631 #endif
00632   if(datagram) {
00633     if ( ::setsockopt ( opsock, SOL_SOCKET, SO_BROADCAST, ( const char* ) &on, sizeof ( on ) ) == -1 ) {
00634       perror("netstream SO_BROADCAST setsockopt");
00635     }
00636     const int sndbuf=(epptr()-pbase());
00637     if ( ::setsockopt ( opsock, SOL_SOCKET, SO_SNDBUF, ( const char* ) &sndbuf, sizeof ( sndbuf ) ) == -1 ) {
00638       perror("netstream SO_SNDBUF setsockopt");
00639     }
00640   }
00641   
00642   sockaddr_in server = addr.get_addr();
00643   
00644   // bind socket to specified address
00645   if(::bind(opsock, (const sockaddr *) &server, sizeof(server)) != 0) {
00646     //perror("netstream bind");
00647 		::close(opsock);
00648     return false;
00649   }
00650   
00651   if(datagram) {
00652     // datagram is not connection based, so we are now 'connected'
00653     sock = opsock;
00654     sockFromServer = true;
00655     isDatagram = datagram;
00656     socklen_t server_size=sizeof(server);
00657     int err = ::getsockname(sock, reinterpret_cast<sockaddr *>(&server), &server_size);
00658     if(err<0)
00659       perror("netstream getsockname");
00660     else {
00661       localAddress.set_addr(ntohl(server.sin_addr.s_addr), ntohs(server.sin_port));
00662     }
00663     return true;
00664   }
00665   
00666   // tell OS to start listening
00667   if(::listen(opsock, 0) != 0) {
00668     perror("netstream listen");
00669 		::close(opsock);
00670     return false;
00671   }
00672   
00673   while(!is_open()) {
00674     // block until connection
00675     sockaddr tmp;
00676     socklen_t tmplen=sizeof(tmp);
00677     int sock2 = ::accept(opsock, &tmp, &tmplen);
00678     if(sock2 < 0) {
00679       if(errno!=EINTR)
00680         perror("netstream accept");
00681       if(!autoReconnect) {
00682 				::close(opsock);
00683         return false;
00684       }
00685     } else {
00686       // close server socket
00687 			::close(opsock);
00688       //replace with accepted socket
00689       sock=sock2;
00690       sockFromServer=true;
00691       isDatagram=datagram;
00692       socklen_t server_size=sizeof(server);
00693       int err = ::getpeername(sock, reinterpret_cast<sockaddr *>(&server), &server_size);
00694       if(err<0)
00695         perror("netstream getpeername");
00696       else {
00697         peerAddress.set_addr(ntohl(server.sin_addr.s_addr), ntohs(server.sin_port));
00698       }
00699       err = ::getsockname(sock, reinterpret_cast<sockaddr *>(&server), &server_size);
00700       if(err<0)
00701         perror("netstream getsockname");
00702       else {
00703         localAddress.set_addr(ntohl(server.sin_addr.s_addr), ntohs(server.sin_port));
00704       }
00705       //cout << " netstream connected to " << getPeerAddress().get_display_num() << ":" << getPeerAddress().get_port()
00706       //  << " from " << getLocalAddress().get_display_num() << ":" << getLocalAddress().get_port() << endl;
00707     }
00708   }
00709   
00710   return true;
00711 }
00712 
00713 template <class charT, class traits>
00714 bool
00715 basic_netbuf<charT,traits>::open(const IPaddr::ipname_t& str, bool datagram) {
00716   std::string::size_type colon = str.rfind(':');
00717   if(colon==std::string::npos)
00718     return false;
00719   IPaddr::ipport_t port = atoi(str.substr(colon+1).c_str());
00720   bool res = open(str.substr(0,colon),port,datagram);
00721   return res;
00722 }
00723 
00724 template <class charT, class traits>
00725 void
00726 basic_netbuf<charT,traits>::close() {
00727   //cout << "close called" << endl;
00728   if(!is_open())
00729     return;
00730   //cout << "closing" << endl;
00731 	::close(sock);
00732   //cout << "closed" << endl;
00733   sock = INVALID_SOCKET;
00734 }
00735 
00736 template <class charT, class traits>
00737 void
00738 basic_netbuf<charT,traits>::reconnect() {
00739   if(sockFromServer) {
00740     listen(tgtAddress,isDatagram);
00741   } else {
00742     open(tgtAddress,isDatagram);
00743   }
00744 }
00745 
00746 template <class charT, class traits>
00747 void
00748 basic_netbuf<charT,traits>::printBuffer(const char *buf, int buflen, const char *header, const char *prefix) {
00749   fputs(header, stderr);
00750   int line_begin = 0;
00751   for(int i = 0; i < buflen; i++) {
00752     if(buf[i] == '\n') {
00753       line_begin = 1;
00754       fputc('\n', stderr);
00755     } else {
00756       if(line_begin) fputs(prefix, stderr);
00757       line_begin = 0;
00758       fputc(buf[i], stderr);
00759     }
00760   }
00761 }
00762 
00763 template <class charT, class traits>
00764 void
00765 basic_netbuf<charT,traits>::in_sync() {
00766   if(!is_open()) {
00767     //cout << "netstream error: must open connection before reading from it" << endl;
00768     return;
00769   }
00770   update_status();
00771   if(!is_open())
00772     return; // just discovered close, don't complain
00773   if(gptr()>egptr())
00774     gbump( egptr()-gptr() );
00775   if(gptr()==eback()) {
00776     std::cerr << "netstream error: in_sync without room in input buffer" << std::endl;
00777     return;
00778   }
00779   unsigned long n = gptr()-eback()-1;
00780   charT temp_buf[n];
00781   ssize_t used;
00782   if(isDatagram) {
00783     sockaddr_in peer = IPaddr::ANY.get_addr();
00784     socklen_t peer_size=sizeof(peer);
00785     used = recvfrom(sock, temp_buf, n*sizeof(charT), 0, reinterpret_cast<sockaddr *>(&peer), &peer_size);
00786     peerAddress.set_addr(ntohl(peer.sin_addr.s_addr), ntohs(peer.sin_port));
00787   } else {
00788     used = read(sock, temp_buf, n*sizeof(charT));
00789   }
00790   /*if(used==(ssize_t)-1)
00791     perror("netstream Read error");
00792   else
00793     std::cout << "Read '" << std::string(temp_buf,used) << "'" << std::endl;*/
00794   while(used==0 || used==(ssize_t)-1) {
00795     if(errno==EINTR) {
00796       return;
00797     }
00798     //std::cout << "netstream error: connection dropped" << std::endl;
00799     close();
00800     if(autoReconnect)
00801       reconnect();
00802     if(!is_open()) {
00803       return;
00804     }
00805     if(isDatagram) {
00806       sockaddr_in peer = IPaddr::ANY.get_addr();
00807       socklen_t peer_size=sizeof(peer);
00808       used = recvfrom(sock, temp_buf, n*sizeof(charT), 0, reinterpret_cast<sockaddr *>(&peer), &peer_size);
00809       peerAddress.set_addr(ntohl(peer.sin_addr.s_addr), ntohs(peer.sin_port));
00810       std::cout << "setting " << peerAddress.get_display_num() << ":" << peerAddress.get_port() << std::endl;
00811     } else {
00812       used = read(sock, temp_buf, n*sizeof(charT));
00813     }
00814   }
00815   if(is_echoing)
00816     printBuffer(temp_buf, used, "netstream receiving: ", "» ");
00817   //TODO - what if sizeof(charT)>1?  We might need to worry about getting/storing partial char
00818   OTAssert("Partial char was dropped!",(ssize_t)((used/sizeof(charT))*sizeof(charT))==used);
00819   used/=sizeof(charT);
00820   size_t remain = egptr()-eback()-used;
00821   traits::move(eback(),egptr()-remain,remain);
00822   traits::copy(egptr()-used,temp_buf,used);
00823   gbump( -used );
00824 }
00825 
00826 template <class charT, class traits>
00827 void
00828 basic_netbuf<charT,traits>::out_flush() {
00829   if(!is_open()) {
00830     //std::cout << "netstream error: must open connection before writing to it" << std::endl;
00831     return;
00832   }
00833   update_status();
00834   if(!is_open())
00835     return; // just discovered close, don't complain
00836   size_t n = (pptr()-pbase())*sizeof(charT);
00837   //std::cout << "Writing '" << std::string(pbase(),n) << "'" << std::endl;
00838   if(n==0)
00839     return;
00840   while(n>0) {
00841     ssize_t sent;
00842     if(isDatagram && peerAddress.get_num()==IPaddr::BROADCAST.get_num()) {
00843       sockaddr_in peer = peerAddress.get_addr();
00844       sent = sendto(sock, pbase(), n, 0, reinterpret_cast<sockaddr *>(&peer), sizeof(peer));
00845     } else {
00846       sent = write(sock, pbase(), n);
00847     }
00848     if(sent < 0) {
00849       perror("netstream flush");
00850       if(errno==EINTR)
00851         return;
00852       //cout << "netstream error: write error" << endl;
00853       close();
00854       if(autoReconnect)
00855         reconnect();
00856       if(!is_open())
00857         return;
00858       continue;
00859     }
00860     if(is_echoing)
00861       printBuffer(pbase(), sent, "netstream sending: ", "« ");
00862     n -= sent;
00863     // could be trouble if partial character sent:
00864     if(n!=0)
00865       traits::move(pbase(),pbase()+sent/sizeof(charT),n/sizeof(charT));
00866     pbump( -sent/sizeof(charT) );
00867   }
00868 }
00869 
00870 
00871 template <class charT, class traits>
00872 inline std::streamsize
00873 basic_netbuf<charT, traits>::showmanyc() {
00874   //caller is supposed to check this before calling...
00875   //if(gptr()<egptr())
00876   //  return egptr()-gptr();
00877   update_status();
00878   return (is_open() && canRead) ? 1 : 0;
00879 }
00880 
00881 template <class charT, class traits>
00882 inline typename basic_netbuf<charT, traits>::int_type
00883 basic_netbuf<charT, traits>::underflow() {
00884   in_sync();
00885   if(gptr()<egptr())
00886     return traits::to_int_type(*gptr());
00887 //  cout << "UNDERFLOW" << endl;
00888   return traits::eof();
00889 }
00890 
00891 template <class charT, class traits>
00892 inline typename basic_netbuf<charT, traits>::int_type
00893 basic_netbuf<charT, traits>::uflow() {
00894   in_sync();
00895   if(gptr()<egptr()) {
00896     int_type ans = traits::to_int_type(*gptr());
00897     gbump(1);
00898     return ans;
00899   }
00900 //  cout << "UNDERFLOW" << endl;
00901   return traits::eof();
00902 }
00903 
00904 template <class charT, class traits>
00905 inline typename basic_netbuf<charT, traits>::int_type
00906 basic_netbuf<charT, traits>::overflow(int_type c) { 
00907   out_flush();
00908   if(!is_open())
00909     return traits::eof();
00910   if(!traits::eq_int_type(c, traits::eof())) {
00911     *pptr() = c;
00912     pbump(1);
00913   }
00914   return traits::not_eof(c);
00915 }
00916 
00917 //template <class charT, class traits>
00918 //inline basic_netbuf<charT, traits>::_Myt //not supported - don't know details of expected implementation
00919 //basic_netbuf<charT, traits>::setbuf(char_type* /*s*/, streamsize /*n*/) {
00920 //  return this;
00921 //}
00922 
00923 template <class charT, class traits>
00924 typename basic_netbuf<charT, traits>::pos_type
00925 basic_netbuf<charT, traits>::seekoff(off_type off, std::ios_base::seekdir way, std::ios_base::openmode which /*= ios_base::in | ios_base::out*/) {
00926   bool dog = (which & std::ios_base::in);
00927   bool dop = (which & std::ios_base::out);
00928   charT* newg, *newp;
00929   int gb,pb;
00930   switch(way) {
00931     case std::ios_base::beg: {
00932       newg = eback()+off;
00933       gb = newg-gptr();
00934       newp = pbase()+off;
00935       pb = newp-pptr();
00936     } break;
00937     case std::ios_base::cur: {
00938       newg = gptr()+off;
00939       newp = pptr()+off;
00940       gb=pb=static_cast<int>(off);
00941     } break;
00942     case std::ios_base::end: {
00943       newg = egptr()+off;
00944       gb = newg-gptr();
00945       newp = epptr()+off;
00946       pb = newp-pptr();
00947     } break;
00948     default:
00949       return pos_type(off_type(-1));
00950   }
00951   if((dog && (newg<eback() || egptr()<newg)) || (dop && (newp<pbase() || epptr()<newp)))
00952     return pos_type(off_type(-1));
00953   if(dog)
00954     gbump(gb);
00955   if(dop) {
00956     pbump(pb);
00957     return pptr()-pbase();
00958   } else {
00959     return gptr()-eback();
00960   }
00961 }
00962 
00963 template <class charT, class traits>
00964 inline int
00965 basic_netbuf<charT, traits>::sync() {
00966   out_flush();
00967   //in_sync();
00968   return is_open()?0:-1;
00969 }
00970 
00971 #undef OTAssert
00972 
00973 #endif

Tekkotsu v5.1CVS
Generated Mon May 9 04:58:45 2016 by Doxygen 1.6.3