/* This file is part of Bolixo. Bolixo is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Bolixo is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Bolixo. If not, see . */ /* Publish content to other bolixo nodes */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "filesystem.h" #include "bolixo.h" #include "bolixo.m" #define INSTRUMENT_DONOTOPEN #include "instrument.h" using namespace std; static DEBUG_KEY D_PROTO ("proto","Protocol information"); enum CONNECT_TYPE { TYPE_NONE, TYPE_CONTROL, TYPE_CLIENT, TYPE_WORKER}; struct HANDLE_INFO: public ARRAY_OBJ{ CONNECT_TYPE type; REQUEST_INFO req; HANDLE_INFO(){ type = TYPE_NONE; } }; #include "proto/publishd_control.protoh" #include "proto/publishd_client.protoh" #include "proto/bod_client.protodef" #define webapi_test_NOTNEED #define webapi_login_NOTNEED #define webapi_addfile_NOTNEED #define webapi_addfile_bob_NOTNEED #define webapi_delfile_NOTNEED #define webapi_undelete_NOTNEED #define webapi_modifyfile_NOTNEED #define webapi_modifyfile_bob_NOTNEED #define webapi_rename_NOTNEED #define webapi_copy_NOTNEED #define webapi_readfile_NOTNEED #define webapi_readfile_bob_NOTNEED #define webapi_readmore_NOTNEED #define webapi_mkdir_NOTNEED #define webapi_rmdir_NOTNEED #define webapi_listdir_NOTNEED #define webapi_stat_NOTNEED #define webapi_set_access_NOTNEED #define webapi_markview_NOTNEED #define webapi_list_inboxes_NOTNEED #define webapi_list_msgs_NOTNEED #define webapi_sendmsg_NOTNEED #define webapi_sendmsg_project_NOTNEED #define webapi_replymsg_NOTNEED #define webapi_replymsg_project_NOTNEED #define webapi_sendattach_NOTNEED #define webapi_sendtalk_file_NOTNEED #define webapi_list_talk_NOTNEED #define webapi_public_listdir_NOTNEED #define webapi_public_readfile_NOTNEED #define webapi_public_list_talk_NOTNEED #define webapi_systempubkey_NOTNEED #define webapi_verifysign_NOTNEED #define webapi_getpubkey_NOTNEED #define webapi_registernode_NOTNEED #define webapi_remote_interest_set_NOTNEED #define webapi_remote_interest_unset_NOTNEED #define webapi_nodelogin_NOTNEED #define webapi_nodepass_NOTNEED #define webapi_config_read_NOTNEED #define webapi_config_write_NOTNEED #define webapi_contact_manage_NOTNEED #define webapi_contact_request_NOTNEED #define webapi_contact_list_NOTNEED #define webapi_list_contacts_NOTNEED #define webapi_list_lists_NOTNEED #define webapi_list_groups_NOTNEED #define webapi_create_group_NOTNEED #define webapi_delete_group_NOTNEED #define webapi_delete_list_NOTNEED #define webapi_set_member_NOTNEED #define webapi_contact_remove_NOTNEED #define webapi_list_members_NOTNEED #define webapi_create_group_list_NOTNEED #define webapi_set_group_NOTNEED #define webapi_playstep_NOTNEED #define webapi_playstep_more_NOTNEED #include "proto/webapi.protoch" #define bo_keysd_control_genkey_NOTNEED #define bo_keysd_control_setpassphrase_NOTNEED #define bo_keysd_control_checkpassphrase_NOTNEED #define bo_keysd_control_quit_NOTNEED #define bo_keysd_control_debug_NOTNEED #define bo_keysd_control_debugfile_NOTNEED #define bo_keysd_control_runstatus_NOTNEED #include "proto/bo-keysd_control.protoch" static vector usehttp; struct PUBLISH_MESSAGE{ string user; string name; string groupname; unsigned userid; unsigned dirid; unsigned fileid; string modified; string sentfrom; PUBLISH_MESSAGE( const char *_user, unsigned _userid, unsigned _dirid, unsigned _fileid, const char *_modified, const char *_name, const char *_groupname, const char *_sentfrom) :user(_user),name(_name),groupname(_groupname),modified(_modified),sentfrom(_sentfrom){ userid = _userid; dirid = _dirid; fileid = _fileid; } PUBLISH_MESSAGE(){ userid = (unsigned)-1; dirid = (unsigned)-1; fileid = (unsigned)-1; } }; static int publishd_remotelogin ( CONNECT_HTTP_INFO &hcon, PARAM_STRING remote_user, const char *keysdsock, unsigned userid, string &sessionid) { glocal int ret = -1; glocal string *sessionid = &sessionid; glocal CONNECT_HTTP_INFO *hcon = &hcon; glocal const char *keysdsock = keysdsock; glocal const char *remote_user = remote_user.ptr; glocal unsigned userid = userid; (hcon,remote_user); if (!success){ tlmp_error ("remotelogin failed for %s: %s\n" ,glocal.remote_user,msg); }else{ CONNECT_INFO con; con.port = glocal.keysdsock; glocal string sign; *glocal.sessionid = sessionid; (con,glocal.userid,BOB_TYPE(sessionid,strlen(sessionid),false)); if (status == ERR_CODE_NONE) glocal.sign = sign; // tlmp_error ("sign=%s\n",glocal.sign.c_str()); if (glocal.sign.size() > 0){ (*glocal.hcon,sessionid,glocal.remote_user,glocal.sign); if (success) glocal.ret = 0; } } return glocal.ret; } struct STATS{ unsigned nbcopied; unsigned nbfailed; unsigned nbtried; STATS(){ nbcopied = nbfailed = nbtried = 0; } }; static bool nonstrict = false; static void publishd_doone ( _F_TCPSERVER_V1 &c, pid_t &pid, deque &messages, NSQL &sq, const char *our_node, const char *keysdsock) { glocal sq; glocal const char *our_node = our_node; glocal const char *keysdsock = keysdsock; if (messages.size() > 0){ int tb[2]; if (pipe(tb)==-1){ tlmp_error ("Can't setup pipe (%s)\n",strerror(errno)); }else{ glocal STATS stats; glocal PUBLISH_MESSAGE todo = messages.front(); messages.pop_front(); pid = fork(); if (pid == (pid_t)0){ // Do the job close (tb[0]); glocal set nodes; if (glocal.todo.groupname == "public"){ // tlmp_error ("sendmessage userid=%u dirid=%u fileid=%u modified=%s\n",userid,dirid,fileid,modified); (glocal.sq,"select public_view from config where userid=%u",glocal.todo.userid); bool public_view = atoi(row[0]); if (public_view){ // Check the nodes interested in this user public messages (glocal.sq,"select nodename from interests_remote where userid=%u",glocal.todo.userid); glocal.nodes.insert(row[0]); } } // Now check if the group has remote members (glocal.sq,"select id2name.name from groups" " join group_members on group_members.groupid = groups.id" " join id2name on id2name.userid=group_members.userid" " where groups.ownerid=%u and groups.name='%s'" ,glocal.todo.userid,glocal.todo.groupname.c_str()); const char *pt = strchr(row[0],'@'); if (pt != nullptr){ const char *htt = "https"; pt++; if(find(usehttp.begin(),usehttp.end(),pt)!=usehttp.end()) htt = "http"; glocal.nodes.insert(string_f("%s://%s",htt,pt)); } if (glocal.todo.sentfrom.size() > 0){ /* sentfrom is the address of the sender. From this address, we extract the server. No need to resend the message to the server which generate it. So we remove it from glocal.nodes here. */ const char *pt = strchr(glocal.todo.sentfrom.c_str(),'@'); if (pt != nullptr){ pt++; auto f = glocal.nodes.find(string_f("https://%s",pt)); if (f != glocal.nodes.end()){ glocal.nodes.erase(f); }else{ f = glocal.nodes.find(string_f("http://%s",pt)); if (f != glocal.nodes.end()){ glocal.nodes.erase(f); } } } } if (glocal.nodes.size() > 0){ glocal BOB_TYPE content; glocal string sign; glocal string from; glocal FILE *fin = nullptr; glocal bool some_errors = false; (glocal.sq,"select content,signature,id2name.name from files" " join ids on files.id = ids.id" " join id2name on ids.ownerid=id2name.userid" " where files.id=%u and files.modified='%s'" ,glocal.todo.fileid,glocal.todo.modified.c_str()); if (glocal.todo.user != row[2]){ if (strchr(row[2],'@')!=nullptr){ glocal.from = row[2]; }else{ glocal.from = string_f("%s@%s",row[2],glocal.our_node); } } //tlmp_warning ("todo.user=%s row[2]=%s from=%s",glocal.todo.user.c_str(),row[2],glocal.from.c_str()); if (row[1] != NULL) glocal.sign = row[1]; if (row[0] != NULL){ glocal.content.setbuffer((const void*)row[0],strlen(row[0]),true); }else{ // The content is in a file string fname = fs_createpath(glocal.todo.fileid,glocal.todo.modified); glocal.fin = fopen (fname.c_str(),"r"); if (glocal.fin == NULL){ tlmp_error ("Can't open content file %s (%s)\n",fname.c_str(),strerror(errno)); glocal.some_errors = true; } } tlmp_error ("No content for message: user=%s dirid=%u userid=%u fileid=%u modified=%s\n" ,glocal.todo.user.c_str(),glocal.todo.dirid,glocal.todo.userid ,glocal.todo.fileid,glocal.todo.modified.c_str()); glocal.some_errors = true; for (auto &node:glocal.nodes){ //tlmp_error ("nodename=%s\n",row[0]); glocal CONNECT_HTTP_INFO hcon; // Load the data only once since it is copied to potentially more than one node if (!glocal.some_errors && glocal.hcon.init (node) != -1){ glocal string remote_user = string_f("%s@%s",glocal.todo.user.c_str(),glocal.our_node); glocal string sessionid; glocal.stats.nbtried++; if (nonstrict) glocal.hcon.setnonstrictmode(); if (publishd_remotelogin(glocal.hcon,glocal.remote_user,glocal.keysdsock,glocal.todo.userid,glocal.sessionid)!=-1){ glocal bool error = false; static vector empty; if (glocal.fin == NULL){ (glocal.hcon,glocal.sessionid,"",empty,glocal.todo.groupname,glocal.remote_user ,glocal.content,false,glocal.todo.name,glocal.sign,glocal.from); if (!success){ tlmp_error ("message to %s internal=%d success=%d msg=%s\n" ,glocal.remote_user.c_str(),internal_error,success,msg); glocal.error = true; } }else{ // The content is in a file glocal string handle; char buf[REQ_CONTENT_CHUNK]; int n; bool more = false; rewind(glocal.fin); while (!glocal.error && (n=fread(buf,1,sizeof(buf),glocal.fin))>0){ more = n==sizeof(buf); BOB_TYPE content((const void*)buf,n,false); if (glocal.handle.size() == 0){ (glocal.hcon,glocal.sessionid,"",empty,glocal.todo.groupname,glocal.remote_user ,content,more,glocal.todo.name,glocal.sign,glocal.from); glocal.handle = handle; if(!success){ glocal.error = true; tlmp_error ("copying to %s internal=%d success=%d msg=%s\n" ,glocal.remote_user.c_str(),internal_error,success,msg); } }else{ (glocal.hcon,glocal.sessionid,glocal.handle,content,more); if(!success){ glocal.error = true; tlmp_error ("appending to %s internal=%d success=%d msg=%s\n" ,glocal.remote_user.c_str(),internal_error,success,msg); } } } if (more){ BOB_TYPE content((const void*)"",0,false); (glocal.hcon,glocal.sessionid,glocal.handle,content,false); //tlmp_error ("internal=%d success=%d msg=%s\n",internal_error,success,msg); } } if (glocal.error){ glocal.stats.nbfailed++; }else{ glocal.stats.nbcopied++; } (glocal.hcon,glocal.sessionid); } } if (glocal.some_errors) break; } if (glocal.fin != nullptr) fclose (glocal.fin); } write (tb[1],&glocal.stats,sizeof(glocal.stats)); _exit (0); }else if (pid == (pid_t)-1){ tlmp_error ("Can't fork (%s)\n",strerror(errno)); }else{ HANDLE_INFO *n = new HANDLE_INFO; n->type = TYPE_WORKER; c.inject (tb[0],n); c.setrawmode(tb[0],true); close (tb[1]); } } } } int main (int argc, char *argv[]) { glocal int ret = -1; glocal const char *data_dbserv = "localhost"; glocal const char *data_dbname = "files"; glocal const char *data_dbuser = NULL; glocal const char *control = "/var/run/publishd.sock"; glocal const char *clientsock = "/tmp/publishd_client.sock"; glocal const char *user = "bolixo"; glocal bool daemon = false; glocal const char *client_secretfile = "/etc/bolixo/secrets.client"; glocal const char *pidfile = "/var/run/publishd.pid"; glocal const char *hostname = NULL; glocal const char *keysdsock = "/dev/keysd.sock"; static const char *tbdic[]={"bolixo","tlmpsql",NULL}; glocal.ret = (argc,argv,tbdic); setproginfo ("publishd",VERSION,"Spread user messages to other bolixo nodes"); setgrouparg ("Networking"); setarg ('c',"control","Unix socket for publishd-control",glocal.control,false); setarg ('C',"clientsock","Unix socket for publishd-client",glocal.clientsock,false); setgrouparg ("Database"); setarg (' ',"dbserv","Database server",glocal.data_dbserv,false); setarg (' ',"dbname","Database name",glocal.data_dbname,false); setarg (' ',"dbuser","Database user",glocal.data_dbuser,true); setgrouparg ("Misc."); setarg (' ',"hostname",MSG_U(O_HOSTNAME,"Host name"),glocal.hostname,true); setarg (' ',"keysdsock","Port to reach the bo-keysd server",glocal.keysdsock,false); setarg (' ',"client_secrets",MSG_R(O_CLIENTSECRETS),glocal.client_secretfile,false); setarg (' ',"user","Run the program as this user",glocal.user,false); setarg (' ',"daemon","Run in background",glocal.daemon,false); setarg (' ',"pidfile","File holding the PID of the process",glocal.pidfile,false); setarg (' ',"nonstrict",MSG_R(O_NONSTRICT),nonstrict,false); setarg (' ',"usehttp",MSG_R(O_USEHTTP),usehttp,false); if (glocal.daemon){ syslog (LOG_ERR,"%s",msg); }else{ fprintf (stderr,"%s",msg); } if (glocal.daemon){ syslog (LOG_WARNING,"%s",msg); }else{ fprintf (stderr,"%s",msg); } int ret = -1; glocal STATS stats; glocal unsigned messages_sent = 0; glocal string controlport = string_f("unix:%s",glocal.control); glocal string clientport = string_f("unix:%s",glocal.clientsock); glocal deque messages; glocal map client_secrets; glocal pid_t pid = (pid_t)-1; glocal NSQL *sub_sq = NULL; const char *passwd = getenv("PUBLISHD_PWD"); if (passwd == NULL){ tlmp_error ("Can't get database password from environment, aborting\n"); exit (-1); } fdpass_readsecrets (glocal.client_secretfile,glocal.client_secrets); query_setdefaultdb (glocal.data_dbserv,glocal.data_dbname,glocal.data_dbuser,passwd); query_getdefaultdb()->showerrormode(true); // Connection for the sub-process // This NSQL object will only connect at first use, in publisd_doone, after the fork. // So all children will have their own database connection. NSQL sub_sq (glocal.data_dbserv,glocal.data_dbname,glocal.data_dbuser,passwd); sub_sq.showerrormode(true); glocal.sub_sq = &sub_sq; signal (SIGCHLD,SIG_IGN); (glocal.clientport,5); HANDLE_INFO *n = new HANDLE_INFO; info.data = n; // tlmp_error ("port=%s control=%s client=%s\n",info.port,glocal.controlport.c_str(),glocal.clientport.c_str()); if (string_cmp(info.port,glocal.controlport)==0){ n->type = TYPE_CONTROL; }else if (string_cmp(info.port,glocal.clientport)==0){ n->req.secret = fdpass_findsecret (glocal.client_secrets,info.port); n->type = TYPE_CLIENT; } HANDLE_INFO *n = (HANDLE_INFO*)info.data; if (n->type == TYPE_WORKER){ glocal.pid = (pid_t)-1; publishd_doone(*this,glocal.pid,glocal.messages,*glocal.sub_sq,glocal.hostname,glocal.keysdsock); } HANDLE_INFO *c = (HANDLE_INFO*)info.data; static const char *tbtype[]={"none","control request","client request", "worker request"}; debug_printf (D_PROTO,"receive line-%s: %s\n",tbtype[c->type],line); ERROR_PREFIX prefix ("%s: ",tbtype[c->type]); if (c->type == TYPE_CONTROL){ (this,c->req,line, info.linelen,endserver, endclient, no,c); vector tb; tb.push_back(string_f ("Version %s",VERSION)); tb.push_back(string_f("messages size: %zu",glocal.messages.size())); tb.push_back(string_f("sub-process: %s",glocal.pid == (pid_t)-1 ? "not running" : "running")); tb.push_back(string_f("hostname: %s",glocal.hostname)); tb.push_back(string_f("messages_sent: %u",glocal.messages_sent)); tb.push_back(string_f("stats: nbtried %u nbcopied %u nbfailed %u" ,glocal.stats.nbtried,glocal.stats.nbcopied,glocal.stats.nbfailed)); tb.push_back(string_f("nonstrict: %d",nonstrict)); instrument_status(tb); rep_status(tb); toggle_instrument_file(on); endserver = true; if (on){ debug_seton(); }else{ debug_setoff(); } debug_setfdebug (filename); // connectto port send = lines:v glocal const char *send = send; glocal vector lines; // We want to test publishd connectivity to the outside (connectto,port,5); sendf ("%s\n",glocal.send); glocal.lines.push_back(line); end = true; glocal.lines.emplace_back(string_f("fail: %s\n",strerror(errno))); rep_help_connect (glocal.lines); glocal bool dbfiles=false; glocal bool keysd=false; ("select count(*) from id2name"); glocal.dbfiles=true; CONNECT_INFO con; con.port = glocal.keysdsock; (con); glocal.keysd = lines.size() > 0; rep_test (glocal.dbfiles,glocal.keysd); tlmp_error ("Invalid command: %s\n",line); endclient = true; }else if (c->type == TYPE_WORKER){ if (info.linelen == sizeof(STATS)){ const STATS *st = (const STATS *)line; glocal.stats.nbtried += st->nbtried; glocal.stats.nbfailed += st->nbfailed; glocal.stats.nbcopied += st->nbcopied; } }else if (c->type == TYPE_CLIENT){ (this,c->req,line,info.linelen, endserver, endclient,no,c); glocal bool dbfiles = false; glocal bool fsok = file_type(testfile)==0; if (glocal.fsok && unlink(testfile)!= -1) glocal.fsok = false; ("select count(*) from id2name"); glocal.dbfiles=true; rep_test (glocal.dbfiles,glocal.fsok); glocal string msg; glocal string user; ("select name from id2name where userid=%u",userid); glocal.user = row[0]; if (glocal.user.size()==0){ glocal.msg = string_f("No user for userid=%u",userid); tlmp_error ("No user for userid=%u\n",userid); }else{ glocal.messages_sent++; glocal.messages.emplace_back(glocal.user.c_str(),userid,dirid,fileid,modified,name,groupname,sentfrom); if (glocal.pid == (pid_t)-1){ publishd_doone (glocal.TCPSERVER,glocal.pid,glocal.messages ,*glocal.sub_sq,glocal.hostname,glocal.keysdsock); } } if (glocal.msg.size() > 0){ rep_sendmessage (false,glocal.msg); }else{ rep_sendmessage (true,""); } tlmp_error ("Invalid command: %s\n",line); endclient = true; } bool some_errors = false; if (fdpass_setcontrol(s,glocal.control,glocal.user)==-1){ some_errors = true; } if (!some_errors && s.is_ok()){ chmod (glocal.clientsock,0666); s.setrawmode(true); if (glocal.daemon){ daemon_init(glocal.pidfile,glocal.user); } s.loop(); ret = 0; } return ret; return glocal.ret; }