/*
This file is part of Bolixo.
Bolixo is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Bolixo is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Bolixo. If not, see .
*/
/*
Publish content to other bolixo nodes
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "filesystem.h"
#include "bolixo.h"
#include "bolixo.m"
#define INSTRUMENT_DONOTOPEN
#include "instrument.h"
using namespace std;
static DEBUG_KEY D_PROTO ("proto","Protocol information");
enum CONNECT_TYPE { TYPE_NONE, TYPE_CONTROL, TYPE_CLIENT, TYPE_WORKER};
struct HANDLE_INFO: public ARRAY_OBJ{
CONNECT_TYPE type;
REQUEST_INFO req;
HANDLE_INFO(){
type = TYPE_NONE;
}
};
#include "proto/publishd_control.protoh"
#include "proto/publishd_client.protoh"
#include "proto/bod_client.protodef"
#define webapi_test_NOTNEED
#define webapi_login_NOTNEED
#define webapi_addfile_NOTNEED
#define webapi_addfile_bob_NOTNEED
#define webapi_delfile_NOTNEED
#define webapi_undelete_NOTNEED
#define webapi_modifyfile_NOTNEED
#define webapi_modifyfile_bob_NOTNEED
#define webapi_rename_NOTNEED
#define webapi_copy_NOTNEED
#define webapi_readfile_NOTNEED
#define webapi_readfile_bob_NOTNEED
#define webapi_readmore_NOTNEED
#define webapi_mkdir_NOTNEED
#define webapi_rmdir_NOTNEED
#define webapi_listdir_NOTNEED
#define webapi_stat_NOTNEED
#define webapi_set_access_NOTNEED
#define webapi_markview_NOTNEED
#define webapi_list_inboxes_NOTNEED
#define webapi_list_msgs_NOTNEED
#define webapi_sendmsg_NOTNEED
#define webapi_sendmsg_project_NOTNEED
#define webapi_replymsg_NOTNEED
#define webapi_replymsg_project_NOTNEED
#define webapi_sendattach_NOTNEED
#define webapi_sendtalk_file_NOTNEED
#define webapi_list_talk_NOTNEED
#define webapi_public_listdir_NOTNEED
#define webapi_public_readfile_NOTNEED
#define webapi_public_list_talk_NOTNEED
#define webapi_systempubkey_NOTNEED
#define webapi_verifysign_NOTNEED
#define webapi_getpubkey_NOTNEED
#define webapi_registernode_NOTNEED
#define webapi_remote_interest_set_NOTNEED
#define webapi_remote_interest_unset_NOTNEED
#define webapi_nodelogin_NOTNEED
#define webapi_nodepass_NOTNEED
#define webapi_config_read_NOTNEED
#define webapi_config_write_NOTNEED
#define webapi_contact_manage_NOTNEED
#define webapi_contact_request_NOTNEED
#define webapi_contact_list_NOTNEED
#define webapi_list_contacts_NOTNEED
#define webapi_list_lists_NOTNEED
#define webapi_list_groups_NOTNEED
#include "proto/webapi.protoch"
#define bo_keysd_control_genkey_NOTNEED
#define bo_keysd_control_setpassphrase_NOTNEED
#define bo_keysd_control_checkpassphrase_NOTNEED
#define bo_keysd_control_quit_NOTNEED
#define bo_keysd_control_debug_NOTNEED
#define bo_keysd_control_debugfile_NOTNEED
#define bo_keysd_control_runstatus_NOTNEED
#include "proto/bo-keysd_control.protoch"
struct PUBLISH_MESSAGE{
string user;
string name;
unsigned userid;
unsigned dirid;
unsigned fileid;
string modified;
PUBLISH_MESSAGE(const char *_user, unsigned _userid, unsigned _dirid, unsigned _fileid, const char *_modified, const char *_name){
user = _user;
userid = _userid;
dirid = _dirid;
fileid = _fileid;
modified = _modified;
name = _name;
}
PUBLISH_MESSAGE(){
userid = (unsigned)-1;
dirid = (unsigned)-1;
fileid = (unsigned)-1;
}
};
static int publishd_remotelogin (
CONNECT_HTTP_INFO &hcon,
PARAM_STRING remote_user,
const char *keysdsock,
unsigned userid,
string &sessionid)
{
glocal int ret = -1;
glocal string *sessionid = &sessionid;
glocal CONNECT_HTTP_INFO *hcon = &hcon;
glocal const char *keysdsock = keysdsock;
glocal const char *remote_user = remote_user.ptr;
glocal unsigned userid = userid;
(hcon,remote_user);
if (!success){
tlmp_error ("remotelogin failed for %s: %s\n"
,glocal.remote_user,msg);
}else{
CONNECT_INFO con;
con.port = glocal.keysdsock;
glocal string sign;
*glocal.sessionid = sessionid;
(con,glocal.userid,BOB_TYPE(sessionid,strlen(sessionid),false));
if (status == ERR_CODE_NONE) glocal.sign = sign;
// tlmp_error ("sign=%s\n",glocal.sign.c_str());
if (glocal.sign.size() > 0){
(*glocal.hcon,sessionid,glocal.remote_user,glocal.sign);
if (success) glocal.ret = 0;
}
}
return glocal.ret;
}
struct STATS{
unsigned nbcopied;
unsigned nbfailed;
unsigned nbtried;
STATS(){
nbcopied = nbfailed = nbtried = 0;
}
};
static void publishd_doone (
_F_TCPSERVER_V1 &c,
pid_t &pid,
deque &messages,
NSQL &sq,
const char *our_node,
const char *keysdsock)
{
glocal NSQL *sq = &sq;
glocal const char *our_node = our_node;
glocal const char *keysdsock = keysdsock;
if (messages.size() > 0){
int tb[2];
if (pipe(tb)==-1){
tlmp_error ("Can't setup pipe (%s)\n",strerror(errno));
}else{
glocal STATS stats;
glocal PUBLISH_MESSAGE todo = messages.front();
messages.pop_front();
pid = fork();
if (pid == (pid_t)0){
// Do the job
close (tb[0]);
glocal BOB_TYPE content;
glocal string sign;
glocal FILE *fin = NULL;
glocal bool data_loaded=false;
glocal bool some_errors = false;
(*glocal.sq,"select nodename from interests_remote where userid=%u",glocal.todo.userid);
//tlmp_error ("nodename=%s\n",row[0]);
glocal CONNECT_HTTP_INFO hcon;
// Load the data only once since it is copied to potentially more than one node
if (!glocal.data_loaded){
(*glocal.sq,"select content,signature from files where id=%u and modified='%s'"
,glocal.todo.fileid,glocal.todo.modified.c_str());
if (row[1] != NULL) glocal.sign = row[1];
if (row[0] != NULL){
glocal.content.setbuffer((const void*)row[0],strlen(row[0]),true);
}else{
// The content is in a file
string fname = fs_createpath(glocal.todo.fileid,glocal.todo.modified);
glocal.fin = fopen (fname.c_str(),"r");
if (glocal.fin == NULL){
tlmp_error ("Can't open content file %s (%s)\n",fname.c_str(),strerror(errno));
glocal.some_errors = true;
}
}
tlmp_error ("No content for message: user=%s dirid=%u userid=%u fileid=%u modified=%s\n"
,glocal.todo.user.c_str(),glocal.todo.dirid,glocal.todo.userid
,glocal.todo.fileid,glocal.todo.modified.c_str());
glocal.some_errors = true;
glocal.data_loaded = true;
}
if (!glocal.some_errors && glocal.hcon.init (row[0]) != -1){
glocal string remote_user = string_f("%s@%s",glocal.todo.user.c_str(),glocal.our_node);
glocal string sessionid;
glocal.stats.nbtried++;
if (publishd_remotelogin(glocal.hcon,glocal.remote_user,glocal.keysdsock,glocal.todo.userid,glocal.sessionid)!=-1){
glocal bool error = false;
static vector empty;
if (glocal.fin == NULL){
(glocal.hcon,glocal.sessionid,"",empty,"public",glocal.remote_user
,glocal.content,false,glocal.todo.name,glocal.sign,"");
if (!success){
tlmp_error ("message to %s internal=%d success=%d msg=%s\n"
,glocal.remote_user.c_str(),internal_error,success,msg);
glocal.error = true;
}
}else{
// The content is in a file
glocal string handle;
char buf[REQ_CONTENT_CHUNK];
int n;
bool more = false;
rewind(glocal.fin);
while (!glocal.error && (n=fread(buf,1,sizeof(buf),glocal.fin))>0){
more = n==sizeof(buf);
BOB_TYPE content((const void*)buf,n,false);
if (glocal.handle.size() == 0){
(glocal.hcon,glocal.sessionid,"",empty,"public",glocal.remote_user
,content,more,glocal.todo.name,glocal.sign,"");
glocal.handle = handle;
if(!success){
glocal.error = true;
tlmp_error ("copying to %s internal=%d success=%d msg=%s\n"
,glocal.remote_user.c_str(),internal_error,success,msg);
}
}else{
(glocal.hcon,glocal.sessionid,glocal.handle,content,more);
if(!success){
glocal.error = true;
tlmp_error ("appending to %s internal=%d success=%d msg=%s\n"
,glocal.remote_user.c_str(),internal_error,success,msg);
}
}
}
if (more){
BOB_TYPE content((const void*)"",0,false);
(glocal.hcon,glocal.sessionid,glocal.handle,content,false);
//tlmp_error ("internal=%d success=%d msg=%s\n",internal_error,success,msg);
}
}
if (glocal.error){
glocal.stats.nbfailed++;
}else{
glocal.stats.nbcopied++;
}
(glocal.hcon,glocal.sessionid);
}
}
if (glocal.some_errors) end = true;
if (glocal.fin != NULL) fclose (glocal.fin);
write (tb[1],&glocal.stats,sizeof(glocal.stats));
_exit (0);
}else if (pid == (pid_t)-1){
tlmp_error ("Can't fork (%s)\n",strerror(errno));
}else{
HANDLE_INFO *n = new HANDLE_INFO;
n->type = TYPE_WORKER;
c.inject (tb[0],n);
c.setrawmode(tb[0],true);
close (tb[1]);
}
}
}
}
int main (int argc, char *argv[])
{
glocal int ret = -1;
glocal const char *data_dbserv = "localhost";
glocal const char *data_dbname = "files";
glocal const char *data_dbuser = NULL;
glocal const char *control = "/var/run/publishd.sock";
glocal const char *clientsock = "/tmp/publishd_client.sock";
glocal const char *user = "bolixo";
glocal bool daemon = false;
glocal const char *admin_secretfile = "/etc/bolixo/secrets.admin";
glocal const char *pidfile = "/var/run/publishd.pid";
glocal const char *hostname = NULL;
glocal const char *keysdsock = "/dev/keysd.sock";
static const char *tbdic[]={"bolixo","tlmpsql",NULL};
glocal.ret = (argc,argv,tbdic);
setproginfo ("publishd",VERSION,"Spread user messages to other bolixo nodes");
setgrouparg ("Networking");
setarg ('c',"control","Unix socket for publishd-control",glocal.control,false);
setarg ('C',"clientsock","Unix socket for publishd-client",glocal.clientsock,false);
setgrouparg ("Database");
setarg (' ',"dbserv","Database server",glocal.data_dbserv,false);
setarg (' ',"dbname","Database name",glocal.data_dbname,false);
setarg (' ',"dbuser","Database user",glocal.data_dbuser,true);
setgrouparg ("Misc.");
setarg (' ',"hostname",MSG_U(O_HOSTNAME,"Host name"),glocal.hostname,true);
setarg (' ',"keysdsock","Port to reach the bo-keysd server",glocal.keysdsock,false);
setarg (' ',"admin_secrets",MSG_R(O_ADMINSECRETS),glocal.admin_secretfile,false);
setarg (' ',"user","Run the program as this user",glocal.user,false);
setarg (' ',"daemon","Run in background",glocal.daemon,false);
setarg (' ',"pidfile","File holding the PID of the process",glocal.pidfile,false);
if (glocal.daemon){
syslog (LOG_ERR,"%s",msg);
}else{
fprintf (stderr,"%s",msg);
}
if (glocal.daemon){
syslog (LOG_WARNING,"%s",msg);
}else{
fprintf (stderr,"%s",msg);
}
int ret = -1;
glocal STATS stats;
glocal unsigned messages_sent = 0;
glocal string controlport = string_f("unix:%s",glocal.control);
glocal string clientport = string_f("unix:%s",glocal.clientsock);
glocal deque messages;
glocal map admin_secrets;
glocal pid_t pid = (pid_t)-1;
glocal NSQL *sub_sq = NULL;
const char *passwd = getenv("PUBLISHD_PWD");
if (passwd == NULL){
tlmp_error ("Can't get database password from environment, aborting\n");
exit (-1);
}
fdpass_readsecrets (glocal.admin_secretfile,glocal.admin_secrets);
query_setdefaultdb (glocal.data_dbserv,glocal.data_dbname,glocal.data_dbuser,passwd);
query_getdefaultdb()->showerrormode(true);
// Connection for the sub-process
NSQL sub_sq (glocal.data_dbserv,glocal.data_dbname,glocal.data_dbuser,passwd);
sub_sq.showerrormode(true);
glocal.sub_sq = &sub_sq;
signal (SIGCHLD,SIG_IGN);
(glocal.clientport,5);
HANDLE_INFO *n = new HANDLE_INFO;
info.data = n;
// tlmp_error ("port=%s control=%s client=%s\n",info.port,glocal.controlport.c_str(),glocal.clientport.c_str());
if (string_cmp(info.port,glocal.controlport)==0){
n->type = TYPE_CONTROL;
}else if (string_cmp(info.port,glocal.clientport)==0){
n->req.secret = fdpass_findsecret (glocal.admin_secrets,info.port);
n->type = TYPE_CLIENT;
}
HANDLE_INFO *n = (HANDLE_INFO*)info.data;
if (n->type == TYPE_WORKER){
glocal.pid = (pid_t)-1;
publishd_doone(*this,glocal.pid,glocal.messages,*glocal.sub_sq,glocal.hostname,glocal.keysdsock);
}
debug_printf (D_PROTO,"receive line: %s\n",line);
HANDLE_INFO *c = (HANDLE_INFO*)info.data;
static const char *tbtype[]={"none","control request","client request", "worker request"};
ERROR_PREFIX prefix ("%s: ",tbtype[c->type]);
if (c->type == TYPE_CONTROL){
(this,c->req,line, info.linelen,endserver, endclient, no,c);
vector tb;
tb.push_back(string_f ("Version %s",VERSION));
tb.push_back(string_f("messages size: %lu",glocal.messages.size()));
tb.push_back(string_f("sub-process: %s",glocal.pid == (pid_t)-1 ? "not running" : "running"));
tb.push_back(string_f("hostname: %s",glocal.hostname));
tb.push_back(string_f("messages_sent: %u",glocal.messages_sent));
tb.push_back(string_f("stats: nbtried %u nbcopied %u nbfailed %u"
,glocal.stats.nbtried,glocal.stats.nbcopied,glocal.stats.nbfailed));
instrument_status(tb);
rep_status(tb);
toggle_instrument_file(on);
endserver = true;
if (on){
debug_seton();
}else{
debug_setoff();
}
debug_setfdebug (filename);
// connectto port send = lines:v
glocal const char *send = send;
glocal vector lines;
// We want to test publishd connectivity to the outside
(connectto,port,5);
sendf ("%s\n",glocal.send);
glocal.lines.push_back(line);
end = true;
glocal.lines.emplace_back(string_f("fail: %s\n",strerror(errno)));
rep_help_connect (glocal.lines);
glocal bool dbfiles=false;
glocal bool keysd=false;
("select count(*) from id2name");
glocal.dbfiles=true;
CONNECT_INFO con;
con.port = glocal.keysdsock;
(con);
glocal.keysd = lines.size() > 0;
rep_test (glocal.dbfiles,glocal.keysd);
tlmp_error ("Invalid command: %s\n",line);
endclient = true;
}else if (c->type == TYPE_WORKER){
if (info.linelen == sizeof(STATS)){
const STATS *st = (const STATS *)line;
glocal.stats.nbtried += st->nbtried;
glocal.stats.nbfailed += st->nbfailed;
glocal.stats.nbcopied += st->nbcopied;
}
}else if (c->type == TYPE_CLIENT){
(this,c->req,line,info.linelen, endserver, endclient,no,c);
glocal bool dbfiles = false;
glocal bool fsok = file_type(testfile)==0;
if (glocal.fsok && unlink(testfile)!= -1) glocal.fsok = false;
("select count(*) from id2name");
glocal.dbfiles=true;
rep_test (glocal.dbfiles,glocal.fsok);
glocal string msg;
glocal bool public_view = false;
// tlmp_error ("sendmessage userid=%u dirid=%u fileid=%u modified=%s\n",userid,dirid,fileid,modified);
("select public_view from config where userid=%u",userid);
glocal.public_view = atoi(row[0]);
if (glocal.public_view){
glocal string user;
("select name from id2name where userid=%u",userid);
glocal.user = row[0];
if (glocal.user.size()==0){
glocal.msg = string_f("No user for userid=%u",userid);
tlmp_error ("No user for userid=%u\n",userid);
}else{
glocal.messages_sent++;
glocal.messages.emplace_back(glocal.user.c_str(),userid,dirid,fileid,modified,name);
if (glocal.pid == (pid_t)-1){
publishd_doone (glocal.TCPSERVER,glocal.pid,glocal.messages
,*glocal.sub_sq,glocal.hostname,glocal.keysdsock);
}
}
}
if (glocal.msg.size() > 0){
rep_sendmessage (false,glocal.msg);
}else{
rep_sendmessage (true,"");
}
tlmp_error ("Invalid command: %s\n",line);
endclient = true;
}
bool some_errors = false;
if (fdpass_setcontrol(s,glocal.control,glocal.user)==-1){
some_errors = true;
}
if (!some_errors && s.is_ok()){
chmod (glocal.clientsock,0666);
s.setrawmode(true);
if (glocal.daemon){
daemon_init(glocal.pidfile,glocal.user);
}
open_instrument_file();
s.loop();
ret = 0;
}
return ret;
return glocal.ret;
}