#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "tlmpnet.h" #include "tlmpnet.m" #include "../tlmplib/tlmplib.h" #include #include using namespace std; #define MAX_CLIENTS 1024 TCPSERVER_V1_INFO::TCPSERVER_V1_INFO() { linelen = 0; force_end = false; port = NULL; data = NULL; rest = NULL; listenhandle = -1; } struct TCPSERVER_V1_BLOCKBUF{ void *data; int len; int off; // Written so far void init (const void *_data, int _off, int _len){ off = _off; len = _len; data = malloc(len); if (data != NULL){ memcpy (data,_data,_len); }else{ len = 0; } } TCPSERVER_V1_BLOCKBUF (const void *_data, int _len){ init (_data,0,_len); } TCPSERVER_V1_BLOCKBUF (const TCPSERVER_V1_BLOCKBUF &s){ init (s.data,s.off,s.len); } ~TCPSERVER_V1_BLOCKBUF(){ free (data); } unsigned getremain() const { return len - off; } }; struct TCPSERVER_V1_CLIENT{ SSTRING s; // Input buffer int state; ARRAY_OBJ *data; bool rawmode; vector blocks; unsigned long buffered; // How many bytes in blocks bool eventfull_sent; // The application knows we have too many // bytes in blocks bool listening; struct timeval lastwrite; // Allows us to track dead (or very slow) // connection bool ending; // Discard all output to this client // it will be shut down TCPSERVER_V1_CLIENT(){ data = NULL; state = 0; listening = true; rawmode = false; lastwrite.tv_sec = lastwrite.tv_usec = 0; buffered = 0; eventfull_sent = false; ending = false; } ~TCPSERVER_V1_CLIENT(){ delete data; for (unsigned i=0; i tbc; int curclient; vector fds; vector ports; int since; int nbclients; int nbpending; bool rawmode; bool nonblock; unsigned long nonblock_bufsize; unsigned maxclient; int iter_pos; // State for iter_init and iter_next int id_data; int id_listen; bool endserver; vector client_ending; // event(OUTFULL) request endclient TLMPEPOLL *ep; TCPSERVER_V1_PRIVATE(){ time_out = 0; c= NULL; curclient = -1; since = 0; nbclients = 0; rawmode = false; nonblock = false; nonblock_bufsize = 0; nbpending = 0; maxclient = MAX_CLIENTS; id_data = id_listen = -1; ep = NULL; } ~TCPSERVER_V1_PRIVATE(){ for (unsigned i=0; i= 0 && (unsigned)handle < tbc.size()){ ret = tbc[handle]; if (ret == NULL){ fprintf (stderr,errmsg,handle); for (unsigned i=0; ictl (TLMPEPOLL_CTL_MOD,client,op); } cl.blocks.push_back (new TCPSERVER_V1_BLOCKBUF(data,len)); cl.buffered += len; if (cl.buffered >= nonblock_bufsize && !cl.eventfull_sent){ bool endclient = false; c->event(client,TCPSERVER_OUTFULL,endserver,endclient,cl.data); cl.eventfull_sent = true; if (endclient){ // We can't close here, because we are in some process // From now on, we discard output and the main loop // will close the connection client_ending.push_back(client); cl.ending = true; } } } }; int _F_TCPSERVER_V1::iter_next (void *&data) { data = NULL; int ret = -1; int iter_pos = priv->iter_pos; int size = priv->tbc.size(); while (iter_pos >= 0 && iter_pos < size){ int pos = iter_pos++; TCPSERVER_V1_CLIENT *c = priv->tbc[pos]; if (c != NULL){ ret = pos; data = c->data; break; } } priv->iter_pos = iter_pos; return ret; } int _F_TCPSERVER_V1::iter_init (void *&data) { priv->iter_pos = 0; return iter_next (data); } int _F_TCPSERVER_V1::iter_init () { void *data; return iter_init (data); } int _F_TCPSERVER_V1::iter_next () { void *data; return iter_next (data); } PUBLIC int TCPSERVER_V1::iter_init (void *&data) { return priv->c->iter_init(data); } PUBLIC int TCPSERVER_V1::iter_init () { void *data; return iter_init (data); } PUBLIC int TCPSERVER_V1::iter_next (void *&data) { return priv->c->iter_next (data); } PUBLIC int TCPSERVER_V1::iter_next () { void *data; return iter_next (data); } PUBLIC bool TCPSERVER_V1::is_blocked (int fd, unsigned long &size, long long &last_write) { return priv->c->is_blocked (fd,size,last_write); } /* Record the IDLE maximum time for a connection */ void _F_TCPSERVER_V1::set_timeout(int nbseconds) { //priv->cmd->set_timeout (priv->curclient,nbseconds); } /* Record the operation mode for a given client. mode == true, means binary mode, true = false means line mode (text) Returns the current mode. */ bool _F_TCPSERVER_V1::setrawmode (int no, bool mode) { TCPSERVER_V1_CLIENT &ptc = priv->getcli(no); bool ret = ptc.rawmode; ptc.rawmode = mode; return ret; } bool _F_TCPSERVER_V1::setrawmode (bool mode) { return setrawmode (priv->curclient,mode); } /* Set the line mode or raw (binary) mode. Return the current mode. */ PUBLIC bool TCPSERVER_V1::setrawmode (bool mode) { bool ret = priv->rawmode; priv->rawmode = mode; return ret; } PUBLIC bool TCPSERVER_V1::setrawmode (int no, bool mode) { return priv->c->setrawmode (no,mode); } /* Set the sockets to non-blocking (for writting). */ PUBLIC bool TCPSERVER_V1::setnonblock ( bool mode, int bufsize) // TCPSERVER_V1 transparently buf the send/sendto request // when they block so it is transparent to the caller // But if there is too much buffer to buf, some connection // will be terminated. { bool ret = priv->nonblock; if (mode && bufsize < 1000){ tlmp_error ("TCPSERVER_V1::setnonblock() bufsize=%d, raised to 30000\n",bufsize); bufsize = 30000; } priv->nonblock = mode; priv->nonblock_bufsize = bufsize; return ret; } /* This is called when the server is idle for a given timeout */ void _F_TCPSERVER_V1::idle(int, bool &, TCPSERVER_V1_INFO &) { } void _F_TCPSERVER_V1::time_out(int, bool &, TCPSERVER_V1_INFO &) { } void _F_TCPSERVER_V1::event(int, TCPSERVER_EVENT, bool &, bool &, ARRAY_OBJ *&) { } /* Send a string to the given client */ int _F_TCPSERVER_V1::sendto(int client, const char *line) { return sendto (client,line,strlen(line)); } int _F_TCPSERVER_V1::sendto(int client,const void *buf, int len) { int ret = -1; TCPSERVER_V1_CLIENT &c = priv->getcli(client); if (!c.ending){ if (c.blocks.size() > 0){ // We can't write now, there is already some data in the buffer // waiting to be sent to the client priv->keepbuf (client,buf,len); ret = len; }else{ while (1){ int written = write (client,buf,len); // fprintf (stderr,"client %d written %d len %d errno %d(%s)\n",client,written,len,errno,strerror(errno)); if (written == -1 && errno == EAGAIN) written = 0; if (written == len){ ret = len; break; }else if (written >= 0){ gettimeofday (&c.lastwrite,NULL); priv->keepbuf (client,(void*)((char*)buf+written),len-written); ret = len; break; }else if (errno != EINTR){ // Some error, not EINTR, better to leave break; } // Loop because we were interrupted } } } return ret; } /* Send a formatted message to the given client */ int _F_TCPSERVER_V1::sendtof(int client, const char *ctl, ...) { va_list list; va_start (list,ctl); char buf[10000]; int n = vsnprintf (buf,sizeof(buf)-1,ctl,list); va_end (list); int ret = sendto (client,buf,n); return ret; } /* Send a string to the current client */ int _F_TCPSERVER_V1::send(const char *line) { return send (line,strlen(line)); } int _F_TCPSERVER_V1::send(const void *buf, int len) { return sendto (priv->curclient,buf,len); } /* Send a formatted message to the current client */ int _F_TCPSERVER_V1::sendf(const char *ctl, ...) { int ret = -1; va_list list; va_start (list,ctl); va_list list2; va_copy (list2,list); char buf[10000]; int n = vsnprintf (buf,sizeof(buf)-1,ctl,list); va_end (list); if (n == -1 || n >= (int)(sizeof(buf)-1)){ // Ok, this was too long. older glibc returned -1, newer return // the expected amount char *alloc = NULL; n = vasprintf (&alloc,ctl,list2); ret = send (alloc,n); free (alloc); }else{ ret = send (buf,n); } va_end (list2); return ret; } /* Send a formatted message to all client */ int _F_TCPSERVER_V1::sendallf(const char *ctl, ...) { va_list list; va_start (list,ctl); char buf[10000]; int n = vsnprintf (buf,sizeof(buf)-1,ctl,list); va_end (list); return sendall (buf,n); } int _F_TCPSERVER_V1::sendall(const void *s, int len) { int ret = -1; for (unsigned i=0; itbc.size(); i++){ TCPSERVER_V1_CLIENT *c = priv->tbc[i]; if (c != NULL){ ret = write (i,s,len); if (ret != len) break; } } return ret; } /* Send a string to all client */ int _F_TCPSERVER_V1::sendall(const char *s) { return sendall (s,strlen(s)); } void _F_TCPSERVER_V1::endclient (int, bool &, TCPSERVER_V1_INFO &) { } void _F_TCPSERVER_V1::settcpnodelay(bool on) { int opt = on ? 1 : 0; setsockopt (priv->curclient,SOL_TCP,TCP_NODELAY,&opt,sizeof(opt)); } void _F_TCPSERVER_V1::settcpnodelay(int no, bool on) { int opt = on ? 1 : 0; setsockopt (no,SOL_TCP,TCP_NODELAY,&opt,sizeof(opt)); } /* Forget the current client. This is generally called when the application fork to handle the current client itself. */ void _F_TCPSERVER_V1::forgetclient() { int cli = priv->curclient; delete priv->tbc[cli]; priv->tbc[cli] = NULL; priv->nbclients--; if (priv->ep != NULL) priv->ep->ctl (TLMPEPOLL_CTL_DEL,cli,0); } /* Close one client. */ void _F_TCPSERVER_V1::closeclient(int cli) { if (cli >= 0 && (unsigned)cli < priv->tbc.size()){ delete priv->tbc[cli]; priv->tbc[cli] = NULL; priv->nbclients--; if (priv->ep != NULL) priv->ep->ctl (TLMPEPOLL_CTL_DEL,cli,0); ::close (cli); } } PUBLIC void TCPSERVER_V1::closeclient(int cli) { priv->c->closeclient (cli); } /* Return the number of connected clients */ int _F_TCPSERVER_V1::getnbclients() { return priv->nbclients; } /* Get the application data associated with a handle */ ARRAY_OBJ *_F_TCPSERVER_V1::getclientdata(int cli) { ARRAY_OBJ *ret = NULL; if (cli >= 0 && (unsigned)cli < priv->tbc.size()){ TCPSERVER_V1_CLIENT *c = priv->tbc[cli]; if (c != NULL){ ret = c->data; } } return ret; } /* Return true if the connection output is blocked (stuff written using send/sendto() has not been completed yet). */ bool _F_TCPSERVER_V1::is_blocked(int handle, unsigned long &size, long long &lastwrite) { TCPSERVER_V1_CLIENT &c = priv->getcli(handle); int nb = c.blocks.size(); bool ret = nb > 0; size = 0; lastwrite = 0; if (nb > 0){ lastwrite = (long long)c.lastwrite.tv_sec *1000000 + c.lastwrite.tv_usec; size = c.buffered; } return ret; } /* Return true if the current (active) connection output is blocked. */ bool _F_TCPSERVER_V1::is_blocked() { unsigned long size; long long lastwrite; return is_blocked (priv->curclient,size,lastwrite); } /* Return the number of socket with data to process */ int _F_TCPSERVER_V1::getnbpending() { return priv->nbpending; } bool _F_TCPSERVER_V1::setlisten (int handle, bool on) { TCPSERVER_V1_CLIENT &c = priv->getcli(handle); bool ret = c.listening; c.listening = on; if (on != ret && priv->ep != NULL){ int op = on ? TLMPEPOLL_IN : 0; if (c.blocks.size() > 0) op |= TLMPEPOLL_OUT; priv->ep->ctl (TLMPEPOLL_CTL_MOD,handle,op); } return ret; } PUBLIC bool TCPSERVER_V1::setlisten (int handle, bool on) { return priv->c->setlisten (handle,on); } PUBLIC bool TCPSERVER_V1::islistening (int handle) { return priv->c->islistening(handle); } bool _F_TCPSERVER_V1::islistening (int handle) { TCPSERVER_V1_CLIENT &c = priv->getcli(handle); return c.listening; } PUBLIC int _F_TCPSERVER_V1::listenfd ( int fd, const char *port) // Mostly for documentation { if (fd != -1){ priv->fds.push_back(fd); priv->ports.push_back(port); if (priv->ep != NULL){ priv->ep->ctl (TLMPEPOLL_CTL_ADD,fd,TLMPEPOLL_IN); priv->ep->setid (fd,priv->id_listen); } } return fd; } PUBLIC int TCPSERVER_V1::listenfd ( int fd, const char *port) // Mostly for documentation { return priv->c->listenfd (fd,port); } PUBLIC int TCPSERVER_V1::listen ( const char *bindaddr, const char *port) { int fd = -1; if (strncmp(port,"unix:",5)==0){ fd = tcpserver_openunix (port+5); }else{ fd = tcpserver_opentcp (bindaddr,port,true); } if (fd != -1){ priv->fds.push_back(fd); priv->ports.push_back(port); if (priv->ep != NULL){ priv->ep->ctl (TLMPEPOLL_CTL_ADD,fd,TLMPEPOLL_IN); priv->ep->setid (fd,priv->id_listen); } } return fd; } PUBLIC int TCPSERVER_V1::listen ( const char *port) { return listen (NULL,port); } PUBLIC int TCPSERVER_V1::listen ( int port) { char tmp[20]; snprintf (tmp,sizeof(tmp)-1,"%d",port); return listen (NULL,tmp); } PRIVATE void TCPSERVER_V1::init ( _F_TCPSERVER_V1 &c, const char *bindaddr, const char *port, int time_out) { priv = new TCPSERVER_V1_PRIVATE; priv->c = &c; c.priv = priv; priv->time_out = time_out; if (port != NULL) listen (bindaddr,port); } PUBLIC TCPSERVER_V1::TCPSERVER_V1 (_F_TCPSERVER_V1 &c, const char *port, int time_out) { init (c,NULL,port,time_out); } PUBLIC TCPSERVER_V1::TCPSERVER_V1 ( _F_TCPSERVER_V1 &c, const char *bindaddr, const char *port, int time_out) { init (c,bindaddr,port,time_out); } PUBLIC TCPSERVER_V1::TCPSERVER_V1 (_F_TCPSERVER_V1 &c, int port, int time_out) { char tmp[20]; sprintf (tmp,"%d",port); init (c,NULL,tmp,time_out); } PUBLIC TCPSERVER_V1::TCPSERVER_V1 (_F_TCPSERVER_V1 &c) { init (c,NULL,NULL,5); } /* Return true if the listening socket has been established */ PUBLIC bool TCPSERVER_V1::is_ok() { return priv->fds.size() > 0; } PUBLIC VIRTUAL TCPSERVER_V1::~TCPSERVER_V1 () { delete priv; } PRIVATE void _F_TCPSERVER_V1::endsession(int cli) { closeclient (cli); } PRIVATE void TCPSERVER_V1::endsession(int cli) { priv->c->endsession(cli); } /* Return the number of active clients */ PUBLIC int TCPSERVER_V1::getnbclients() const { return priv->nbclients; } PUBLIC void TCPSERVER_V1::loop() { if (priv->fds.size() > 0 || priv->nbclients > 0){ fd_set empty; FD_ZERO (&empty); bool endserver = false; priv->endserver = false; priv->since = 0; TLMPEPOLL ep; setup_epoll (ep); int timeout = priv->time_out * 1000; while (!endserver && !priv->endserver){ TLMPEPOLL_EVENT events[10]; int nbev = ep.wait (events,10,timeout); if (nbev == -1) break; if (nbev == 0){ endserver = dispatch (0,empty); }else{ endserver = process_epoll (ep,events,nbev); } } forget_epoll(); } } /* Accept an handle to monitor even if this handle is unrelated to the sockets we are monitoring. */ void _F_TCPSERVER_V1::inject (int client, ARRAY_OBJ *data) { if (client >= 0 && (unsigned)client < priv->maxclient){ TCPSERVER_V1_CLIENT &ptc = priv->allocclient (client); priv->nbclients++; ptc.data = data; ptc.rawmode = priv->rawmode; ptc.listening = true; if (priv->nonblock) fcntl(client,F_SETFL,O_NONBLOCK); if (priv->ep != NULL){ priv->ep->ctl (TLMPEPOLL_CTL_ADD,client,TLMPEPOLL_IN); priv->ep->setid (client,priv->id_data); } } } void _F_TCPSERVER_V1::inject_output (int handle) { if (handle >= 0 && (unsigned)handle < priv->maxclient){ inject (handle,NULL); setlisten (handle,false); } } PUBLIC void TCPSERVER_V1::inject_output (int handle) { priv->c->inject_output (handle); } PUBLIC void TCPSERVER_V1::inject (int client, ARRAY_OBJ *data) { priv->c->inject (client,data); } PUBLIC void TCPSERVER_V1::inject (int client) { priv->c->inject (client,NULL); } PRIVATE void TCPSERVER_V1::accept_con ( int i, bool &endserver, TLMPEPOLL *ep) { struct sockaddr_in sacc; socklen_t size=sizeof(sacc); int newclient = accept (priv->fds[i],(struct sockaddr *)&sacc,&size); if (newclient != -1 && (unsigned)newclient < priv->maxclient){ TCPSERVER_V1_CLIENT &ptc = priv->allocclient(newclient); // fprintf (stderr,"newclient %d\n",newclient); if (priv->nonblock) fcntl(newclient,F_SETFL,O_NONBLOCK); priv->nbclients++; ptc.rawmode = priv->rawmode; priv->curclient = newclient; unsigned long addr = ntohl(sacc.sin_addr.s_addr); #if 0 struct sockaddr_in adr; unsigned int len = sizeof(adr); if (getpeername (newclient,(struct sockaddr*)&adr,&len) != -1){ addr = ntohl(adr.sin_addr.s_addr); } #endif bool endclient = false; TCPSERVER_V1_INFO info; info.data = NULL; info.port = priv->ports[i].c_str(); info.listenhandle = priv->fds[i]; // fprintf (stderr,"accept_con curclient %d size %u\n",priv->curclient,priv->tbc.size()); if (ep != NULL){ ep->ctl (TLMPEPOLL_CTL_ADD,newclient,TLMPEPOLL_IN); ep->setid (newclient,priv->id_data); } priv->c->newclient (newclient,addr,endclient,endserver,info); ptc.data = info.data; if (endclient){ endsession(newclient); } }else{ ::close(newclient); } } PRIVATE void TCPSERVER_V1::process_end( int fd, bool &endserver, bool is_timeout, bool force_end) // Application request ending { TCPSERVER_V1_CLIENT &ptc = priv->getcli(fd); TCPSERVER_V1_INFO info; info.data = ptc.data; info.force_end = force_end; if (is_timeout) priv->c->time_out (fd,endserver,info); priv->c->endclient (fd,endserver,info); ptc.data = info.data; endsession(fd); } /* Close all connection as requested by the event functag in OUTFULL condition. */ PRIVATE void TCPSERVER_V1::process_ending(bool &endserver) { for (vector::iterator it=priv->client_ending.begin(); it != priv->client_ending.end(); it++){ int fd = *it; TCPSERVER_V1_CLIENT *ptc = priv->tbc[fd]; // The handle may have been closed while processing a previous // events (int the same loop) if (ptc != NULL){ process_end (fd,endserver,false,true); } } priv->client_ending.clear(); } PRIVATE void TCPSERVER_V1::process_data ( int fd, bool &endserver) { bool is_timeout = false; // Not done // fprintf (stderr,"process_data fd=%d\n",fd); TCPSERVER_V1_CLIENT &ptc = priv->getcli(fd); { char buf[32*1024+1]; int nb = read (fd,buf,sizeof(buf)-1); priv->curclient = fd; // fprintf (stderr,"TCPSERVER_V1::process_data fd=%d nb=%d errno=%d\n",fd,nb,errno); if (nb <= 0){ if (nb == 0 || errno != EINTR){ process_end (fd,endserver,is_timeout,false); } }else{ bool endclient = false; buf[nb] = '\0'; if (ptc.rawmode){ TCPSERVER_V1_INFO info; info.data = ptc.data; info.rest = NULL; info.linelen = nb; priv->c->receive (fd,buf,endclient,endserver ,ptc.state,info); ptc.data = info.data; }else{ SSTRING *s = &ptc.s; s->append (buf); const char *pt = s->get(); while (*pt != '\0'){ const char *start = pt; while (*pt != '\0' && *pt != '\n') pt++; if (*pt == '\n'){ int len = (int)(pt-start); char line[len+1]; strncpy (line,start,len); line[len] = '\0'; // Some are sending \r\n lines if (len > 0 && line[len-1] == '\r'){ len--; line[len] = '\0'; } pt++; TCPSERVER_V1_INFO info; info.data = ptc.data; info.rest = pt; info.linelen = len; priv->c->receive (fd,line,endclient,endserver ,ptc.state,info); ptc.data = info.data; if (endclient || endserver){ break; } }else{ pt = start; break; } } s->setfrom (pt); } if (endclient){ process_end (fd,endserver,false,true); } } } } PRIVATE bool TCPSERVER_V1::dispatch (int select_ret, fd_set &set) { bool endserver = false; if (select_ret == 0){ priv->since += priv->time_out; TCPSERVER_V1_INFO info; priv->c->idle (priv->since,endserver,info); // Even if ok == 0, we must cycle through readnext() // to detect ended client (timeout) }else{ priv->since = 0; for (unsigned i=0; ifds.size(); i++){ int fd = priv->fds[i]; if (FD_ISSET(fd,&set)){ accept_con (i,endserver,NULL); FD_CLR (fd,&set); // Make sure the next loop // does not pick this bit again } } for (unsigned i=0; !endserver && itbc.size(); i++){ if (FD_ISSET(i,&set) && priv->tbc[i] != NULL){ process_data (i,endserver); } } } process_ending(endserver); return endserver; } /* Insert the proper handle in the fd_set so a global select may be done. */ PUBLIC int TCPSERVER_V1::setup_select (fd_set &set, int max_handle) { for (unsigned i=0; ifds.size(); i++){ int fd = priv->fds[i]; if (fd > max_handle) max_handle = fd; FD_SET (fd,&set); } for (unsigned i=0; itbc.size(); i++){ TCPSERVER_V1_CLIENT *ptc = priv->tbc[i]; if (ptc != NULL){ if (i > (unsigned)max_handle) max_handle = i; FD_SET (i,&set); } } return max_handle; } /* Process the work in the fd_set. */ PUBLIC void TCPSERVER_V1::process_select ( int select_ret, fd_set &set, long timeout) { dispatch (select_ret,set); } PUBLIC void TCPSERVER_V1::setup_epoll (TLMPEPOLL &ep) { priv->ep = &ep; priv->id_listen = tlmpepoll_allocid(); priv->id_data = tlmpepoll_allocid(); for (unsigned i=0; ifds.size(); i++){ int fd = priv->fds[i]; ep.ctl (TLMPEPOLL_CTL_ADD,fd,TLMPEPOLL_IN); ep.setid (fd,priv->id_listen); } for (unsigned i=0; itbc.size(); i++){ TCPSERVER_V1_CLIENT *ptc = priv->tbc[i]; if (ptc != NULL){ ep.ctl (TLMPEPOLL_CTL_ADD,i,TLMPEPOLL_IN); ep.setid (i,priv->id_data); } } } PUBLIC void TCPSERVER_V1::forget_epoll () { priv->ep = NULL; } /* Process the work in the fd_set. */ PUBLIC bool TCPSERVER_V1::process_epoll ( TLMPEPOLL &ep, TLMPEPOLL_EVENT events[], int nbevent) { bool endserver = false; TLMPEPOLL_EVENT *ptev = events; // fprintf (stderr,"TCPSERVER_V1::process_epoll nbevent=%d\n",nbevent); for (int i=0; ifd; // fprintf (stderr,"TCPSERVER_V1::process_epoll ptev->id=%d priv->id_listen=%d id_data=%d\n",ptev->id,priv->id_listen,priv->id_data); if (ptev->id == priv->id_listen){ for (unsigned i=0; ifds.size(); i++){ if (priv->fds[i] == fd){ accept_con (i,endserver,&ep); break; } } }else if (ptev->id == priv->id_data){ TCPSERVER_V1_CLIENT *ptc = priv->tbc[fd]; // The handle may have been closed while processing a previous // events (int the same loop) // fprintf (stderr,"TCPSERVER_V1::process_epoll ptc=%p listening=%d events=%d in=%d out=%d\n",ptc,ptc->listening,ptev->events,ptev->events&TLMPEPOLL_IN,ptev->events&TLMPEPOLL_OUT); if (ptc != NULL){ if ((ptev->events & TLMPEPOLL_IN)!=0){ // While processing other event, we may have turned off // listening for that connection. But the event was // already queued. We just ignore the event. When we // turn listening on, epoll will resend the event if (ptc->listening){ process_data (fd,endserver); }else{ //fprintf (stderr,"Activity on fd %d, listening is off\n",fd); } } // The handle may have been closed in the previous if() ptc = priv->tbc[fd]; if (ptc != NULL && (ptev->events & TLMPEPOLL_OUT)!=0){ gettimeofday (&ptc->lastwrite,NULL); bool endclient = false; while (1){ // fprintf (stderr,"epoll out %d blocks %d\n",fd,ptc.blocks.size()); if (ptc->blocks.size()==0){ priv->ep->ctl (TLMPEPOLL_CTL_MOD,fd ,ptc->listening ? TLMPEPOLL_IN : 0); priv->c->event (fd,TCPSERVER_OUTFLUSHED,endserver,endclient,ptc->data); break; }else{ TCPSERVER_V1_BLOCKBUF *buf = ptc->blocks[0]; int len = buf->len - buf->off; int written = write (fd,(char*)buf->data+buf->off,len); // fprintf (stderr,"block written %d len %d errno=%d\n",written,len,errno); if (written == len){ delete buf; ptc->blocks.erase(ptc->blocks.begin()); ptc->buffered -= len; }else if (written >= 0){ buf->off += written; ptc->buffered -= written; break; }else if (errno == EAGAIN){ break; }else if (errno != EINTR){ process_end (fd,endserver,false,false); break; } } } if (!endclient && ptc->buffered < priv->nonblock_bufsize/2 && ptc->eventfull_sent){ priv->c->event (fd,TCPSERVER_OUTOK,endserver,endclient,ptc->data); ptc->eventfull_sent = false; } if (endclient) process_end (fd,endserver,false,true); } } } } process_ending (endserver); return endserver; } /* How many concurent clients are accepted. (not exactly, this is the largest file handle accepted). Defaults is 1024. */ PUBLIC void TCPSERVER_V1::setmaxclients(int n) { priv->maxclient = n; } PUBLIC int TCPSERVER_V1::sendallf(const char *ctl, ...) { va_list list; va_start (list,ctl); char buf[10000]; int n = vsnprintf (buf,sizeof(buf)-1,ctl,list); va_end (list); return sendall (buf,n); } PUBLIC int TCPSERVER_V1::sendall(const void *s, int len) { int ret = -1; for (unsigned i=0; itbc.size(); i++){ TCPSERVER_V1_CLIENT *c = priv->tbc[i]; if (c != NULL){ ret = write (i,s,len); if (ret != len) break; } } return ret; } /* Send a string to all client */ PUBLIC int TCPSERVER_V1::sendall(const char *s) { return sendall (s,strlen(s)); } /* Send a string to the given client */ PUBLIC int TCPSERVER_V1::sendto(int client, const char *line) { return sendto (client,line,strlen(line)); } PUBLIC int TCPSERVER_V1::sendto(int client,const void *buf, int len) { return priv->c->sendto (client,buf,len); } /* Send a formatted message to the given client */ PUBLIC int TCPSERVER_V1::sendtof(int client, const char *ctl, ...) { va_list list; va_start (list,ctl); char buf[10000]; int n = vsnprintf (buf,sizeof(buf)-1,ctl,list); va_end (list); int ret = sendto (client,buf,n); return ret; } /* Return the listening port allocated by the OS (when port==0 in the constructor) */ PUBLIC int TCPSERVER_V1::getlistenport() { int ret = -1; if (priv->fds.size()==1){ int fd = priv->fds[0]; struct sockaddr_in adr; unsigned int len = sizeof(adr); if (getsockname(fd,(struct sockaddr*)&adr,&len)!=-1){ ret = ntohs(adr.sin_port); } } return ret; }