/* Wrapper to allow using either epoll or select. The selection is either done at compile time or on the fly based on kernel capabilities (2.4 or 2.6). */ #include #include #include #include #include #include #include "tlmpnet.h" #include #include "has_epoll-tmp.bak" #ifdef HAS_EPOLL #include #else #endif static int allocid=0; /* Return an ID for TLMPEPOLL clients so they can tell apart their events from others when they are sharing the same TLMPEPOLL */ int tlmpepoll_allocid() { return allocid++; } struct FDDATA{ int id; void *data; }; struct TLMPEPOLL_private{ fd_set in,out; fd_set res_in,res_out; int loop; int maxfd; int epfd; FDDATA *fddata; int maxdata; TLMPEPOLL_private(){ FD_ZERO (&in); FD_ZERO (&out); FD_ZERO (&res_in); FD_ZERO (&res_out); epfd = -1; maxfd = 0; loop = 0; fddata = NULL; maxdata = 0; } ~TLMPEPOLL_private(){ free (fddata); } void grow (int fd){ if (fd >= maxdata){ int new_maxdata = fd+100; fddata = (FDDATA*)realloc(fddata,new_maxdata*sizeof(FDDATA)); for (int i=maxdata; iepfd= -1; }else{ priv->epfd = epoll_create (1024); // fprintf (stderr,"epfd = %d\n",priv->epfd); } #endif } PUBLIC TLMPEPOLL::~TLMPEPOLL() { close (priv->epfd); delete priv; } /* Special version when epoll is not available */ PRIVATE void TLMPEPOLL::ctl_select( TLMPEPOLL_CTL op, int fd, int events) { if (op == TLMPEPOLL_CTL_ADD){ if ((events & TLMPEPOLL_IN)!=0){ FD_SET (fd,&priv->in); } if ((events & TLMPEPOLL_OUT)!=0){ FD_SET (fd,&priv->out); } if (priv->maxfd <= fd) priv->maxfd = fd+1; }else if (op == TLMPEPOLL_CTL_DEL){ FD_CLR (fd,&priv->in); FD_CLR (fd,&priv->out); }else if (op == TLMPEPOLL_CTL_MOD){ if ((events & TLMPEPOLL_IN)!=0){ FD_SET (fd,&priv->in); }else{ FD_CLR (fd,&priv->in); } if ((events & TLMPEPOLL_OUT)!=0){ FD_SET (fd,&priv->out); }else{ FD_CLR (fd,&priv->out); } } } PUBLIC void TLMPEPOLL::ctl ( TLMPEPOLL_CTL op, int fd, int events) { #ifdef HAS_EPOLL if (priv->epfd != -1){ static int tbop[]={ EPOLL_CTL_ADD, EPOLL_CTL_DEL, EPOLL_CTL_MOD }; epoll_event ev; ev.events = (events & TLMPEPOLL_IN) != 0 ? EPOLLIN : 0; ev.events |= (events & TLMPEPOLL_OUT) != 0 ? EPOLLOUT : 0; ev.data.fd = fd; int ok = epoll_ctl (priv->epfd,tbop[op],fd,&ev); if (ok == -1){ fprintf (stderr,"epoll_ctl fd=%d ok=%d errno=%d\n",fd,ok,errno); } }else{ ctl_select (op,fd,events); } #else ctl_select (op,fd,events); #endif } /* Associate some data to the handle */ PUBLIC void TLMPEPOLL::setdata (int fd, void *data) { priv->setdata (fd,data); } /* Associate in ID (or owner) to the handle */ PUBLIC void TLMPEPOLL::setid (int fd, int id) { priv->setid (fd,id); } /* Special case when epoll is not available */ PRIVATE int TLMPEPOLL::wait_select( TLMPEPOLL_EVENT events[], int nbevents, int timeout) { int ret = 0; while (1){ // fprintf (stderr,"loop %d < %d ret=%d\n",priv->loop,priv->maxfd,ret); while (priv->loop < priv->maxfd && ret < nbevents){ int bits = FD_ISSET(priv->loop,&priv->res_in) ? TLMPEPOLL_IN : 0; bits |= FD_ISSET(priv->loop,&priv->res_out) ? TLMPEPOLL_OUT : 0; if (bits != 0){ events[ret].events = bits; events[ret].fd = priv->loop; FDDATA *data = priv->fddata+priv->loop; events[ret].data = data->data; events[ret].id = data->id; ret++; } priv->loop++; } if (ret == 0){ priv->res_in = priv->in; priv->res_out = priv->out; #if 0 for (int i=0; imaxfd; i++){ if (FD_ISSET(i,&priv->res_in)) printf ("in[%d]\n",i); if (FD_ISSET(i,&priv->res_out)) printf ("out[%d]\n",i); } fprintf (stderr,"select maxfd %d\n",priv->maxfd); #endif struct timeval tm; tm.tv_sec = timeout / 1000; tm.tv_usec = (timeout % 1000) * 1000; int ok = select (priv->maxfd,&priv->res_in,&priv->res_out,NULL ,timeout < 0 ? NULL : &tm); if (ok < 0){ ret = ok; break; }else if (ok == 0){ break; }else{ priv->loop = 0; } }else{ break; } } return ret; } PUBLIC int TLMPEPOLL::wait ( TLMPEPOLL_EVENT events[], int nbevents, int timeout) { #ifdef HAS_EPOLL if (priv->epfd != -1){ epoll_event tmp[nbevents]; int ret = epoll_wait (priv->epfd,tmp,nbevents,timeout); // fprintf (stderr,"epoll_wait ret=%d errno=%d timeout=%d\n",ret,errno,timeout); TLMPEPOLL_EVENT *pt = events; epoll_event *pttmp = tmp; for (int i=0; ievents; // fprintf (stderr,"TLMPEPOLL::wait x=%x\n",x); // For now, we only support IN and OUT. Any other // event will be taken as an EOF int nx = (x & EPOLLIN) != 0 ? TLMPEPOLL_IN : 0; nx |= (x & EPOLLOUT) != 0 ? TLMPEPOLL_OUT : 0; // patch // Currently, we only process IN and OUT upstream. // All other conditions are producing an EOF anyway // So all other flags are mapped to IN if (nx == 0) nx = TLMPEPOLL_IN; pt->events = nx; pt->fd = pttmp->data.fd; FDDATA *data = priv->fddata + pt->fd; pt->data = data->data; pt->id = data->id; } return ret; }else{ return wait_select (events,nbevents,timeout); } #else return wait_select (events,nbevents,timeout); #endif } #ifdef TEST #include #include #include #include #include #include #include #include #include using namespace std; static int tcp_server( const char *port) { struct sockaddr_in sin; sin.sin_family = AF_INET; struct servent *s = getservbyname(port,"tcp"); if (s != NULL){ sin.sin_port = s->s_port; }else{ sin.sin_port = htons (atoi(port)); } memset (&sin.sin_addr,0,sizeof(sin.sin_addr)); int listen_handle = -1; for (int i=0; i<5; i++){ listen_handle = socket (AF_INET, SOCK_STREAM, 0); int opt = 1; if (listen_handle == -1){ fprintf (stderr,"listen_handle %d(%s)\n" ,errno ,strerror(errno)); }else if (setsockopt(listen_handle,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt))==-1){ fprintf (stderr,"Can't set socket option SO_REUSEADDR (%s)\n" ,strerror (errno)); }else if (bind (listen_handle,(struct sockaddr *) &sin, sizeof (sin)) == -1){ fprintf (stderr,"bind %d(%s)\n",errno ,strerror(errno)); }else if (listen (listen_handle,35) == -1){ fprintf (stderr,"listen %d(%s)\n",errno ,strerror(errno)); break; }else{ fprintf (stderr,"bind ok\n"); break; } close (listen_handle); listen_handle = -1; if (i < 5){ fprintf (stderr,"Sleeping 5 seconds\n"); sleep (5); } } return listen_handle; } struct SOFAR{ int sofar; int size; SOFAR(){ sofar = size = 0; } SOFAR(int _size){ sofar = 0; size = _size; } bool send (int fd){ int len = size - sofar; char buf[len]; memset (buf,'a',len); buf[len-1] = '\n'; int rest = write (fd,buf,len); sofar += rest; return sofar == size; } }; int main (int argc, char *argv[]) { TLMPEPOLL ep; int listen_handle = tcp_server ("8001"); if (listen_handle != -1){ ep.ctl (TLMPEPOLL_CTL_ADD,listen_handle,TLMPEPOLL_IN); ep.setdata (listen_handle,(void*)"listen_handle"); map mp; while (1){ TLMPEPOLL_EVENT evs[10]; int nb = ep.wait (evs,10,15000); if (nb == 0){ fprintf (stderr,"Timeout\n"); }else{ fprintf (stderr,"nb=%d\n",nb); TLMPEPOLL_EVENT *pt = evs; for (int i=0; ifd; fprintf (stderr,"Event fd=%d data=%s\n",fd,(char*)pt->data); if (fd == listen_handle){ struct sockaddr_in sin; socklen_t addrlen=sizeof(sin); int newfd = accept (listen_handle,(struct sockaddr*)&sin,&addrlen); fprintf (stderr,"New connection %d\n",newfd); if (newfd == -1){ fprintf (stderr,"Accept failed\n"); }else{ char tmp[100]; snprintf (tmp,sizeof(tmp)-1,"handle %d",newfd); ep.ctl (TLMPEPOLL_CTL_ADD,newfd,TLMPEPOLL_IN); ep.setdata (newfd,strdup(tmp)); fcntl (newfd,F_SETFL,O_NONBLOCK); } }else{ if ((pt->events & TLMPEPOLL_IN) != 0){ char buf[101]; int len = read (fd,buf,100); fprintf (stderr,"read %d -> %d\n",fd,len); if (len <= 0){ ep.ctl (TLMPEPOLL_CTL_DEL,fd,0); close (fd); }else{ buf[len] = '\0'; int size = atoi(buf); len = snprintf (buf,sizeof(buf)-1,"Rec %d bytes\n",len); write (fd,buf,len); if (size > 0) mp[fd] = SOFAR(size); if (mp[fd].send(fd)){ ep.ctl (TLMPEPOLL_CTL_MOD,fd,TLMPEPOLL_IN); }else{ ep.ctl (TLMPEPOLL_CTL_MOD,fd,TLMPEPOLL_IN|TLMPEPOLL_OUT); } } } if ((pt->events & TLMPEPOLL_OUT) != 0){ if (mp[fd].send(fd)){ ep.ctl (TLMPEPOLL_CTL_MOD,fd,TLMPEPOLL_IN); } } } } } } } return 0; } #endif