/* This file is part of Bolixo. Bolixo is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Bolixo is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Bolixo. If not, see . */ /* Listen on a socket */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define INSTRUMENT_DONOTOPEN #include "instrument.h" #include "bolixo.h" #include "documentd_req.h" #include "filesystem.h" using namespace std; enum CONNECT_TYPE { TYPE_NONE, TYPE_CONTROL, TYPE_CLIENT, TYPE_WORKER}; struct HANDLE_INFO: public ARRAY_OBJ{ REQUEST_INFO req; CONNECT_TYPE type = TYPE_NONE; }; #define bo_websocket_control_rep_pause_NEEDED #define bo_websocket_control_rep_resume_NEEDED #define bo_websocket_control_rep_test_NEEDED #include "proto/bo-websocket_control.protoh" #define bo_sessiond_client_getsessioninfo_NOTNEED #define bo_sessiond_client_getsessioninfovars_NOTNEED #define bo_sessiond_client_delnotify_NOTNEED #define bo_sessiond_client_test_NOTNEED #define bo_sessiond_client_setvar_NOTNEED #define bo_sessiond_client_setnotify_NOTNEED #define bo_sessiond_client_waitevent_ASYNC #include "proto/bo-sessiond_client.protoch" #define bod_client_login_NOTNEED #define bod_client_logout_NOTNEED #define bod_client_adduser_NOTNEED #define bod_client_confirmuser_NOTNEED #define bod_client_deleteuser_NOTNEED #define bod_client_confirmdelete_NOTNEED #define bod_client_createsession_NOTNEED #define bod_client_addfile_NOTNEED #define bod_client_addfile_bob_NOTNEED #define bod_client_appendfile_NOTNEED #define bod_client_delfile_NOTNEED #define bod_client_undelete_NOTNEED #define bod_client_modifyfile_NOTNEED #define bod_client_modifyfile_bob_NOTNEED #define bod_client_rename_NOTNEED #define bod_client_copy_NOTNEED #define bod_client_readfile_NOTNEED #define bod_client_readfile_bob_NOTNEED #define bod_client_readmore_NOTNEED #define bod_client_mkdir_NOTNEED #define bod_client_rmdir_NOTNEED #define bod_client_listdir_NOTNEED #define bod_client_stat_NOTNEED #define bod_client_set_access_NOTNEED #define bod_client_markview_NOTNEED #define bod_client_create_group_list_NOTNEED #define bod_client_create_group_NOTNEED #define bod_client_set_group_NOTNEED #define bod_client_set_member_NOTNEED #define bod_client_set_members_NOTNEED #define bod_client_set_list_desc_NOTNEED #define bod_client_set_group_desc_NOTNEED #define bod_client_delete_list_NOTNEED #define bod_client_delete_group_NOTNEED #define bod_client_list_lists_NOTNEED #define bod_client_list_groups_NOTNEED #define bod_client_create_project_dir_NOTNEED #define bod_client_list_contacts_NOTNEED #define bod_client_list_inboxes_NOTNEED #define bod_client_list_msgs_NOTNEED #define bod_client_sendmsg_NOTNEED #define bod_client_sendmsg_project_NOTNEED #define bod_client_replymsg_NOTNEED #define bod_client_replymsg_project_NOTNEED #define bod_client_sendattach_NOTNEED #define bod_client_verifysign_NOTNEED #define bod_client_getpubkey_NOTNEED #define bod_client_registernode_NOTNEED #define bod_client_remotelogin_NOTNEED #define bod_client_remotepass_NOTNEED #define bod_client_remote_interest_set_NOTNEED #define bod_client_remote_interest_unset_NOTNEED #define bod_client_nodelogin_NOTNEED #define bod_client_nodepass_NOTNEED #define bod_client_sendtalk_anon_NOTNEED #define bod_client_sendtalk_NOTNEED #define bod_client_sendtalk_file_NOTNEED #define bod_client_list_talk_NOTNEED #define bod_client_contact_request_NOTNEED #define bod_client_contact_manage_NOTNEED #define bod_client_contact_list_NOTNEED #define bod_client_contact_remove_NOTNEED #define bod_client_config_read_NOTNEED #define bod_client_config_write_NOTNEED #define bod_client_set_notification_NOTNEED #define bod_client_get_notification_NOTNEED #define bod_client_public_checkuser_NOTNEED #define bod_client_public_listdir_NOTNEED #define bod_client_public_readfile_NOTNEED #define bod_client_public_list_talk_NOTNEED #define bod_client_form_savevar_NOTNEED #define bod_client_form_readvar_NOTNEED #define bod_client_form_deletevar_NOTNEED #define bod_client_form_deleteall_NOTNEED #define bod_client_interest_set_NOTNEED #define bod_client_interest_unset_NOTNEED #define bod_client_interest_list_NOTNEED #define bod_client_interest_check_NOTNEED #define bod_client_systempubkey_NOTNEED #define bod_client_systemsign_NOTNEED #define bod_client_info_read_NOTNEED #define bod_client_info_write_NOTNEED #define bod_client_waitevent_ASYNC #define bod_client_list_members_NOTNEED #include "proto/bod_client.protoch" /* Read the secret used to communicate with internal service */ string util_readsecret() { glocal string ret; ("/etc/secret",true); glocal.ret = line; return 0; return glocal.ret; } enum class WORKER_COMMAND{ REQ_NONE, REQ_END, REQ_PAUSE, // The worker disconnect from sessiond and bod and stop processing user request REQ_RESUME, // Allow processing again REQ_TEST // Check if the worker is working }; struct WORKER_REQUEST{ WORKER_COMMAND cmd; }; enum class WORKER_REPLY{ REPLY_OK }; struct WORKER_RESULT{ WORKER_REPLY ret; }; static void bo_websocket_send (_F_TCPSERVER_V1 *c, int fd, unsigned opcode, PARAM_STRING msg) { long long lenmsg = strlen(msg.ptr); size_t max_lenframe = 2+8+lenmsg; size_t lenframe = 0; char buf[max_lenframe]; buf[0] = 128+opcode; char *ptbuf=buf+2; if (lenmsg <= 125){ buf[1] = (char)lenmsg; ptbuf = buf+2; lenframe = 2 + lenmsg; }else if (lenmsg < 65536){ buf[1] = 126; buf[2] = (lenmsg >> 8) &0xff; buf[3] = lenmsg & 0xff; ptbuf = buf+4; lenframe = 2 + 2 + lenmsg; }else{ // Len is encoded using 64 bits buf[1] = 127; buf[2] = (lenmsg >> 56) &0xff; buf[3] = (lenmsg >> 48) &0xff; buf[4] = (lenmsg >> 40) &0xff; buf[5] = (lenmsg >> 32) &0xff; buf[6] = (lenmsg >> 24) &0xff; buf[7] = (lenmsg >> 16) &0xff; buf[8] = (lenmsg >> 8) &0xff; buf[9] = lenmsg & 0xff; ptbuf = buf+10; lenframe = 2 + 8 + lenmsg; } memcpy (ptbuf,msg.ptr,lenmsg); c->sendto (fd,buf,lenframe); } // Send a ping to the sessiond server so it knows this session is active static void bo_websocket_sessiond_ping (CONNECT_INFO &con, PARAM_STRING session, bool activity) { (con,session,activity); if (!success) tlmp_error ("bo_sessiond_client_ping: internal_error=%d success=false",internal_error); } #define _TLMP_bo_websocket struct _F_bo_websocket{ _F_TCPSERVER_V1 *tcp=nullptr; int fd = -1; void send (PARAM_STRING msg){ bo_websocket_send (tcp,fd,1,msg); } #define _F_bo_websocket_commands(name) void name commands(const vector &cmds, CONNECT_INFO &con_bod, CONNECT_INFO &con_sessiond, \ PARAM_STRING connectid, PARAM_STRING session, PARAM_STRING gameid, const DOC_UI_SPECS &sp, bool &endserver) virtual _F_bo_websocket_commands( )=0; #define _F_bo_websocket_waitnote(name) void name waitnote(CONNECT_INFO &con_sessiond, PARAM_STRING session, PARAM_STRING gameid, \ int &sequence, const set &project_notifications, bool gamewaiting, bool &endserver) virtual _F_bo_websocket_waitnote( )=0; #define _F_bo_websocket_waitnote_call(name) int name waitnote_call(CONNECT_INFO &con_sessiond, const char *session, int &sequence, bool &endserver) virtual _F_bo_websocket_waitnote_call( )=0; #define _F_bo_websocket_waitevent_call(name) int name waitevent_call(CONNECT_INFO &con_waitevent, PARAM_STRING connectid, PARAM_STRING session, \ PARAM_STRING gameid, const DOC_UI_SPECS &sp, int &gamesequence, bool &endserver) virtual _F_bo_websocket_waitevent_call( )=0; #define _F_bo_websocket_waitevent(name) void name waitevent(CONNECT_INFO &con_waitevent, const char *session, const char *gameid, int &gamesequence, bool &websocket, bool &endserver, bool &fail) virtual _F_bo_websocket_waitevent( )=0; }; /* Extract the list of tabs in section projects. The strings will be formatted as a notification id. */ static void webtab_init (const vector &vals, set ¬ifications) { for (auto &s:vals){ const char *pt; if (is_start_any_of(s.sname,pt,"Projects:2:")){ string note = string_f("Projects:%s",pt); //tlmp_warning ("add notifications: %s",note.c_str()); notifications.insert(note); } } } static bool debug_header = false; static unsigned idle_delay=45; // see set_idle_timeout below static void bo_websocket (_F_bo_websocket &c, int fd, int waitfd, CONNECT_INFO &con_bod, CONNECT_INFO &con_sessiond, CONNECT_INFO &con_waitevent, bool paused) { glocal string connectid = fs_makeid(); glocal int waitfd = waitfd; glocal int fd = fd; glocal c; glocal con_bod; glocal con_sessiond; glocal CONNECT_INFO con_sessiond_ping = con_sessiond; // We use a separate connection for ping // because con_sessiond is used for asynchronous waitevent glocal con_waitevent; glocal STREAMP_BUF buf; glocal bool header_seen = false; glocal string acceptkey; glocal string session; glocal vector messages; glocal unsigned nbping = 0; // number of ping sent without an answer glocal time_t lastactivity = time(nullptr); glocal string gameid; glocal DOC_UI_SPECS sp; glocal int sequence = -1; glocal int sessionfd = -1; glocal int gamesequence = -1; glocal int waiteventfd = -1; glocal bool websocket = false; // We connect to a remote websocket through bod. glocal STREAMP_BUF waiteventbuf; // In websocket mode, handling of waiteventfd glocal set project_notifications; glocal bool end_in_idle=false; // When something is broken in a connection, but we do not want to hang up // too quickly, we set this to true. // In the idle functag, the connection will end. glocal string clientip; // Client IP number, for error logs c.fd = fd; (); tlmp_warning ("endserver endclient %s",glocal.clientip.c_str()); endserver = true; if (glocal.end_in_idle){ tlmp_warning ("endserver end_in_idle: %s",glocal.clientip.c_str()); endserver = true; }else{ unsigned diff = time(nullptr) - glocal.lastactivity; debug_printf ("idle diff=%u nbping=%d\n",diff,glocal.nbping); if (diff > idle_delay){ if (glocal.nbping > 2){ tlmp_warning ("endserver nbping > 2: %s",glocal.clientip.c_str()); endserver = true; } debug_printf ("idle since=%d diff=%u ping sent nbping=%u gameid=%s\n",since,diff,glocal.nbping,glocal.gameid.c_str()); bo_websocket_send (this,glocal.fd,9,"hello"); glocal.nbping++; bo_websocket_sessiond_ping (glocal.con_sessiond_ping,glocal.session,false); } } glocal endserver; glocal line; glocal info; glocal.c.tcp = this; // tlmp_error ("receive %d no=%d fd=%d waitfd=%d sessionfd=%d waiteventfd=%d",info.linelen,no,glocal.fd,glocal.waitfd,glocal.sessionfd,glocal.waiteventfd); if (0) { ("/tmp/log",true); fprintf(fout,"receive [%d] header_seen=%d len=%u %s\n",getpid(),glocal.header_seen,glocal.info.linelen,glocal.line); return 0; } if (no == glocal.waitfd){ if (info.linelen == sizeof(WORKER_REQUEST)){ auto *req = (const WORKER_REQUEST*)line; if (req->cmd == WORKER_COMMAND::REQ_END){ endserver = true; }else{ if (req->cmd == WORKER_COMMAND::REQ_PAUSE){ glocal.con_bod.close(); if (glocal.sessionfd != -1){ glocal.con_sessiond.close(); closeclient (glocal.sessionfd); glocal.sessionfd = -1; } if (glocal.waiteventfd != -1){ glocal.con_waitevent.close(); closeclient (glocal.waiteventfd); glocal.waiteventfd = -1; } setlisten (glocal.fd,false); }else if (req->cmd == WORKER_COMMAND::REQ_RESUME){ if (glocal.sequence != -1){ glocal.c.waitnote_call(glocal.con_sessiond, glocal.session.c_str(),glocal.sequence, endserver); glocal.sessionfd = glocal.con_sessiond.fd; glocal.TCPSERVER.inject(glocal.sessionfd,nullptr); glocal.TCPSERVER.setmonitormode(glocal.sessionfd,true); } if (glocal.gamesequence != -1){ glocal.c.waitevent_call(glocal.con_waitevent, glocal.connectid, glocal.session,glocal.gameid ,glocal.sp,glocal.gamesequence, endserver); glocal.waiteventfd = glocal.con_waitevent.fd; glocal.TCPSERVER.inject(glocal.waiteventfd,nullptr); glocal.TCPSERVER.setmonitormode(glocal.waiteventfd,true); } setlisten (glocal.fd,true); } WORKER_RESULT rep; rep.ret = WORKER_REPLY::REPLY_OK; send (&rep,sizeof(rep)); } }else{ tlmp_error ("Invalid packet size for worker request: %s",glocal.clientip.c_str()); } }else if (no == glocal.sessionfd){ glocal.c.waitnote(glocal.con_sessiond, glocal.session,glocal.gameid,glocal.sequence, glocal.project_notifications ,glocal.gamesequence!=-1,endserver); // We are always calling back sessiond to wait for more events. if (!glocal.endserver) glocal.c.waitnote_call(glocal.con_sessiond, glocal.session.c_str(),glocal.sequence, endserver); }else if (no == glocal.waiteventfd){ if (glocal.websocket){ // In this mode, we receive websocket packets. We can't copy them to the caller directly // because bo-websocket (this program) is also sending its own packet (for notifications, obtained from bo-sessiond). // So our own packets may end up in the middle of a received packet. // So we must accumulate a full packet and then write it. char buf[100*1024]; int ret = read(no,buf,sizeof(buf)); if (ret <= 0){ tlmp_error ("endserver ret <= 0: %s",glocal.clientip.c_str()); endserver = true; }else{ glocal.lastactivity = time(nullptr); (glocal.waiteventbuf,buf,ret); int ret = 0; // We only check if we have a complete websocket packet. Once we do, we write it to the client. const unsigned char *line = (const unsigned char *)buf; if (len > 4){ bool fin = (line[0] &0x80) != 0; if (!fin){ tlmp_error ("fin != %d: %s",fin,glocal.clientip.c_str()); glocal.endserver = true; return 0; } unsigned opcode = line[0] &0xf; if (opcode == 8){ // Close tlmp_warning ("endserver opcode 8"); glocal.endserver = true; }else{ bool mask = (line[1]&0x80)!=0; unsigned len1 = line[1] &0x7f; unsigned data_len = len1; int frame_len = 2 + len1; if (len1 == 126){ data_len = (line[2]<<8)+line[3]; frame_len = 2 + 2 + data_len; } if (mask){ frame_len += 4; } if (frame_len <= len){ ret = frame_len; //tlmp_warning ("write websocket packet to client ret = %d",ret); write (glocal.fd,line,ret); } } } return ret; } }else{ bool fail = false; do{ glocal.c.waitevent(glocal.con_waitevent, glocal.session.c_str(),glocal.gameid.c_str(),glocal.gamesequence, glocal.websocket,endserver,fail); if (fail){ closeclient (no); glocal.waiteventfd = -1; break; }else if (!endserver && !glocal.websocket){ // This code (commented) is obsolete. A single waitevent is needed and then // we get events. bod and documentd only need one call to waitevent to get going. // sessiond (waitnote) does not work like this and should be updated. // We are always calling back bod to wait for more events. // If it failed, there is no point to call again. When the user will realise something is odd // he will request a refresh and connection will be established again. //glocal.c.waitevent_call(glocal.con_waitevent, glocal.session,glocal.gameid // ,glocal.sp,glocal.gamesequence, endserver); } }while (!endserver && !glocal.websocket && glocal.con_waitevent.has_more()); } }else if (glocal.websocket){ //tlmp_warning ("write websocket packet to server linelen=%d",info.linelen); glocal.lastactivity = time(nullptr); bo_websocket_sessiond_ping (glocal.con_sessiond_ping,glocal.session,true); glocal.nbping=0; // in websocket mode, we pass everything to the remote websocket server // through the waitevent bod connection. // So we just assume any activity is a pong answer write (glocal.waiteventfd,line,info.linelen); }else if (!glocal.header_seen){ const char *pt; if (debug_header) tlmp_warning ("header: %s",line); if (is_start_any_ofnc(line,pt,"Sec-WebSocket-Key:")){ pt = str_skip(pt); string webkey = pt; string tmp = webkey + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; unsigned char* sha_str = SHA1(reinterpret_cast(tmp.c_str()), tmp.length(), nullptr); glocal.acceptkey = base64_encode(reinterpret_cast(sha_str), 20); // tlmp_warning ("WEBKEY=%s acceptkey=%s\n",webkey.c_str(),glocal.acceptkey.c_str()); }else if (is_start_any_ofnc(line,pt,"Cookie:")){ vector cookies; str_splitline(pt,' ',cookies); if (cookies.size() > 0){ for (auto &c:cookies){ const char *val; if (c.size() > 0 && c[c.size()-1] == ';') c.resize(c.size()-1); if (is_start_any_of(c,val,"session=")){ glocal.session = val; break; } } if (glocal.session.size() == 0){ tlmp_warning ("No session in cookie"); glocal.end_in_idle = true; // This is odd. For some reason, we are receiving tons of those from the same IP // We will keep the session opened and close it in the idle functag. }else{ (glocal.con_sessiond,glocal.session,false); if (internal_error || !success){ tlmp_warning ("Invalid session"); glocal.end_in_idle = true; }else if (name[0] == '\0'){ tlmp_warning ("Valid session, anonymous"); glocal.end_in_idle = true; }else{ for (auto &var:vars){ if (strcmp(var.name,"webtabs")==0){ // We extract the list of tabs in the session. We will use this to // decide if we send notification for documents. Se below webtab_init (var.vals,glocal.project_notifications); break; } } // tlmp_warning ("Valid session"); } } }else{ tlmp_warning ("No cookie, ending"); glocal.end_in_idle = true; } }else if (is_start_any_ofnc(line,pt,"X-Forwarded-For:")){ glocal.clientip = str_skip(pt); }else if (line[0] == '\0'){ if (!glocal.end_in_idle){ if (glocal.session.size() == 0){ tlmp_warning ("No session received"); glocal.end_in_idle = true; // This is odd. For some reason, we are receiving tons of those from the same IP }else{ // tlmp_warning ("session=%s",glocal.session.c_str()); glocal.header_seen = true; sendf ("HTTP/1.1 101 Switching Protocols\r\n" "Upgrade: websocket\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Accept: %s\r\n" "\r\n" ,glocal.acceptkey.c_str()); setrawmode(true); } } } }else if (!glocal.end_in_idle){ glocal.lastactivity = time(nullptr); (glocal.buf,line,info.linelen); int ret = 0; const unsigned char *line = (const unsigned char *)buf; if (len > 4){ bool fin = (line[0] &0x80) != 0; if (!fin){ tlmp_error ("fin != %d: %s",fin,glocal.clientip.c_str()); glocal.endserver = true; return 0; } unsigned opcode = line[0] &0xf; if (opcode == 8){ // Close glocal.endserver = true; }else{ bool mask = (line[1]&0x80)!=0; unsigned len1 = line[1] &0x7f; unsigned data_len = len1; int frame_len = 2 + len1; const unsigned char *data = line+2; const unsigned char *maskb = line+2; if (len1 == 126){ data_len = (line[2]<<8)+line[3]; frame_len = 2 + 2 + data_len; data = line + 4; maskb = line + 4; } if (mask){ frame_len += 4; data += 4; } if (frame_len <= len){ ret = frame_len; string msg; //FILE *fout = fopen ("/tmp/log","a"); //fprintf (fout,"data:"); for (unsigned i=0; i // for (auto &m:glocal.messages) tlmp_warning ("msg=%s",m.c_str()); // At the start of a session, we receive a sequence number and a gameid. // After that, we receive line like this "verb: arguments" while (glocal.messages.size() > 0){ const char *pt; auto p = glocal.messages.begin(); if (is_start_any_of(*p,pt,"gameid=")){ auto tb = str_splitlineq(pt); if (tb.size()!=7){ tlmp_warning ("Invalid gameid spec: %s",p->c_str()); bo_websocket_send (&glocal.TCPSERVER,no,1,"callrefresh();"); endserver = true; }else{ glocal.gameid = tb[0]; glocal.sp.width = atoi(tb[1].c_str()); glocal.sp.height = atoi(tb[2].c_str()); glocal.sp.content_width = atoi(tb[3].c_str()); glocal.sp.content_height = atoi(tb[4].c_str()); glocal.sp.mobile = atoi(tb[5].c_str()) != 0; glocal.sp.fontsize = atoi(tb[6].c_str()); } //tlmp_warning ("gameid:%s",glocal.gameid.c_str()); glocal.messages.erase (p); }else if (is_start_any_of(*p,pt,"sequence=")){ // We had to wait until we received the sequence number before doing the first // call to sessiond. // After that, every success completed call is followed by a new one (see above) glocal.sequence = atoi(pt); if (glocal.sessionfd == -1){ glocal.c.waitnote_call(glocal.con_sessiond, glocal.session.c_str(),glocal.sequence, endserver); glocal.sessionfd = glocal.con_sessiond.fd; glocal.TCPSERVER.inject(glocal.sessionfd,nullptr); glocal.TCPSERVER.setmonitormode(glocal.sessionfd,true); } glocal.messages.erase (p); }else if (is_start_any_of(*p,pt,"gamesequence=")){ // We had to wait until we received the sequence number before doing the first // waitevent call to bod. // After that, every success completed call is followed by a new one (see above) glocal.gamesequence = atoi(pt); if (glocal.waiteventfd == -1){ glocal.c.waitevent_call(glocal.con_waitevent, glocal.connectid,glocal.session,glocal.gameid ,glocal.sp,glocal.gamesequence, endserver); glocal.waiteventfd = glocal.con_waitevent.fd; glocal.TCPSERVER.inject(glocal.waiteventfd,nullptr); glocal.TCPSERVER.setmonitormode(glocal.waiteventfd,true); } glocal.messages.erase (p); }else if (glocal.gameid.size() > 0){ if (glocal.messages.size() > 0){ bo_websocket_sessiond_ping (glocal.con_sessiond_ping,glocal.session,true); glocal.c.commands(glocal.messages,glocal.con_bod,glocal.con_sessiond,glocal.connectid,glocal.session,glocal.gameid,glocal.sp,endserver); } break; }else{ tlmp_error ("bo_websocket_game: gameid or sequence not set, ending: messages.size()=%zu message[0]=%s ip=%s" ,glocal.messages.size(),glocal.messages[0].c_str(),glocal.clientip.c_str()); endserver = true; break; } } glocal.messages.clear(); } s.inject(fd,nullptr); if (paused) s.setlisten(fd,false); s.inject(waitfd,nullptr); s.setrawmode (waitfd,true); // Apache has a default timeout of 60 when proxying websocket connections // We have to wakeup regularly to send ping every idle_delay s.set_idle_timeout(5); s.loop(); } static void bo_websocket_game (int fd, int waitfd, CONNECT_INFO &con_bod, CONNECT_INFO &con_sessiond, CONNECT_INFO &con_waitevent, bool paused) { (fd,waitfd,con_bod,con_sessiond,con_waitevent,paused); int ret = bo_sessiond_client_waitevent_SEND(con_sessiond,session,sequence); return ret; glocal gameid; glocal endserver; glocal sequence; glocal project_notifications; glocal gamewaiting; (con_sessiond); if (!success){ tlmp_warning ("sessiond waitevent internal_error=%d success=false",internal_error); glocal.endserver = true; return; } // tlmp_warning ("waitnote content=%s gameid=%s",content,glocal.gameid.ptr); const char *pt, *pt2; if (is_start_any_of(content,pt,"Projects:") && is_start_any_of(glocal.gameid,pt2,"/projects/") && is_eq(pt,pt2)){ // Nothing to do, this is a notification about the document we are currently // viewing. glocal.sequence = sequence; return; } // Any event starting with profile: turn the 3 bars menu orange string buf; if (is_start_any_of(content,NONEED,"profile:")){ buf += string_f ("var e = document.getElementById('rectdotmenu');\n"); buf += "if (e!=null){\n"; buf += "\te.setAttribute('fill', 'orange');\n"; buf += "}\n"; } // Any event starting with talks or projects turn the main talks and projects tab orange if (is_start_any_of(content,pt,"talks","Projects")){ string tmp(content,pt-content); if (tmp != "Projects" || glocal.project_notifications.count(content) > 0){ buf += string_f ("var e = document.getElementById('tabrect%s');\n",tmp.c_str()); buf += "if (e!=null){\n"; buf += "\te.setAttribute('fill', 'orange');\n"; buf += string_f("\ttabcolor['tabrect%s'] = 'orange';\n",tmp.c_str()); buf += "}\n"; } } // Notitication /projects/.... // are used to send update scripts to all listener. This will go away // as all listener used the gamesequence system (bod_client_waitevent). bool not_project = !is_start_any_of(content,NONEED,"/projects/"); if (not_project){ // Now we turn the subtab orange buf += string_f ("var e = document.getElementById('tabrect%s');\n",content); buf += "if (e!=null){\n"; buf += "\te.setAttribute('fill', 'orange');\n"; buf += string_f("\ttabcolor['tabrect%s'] = 'orange';\n",content); buf += "}\n"; // And the X in the subtab buf += string_f ("e = document.getElementById('tabsrect%s');\n",content); buf += "if (e!=null){\n"; buf += "\te.setAttribute('fill', 'orange');\n"; buf += "}\n"; } // Turn the table entry in the webtable orange if (is_start_any_of(content,pt,"talks")){ buf += string_f ("var e = document.getElementById('talktbl%s');\n",pt); buf += "if (e!=null){\n"; buf += "\te.setAttribute('bgcolor', 'orange');\n"; buf += "}\n"; pt++; buf += string_f ("var e = document.getElementById('hid-talk=%s');\n",pt); buf += "if (e!=null){\n"; buf += "\te.style.color = 'blue';\n"; buf += string_f ("\tvar num = parseInt(e.innerHTML,10)+%u;\n",sequence-glocal.sequence); buf += "\te.innerHTML = num + e.innerHTML.replace(/[0-9]/g, '');\n"; buf += "}\n"; buf += string_f ("var e = document.getElementById('hidsvg-talk=%s');\n",pt); buf += "if (e!=null){\n"; buf += "\te.setAttribute('fill', 'orange');\n"; buf += "}\n"; } if (strcmp(content,"main")==0){ buf += "var e = document.getElementById('hid-follow');\n"; buf += "if (e!=null){\n"; buf += "\tconsole.log ('found hidfollow');\n"; buf += "\te.style.color = 'blue';\n"; buf += "\tvar num = parseInt(e.innerHTML,10)+1;\n"; buf += "\te.innerHTML = '' + num + e.innerHTML.replace(/[0-9]/g, '');\n"; buf += "}\n"; buf += "var e = document.getElementById('hidsvg-follow');\n"; buf += "if (e!=null){\n"; buf += "\te.setAttribute('fill', 'orange');\n"; buf += "}\n"; } if (not_project){ buf += "if (document.hidden && document.title[0] != '[') document.title = '[' + document.title + ']';\n"; } if (!glocal.gamewaiting){ if (script[0] != '\0') buf += script; } if (buf.size() > 0){ buf += string_f("sequence=%u;\n",sequence); glocal.bo_websocket.send (buf); } glocal.sequence = sequence; int ret = bod_client_waitevent_SEND(con_waitevent,connectid,session,gameid,sp,gamesequence); return ret; glocal gamesequence; glocal fail; fail = false; glocal websocket; (con_waitevent); if (success){ glocal.gamesequence = sequence; string buf(script); buf += string_f("gamesequence=%u;\n",sequence); glocal.bo_websocket.send (buf); //tlmp_warning ("websocket=%d",websocket); glocal.websocket = websocket; }else{ tlmp_warning ("waitevent failed: internal_error=%d %s",internal_error,msg); glocal.fail = true; } vector steps; for (auto &a:cmds){ vector tb; // Command goes like this: command:arguments ... const char *pta = a.c_str(); const char *pt = strchr(pta,':'); if (pt != nullptr){ VARVAL var; var.var = string (pta,pt-pta); var.val = pt+1; steps.push_back(var); } } if (steps.size() > 0){ //for (auto m:cmds) tlmp_warning ("m=%s",m.c_str()); // TODO unsigned docnum = 0; glocal gameid; glocal session; glocal con_bod; (con_bod,connectid,session,"",gameid,docnum,steps,sp); if (success){ glocal string script; glocal string notify; glocal bool more = more; glocal function&res)> fres; glocal.fres = [&](const vector &res){ for (auto &r:res){ if (strcmp(r.var,VAR_NOTIFY)==0){ glocal.notify += r.val; }else if (strcmp(r.var,VAR_SCRIPT)==0){ glocal.script += r.val; }else if (strcmp(r.var,VAR_REFRESH)==0){ glocal.bo_websocket.send ("callrefresh();"); }else if (strcmp(r.var,VAR_DIALOG)==0){ glocal.bo_websocket.send (string_f("calldialog(\"%s\");",r.val)); } } }; glocal.fres(res); while (glocal.more){ glocal.more = false; (glocal.con_bod,glocal.session,"",glocal.gameid,handle); if (!success){ tlmp_error ("playstep_more msg=%s",msg); }else{ glocal.more = more; glocal.fres(res); } } if (glocal.notify.size() > 0) glocal.bo_websocket.send (glocal.notify); if (glocal.script.size() > 0) glocal.bo_websocket.send (glocal.script); }else{ tlmp_warning ("success false"); } } } int main (int argc, char *argv[]) { glocal int ret = -1; glocal const char *port = "/var/run/websocket.sock"; glocal const char *control = "/var/run/websocket-control.sock"; glocal const char *user = "apache"; glocal const char *user_control = "bolixo"; glocal bool daemon = false; glocal const char *pidfile = "/var/run/bo-websocket.pid"; glocal const char *bodsock = "/dev/bod.sock"; glocal const char *sessiondsock = "/dev/sessiond.sock"; glocal.ret = (argc,argv,"bolixo"); setproginfo ("bo-websocket",VERSION,"websocet for documents/games protocol and notifications"); setarg ('c',"control","Unix socket for bo-websocket-control",glocal.control,false); setarg ('C',"port","Unix socket for bo-websocket",glocal.port,false); setgrouparg ("Misc."); setarg (' ',"sessiondsock","Port to reach the bo-sessiond server",glocal.sessiondsock,false); setarg (' ',"bodsock","Port to reach the bod server",glocal.bodsock,false); setarg (' ',"user","Run the program as this user",glocal.user,false); setarg (' ',"user-control","Owner of the websocket-control socket",glocal.user_control,false); setarg (' ',"daemon","Run in background",glocal.daemon,false); setarg (' ',"pidfile","File holding the PID of the process",glocal.pidfile,false); if (glocal.daemon){ syslog (LOG_ERR,"%s",msg); }else{ fprintf (stderr,"%s",msg); } if (glocal.daemon){ syslog (LOG_WARNING,"%s",msg); }else{ fprintf (stderr,"%s",msg); } struct REQUEST{ int caller; WORKER_COMMAND cmd; REQUEST(int _caller, WORKER_COMMAND _cmd){ caller = _caller; cmd = _cmd; } }; glocal vector requests; glocal bool paused = false; glocal unsigned expect_replies = 0; glocal vector workers; glocal CONNECT_INFO con_bod; glocal CONNECT_INFO con_sessiond; glocal CONNECT_INFO con_waitevent; glocal.con_bod.port = glocal.bodsock; glocal.con_sessiond.port = glocal.sessiondsock; glocal.con_bod.secret = util_readsecret(); glocal.con_sessiond.secret = glocal.con_bod.secret; glocal.con_waitevent.secret = glocal.con_bod.secret; glocal.con_waitevent.port = glocal.bodsock; int ret = -1; signal (SIGCHLD,SIG_IGN); (string_f("unix:%s",glocal.port),10); HANDLE_INFO *n = new HANDLE_INFO; info.data = n; const char *port = info.port; const char *pt; if (is_start_any_of (port,pt,"unix:")){ port = pt; } if (string_cmp(port,glocal.control)==0){ n->type = TYPE_CONTROL; }else if (string_cmp(port,glocal.port)==0){ int tb[2]; if (socketpair(AF_UNIX,SOCK_STREAM,PF_UNIX,tb)==-1){ }else{ pid_t pid = fork(); if (pid == 0){ close (tb[0]); bo_websocket_game(no,tb[1],glocal.con_bod,glocal.con_sessiond,glocal.con_waitevent,glocal.paused); _exit (0); }else if (pid == (pid_t)-1){ close (tb[0]); close (tb[1]); tlmp_error ("Can't fork (%s)",strerror(errno)); }else{ close (tb[1]); HANDLE_INFO *nn = new HANDLE_INFO; nn->type = TYPE_WORKER; inject (tb[0],nn); glocal.workers.push_back(tb[0]); endclient = true; } } } HANDLE_INFO *n = (HANDLE_INFO*)info.data; if (n->type == TYPE_WORKER){ for (auto p=glocal.workers.begin(); p != glocal.workers.end(); p++){ if (*p == no){ glocal.workers.erase(p); break; } } } for (auto &r:glocal.requests) if (no == r.caller) r.caller = -1; (); while (glocal.expect_replies == 0 && glocal.requests.size() > 0){ auto first = glocal.requests.begin(); if (first->caller == -1){ glocal.requests.erase(first); }else{ WORKER_REQUEST req; req.cmd = first->cmd; glocal.expect_replies = glocal.workers.size(); for (auto fd:glocal.workers){ glocal.TCPSERVER.sendto(fd,&req,sizeof(req)); } break; } } glocal OBJECTSUB *ptsub = ⊂ HANDLE_INFO *n = (HANDLE_INFO*)info.data; if (n->type == TYPE_CONTROL){ (this,n->req,line, info.linelen,endserver, endclient, no,n); vector tb; instrument_status(tb); tb.push_back(string_f("workers: %zu",glocal.workers.size())); tb.push_back(string_f("paused: %d",glocal.paused)); tb.push_back(string_f("set_debug_header: %d",debug_header)); tb.push_back(string_f("expect-replies: %u",glocal.expect_replies)); static const char *tbcmd[]={"none","end","pause","resume","test"}; const char *running = "none"; unsigned pending = 0; if (glocal.requests.size() > 0){ running = tbcmd[(unsigned)glocal.requests[0].cmd]; pending = glocal.requests.size()-1; } tb.push_back(string_f("worker-command: running=%s pending=%u",running,pending)); rep_status (tb); endserver = true; // on:b if (on){ debug_seton(); }else{ debug_setoff(); } debug_setfdebug (filename); // ok:b if (glocal.workers.size() > 0){ glocal.requests.emplace_back(no,WORKER_COMMAND::REQ_TEST); glocal.ptsub->exec(); }else{ rep_test (true,0,glocal.paused); } // on:b toggle_instrument_file(on); WORKER_REQUEST req; req.cmd = WORKER_COMMAND::REQ_END; for (auto fd:glocal.workers){ glocal.TCPSERVER.sendto(fd,&req,sizeof(req)); } // Note that new connection (new workers) will be started in paused mode glocal.paused = true; if (glocal.workers.size() == 0){ rep_pause(true); }else{ glocal.requests.emplace_back(no,WORKER_COMMAND::REQ_PAUSE); glocal.ptsub->exec(); // We do reply when receiving answers from all workers. } glocal.paused = false; if (glocal.workers.size() == 0){ rep_resume(true); }else{ glocal.requests.emplace_back(no,WORKER_COMMAND::REQ_RESUME); glocal.ptsub->exec(); // We do reply when receiving answers from all workers. } debug_header = on; tlmp_error ("Invalid command: %s\n",line); endclient = true; }else if (n->type == TYPE_CLIENT){ // Not possible }else if (n->type == TYPE_WORKER){ if (glocal.expect_replies > 0){ glocal.expect_replies--; /* We have received all replies for the current request. So we reply. At the end, we submit another request using ptsub->exec(); if the caller of the next request has quit, or if there are no workers anymore then ptsub->exec() won't do much and expect_replies will be 0. This explains the while loop. */ while (glocal.expect_replies == 0 && glocal.requests.size() > 0){ auto first = glocal.requests.begin(); if (first->caller != -1){ if (first->cmd == WORKER_COMMAND::REQ_PAUSE){ bo_websocket_control_rep_pause (first->caller,true); }else if (first->cmd == WORKER_COMMAND::REQ_RESUME){ bo_websocket_control_rep_resume (first->caller,true); }else if (first->cmd == WORKER_COMMAND::REQ_TEST){ bo_websocket_control_rep_test (first->caller,true ,glocal.workers.size(),glocal.paused); } glocal.requests.erase(first); glocal.ptsub->exec(); // Check if there is another request pending } } }else{ tlmp_error ("Unexpected reply from worker, ignored"); } } bool some_errors = false; if (fdpass_setcontrol(s,glocal.control,glocal.user_control)==-1){ some_errors = true; } if (!some_errors && s.is_ok()){ chmod (glocal.port,0666); if (glocal.daemon){ daemon_init(glocal.pidfile,glocal.user); } s.setrawmode(true); s.loop(); ret = 0; } return ret; return glocal.ret; }