/*
This file is part of Bolixo.
Bolixo is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Bolixo is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Bolixo. If not, see .
*/
/*
Listen on a socket
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define INSTRUMENT_DONOTOPEN
#include "instrument.h"
#include "bolixo.h"
#include "documentd_req.h"
#include "filesystem.h"
using namespace std;
enum CONNECT_TYPE { TYPE_NONE, TYPE_CONTROL, TYPE_CLIENT, TYPE_WORKER};
struct HANDLE_INFO: public ARRAY_OBJ{
REQUEST_INFO req;
CONNECT_TYPE type = TYPE_NONE;
};
#define bo_websocket_control_rep_pause_NEEDED
#define bo_websocket_control_rep_resume_NEEDED
#define bo_websocket_control_rep_test_NEEDED
#include "proto/bo-websocket_control.protoh"
#define bo_sessiond_client_getsessioninfo_NOTNEED
#define bo_sessiond_client_getsessioninfovars_NOTNEED
#define bo_sessiond_client_delnotify_NOTNEED
#define bo_sessiond_client_test_NOTNEED
#define bo_sessiond_client_setvar_NOTNEED
#define bo_sessiond_client_setnotify_NOTNEED
#define bo_sessiond_client_waitevent_ASYNC
#include "proto/bo-sessiond_client.protoch"
#define bod_client_login_NOTNEED
#define bod_client_logout_NOTNEED
#define bod_client_adduser_NOTNEED
#define bod_client_confirmuser_NOTNEED
#define bod_client_deleteuser_NOTNEED
#define bod_client_confirmdelete_NOTNEED
#define bod_client_createsession_NOTNEED
#define bod_client_addfile_NOTNEED
#define bod_client_addfile_bob_NOTNEED
#define bod_client_appendfile_NOTNEED
#define bod_client_delfile_NOTNEED
#define bod_client_undelete_NOTNEED
#define bod_client_modifyfile_NOTNEED
#define bod_client_modifyfile_bob_NOTNEED
#define bod_client_rename_NOTNEED
#define bod_client_copy_NOTNEED
#define bod_client_readfile_NOTNEED
#define bod_client_readfile_bob_NOTNEED
#define bod_client_readmore_NOTNEED
#define bod_client_mkdir_NOTNEED
#define bod_client_rmdir_NOTNEED
#define bod_client_listdir_NOTNEED
#define bod_client_stat_NOTNEED
#define bod_client_set_access_NOTNEED
#define bod_client_markview_NOTNEED
#define bod_client_create_group_list_NOTNEED
#define bod_client_create_group_NOTNEED
#define bod_client_set_group_NOTNEED
#define bod_client_set_member_NOTNEED
#define bod_client_set_members_NOTNEED
#define bod_client_set_list_desc_NOTNEED
#define bod_client_set_group_desc_NOTNEED
#define bod_client_delete_list_NOTNEED
#define bod_client_delete_group_NOTNEED
#define bod_client_list_lists_NOTNEED
#define bod_client_list_groups_NOTNEED
#define bod_client_create_project_dir_NOTNEED
#define bod_client_list_contacts_NOTNEED
#define bod_client_list_inboxes_NOTNEED
#define bod_client_list_msgs_NOTNEED
#define bod_client_sendmsg_NOTNEED
#define bod_client_sendmsg_project_NOTNEED
#define bod_client_replymsg_NOTNEED
#define bod_client_replymsg_project_NOTNEED
#define bod_client_sendattach_NOTNEED
#define bod_client_verifysign_NOTNEED
#define bod_client_getpubkey_NOTNEED
#define bod_client_registernode_NOTNEED
#define bod_client_remotelogin_NOTNEED
#define bod_client_remotepass_NOTNEED
#define bod_client_remote_interest_set_NOTNEED
#define bod_client_remote_interest_unset_NOTNEED
#define bod_client_nodelogin_NOTNEED
#define bod_client_nodepass_NOTNEED
#define bod_client_sendtalk_anon_NOTNEED
#define bod_client_sendtalk_NOTNEED
#define bod_client_sendtalk_file_NOTNEED
#define bod_client_list_talk_NOTNEED
#define bod_client_contact_request_NOTNEED
#define bod_client_contact_manage_NOTNEED
#define bod_client_contact_list_NOTNEED
#define bod_client_contact_remove_NOTNEED
#define bod_client_config_read_NOTNEED
#define bod_client_config_write_NOTNEED
#define bod_client_set_notification_NOTNEED
#define bod_client_get_notification_NOTNEED
#define bod_client_public_checkuser_NOTNEED
#define bod_client_public_listdir_NOTNEED
#define bod_client_public_readfile_NOTNEED
#define bod_client_public_list_talk_NOTNEED
#define bod_client_form_savevar_NOTNEED
#define bod_client_form_readvar_NOTNEED
#define bod_client_form_deletevar_NOTNEED
#define bod_client_form_deleteall_NOTNEED
#define bod_client_interest_set_NOTNEED
#define bod_client_interest_unset_NOTNEED
#define bod_client_interest_list_NOTNEED
#define bod_client_interest_check_NOTNEED
#define bod_client_systempubkey_NOTNEED
#define bod_client_systemsign_NOTNEED
#define bod_client_info_read_NOTNEED
#define bod_client_info_write_NOTNEED
#define bod_client_waitevent_ASYNC
#define bod_client_list_members_NOTNEED
#include "proto/bod_client.protoch"
/*
Read the secret used to communicate with internal service
*/
string util_readsecret()
{
glocal string ret;
("/etc/secret",true);
glocal.ret = line;
return 0;
return glocal.ret;
}
enum class WORKER_COMMAND{
REQ_NONE,
REQ_END,
REQ_PAUSE, // The worker disconnect from sessiond and bod and stop processing user request
REQ_RESUME, // Allow processing again
REQ_TEST // Check if the worker is working
};
struct WORKER_REQUEST{
WORKER_COMMAND cmd;
};
enum class WORKER_REPLY{
REPLY_OK
};
struct WORKER_RESULT{
WORKER_REPLY ret;
};
static void bo_websocket_send (_F_TCPSERVER_V1 *c, int fd, unsigned opcode, PARAM_STRING msg)
{
long long lenmsg = strlen(msg.ptr);
size_t max_lenframe = 2+8+lenmsg;
size_t lenframe = 0;
char buf[max_lenframe];
buf[0] = 128+opcode;
char *ptbuf=buf+2;
if (lenmsg <= 125){
buf[1] = (char)lenmsg;
ptbuf = buf+2;
lenframe = 2 + lenmsg;
}else if (lenmsg < 65536){
buf[1] = 126;
buf[2] = (lenmsg >> 8) &0xff;
buf[3] = lenmsg & 0xff;
ptbuf = buf+4;
lenframe = 2 + 2 + lenmsg;
}else{
// Len is encoded using 64 bits
buf[1] = 127;
buf[2] = (lenmsg >> 56) &0xff;
buf[3] = (lenmsg >> 48) &0xff;
buf[4] = (lenmsg >> 40) &0xff;
buf[5] = (lenmsg >> 32) &0xff;
buf[6] = (lenmsg >> 24) &0xff;
buf[7] = (lenmsg >> 16) &0xff;
buf[8] = (lenmsg >> 8) &0xff;
buf[9] = lenmsg & 0xff;
ptbuf = buf+10;
lenframe = 2 + 8 + lenmsg;
}
memcpy (ptbuf,msg.ptr,lenmsg);
c->sendto (fd,buf,lenframe);
}
// Send a ping to the sessiond server so it knows this session is active
static void bo_websocket_sessiond_ping (CONNECT_INFO &con, PARAM_STRING session, bool activity)
{
(con,session,activity);
if (!success) tlmp_error ("bo_sessiond_client_ping: internal_error=%d success=false",internal_error);
}
#define _TLMP_bo_websocket
struct _F_bo_websocket{
_F_TCPSERVER_V1 *tcp=nullptr;
int fd = -1;
void send (PARAM_STRING msg){
bo_websocket_send (tcp,fd,1,msg);
}
#define _F_bo_websocket_commands(name) void name commands(const vector &cmds, CONNECT_INFO &con_bod, CONNECT_INFO &con_sessiond, \
PARAM_STRING connectid, PARAM_STRING session, PARAM_STRING gameid, const DOC_UI_SPECS &sp, bool &endserver)
virtual _F_bo_websocket_commands( )=0;
#define _F_bo_websocket_waitnote(name) void name waitnote(CONNECT_INFO &con_sessiond, PARAM_STRING session, PARAM_STRING gameid, \
int &sequence, const set &project_notifications, bool gamewaiting, bool &endserver)
virtual _F_bo_websocket_waitnote( )=0;
#define _F_bo_websocket_waitnote_call(name) int name waitnote_call(CONNECT_INFO &con_sessiond, const char *session, int &sequence, bool &endserver)
virtual _F_bo_websocket_waitnote_call( )=0;
#define _F_bo_websocket_waitevent_call(name) int name waitevent_call(CONNECT_INFO &con_waitevent, PARAM_STRING connectid, PARAM_STRING session, \
PARAM_STRING gameid, const DOC_UI_SPECS &sp, int &gamesequence, bool &endserver)
virtual _F_bo_websocket_waitevent_call( )=0;
#define _F_bo_websocket_waitevent(name) void name waitevent(CONNECT_INFO &con_waitevent, const char *session, const char *gameid, int &gamesequence, bool &websocket, bool &endserver, bool &fail)
virtual _F_bo_websocket_waitevent( )=0;
};
/*
Extract the list of tabs in section projects.
The strings will be formatted as a notification id.
*/
static void webtab_init (const vector &vals, set ¬ifications)
{
for (auto &s:vals){
const char *pt;
if (is_start_any_of(s.sname,pt,"Projects:2:")){
string note = string_f("Projects:%s",pt);
//tlmp_warning ("add notifications: %s",note.c_str());
notifications.insert(note);
}
}
}
static bool debug_header = false;
static unsigned idle_delay=45; // see set_idle_timeout below
static void bo_websocket (_F_bo_websocket &c, int fd, int waitfd, CONNECT_INFO &con_bod, CONNECT_INFO &con_sessiond, CONNECT_INFO &con_waitevent, bool paused)
{
glocal string connectid = fs_makeid();
glocal int waitfd = waitfd;
glocal int fd = fd;
glocal c;
glocal con_bod;
glocal con_sessiond;
glocal CONNECT_INFO con_sessiond_ping = con_sessiond; // We use a separate connection for ping
// because con_sessiond is used for asynchronous waitevent
glocal con_waitevent;
glocal STREAMP_BUF buf;
glocal bool header_seen = false;
glocal string acceptkey;
glocal string session;
glocal vector messages;
glocal unsigned nbping = 0; // number of ping sent without an answer
glocal time_t lastactivity = time(nullptr);
glocal string gameid;
glocal DOC_UI_SPECS sp;
glocal int sequence = -1;
glocal int sessionfd = -1;
glocal int gamesequence = -1;
glocal int waiteventfd = -1;
glocal bool websocket = false; // We connect to a remote websocket through bod.
glocal STREAMP_BUF waiteventbuf; // In websocket mode, handling of waiteventfd
glocal set project_notifications;
glocal bool end_in_idle=false; // When something is broken in a connection, but we do not want to hang up
// too quickly, we set this to true.
// In the idle functag, the connection will end.
glocal string clientip; // Client IP number, for error logs
c.fd = fd;
();
tlmp_warning ("endserver endclient %s",glocal.clientip.c_str());
endserver = true;
if (glocal.end_in_idle){
tlmp_warning ("endserver end_in_idle: %s",glocal.clientip.c_str());
endserver = true;
}else{
unsigned diff = time(nullptr) - glocal.lastactivity;
debug_printf ("idle diff=%u nbping=%d\n",diff,glocal.nbping);
if (diff > idle_delay){
if (glocal.nbping > 2){
tlmp_warning ("endserver nbping > 2: %s",glocal.clientip.c_str());
endserver = true;
}
debug_printf ("idle since=%d diff=%u ping sent nbping=%u gameid=%s\n",since,diff,glocal.nbping,glocal.gameid.c_str());
bo_websocket_send (this,glocal.fd,9,"hello");
glocal.nbping++;
bo_websocket_sessiond_ping (glocal.con_sessiond_ping,glocal.session,false);
}
}
glocal endserver;
glocal line;
glocal info;
glocal.c.tcp = this;
// tlmp_error ("receive %d no=%d fd=%d waitfd=%d sessionfd=%d waiteventfd=%d",info.linelen,no,glocal.fd,glocal.waitfd,glocal.sessionfd,glocal.waiteventfd);
if (0) {
("/tmp/log",true);
fprintf(fout,"receive [%d] header_seen=%d len=%u %s\n",getpid(),glocal.header_seen,glocal.info.linelen,glocal.line);
return 0;
}
if (no == glocal.waitfd){
if (info.linelen == sizeof(WORKER_REQUEST)){
auto *req = (const WORKER_REQUEST*)line;
if (req->cmd == WORKER_COMMAND::REQ_END){
endserver = true;
}else{
if (req->cmd == WORKER_COMMAND::REQ_PAUSE){
glocal.con_bod.close();
if (glocal.sessionfd != -1){
glocal.con_sessiond.close();
closeclient (glocal.sessionfd);
glocal.sessionfd = -1;
}
if (glocal.waiteventfd != -1){
glocal.con_waitevent.close();
closeclient (glocal.waiteventfd);
glocal.waiteventfd = -1;
}
setlisten (glocal.fd,false);
}else if (req->cmd == WORKER_COMMAND::REQ_RESUME){
if (glocal.sequence != -1){
glocal.c.waitnote_call(glocal.con_sessiond, glocal.session.c_str(),glocal.sequence, endserver);
glocal.sessionfd = glocal.con_sessiond.fd;
glocal.TCPSERVER.inject(glocal.sessionfd,nullptr);
glocal.TCPSERVER.setmonitormode(glocal.sessionfd,true);
}
if (glocal.gamesequence != -1){
glocal.c.waitevent_call(glocal.con_waitevent, glocal.connectid, glocal.session,glocal.gameid
,glocal.sp,glocal.gamesequence, endserver);
glocal.waiteventfd = glocal.con_waitevent.fd;
glocal.TCPSERVER.inject(glocal.waiteventfd,nullptr);
glocal.TCPSERVER.setmonitormode(glocal.waiteventfd,true);
}
setlisten (glocal.fd,true);
}
WORKER_RESULT rep;
rep.ret = WORKER_REPLY::REPLY_OK;
send (&rep,sizeof(rep));
}
}else{
tlmp_error ("Invalid packet size for worker request: %s",glocal.clientip.c_str());
}
}else if (no == glocal.sessionfd){
glocal.c.waitnote(glocal.con_sessiond, glocal.session,glocal.gameid,glocal.sequence, glocal.project_notifications
,glocal.gamesequence!=-1,endserver);
// We are always calling back sessiond to wait for more events.
if (!glocal.endserver) glocal.c.waitnote_call(glocal.con_sessiond, glocal.session.c_str(),glocal.sequence, endserver);
}else if (no == glocal.waiteventfd){
if (glocal.websocket){
// In this mode, we receive websocket packets. We can't copy them to the caller directly
// because bo-websocket (this program) is also sending its own packet (for notifications, obtained from bo-sessiond).
// So our own packets may end up in the middle of a received packet.
// So we must accumulate a full packet and then write it.
char buf[100*1024];
int ret = read(no,buf,sizeof(buf));
if (ret <= 0){
tlmp_error ("endserver ret <= 0: %s",glocal.clientip.c_str());
endserver = true;
}else{
glocal.lastactivity = time(nullptr);
(glocal.waiteventbuf,buf,ret);
int ret = 0;
// We only check if we have a complete websocket packet. Once we do, we write it to the client.
const unsigned char *line = (const unsigned char *)buf;
if (len > 4){
bool fin = (line[0] &0x80) != 0;
if (!fin){
tlmp_error ("fin != %d: %s",fin,glocal.clientip.c_str());
glocal.endserver = true;
return 0;
}
unsigned opcode = line[0] &0xf;
if (opcode == 8){ // Close
tlmp_warning ("endserver opcode 8");
glocal.endserver = true;
}else{
bool mask = (line[1]&0x80)!=0;
unsigned len1 = line[1] &0x7f;
unsigned data_len = len1;
int frame_len = 2 + len1;
if (len1 == 126){
data_len = (line[2]<<8)+line[3];
frame_len = 2 + 2 + data_len;
}
if (mask){
frame_len += 4;
}
if (frame_len <= len){
ret = frame_len;
//tlmp_warning ("write websocket packet to client ret = %d",ret);
write (glocal.fd,line,ret);
}
}
}
return ret;
}
}else{
bool fail = false;
do{
glocal.c.waitevent(glocal.con_waitevent, glocal.session.c_str(),glocal.gameid.c_str(),glocal.gamesequence, glocal.websocket,endserver,fail);
if (fail){
closeclient (no);
glocal.waiteventfd = -1;
break;
}else if (!endserver && !glocal.websocket){
// This code (commented) is obsolete. A single waitevent is needed and then
// we get events. bod and documentd only need one call to waitevent to get going.
// sessiond (waitnote) does not work like this and should be updated.
// We are always calling back bod to wait for more events.
// If it failed, there is no point to call again. When the user will realise something is odd
// he will request a refresh and connection will be established again.
//glocal.c.waitevent_call(glocal.con_waitevent, glocal.session,glocal.gameid
// ,glocal.sp,glocal.gamesequence, endserver);
}
}while (!endserver && !glocal.websocket && glocal.con_waitevent.has_more());
}
}else if (glocal.websocket){
//tlmp_warning ("write websocket packet to server linelen=%d",info.linelen);
glocal.lastactivity = time(nullptr);
bo_websocket_sessiond_ping (glocal.con_sessiond_ping,glocal.session,true);
glocal.nbping=0; // in websocket mode, we pass everything to the remote websocket server
// through the waitevent bod connection.
// So we just assume any activity is a pong answer
write (glocal.waiteventfd,line,info.linelen);
}else if (!glocal.header_seen){
const char *pt;
if (debug_header) tlmp_warning ("header: %s",line);
if (is_start_any_ofnc(line,pt,"Sec-WebSocket-Key:")){
pt = str_skip(pt);
string webkey = pt;
string tmp = webkey + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
unsigned char* sha_str = SHA1(reinterpret_cast(tmp.c_str()), tmp.length(), nullptr);
glocal.acceptkey = base64_encode(reinterpret_cast(sha_str), 20);
// tlmp_warning ("WEBKEY=%s acceptkey=%s\n",webkey.c_str(),glocal.acceptkey.c_str());
}else if (is_start_any_ofnc(line,pt,"Cookie:")){
vector cookies;
str_splitline(pt,' ',cookies);
if (cookies.size() > 0){
for (auto &c:cookies){
const char *val;
if (c.size() > 0 && c[c.size()-1] == ';') c.resize(c.size()-1);
if (is_start_any_of(c,val,"session=")){
glocal.session = val;
break;
}
}
if (glocal.session.size() == 0){
tlmp_warning ("No session in cookie");
glocal.end_in_idle = true;
// This is odd. For some reason, we are receiving tons of those from the same IP
// We will keep the session opened and close it in the idle functag.
}else{
(glocal.con_sessiond,glocal.session,false);
if (internal_error || !success){
tlmp_warning ("Invalid session");
glocal.end_in_idle = true;
}else if (name[0] == '\0'){
tlmp_warning ("Valid session, anonymous");
glocal.end_in_idle = true;
}else{
for (auto &var:vars){
if (strcmp(var.name,"webtabs")==0){
// We extract the list of tabs in the session. We will use this to
// decide if we send notification for documents. Se below
webtab_init (var.vals,glocal.project_notifications);
break;
}
}
// tlmp_warning ("Valid session");
}
}
}else{
tlmp_warning ("No cookie, ending");
glocal.end_in_idle = true;
}
}else if (is_start_any_ofnc(line,pt,"X-Forwarded-For:")){
glocal.clientip = str_skip(pt);
}else if (line[0] == '\0'){
if (!glocal.end_in_idle){
if (glocal.session.size() == 0){
tlmp_warning ("No session received");
glocal.end_in_idle = true;
// This is odd. For some reason, we are receiving tons of those from the same IP
}else{
// tlmp_warning ("session=%s",glocal.session.c_str());
glocal.header_seen = true;
sendf ("HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Accept: %s\r\n"
"\r\n"
,glocal.acceptkey.c_str());
setrawmode(true);
}
}
}
}else if (!glocal.end_in_idle){
glocal.lastactivity = time(nullptr);
(glocal.buf,line,info.linelen);
int ret = 0;
const unsigned char *line = (const unsigned char *)buf;
if (len > 4){
bool fin = (line[0] &0x80) != 0;
if (!fin){
tlmp_error ("fin != %d: %s",fin,glocal.clientip.c_str());
glocal.endserver = true;
return 0;
}
unsigned opcode = line[0] &0xf;
if (opcode == 8){ // Close
glocal.endserver = true;
}else{
bool mask = (line[1]&0x80)!=0;
unsigned len1 = line[1] &0x7f;
unsigned data_len = len1;
int frame_len = 2 + len1;
const unsigned char *data = line+2;
const unsigned char *maskb = line+2;
if (len1 == 126){
data_len = (line[2]<<8)+line[3];
frame_len = 2 + 2 + data_len;
data = line + 4;
maskb = line + 4;
}
if (mask){
frame_len += 4;
data += 4;
}
if (frame_len <= len){
ret = frame_len;
string msg;
//FILE *fout = fopen ("/tmp/log","a");
//fprintf (fout,"data:");
for (unsigned i=0; i
// for (auto &m:glocal.messages) tlmp_warning ("msg=%s",m.c_str());
// At the start of a session, we receive a sequence number and a gameid.
// After that, we receive line like this "verb: arguments"
while (glocal.messages.size() > 0){
const char *pt;
auto p = glocal.messages.begin();
if (is_start_any_of(*p,pt,"gameid=")){
auto tb = str_splitlineq(pt);
if (tb.size()!=7){
tlmp_warning ("Invalid gameid spec: %s",p->c_str());
bo_websocket_send (&glocal.TCPSERVER,no,1,"callrefresh();");
endserver = true;
}else{
glocal.gameid = tb[0];
glocal.sp.width = atoi(tb[1].c_str());
glocal.sp.height = atoi(tb[2].c_str());
glocal.sp.content_width = atoi(tb[3].c_str());
glocal.sp.content_height = atoi(tb[4].c_str());
glocal.sp.mobile = atoi(tb[5].c_str()) != 0;
glocal.sp.fontsize = atoi(tb[6].c_str());
}
//tlmp_warning ("gameid:%s",glocal.gameid.c_str());
glocal.messages.erase (p);
}else if (is_start_any_of(*p,pt,"sequence=")){
// We had to wait until we received the sequence number before doing the first
// call to sessiond.
// After that, every success completed call is followed by a new one (see above)
glocal.sequence = atoi(pt);
if (glocal.sessionfd == -1){
glocal.c.waitnote_call(glocal.con_sessiond, glocal.session.c_str(),glocal.sequence, endserver);
glocal.sessionfd = glocal.con_sessiond.fd;
glocal.TCPSERVER.inject(glocal.sessionfd,nullptr);
glocal.TCPSERVER.setmonitormode(glocal.sessionfd,true);
}
glocal.messages.erase (p);
}else if (is_start_any_of(*p,pt,"gamesequence=")){
// We had to wait until we received the sequence number before doing the first
// waitevent call to bod.
// After that, every success completed call is followed by a new one (see above)
glocal.gamesequence = atoi(pt);
if (glocal.waiteventfd == -1){
glocal.c.waitevent_call(glocal.con_waitevent, glocal.connectid,glocal.session,glocal.gameid
,glocal.sp,glocal.gamesequence, endserver);
glocal.waiteventfd = glocal.con_waitevent.fd;
glocal.TCPSERVER.inject(glocal.waiteventfd,nullptr);
glocal.TCPSERVER.setmonitormode(glocal.waiteventfd,true);
}
glocal.messages.erase (p);
}else if (glocal.gameid.size() > 0){
if (glocal.messages.size() > 0){
bo_websocket_sessiond_ping (glocal.con_sessiond_ping,glocal.session,true);
glocal.c.commands(glocal.messages,glocal.con_bod,glocal.con_sessiond,glocal.connectid,glocal.session,glocal.gameid,glocal.sp,endserver);
}
break;
}else{
tlmp_error ("bo_websocket_game: gameid or sequence not set, ending: messages.size()=%zu message[0]=%s ip=%s"
,glocal.messages.size(),glocal.messages[0].c_str(),glocal.clientip.c_str());
endserver = true;
break;
}
}
glocal.messages.clear();
}
s.inject(fd,nullptr);
if (paused) s.setlisten(fd,false);
s.inject(waitfd,nullptr);
s.setrawmode (waitfd,true);
// Apache has a default timeout of 60 when proxying websocket connections
// We have to wakeup regularly to send ping every idle_delay
s.set_idle_timeout(5);
s.loop();
}
static void bo_websocket_game (int fd, int waitfd, CONNECT_INFO &con_bod, CONNECT_INFO &con_sessiond, CONNECT_INFO &con_waitevent, bool paused)
{
(fd,waitfd,con_bod,con_sessiond,con_waitevent,paused);
int ret = bo_sessiond_client_waitevent_SEND(con_sessiond,session,sequence);
return ret;
glocal gameid;
glocal endserver;
glocal sequence;
glocal project_notifications;
glocal gamewaiting;
(con_sessiond);
if (!success){
tlmp_warning ("sessiond waitevent internal_error=%d success=false",internal_error);
glocal.endserver = true;
return;
}
// tlmp_warning ("waitnote content=%s gameid=%s",content,glocal.gameid.ptr);
const char *pt, *pt2;
if (is_start_any_of(content,pt,"Projects:")
&& is_start_any_of(glocal.gameid,pt2,"/projects/")
&& is_eq(pt,pt2)){
// Nothing to do, this is a notification about the document we are currently
// viewing.
glocal.sequence = sequence;
return;
}
// Any event starting with profile: turn the 3 bars menu orange
string buf;
if (is_start_any_of(content,NONEED,"profile:")){
buf += string_f ("var e = document.getElementById('rectdotmenu');\n");
buf += "if (e!=null){\n";
buf += "\te.setAttribute('fill', 'orange');\n";
buf += "}\n";
}
// Any event starting with talks or projects turn the main talks and projects tab orange
if (is_start_any_of(content,pt,"talks","Projects")){
string tmp(content,pt-content);
if (tmp != "Projects" || glocal.project_notifications.count(content) > 0){
buf += string_f ("var e = document.getElementById('tabrect%s');\n",tmp.c_str());
buf += "if (e!=null){\n";
buf += "\te.setAttribute('fill', 'orange');\n";
buf += string_f("\ttabcolor['tabrect%s'] = 'orange';\n",tmp.c_str());
buf += "}\n";
}
}
// Notitication /projects/....
// are used to send update scripts to all listener. This will go away
// as all listener used the gamesequence system (bod_client_waitevent).
bool not_project = !is_start_any_of(content,NONEED,"/projects/");
if (not_project){
// Now we turn the subtab orange
buf += string_f ("var e = document.getElementById('tabrect%s');\n",content);
buf += "if (e!=null){\n";
buf += "\te.setAttribute('fill', 'orange');\n";
buf += string_f("\ttabcolor['tabrect%s'] = 'orange';\n",content);
buf += "}\n";
// And the X in the subtab
buf += string_f ("e = document.getElementById('tabsrect%s');\n",content);
buf += "if (e!=null){\n";
buf += "\te.setAttribute('fill', 'orange');\n";
buf += "}\n";
}
// Turn the table entry in the webtable orange
if (is_start_any_of(content,pt,"talks")){
buf += string_f ("var e = document.getElementById('talktbl%s');\n",pt);
buf += "if (e!=null){\n";
buf += "\te.setAttribute('bgcolor', 'orange');\n";
buf += "}\n";
pt++;
buf += string_f ("var e = document.getElementById('hid-talk=%s');\n",pt);
buf += "if (e!=null){\n";
buf += "\te.style.color = 'blue';\n";
buf += string_f ("\tvar num = parseInt(e.innerHTML,10)+%u;\n",sequence-glocal.sequence);
buf += "\te.innerHTML = num + e.innerHTML.replace(/[0-9]/g, '');\n";
buf += "}\n";
buf += string_f ("var e = document.getElementById('hidsvg-talk=%s');\n",pt);
buf += "if (e!=null){\n";
buf += "\te.setAttribute('fill', 'orange');\n";
buf += "}\n";
}
if (strcmp(content,"main")==0){
buf += "var e = document.getElementById('hid-follow');\n";
buf += "if (e!=null){\n";
buf += "\tconsole.log ('found hidfollow');\n";
buf += "\te.style.color = 'blue';\n";
buf += "\tvar num = parseInt(e.innerHTML,10)+1;\n";
buf += "\te.innerHTML = '' + num + e.innerHTML.replace(/[0-9]/g, '');\n";
buf += "}\n";
buf += "var e = document.getElementById('hidsvg-follow');\n";
buf += "if (e!=null){\n";
buf += "\te.setAttribute('fill', 'orange');\n";
buf += "}\n";
}
if (not_project){
buf += "if (document.hidden && document.title[0] != '[') document.title = '[' + document.title + ']';\n";
}
if (!glocal.gamewaiting){
if (script[0] != '\0') buf += script;
}
if (buf.size() > 0){
buf += string_f("sequence=%u;\n",sequence);
glocal.bo_websocket.send (buf);
}
glocal.sequence = sequence;
int ret = bod_client_waitevent_SEND(con_waitevent,connectid,session,gameid,sp,gamesequence);
return ret;
glocal gamesequence;
glocal fail;
fail = false;
glocal websocket;
(con_waitevent);
if (success){
glocal.gamesequence = sequence;
string buf(script);
buf += string_f("gamesequence=%u;\n",sequence);
glocal.bo_websocket.send (buf);
//tlmp_warning ("websocket=%d",websocket);
glocal.websocket = websocket;
}else{
tlmp_warning ("waitevent failed: internal_error=%d %s",internal_error,msg);
glocal.fail = true;
}
vector steps;
for (auto &a:cmds){
vector tb;
// Command goes like this: command:arguments ...
const char *pta = a.c_str();
const char *pt = strchr(pta,':');
if (pt != nullptr){
VARVAL var;
var.var = string (pta,pt-pta);
var.val = pt+1;
steps.push_back(var);
}
}
if (steps.size() > 0){
//for (auto m:cmds) tlmp_warning ("m=%s",m.c_str());
// TODO
unsigned docnum = 0;
glocal gameid;
glocal session;
glocal con_bod;
(con_bod,connectid,session,"",gameid,docnum,steps,sp);
if (success){
glocal string script;
glocal string notify;
glocal bool more = more;
glocal function&res)> fres;
glocal.fres = [&](const vector &res){
for (auto &r:res){
if (strcmp(r.var,VAR_NOTIFY)==0){
glocal.notify += r.val;
}else if (strcmp(r.var,VAR_SCRIPT)==0){
glocal.script += r.val;
}else if (strcmp(r.var,VAR_REFRESH)==0){
glocal.bo_websocket.send ("callrefresh();");
}else if (strcmp(r.var,VAR_DIALOG)==0){
glocal.bo_websocket.send (string_f("calldialog(\"%s\");",r.val));
}
}
};
glocal.fres(res);
while (glocal.more){
glocal.more = false;
(glocal.con_bod,glocal.session,"",glocal.gameid,handle);
if (!success){
tlmp_error ("playstep_more msg=%s",msg);
}else{
glocal.more = more;
glocal.fres(res);
}
}
if (glocal.notify.size() > 0) glocal.bo_websocket.send (glocal.notify);
if (glocal.script.size() > 0) glocal.bo_websocket.send (glocal.script);
}else{
tlmp_warning ("success false");
}
}
}
int main (int argc, char *argv[])
{
glocal int ret = -1;
glocal const char *port = "/var/run/websocket.sock";
glocal const char *control = "/var/run/websocket-control.sock";
glocal const char *user = "apache";
glocal const char *user_control = "bolixo";
glocal bool daemon = false;
glocal const char *pidfile = "/var/run/bo-websocket.pid";
glocal const char *bodsock = "/dev/bod.sock";
glocal const char *sessiondsock = "/dev/sessiond.sock";
glocal.ret = (argc,argv,"bolixo");
setproginfo ("bo-websocket",VERSION,"websocet for documents/games protocol and notifications");
setarg ('c',"control","Unix socket for bo-websocket-control",glocal.control,false);
setarg ('C',"port","Unix socket for bo-websocket",glocal.port,false);
setgrouparg ("Misc.");
setarg (' ',"sessiondsock","Port to reach the bo-sessiond server",glocal.sessiondsock,false);
setarg (' ',"bodsock","Port to reach the bod server",glocal.bodsock,false);
setarg (' ',"user","Run the program as this user",glocal.user,false);
setarg (' ',"user-control","Owner of the websocket-control socket",glocal.user_control,false);
setarg (' ',"daemon","Run in background",glocal.daemon,false);
setarg (' ',"pidfile","File holding the PID of the process",glocal.pidfile,false);
if (glocal.daemon){
syslog (LOG_ERR,"%s",msg);
}else{
fprintf (stderr,"%s",msg);
}
if (glocal.daemon){
syslog (LOG_WARNING,"%s",msg);
}else{
fprintf (stderr,"%s",msg);
}
struct REQUEST{
int caller;
WORKER_COMMAND cmd;
REQUEST(int _caller, WORKER_COMMAND _cmd){
caller = _caller;
cmd = _cmd;
}
};
glocal vector requests;
glocal bool paused = false;
glocal unsigned expect_replies = 0;
glocal vector workers;
glocal CONNECT_INFO con_bod;
glocal CONNECT_INFO con_sessiond;
glocal CONNECT_INFO con_waitevent;
glocal.con_bod.port = glocal.bodsock;
glocal.con_sessiond.port = glocal.sessiondsock;
glocal.con_bod.secret = util_readsecret();
glocal.con_sessiond.secret = glocal.con_bod.secret;
glocal.con_waitevent.secret = glocal.con_bod.secret;
glocal.con_waitevent.port = glocal.bodsock;
int ret = -1;
signal (SIGCHLD,SIG_IGN);
(string_f("unix:%s",glocal.port),10);
HANDLE_INFO *n = new HANDLE_INFO;
info.data = n;
const char *port = info.port;
const char *pt;
if (is_start_any_of (port,pt,"unix:")){
port = pt;
}
if (string_cmp(port,glocal.control)==0){
n->type = TYPE_CONTROL;
}else if (string_cmp(port,glocal.port)==0){
int tb[2];
if (socketpair(AF_UNIX,SOCK_STREAM,PF_UNIX,tb)==-1){
}else{
pid_t pid = fork();
if (pid == 0){
close (tb[0]);
bo_websocket_game(no,tb[1],glocal.con_bod,glocal.con_sessiond,glocal.con_waitevent,glocal.paused);
_exit (0);
}else if (pid == (pid_t)-1){
close (tb[0]);
close (tb[1]);
tlmp_error ("Can't fork (%s)",strerror(errno));
}else{
close (tb[1]);
HANDLE_INFO *nn = new HANDLE_INFO;
nn->type = TYPE_WORKER;
inject (tb[0],nn);
glocal.workers.push_back(tb[0]);
endclient = true;
}
}
}
HANDLE_INFO *n = (HANDLE_INFO*)info.data;
if (n->type == TYPE_WORKER){
for (auto p=glocal.workers.begin(); p != glocal.workers.end(); p++){
if (*p == no){
glocal.workers.erase(p);
break;
}
}
}
for (auto &r:glocal.requests) if (no == r.caller) r.caller = -1;
();
while (glocal.expect_replies == 0 && glocal.requests.size() > 0){
auto first = glocal.requests.begin();
if (first->caller == -1){
glocal.requests.erase(first);
}else{
WORKER_REQUEST req;
req.cmd = first->cmd;
glocal.expect_replies = glocal.workers.size();
for (auto fd:glocal.workers){
glocal.TCPSERVER.sendto(fd,&req,sizeof(req));
}
break;
}
}
glocal OBJECTSUB *ptsub = ⊂
HANDLE_INFO *n = (HANDLE_INFO*)info.data;
if (n->type == TYPE_CONTROL){
(this,n->req,line, info.linelen,endserver, endclient, no,n);
vector tb;
instrument_status(tb);
tb.push_back(string_f("workers: %zu",glocal.workers.size()));
tb.push_back(string_f("paused: %d",glocal.paused));
tb.push_back(string_f("set_debug_header: %d",debug_header));
tb.push_back(string_f("expect-replies: %u",glocal.expect_replies));
static const char *tbcmd[]={"none","end","pause","resume","test"};
const char *running = "none";
unsigned pending = 0;
if (glocal.requests.size() > 0){
running = tbcmd[(unsigned)glocal.requests[0].cmd];
pending = glocal.requests.size()-1;
}
tb.push_back(string_f("worker-command: running=%s pending=%u",running,pending));
rep_status (tb);
endserver = true;
// on:b
if (on){
debug_seton();
}else{
debug_setoff();
}
debug_setfdebug (filename);
// ok:b
if (glocal.workers.size() > 0){
glocal.requests.emplace_back(no,WORKER_COMMAND::REQ_TEST);
glocal.ptsub->exec();
}else{
rep_test (true,0,glocal.paused);
}
// on:b
toggle_instrument_file(on);
WORKER_REQUEST req;
req.cmd = WORKER_COMMAND::REQ_END;
for (auto fd:glocal.workers){
glocal.TCPSERVER.sendto(fd,&req,sizeof(req));
}
// Note that new connection (new workers) will be started in paused mode
glocal.paused = true;
if (glocal.workers.size() == 0){
rep_pause(true);
}else{
glocal.requests.emplace_back(no,WORKER_COMMAND::REQ_PAUSE);
glocal.ptsub->exec();
// We do reply when receiving answers from all workers.
}
glocal.paused = false;
if (glocal.workers.size() == 0){
rep_resume(true);
}else{
glocal.requests.emplace_back(no,WORKER_COMMAND::REQ_RESUME);
glocal.ptsub->exec();
// We do reply when receiving answers from all workers.
}
debug_header = on;
tlmp_error ("Invalid command: %s\n",line);
endclient = true;
}else if (n->type == TYPE_CLIENT){
// Not possible
}else if (n->type == TYPE_WORKER){
if (glocal.expect_replies > 0){
glocal.expect_replies--;
/*
We have received all replies for the current request.
So we reply.
At the end, we submit another request using ptsub->exec();
if the caller of the next request has quit, or if there are no workers anymore
then ptsub->exec() won't do much and expect_replies will be 0.
This explains the while loop.
*/
while (glocal.expect_replies == 0 && glocal.requests.size() > 0){
auto first = glocal.requests.begin();
if (first->caller != -1){
if (first->cmd == WORKER_COMMAND::REQ_PAUSE){
bo_websocket_control_rep_pause (first->caller,true);
}else if (first->cmd == WORKER_COMMAND::REQ_RESUME){
bo_websocket_control_rep_resume (first->caller,true);
}else if (first->cmd == WORKER_COMMAND::REQ_TEST){
bo_websocket_control_rep_test (first->caller,true
,glocal.workers.size(),glocal.paused);
}
glocal.requests.erase(first);
glocal.ptsub->exec(); // Check if there is another request pending
}
}
}else{
tlmp_error ("Unexpected reply from worker, ignored");
}
}
bool some_errors = false;
if (fdpass_setcontrol(s,glocal.control,glocal.user_control)==-1){
some_errors = true;
}
if (!some_errors && s.is_ok()){
chmod (glocal.port,0666);
if (glocal.daemon){
daemon_init(glocal.pidfile,glocal.user);
}
s.setrawmode(true);
s.loop();
ret = 0;
}
return ret;
return glocal.ret;
}