/* This file is part of Bolixo. Bolixo is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Bolixo is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Bolixo. If not, see . */ /* Publish content to other bolixo nodes */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "filesystem.h" #include "bolixo.h" #include "bolixo.m" #define INSTRUMENT_DONOTOPEN #include "instrument.h" using namespace std; static DEBUG_KEY D_PROTO ("proto","Protocol information"); enum CONNECT_TYPE { TYPE_NONE, TYPE_CONTROL, TYPE_CLIENT, TYPE_WORKER}; struct HANDLE_INFO: public ARRAY_OBJ{ CONNECT_TYPE type; REQUEST_INFO req; HANDLE_INFO(){ type = TYPE_NONE; } }; #include "proto/publishd_control.protoh" #include "proto/publishd_client.protoh" #include "proto/bod_client.protodef" #define webapi_test_NOTNEED #define webapi_login_NOTNEED #define webapi_addfile_NOTNEED #define webapi_addfile_bob_NOTNEED #define webapi_delfile_NOTNEED #define webapi_undelete_NOTNEED #define webapi_modifyfile_NOTNEED #define webapi_modifyfile_bob_NOTNEED #define webapi_rename_NOTNEED #define webapi_copy_NOTNEED #define webapi_readfile_NOTNEED #define webapi_readfile_bob_NOTNEED #define webapi_readmore_NOTNEED #define webapi_mkdir_NOTNEED #define webapi_rmdir_NOTNEED #define webapi_listdir_NOTNEED #define webapi_stat_NOTNEED #define webapi_set_access_NOTNEED #define webapi_markview_NOTNEED #define webapi_list_inboxes_NOTNEED #define webapi_list_msgs_NOTNEED #define webapi_sendmsg_NOTNEED #define webapi_sendmsg_project_NOTNEED #define webapi_replymsg_NOTNEED #define webapi_replymsg_project_NOTNEED #define webapi_sendattach_NOTNEED #define webapi_sendtalk_file_NOTNEED #define webapi_list_talk_NOTNEED #define webapi_public_listdir_NOTNEED #define webapi_public_readfile_NOTNEED #define webapi_public_list_talk_NOTNEED #define webapi_systempubkey_NOTNEED #define webapi_verifysign_NOTNEED #define webapi_getpubkey_NOTNEED #define webapi_registernode_NOTNEED #define webapi_remote_interest_set_NOTNEED #define webapi_remote_interest_unset_NOTNEED #define webapi_nodelogin_NOTNEED #define webapi_nodepass_NOTNEED #define webapi_config_read_NOTNEED #define webapi_config_write_NOTNEED #define webapi_contact_manage_NOTNEED #define webapi_contact_request_NOTNEED #define webapi_contact_list_NOTNEED #define webapi_list_contacts_NOTNEED #define webapi_list_lists_NOTNEED #define webapi_list_groups_NOTNEED #include "proto/webapi.protoch" #define bo_keysd_control_genkey_NOTNEED #define bo_keysd_control_setpassphrase_NOTNEED #define bo_keysd_control_checkpassphrase_NOTNEED #define bo_keysd_control_quit_NOTNEED #define bo_keysd_control_debug_NOTNEED #define bo_keysd_control_debugfile_NOTNEED #define bo_keysd_control_runstatus_NOTNEED #include "proto/bo-keysd_control.protoch" struct PUBLISH_MESSAGE{ string user; string name; unsigned userid; unsigned dirid; unsigned fileid; string modified; PUBLISH_MESSAGE(const char *_user, unsigned _userid, unsigned _dirid, unsigned _fileid, const char *_modified, const char *_name){ user = _user; userid = _userid; dirid = _dirid; fileid = _fileid; modified = _modified; name = _name; } PUBLISH_MESSAGE(){ userid = (unsigned)-1; dirid = (unsigned)-1; fileid = (unsigned)-1; } }; static int publishd_remotelogin ( CONNECT_HTTP_INFO &hcon, PARAM_STRING remote_user, const char *keysdsock, unsigned userid, string &sessionid) { glocal int ret = -1; glocal string *sessionid = &sessionid; glocal CONNECT_HTTP_INFO *hcon = &hcon; glocal const char *keysdsock = keysdsock; glocal const char *remote_user = remote_user.ptr; glocal unsigned userid = userid; (hcon,remote_user); if (!success){ tlmp_error ("remotelogin failed for %s: %s\n" ,glocal.remote_user,msg); }else{ CONNECT_INFO con; con.port = glocal.keysdsock; glocal string sign; *glocal.sessionid = sessionid; (con,glocal.userid,BOB_TYPE(sessionid,strlen(sessionid),false)); if (status == ERR_CODE_NONE) glocal.sign = sign; // tlmp_error ("sign=%s\n",glocal.sign.c_str()); if (glocal.sign.size() > 0){ (*glocal.hcon,sessionid,glocal.remote_user,glocal.sign); if (success) glocal.ret = 0; } } return glocal.ret; } struct STATS{ unsigned nbcopied; unsigned nbfailed; unsigned nbtried; STATS(){ nbcopied = nbfailed = nbtried = 0; } }; static void publishd_doone ( _F_TCPSERVER_V1 &c, pid_t &pid, deque &messages, NSQL &sq, const char *our_node, const char *keysdsock) { glocal NSQL *sq = &sq; glocal const char *our_node = our_node; glocal const char *keysdsock = keysdsock; if (messages.size() > 0){ int tb[2]; if (pipe(tb)==-1){ tlmp_error ("Can't setup pipe (%s)\n",strerror(errno)); }else{ glocal STATS stats; glocal PUBLISH_MESSAGE todo = messages.front(); messages.pop_front(); pid = fork(); if (pid == (pid_t)0){ // Do the job close (tb[0]); glocal BOB_TYPE content; glocal string sign; glocal FILE *fin = NULL; glocal bool data_loaded=false; glocal bool some_errors = false; (*glocal.sq,"select nodename from interests_remote where userid=%u",glocal.todo.userid); //tlmp_error ("nodename=%s\n",row[0]); glocal CONNECT_HTTP_INFO hcon; // Load the data only once since it is copied to potentially more than one node if (!glocal.data_loaded){ (*glocal.sq,"select content,signature from files where id=%u and modified='%s'" ,glocal.todo.fileid,glocal.todo.modified.c_str()); if (row[1] != NULL) glocal.sign = row[1]; if (row[0] != NULL){ glocal.content.setbuffer((const void*)row[0],strlen(row[0]),true); }else{ // The content is in a file string fname = fs_createpath(glocal.todo.fileid,glocal.todo.modified); glocal.fin = fopen (fname.c_str(),"r"); if (glocal.fin == NULL){ tlmp_error ("Can't open content file %s (%s)\n",fname.c_str(),strerror(errno)); glocal.some_errors = true; } } tlmp_error ("No content for message: user=%s dirid=%u userid=%u fileid=%u modified=%s\n" ,glocal.todo.user.c_str(),glocal.todo.dirid,glocal.todo.userid ,glocal.todo.fileid,glocal.todo.modified.c_str()); glocal.some_errors = true; glocal.data_loaded = true; } if (!glocal.some_errors && glocal.hcon.init (row[0]) != -1){ glocal string remote_user = string_f("%s@%s",glocal.todo.user.c_str(),glocal.our_node); glocal string sessionid; glocal.stats.nbtried++; if (publishd_remotelogin(glocal.hcon,glocal.remote_user,glocal.keysdsock,glocal.todo.userid,glocal.sessionid)!=-1){ glocal bool error = false; static vector empty; if (glocal.fin == NULL){ (glocal.hcon,glocal.sessionid,"",empty,"public",glocal.remote_user ,glocal.content,false,glocal.todo.name,glocal.sign,""); if (!success){ tlmp_error ("message to %s internal=%d success=%d msg=%s\n" ,glocal.remote_user.c_str(),internal_error,success,msg); glocal.error = true; } }else{ // The content is in a file glocal string handle; char buf[REQ_CONTENT_CHUNK]; int n; bool more = false; rewind(glocal.fin); while (!glocal.error && (n=fread(buf,1,sizeof(buf),glocal.fin))>0){ more = n==sizeof(buf); BOB_TYPE content((const void*)buf,n,false); if (glocal.handle.size() == 0){ (glocal.hcon,glocal.sessionid,"",empty,"public",glocal.remote_user ,content,more,glocal.todo.name,glocal.sign,""); glocal.handle = handle; if(!success){ glocal.error = true; tlmp_error ("copying to %s internal=%d success=%d msg=%s\n" ,glocal.remote_user.c_str(),internal_error,success,msg); } }else{ (glocal.hcon,glocal.sessionid,glocal.handle,content,more); if(!success){ glocal.error = true; tlmp_error ("appending to %s internal=%d success=%d msg=%s\n" ,glocal.remote_user.c_str(),internal_error,success,msg); } } } if (more){ BOB_TYPE content((const void*)"",0,false); (glocal.hcon,glocal.sessionid,glocal.handle,content,false); //tlmp_error ("internal=%d success=%d msg=%s\n",internal_error,success,msg); } } if (glocal.error){ glocal.stats.nbfailed++; }else{ glocal.stats.nbcopied++; } (glocal.hcon,glocal.sessionid); } } if (glocal.some_errors) end = true; if (glocal.fin != NULL) fclose (glocal.fin); write (tb[1],&glocal.stats,sizeof(glocal.stats)); _exit (0); }else if (pid == (pid_t)-1){ tlmp_error ("Can't fork (%s)\n",strerror(errno)); }else{ HANDLE_INFO *n = new HANDLE_INFO; n->type = TYPE_WORKER; c.inject (tb[0],n); c.setrawmode(tb[0],true); close (tb[1]); } } } } int main (int argc, char *argv[]) { glocal int ret = -1; glocal const char *data_dbserv = "localhost"; glocal const char *data_dbname = "files"; glocal const char *data_dbuser = NULL; glocal const char *control = "/var/run/publishd.sock"; glocal const char *clientsock = "/tmp/publishd_client.sock"; glocal const char *user = "bolixo"; glocal bool daemon = false; glocal const char *admin_secretfile = "/etc/bolixo/secrets.admin"; glocal const char *pidfile = "/var/run/publishd.pid"; glocal const char *hostname = NULL; glocal const char *keysdsock = "/dev/keysd.sock"; static const char *tbdic[]={"bolixo","tlmpsql",NULL}; glocal.ret = (argc,argv,tbdic); setproginfo ("publishd",VERSION,"Spread user messages to other bolixo nodes"); setgrouparg ("Networking"); setarg ('c',"control","Unix socket for publishd-control",glocal.control,false); setarg ('C',"clientsock","Unix socket for publishd-client",glocal.clientsock,false); setgrouparg ("Database"); setarg (' ',"dbserv","Database server",glocal.data_dbserv,false); setarg (' ',"dbname","Database name",glocal.data_dbname,false); setarg (' ',"dbuser","Database user",glocal.data_dbuser,true); setgrouparg ("Misc."); setarg (' ',"hostname",MSG_U(O_HOSTNAME,"Host name"),glocal.hostname,true); setarg (' ',"keysdsock","Port to reach the bo-keysd server",glocal.keysdsock,false); setarg (' ',"admin_secrets",MSG_R(O_ADMINSECRETS),glocal.admin_secretfile,false); setarg (' ',"user","Run the program as this user",glocal.user,false); setarg (' ',"daemon","Run in background",glocal.daemon,false); setarg (' ',"pidfile","File holding the PID of the process",glocal.pidfile,false); if (glocal.daemon){ syslog (LOG_ERR,"%s",msg); }else{ fprintf (stderr,"%s",msg); } if (glocal.daemon){ syslog (LOG_WARNING,"%s",msg); }else{ fprintf (stderr,"%s",msg); } int ret = -1; glocal STATS stats; glocal unsigned messages_sent = 0; glocal string controlport = string_f("unix:%s",glocal.control); glocal string clientport = string_f("unix:%s",glocal.clientsock); glocal deque messages; glocal map admin_secrets; glocal pid_t pid = (pid_t)-1; glocal NSQL *sub_sq = NULL; const char *passwd = getenv("PUBLISHD_PWD"); if (passwd == NULL){ tlmp_error ("Can't get database password from environment, aborting\n"); exit (-1); } fdpass_readsecrets (glocal.admin_secretfile,glocal.admin_secrets); query_setdefaultdb (glocal.data_dbserv,glocal.data_dbname,glocal.data_dbuser,passwd); query_getdefaultdb()->showerrormode(true); // Connection for the sub-process NSQL sub_sq (glocal.data_dbserv,glocal.data_dbname,glocal.data_dbuser,passwd); sub_sq.showerrormode(true); glocal.sub_sq = &sub_sq; signal (SIGCHLD,SIG_IGN); (glocal.clientport,5); HANDLE_INFO *n = new HANDLE_INFO; info.data = n; // tlmp_error ("port=%s control=%s client=%s\n",info.port,glocal.controlport.c_str(),glocal.clientport.c_str()); if (string_cmp(info.port,glocal.controlport)==0){ n->type = TYPE_CONTROL; }else if (string_cmp(info.port,glocal.clientport)==0){ n->req.secret = fdpass_findsecret (glocal.admin_secrets,info.port); n->type = TYPE_CLIENT; } HANDLE_INFO *n = (HANDLE_INFO*)info.data; if (n->type == TYPE_WORKER){ glocal.pid = (pid_t)-1; publishd_doone(*this,glocal.pid,glocal.messages,*glocal.sub_sq,glocal.hostname,glocal.keysdsock); } debug_printf (D_PROTO,"receive line: %s\n",line); HANDLE_INFO *c = (HANDLE_INFO*)info.data; static const char *tbtype[]={"none","control request","client request", "worker request"}; ERROR_PREFIX prefix ("%s: ",tbtype[c->type]); if (c->type == TYPE_CONTROL){ (this,c->req,line, info.linelen,endserver, endclient, no,c); vector tb; tb.push_back(string_f ("Version %s",VERSION)); tb.push_back(string_f("messages size: %lu",glocal.messages.size())); tb.push_back(string_f("sub-process: %s",glocal.pid == (pid_t)-1 ? "not running" : "running")); tb.push_back(string_f("hostname: %s",glocal.hostname)); tb.push_back(string_f("messages_sent: %u",glocal.messages_sent)); tb.push_back(string_f("stats: nbtried %u nbcopied %u nbfailed %u" ,glocal.stats.nbtried,glocal.stats.nbcopied,glocal.stats.nbfailed)); instrument_status(tb); rep_status(tb); toggle_instrument_file(on); endserver = true; if (on){ debug_seton(); }else{ debug_setoff(); } debug_setfdebug (filename); // connectto port send = lines:v glocal const char *send = send; glocal vector lines; // We want to test publishd connectivity to the outside (connectto,port,5); sendf ("%s\n",glocal.send); glocal.lines.push_back(line); end = true; glocal.lines.emplace_back(string_f("fail: %s\n",strerror(errno))); rep_help_connect (glocal.lines); glocal bool dbfiles=false; glocal bool keysd=false; ("select count(*) from id2name"); glocal.dbfiles=true; CONNECT_INFO con; con.port = glocal.keysdsock; (con); glocal.keysd = lines.size() > 0; rep_test (glocal.dbfiles,glocal.keysd); tlmp_error ("Invalid command: %s\n",line); endclient = true; }else if (c->type == TYPE_WORKER){ if (info.linelen == sizeof(STATS)){ const STATS *st = (const STATS *)line; glocal.stats.nbtried += st->nbtried; glocal.stats.nbfailed += st->nbfailed; glocal.stats.nbcopied += st->nbcopied; } }else if (c->type == TYPE_CLIENT){ (this,c->req,line,info.linelen, endserver, endclient,no,c); glocal bool dbfiles = false; glocal bool fsok = file_type(testfile)==0; if (glocal.fsok && unlink(testfile)!= -1) glocal.fsok = false; ("select count(*) from id2name"); glocal.dbfiles=true; rep_test (glocal.dbfiles,glocal.fsok); glocal string msg; glocal bool public_view = false; // tlmp_error ("sendmessage userid=%u dirid=%u fileid=%u modified=%s\n",userid,dirid,fileid,modified); ("select public_view from config where userid=%u",userid); glocal.public_view = atoi(row[0]); if (glocal.public_view){ glocal string user; ("select name from id2name where userid=%u",userid); glocal.user = row[0]; if (glocal.user.size()==0){ glocal.msg = string_f("No user for userid=%u",userid); tlmp_error ("No user for userid=%u\n",userid); }else{ glocal.messages_sent++; glocal.messages.emplace_back(glocal.user.c_str(),userid,dirid,fileid,modified,name); if (glocal.pid == (pid_t)-1){ publishd_doone (glocal.TCPSERVER,glocal.pid,glocal.messages ,*glocal.sub_sq,glocal.hostname,glocal.keysdsock); } } } if (glocal.msg.size() > 0){ rep_sendmessage (false,glocal.msg); }else{ rep_sendmessage (true,""); } tlmp_error ("Invalid command: %s\n",line); endclient = true; } bool some_errors = false; if (fdpass_setcontrol(s,glocal.control,glocal.user)==-1){ some_errors = true; } if (!some_errors && s.is_ok()){ chmod (glocal.clientsock,0666); s.setrawmode(true); if (glocal.daemon){ daemon_init(glocal.pidfile,glocal.user); } open_instrument_file(); s.loop(); ret = 0; } return ret; return glocal.ret; }