#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "fdpass.h" using namespace std; static DEBUG_KEY D_CONPROXY ("conproxy","Conproxy main program"); enum TYPE_CONNECT {TYPE_NONE, TYPE_FD, TYPE_CONTROL}; static int lognum = 0; static int allocid=0; struct HANDLE_INFO: public ARRAY_OBJ{ TYPE_CONNECT type; int id; int talk_to; // For TYPE_FD time_t start; time_t last; long long bytes; long long blocks; string description; FILE *flog; string logto; bool is_in; // For each connection, there is two handle. // One is considered the "in" and the other the "out" // is_in is used to identify the handle, for logging purpose int fd_control; // Special handle kept open until the end of the connection // conproxy is independant from horizon, workhole and blackhole // but without talking to them, it informs them // when a connection is over, just by closing fd_control. HANDLE_INFO (){ type = TYPE_NONE; id = allocid++; talk_to = -1; start = (time_t)0; bytes = 0; blocks = 0; last = (time_t)0; flog = NULL; is_in = false; fd_control = -1; } void startlogging(const char *dir, const string &fname){ if (flog == NULL){ SSTRING tmp; tmp.setfromf ("%s/%s-%d-%s.log",dir,fname.c_str(),lognum,is_in ? "in" : "out"); flog = fopen (tmp.c_str(),"a"); if (flog != NULL){ logto = tmp.c_str(); }else{ tlmp_error ("Can't open log file %s (%s)\n",tmp.c_str(),strerror(errno)); } } } void stoplogging(){ if (flog != NULL){ fclose (flog); flog = NULL; logto.clear(); } } ~HANDLE_INFO(){ stoplogging(); } }; #include "proto/conproxy_control.protoh" struct LOGSTRING{ string str; // Substring to match in the connection description string fname; // Base name of the log file LOGSTRING(const char *_str, const char *_fname){ str = _str; fname = _fname; } }; static bool logging_needed (const vector &logstrings, const string &description, string &fname) { bool ret = false; for (vector::const_iterator it = logstrings.begin(); it != logstrings.end(); it++){ if (strstr(description.c_str(),it->str.c_str()) != NULL){ fname = it->fname; ret = true; break; } } return ret; } // Remember last connection, so it is easier to figure out the substring to use for the "log on" command struct LOGCONNECT{ time_t start; // When the connection was started string description; long long from,to; // Bytes transfered during the connection long long from_blks,to_blks; // Blocks transfered LOGCONNECT(const time_t _start, const char *_description, long long _from, long long _from_blks, long long _to, long long _to_blks){ start = _start; description = _description; from = _from; from_blks = _from_blks; to = _to; to_blks = _to_blks; } }; static bool conproxy_badchar( const char *s) { bool ret = false; while (*s != '\0'){ char car = *s++; if (car < ' ' || car > 127 || car == '/' || car == ';' || car == '<' || car == '>' || car == '|' || car == '(' || car == ')' || car == '*' || car == '?' || car == '&' || car == '!' || car == '\\' || car == '`' || car == '"' || car == '\'' || car == '$'){ ret = true; break; } } return ret; } static int conproxy_nbconnections(_F_TCPSERVER_V1 *c, bool only_inbound) { int nb = 0; void *data; int fd = c->iter_init(data); while (fd != -1){ HANDLE_INFO *n = (HANDLE_INFO*)data; // For each "connection", we have to handles. We copy from // one to the other. For presentation purpose, we count // those handle pairs as one connection. // One handle has is_in set to true and the other to false if (n->type == TYPE_FD){ if (!only_inbound || n->is_in){ nb++; } } fd = c->iter_next(data); } return nb; } /* Based on the new list of logging string we review all current connection and stop/start the loggin as needed */ static void conproxy_update_log( _F_TCPSERVER_V1 &c, const vector &logstrings, const char *logdir) { void *data; int fd = c.iter_init(data); while (fd != -1){ HANDLE_INFO *n = (HANDLE_INFO*)data; if (n->type == TYPE_FD && n->is_in){ string logname; bool needed = logging_needed (logstrings,n->description,logname); if (needed){ if (n->flog==NULL){ n->startlogging(logdir,logname); if (n->talk_to != -1){ HANDLE_INFO *nn = (HANDLE_INFO*)c.getclientdata (n->talk_to); nn->startlogging(logdir,logname); } lognum++; } }else{ if (n->flog !=NULL){ n->stoplogging(); if (n->talk_to != -1){ HANDLE_INFO *nn = (HANDLE_INFO*)c.getclientdata (n->talk_to); nn->stoplogging(); } } } } fd = c.iter_next(data); } } static void conproxy_kill (_F_TCPSERVER_V1 &c, int connect_id) { int talkid = -1; void *data; int fd = c.iter_init(data); while (fd != -1){ HANDLE_INFO *n = (HANDLE_INFO*)data; if (n->id == connect_id){ talkid = n->talk_to; c.closeclient (fd); c.sendf ("Closing handle %d\n",fd); if (n->fd_control != -1){ close (n->fd_control); c.sendf ("Closing control handle %d\n",n->fd_control); } break; } fd = c.iter_next(data); } // Now we kill the related connection fd = c.iter_init(data); while (fd != -1){ HANDLE_INFO *n = (HANDLE_INFO*)data; if (fd == talkid){ c.closeclient (fd); c.sendf ("Closing related handle %d\n",fd); if (n->fd_control != -1){ close (n->fd_control); c.sendf ("Closing control handle %d\n",n->fd_control); } break; } fd = c.iter_next(data); } } int main (int argc, char *argv[]) { glocal int ret = -1; glocal const char *logdir = "/tmp/conproxy"; glocal const char *pidfile = "/run/conproxy.pid"; glocal const char *user = "blackhole"; glocal bool daemon = false; glocal const char *port = "/var/run/blackhole/conproxy.sock"; glocal const char *statfile = "/var/log/conproxy-connect.log"; glocal int maxhandles = 10000; glocal const char *errorfile = NULL; glocal FILE *ferror = NULL; glocal.ret = (argc,argv); setproginfo ("conproxy",VERSION,"Manage TCP connections"); setarg ('p',"port","Unix socket",glocal.port,false); setgrouparg ("Daemon options"); setarg (' ',"daemon","Run in background",glocal.daemon,false); setarg (' ',"pidfile","Write the daemon pid in this file",glocal.pidfile,false); setarg (' ',"user","Run as this user",glocal.user,false); setgrouparg ("Misc. options"); setarg (' ',"logdir","Directory holding log files",glocal.logdir,false); setarg (' ',"statfile","File holding connection statistics",glocal.statfile,false); setarg (' ',"maxhandles","Maximum number of file handle supported",glocal.maxhandles,false); setarg (' ',"errorfile","Error message will be written to this file",glocal.errorfile,false); if (glocal.ferror != NULL){ fprintf (glocal.ferror,"ERR: %s",msg); fflush (glocal.ferror); }else if (glocal.daemon){ syslog (LOG_ERR,"%s",msg); }else{ fprintf (stderr,"%s",msg); } if (glocal.ferror != NULL){ fprintf (glocal.ferror,"WARN: %s",msg); fflush (glocal.ferror); }else if (glocal.daemon){ syslog (LOG_WARNING,"%s",msg); }else{ fprintf (stderr,"WARN: %s",msg); } glocal unsigned long long total_bytes_in = 0; glocal unsigned long long total_bytes_out = 0; glocal FILE *fstatfile = NULL; glocal vector logstrings; glocal vector logconnects; glocal bool quitlast = false; // End server when last connection ends glocal string removesock; // Unix socket to remove when doing quitlast glocal int maxconnect = 0; int ret = -1; SSTRING tmp; tmp.setfromf ("unix:%s",glocal.port); fdpass_checkservice (glocal.port); glocal.fstatfile = fopen (glocal.statfile,"a"); if (glocal.fstatfile == NULL){ tlmp_error ("Can't open statfile %s (%s)\n",glocal.statfile,strerror(errno)); } if (glocal.errorfile != NULL){ glocal.ferror = fopen (glocal.errorfile,"a"); if (glocal.ferror == NULL){ tlmp_error ("Can't open errorfile %s (%s), aborting\n",glocal.errorfile,strerror(errno)); exit (-1); } } (tmp.c_str(),10); HANDLE_INFO *n = new HANDLE_INFO; n->type = TYPE_CONTROL; debug_printf (D_CONPROXY,"newclient %d\n",no); info.data = n; // Control connections have to operate in blocking mode because fdpass_receivefd is use // and does a revcmsg directly without using epoll before. fcntl(no,F_SETFL,0); HANDLE_INFO *n = (HANDLE_INFO*)info.data; if (n->type == TYPE_CONTROL){ debug_printf (D_CONPROXY,"Ending control %d\n",no); }else if (n->type == TYPE_FD){ if (n->fd_control != -1) close (n->fd_control); if (n->talk_to == -1){ debug_printf (D_CONPROXY,"Delayed ending fd %d\n",no); }else{ unsigned long size; long long lastwrite; bool blocked = is_blocked (n->talk_to,size,lastwrite); debug_printf (D_CONPROXY,"Ending fd %d -> %d blocked %d\n",no,n->talk_to,blocked); HANDLE_INFO *tn = (HANDLE_INFO*)getclientdata (n->talk_to); long long tn_bytes = 0; long long tn_blocks = 0; if (tn != NULL){ if (tn->fd_control != -1) close(tn->fd_control); tn_bytes = tn->bytes; tn_blocks = tn->blocks; }else{ tlmp_error ("Ending client %d, and handle %d already ended ???\n",no,n->talk_to); } if (!blocked){ closeclient (n->talk_to); }else{ tlmp_error ("Closing blocked connection %d -> %d\n",no,n->talk_to); // We can't close this handle immediatly, because some data has not been sent yet // see the TCPSERVER_OUTFLUSHED in the event functag if (tn != NULL) tn->talk_to = -1; } glocal.logconnects.push_back(LOGCONNECT(n->start,n->description.c_str(),n->bytes,n->blocks,tn_bytes,tn_blocks)); if (glocal.logconnects.size() > 20){ glocal.logconnects.erase(glocal.logconnects.begin()); } glocal.total_bytes_in += n->bytes; glocal.total_bytes_out += tn_bytes; const char *pt = strstr(n->description.c_str(),"LOGID="); if (pt != NULL){ string logid; str_copyword(logid,pt+6); if (glocal.fstatfile != NULL){ DATEASC tstr; time_t t = time(NULL); fdpass_asctime (t,tstr); fprintf (glocal.fstatfile,"%s,%s,%Ld,%Ld\n",logid.c_str(),tstr.buf,n->bytes,tn_bytes); fflush (glocal.fstatfile); } } } if (glocal.quitlast && conproxy_nbconnections(this,false)<=1){ // <=1 because the ending connection is part of the count endserver = true; if (glocal.removesock.size() > 0) unlink(glocal.removesock.c_str()); } } // debug_printf (D_CONPROXY,"Receive %d: len=%d %s\n",no,info.linelen,line); HANDLE_INFO *n = (HANDLE_INFO*)info.data; if (n->type == TYPE_FD){ // debug_printf (D_CONPROXY,"Writing %d to %d\n",info.linelen,n->talk_to); if (n->flog != NULL) fwrite (line,1,info.linelen,n->flog); if (n->talk_to != -1) sendto (n->talk_to,line,info.linelen); n->last = time(NULL); n->bytes += info.linelen; n->blocks++; }else if (n->type == TYPE_CONTROL){ (this,line,endserver,endclient,no,n); glocal.TCPSERVER.send ("Go1\n"); int fd1 = fdpass_receivefd (no); debug_printf (D_CONPROXY,"receive fd1 %d\n",fd1); if (fd1 != -1){ glocal.TCPSERVER.send ("Go2\n"); int fd2 = fdpass_receivefd (no); debug_printf (D_CONPROXY,"receive fd2 %d\n",fd2); if (fd2 == -1){ close (fd1); }else{ /* fd_control is just a handle passed from the blackhole, horizon or wormhole used to track when the connection is ended. The conproxy does nothing with this connection, except close it */ glocal.TCPSERVER.send ("Go3\n"); int fd_control= fdpass_receivefd (no); debug_printf (D_CONPROXY,"receive fd_control %d\n",fd_control); if (fd_control == -1){ close (fd1); close (fd2); }else{ HANDLE_INFO *inf1 = new HANDLE_INFO; inf1->type = TYPE_FD; inf1->talk_to = fd2; inf1->description = extra; inf1->start = inf1->last = time(NULL); inf1->is_in = true; inf1->fd_control = fd_control; glocal.TCPSERVER.inject (fd1,inf1); HANDLE_INFO *inf2 = new HANDLE_INFO; inf2->type = TYPE_FD; inf2->talk_to = fd1; inf2->description = extra; inf2->start = inf2->last = inf1->start; inf2->is_in = false; inf2->fd_control = -1; glocal.TCPSERVER.inject (fd2,inf2); glocal.TCPSERVER.setrawmode (fd1,true); glocal.TCPSERVER.setrawmode (fd2,true); glocal.TCPSERVER.settcpnodelay (fd1,true); glocal.TCPSERVER.settcpnodelay (fd2,true); string logname; if (logging_needed (glocal.logstrings,inf1->description,logname)){ // Same description for both connections, so we log both inf1->startlogging(glocal.logdir,logname); inf2->startlogging(glocal.logdir,logname); lognum++; } int nbc = glocal.TCPSERVER.getnbclients(); if (nbc > glocal.maxconnect) glocal.maxconnect = nbc; } } } glocal.TCPSERVER.sendf ("conproxy version %s\n",VERSION); glocal.TCPSERVER.sendf ("logdir: %s\n",glocal.logdir); glocal.TCPSERVER.sendf ("maxhandles: %d\n",glocal.maxhandles); glocal.TCPSERVER.sendf ("max connections: %d\n",glocal.maxconnect); glocal.TCPSERVER.sendf ("%d active connections\n",conproxy_nbconnections(&glocal.TCPSERVER,true)); glocal.TCPSERVER.sendf ("quitlast %d removesock %s\n",glocal.quitlast,glocal.removesock.c_str()); unsigned long long total_bytes_in = glocal.total_bytes_in; unsigned long long total_bytes_out = glocal.total_bytes_out; { // Compute the total sent for ongoing connections void *data; int fd = glocal.TCPSERVER.iter_init(data); while (fd != -1){ HANDLE_INFO *n = (HANDLE_INFO*)data; if (n->type == TYPE_FD){ if (n->is_in){ total_bytes_in += n->bytes; }else{ total_bytes_out += n->bytes; } } fd = glocal.TCPSERVER.iter_next(data); } } glocal.TCPSERVER.sendf ("total_bytes_received %Lu\n",total_bytes_in); glocal.TCPSERVER.sendf ("total_bytes_sent %Lu\n",total_bytes_out); if (glocal.logstrings.size()==0){ glocal.TCPSERVER.sendf ("Logging disabled\n"); }else{ for (vector::iterator it = glocal.logstrings.begin(); it != glocal.logstrings.end(); it++){ glocal.TCPSERVER.sendf ("log on %s %s\n",it->fname.c_str(),it->str.c_str()); } glocal.TCPSERVER.sendf ("next log: %s/...-%d-in/out.log\n",glocal.logdir,lognum); } for (vector::iterator it = glocal.logconnects.begin(); it != glocal.logconnects.end(); it++){ DATEASC starttime; fdpass_asctime (it->start,starttime); glocal.TCPSERVER.sendf ("lastconnect %s %10Ld/%Ld %10Ld/%Ld %s\n",starttime.buf ,it->from,it->from_blks,it->to,it->to_blks,it->description.c_str()); } if (glocal.removesock.size() > 0) unlink(glocal.removesock.c_str()); endserver = true; glocal.quitlast = true; glocal.removesock = unixsock; if (conproxy_nbconnections(&glocal.TCPSERVER,false)==0){ endserver = true; // Ending now if (glocal.removesock.size() > 0) unlink(glocal.removesock.c_str()); } bool ison = atoi(on)==1; if (ison){ debug_seton(); }else{ debug_setoff(); } debug_setfdebug (filename); void *data; int fd = glocal.TCPSERVER.iter_init(data); while (fd != -1){ HANDLE_INFO *n = (HANDLE_INFO*)data; if (n->type == TYPE_FD){ DATEASC starttime,lasttime; fdpass_asctime (n->start,starttime); fdpass_asctime (n->last,lasttime); unsigned long size; long long lastwrite; SSTRING logto("-"); if (n->flog != NULL){ logto.setfromf ("logto:%s)",n->logto.c_str()); } glocal.TCPSERVER.sendf ("%d %d,%d: %Ld/%Ld %s %s %s %s %s :%s\n",n->id,fd,n->talk_to,n->bytes,n->blocks ,starttime.buf,lasttime.buf ,glocal.TCPSERVER.is_blocked(fd,size,lastwrite) ? "blocked" : "-" ,glocal.TCPSERVER.islistening(fd) ? "-" : "not-listening" ,logto.c_str() ,n->description.c_str()); } fd = glocal.TCPSERVER.iter_next(data); } if (extra[0] == '\0'){ // Without argument, we clear all the log strings glocal.logstrings.clear(); }else{ for(vector::iterator it =glocal.logstrings.begin(); it != glocal.logstrings.end(); it++){ if (strcmp(it->str.c_str(),extra)==0){ glocal.logstrings.erase(it); break; } } } conproxy_update_log (glocal.TCPSERVER,glocal.logstrings,glocal.logdir); // We make sure the file name has no special characters // The issue may not be with conproxy itself, but the admin (root) // will review those files. Make sure the name is somewhat clean if (*extra == '\0'){ glocal.TCPSERVER.sendf ("Invalid logon command, not substring: %s\n",line); }else if (conproxy_badchar (filename)){ glocal.TCPSERVER.sendf ("Invalid logfilename: %s\n",line); }else{ glocal.logstrings.push_back(LOGSTRING(extra,filename)); conproxy_update_log (glocal.TCPSERVER,glocal.logstrings,glocal.logdir); } void *data; int fd = glocal.TCPSERVER.iter_init(data); while (fd != -1){ HANDLE_INFO *n = (HANDLE_INFO*)data; if (n->flog != NULL) fflush (n->flog); fd = glocal.TCPSERVER.iter_next(data); } DATEASC datetime; fdpass_asctime (time(NULL),datetime); void *data; int fd = glocal.TCPSERVER.iter_init(data); while (fd != -1){ HANDLE_INFO *n = (HANDLE_INFO*)data; if (n->flog != NULL) fprintf (n->flog,"\n*** %s\n",datetime.buf); fd = glocal.TCPSERVER.iter_next(data); } glocal.maxconnect = glocal.TCPSERVER.getnbclients(); conproxy_kill (glocal.TCPSERVER,atoi(id)); int last_activity = time(NULL) - atoi(hours)*60*60; // We have to redo the loop until all old connections are killed // The connection_kill does an iteration of the TCPSERVER connection // so everytime we kill one connection, we restart the iteration while (1){ bool found_one = false; void *data; int fd = glocal.TCPSERVER.iter_init(data); while (fd != -1){ HANDLE_INFO *n = (HANDLE_INFO*)data; if (n->type == TYPE_FD && n->last < last_activity){ found_one = true; glocal.TCPSERVER.sendf ("Closing session %d\n",n->id); conproxy_kill (glocal.TCPSERVER,n->id); break; } fd = glocal.TCPSERVER.iter_next(data); } if (!found_one) break; } if (glocal.fstatfile != NULL) fclose (glocal.fstatfile); glocal.fstatfile = fopen (filename,"a"); if (glocal.fstatfile == NULL){ tlmp_error ("Can't open statfile %s (%s)\n",filename,strerror(errno)); } glocal.TCPSERVER.sendf ("Invalid command: %s\n",line); endclient = true; } HANDLE_INFO *n = (HANDLE_INFO*)data; if (ev == TCPSERVER_OUTFULL){ debug_printf (D_CONPROXY,"handle %d blocked, stop listening to %d\n",no,n->talk_to); if (n->talk_to != -1) setlisten (n->talk_to,false); }else if (ev == TCPSERVER_OUTOK){ debug_printf (D_CONPROXY,"handle %d flowing again, resume listening to %d\n",no,n->talk_to); if (n->talk_to != -1) setlisten (n->talk_to,true); }else if (ev == TCPSERVER_OUTFLUSHED){ debug_printf (D_CONPROXY,"handle %d is flushed, talk_to %d\n",no,n->talk_to); if (n->talk_to == -1){ endclient = true; if (glocal.quitlast && conproxy_nbconnections(this,false) <= 1){ endserver = true; if (glocal.removesock.size() > 0) unlink(glocal.removesock.c_str()); } } } if (!o.is_ok()){ tlmp_error ("Can't setup the unix socket %s\n",glocal.port); }else{ o.setmaxclients (glocal.maxhandles); if (glocal.user != NULL){ struct passwd *u = getpwnam(glocal.user) ; if (u == NULL){ tlmp_error ("Error retrieving %s uid/gid\n",glocal.user) ; }else{ chown(glocal.port, u->pw_uid, u->pw_gid) ; chmod(glocal.port, 0770) ; } } if (glocal.daemon){ daemon_init (glocal.pidfile,glocal.user); } signal (SIGPIPE,SIG_IGN); o.setnonblock (true,100000); o.loop(); ret = 0; } if (glocal.fstatfile != NULL) fclose (glocal.fstatfile); return ret; return glocal.ret; }