/* Experience pour réaliser POPEN avec des coroutines Idéalement,on obtient un générateur qui produit des lignes stdout et stderr. */ using namespace std; #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using boost::asio::ip::tcp; using boost::asio::awaitable; using boost::asio::co_spawn; using boost::asio::detached; using boost::asio::use_awaitable; using boost::asio::as_tuple; using boost::asio::experimental::generator; using namespace boost::asio::experimental::awaitable_operators; namespace this_coro = boost::asio::this_coro; #if 0 awaitable echo(tcp::socket socket) { try { char data[1024]; for (;;) { std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable); co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable); } } catch (std::exception& e) { cout << format ("echo Exception: {}\n", e.what()); } } awaitable listener() { auto executor = co_await this_coro::executor; tcp::acceptor acceptor(executor, {tcp::v4(), 55555}); for (;;) { tcp::socket socket = co_await acceptor.async_accept(use_awaitable); } } #endif struct HANDLES{ int in=-1; int out = -1; int err = -1; int ctrl = -1; ~HANDLES(){ for (auto fd:{in,out,err,ctrl}){ if (fd != -1) close (fd); } } }; static int execcmd (string_view cmd, HANDLES &handles) { int ret = -1; int tbin[2],tbout[2],tberr[2],tbctrl[2]; bool one_error = false; for (auto &tb:{tbin,tbout,tberr,tbctrl}){ if (pipe(tb)==-1){ cerr << format ("Can set pipe for command {}, {}\n",cmd,strerror(errno)); one_error = true; break; } } if (!one_error){ cerr << "exec apres pipe\n"; pid_t pid = fork(); if (pid == (pid_t)-1){ cerr << format ("Can't fork for command {}, {}\n",cmd,strerror(errno)); }else if (pid == (pid_t)0){ dup2(tbin[0],0); dup2(tbout[1],1); dup2(tberr[1],2); for (auto &tb:{tbin,tbout,tberr}){ close (tb[0]); close (tb[1]); } close (tbctrl[0]); string tmp(cmd); signal (SIGCHLD,SIG_DFL); int status = system(tmp.c_str()); write (tbctrl[1],&status,sizeof(status)); _exit (0); }else{ close (tbin[0]); close (tbout[1]); close (tberr[1]); close (tbctrl[1]); handles.in = tbin[1]; handles.out = tbout[0]; handles.err = tberr[0]; handles.ctrl = tbctrl[0]; ret = 0; } } cerr << format ("execcmd ret={}\n",ret); return ret; } #if 0 static void attend() { cerr << "enter "; char tb[100]; fgets(tb,99,stdin); cerr << "go\n"; } #endif // Use to split input buffer into lines struct INPUT_BUF{ string buf; size_t pos=0; unsigned nblines=0; // Number of complete line (with \n) in the buffer void clear(){ buf.clear(); pos = 0; nblines = 0; } void clear_start(){ if (pos > 0){ buf.erase(0,pos); pos = 0; } } // Find the number of complete line in the buffer void set_nblines(){ nblines = 0; for (auto c:buf|views::drop(pos)){ //size_t i = pos; i 0 || (eof && pos < buf.size()); } }; /* Read one complete line or up to size byte in "line". If there is no complete line, nothing is read */ int readline (string &line, INPUT_BUF &buf, bool eof) { line.clear(); bool eol = false; for (size_t i=buf.pos; i 0 ? 0 : -1; } #include "../trunk/c++script.h" #include //boost::asio::experimental::generator commandexec ( boost::cobalt::generator commandexec ( //boost::asio::io_context &io_context, string_view cmd) { HANDLES handles; cerr << format("command {}\n",cmd); cpps::COMMANDEXEC_line ctrl; ctrl.line = "no exec"; if (execcmd (cmd,handles) != -1){ int forever[2]; if (pipe(forever) == -1){ cerr << format("Can't set forever pipe for command: {}\n",cmd); co_return cpps::COMMANDEXEC_line(); } auto context = co_await boost::cobalt::this_coro::executor; boost::asio::basic_readable_pipe pipe_out(context,handles.out); boost::asio::basic_readable_pipe pipe_err(context,handles.err); boost::asio::basic_readable_pipe pipe_ctrl(context,handles.ctrl); // This readable_pipe is connected to an unused pipe. Noone writes to forever[1] // When one pipe is ended (!pipe_xxx_ok), we co_wwait on the forever pipe // See the co_wait statement below where p_xxx is either pipe_xxx or pipe_forever boost::asio::basic_readable_pipe pipe_forever(context,forever[0]); auto *p_out = &pipe_out; auto *p_err = &pipe_err; auto *p_ctrl = &pipe_ctrl; int nb_pipe_ok = 3; cerr << format("attend {} fds={} {} {} {}\n",cmd,handles.in,handles.out,handles.err,handles.ctrl); INPUT_BUF bufout,buferr; while (nb_pipe_ok > 0){ cerr << format("nb_pipe_ok={}\n",nb_pipe_ok); char data_out[1000],data_err[1000],data_ctrl[sizeof(int)]; //co_await p_out->async_read_some(boost::asio::buffer(data_out), boost::cobalt::use_op); #if 1 using tuple_res = std::tuple; #if 0 std::variant res = co_await ( p_out->async_read_some(boost::asio::buffer(data_out), as_tuple(boost::cobalt::use_op)) || p_err->async_read_some(boost::asio::buffer(data_err), as_tuple(boost::cobalt::use_op)) || p_ctrl->async_read_some(boost::asio::buffer(data_ctrl), as_tuple(boost::cobalt::use_op))); #else boost::variant2::variant res = co_await boost::cobalt::race ( p_out->async_read_some(boost::asio::buffer(data_out), as_tuple(boost::cobalt::use_op)) , p_err->async_read_some(boost::asio::buffer(data_err), as_tuple(boost::cobalt::use_op)) , p_ctrl->async_read_some(boost::asio::buffer(data_ctrl), as_tuple(boost::cobalt::use_op))); #endif //cerr << format ("read_some index={}\n",res.index()); if (res.index()==0){ auto [ec,size] = boost::variant2::get<0>(res); bool eof = false; if (ec){ nb_pipe_ok--; p_out = &pipe_forever; cerr << "pipe_out exception\n"; eof = true; }else{ cerr << format ("pipe_out size={}\n",size); bufout.append(data_out,size); } while (bufout.has_lines(eof)){ cpps::COMMANDEXEC_line out; readline (out.line,bufout,eof); co_yield out; } }else if (res.index()==1){ auto [ec,size] = boost::variant2::get<1>(res); bool eof = false; if (ec){ nb_pipe_ok--; p_err = &pipe_forever; cerr << "pipe_err exception\n"; eof = true; }else{ cerr << format ("pipe_err size={}\n",size); //err.is_err = true; buferr.append(data_err,size); } while (buferr.has_lines(eof)){ cpps::COMMANDEXEC_line out; readline (out.line,buferr,eof); co_yield out; } }else if (res.index()==2){ auto [ec,size] = boost::variant2::get<2>(res); if (ec){ nb_pipe_ok--; p_ctrl = &pipe_forever; cerr << "pipe_ctrl exception\n"; }else if (size != sizeof(data_ctrl)){ cerr << format ("Invalid size for data_ctrl: {} != {}\n",size,sizeof(data_ctrl)); break; }else{ int status = *((int*)data_ctrl); cerr << format ("pipe_ctrl size={} retcod={} signal={}\n",size,status>>8,status&0xff); //ctrl.is_err = true; ctrl.line = format("retcod={} signal={}",status>>8,status&0xff); } } #endif } close (forever[0]); close (forever[1]); } cerr << format ("fin command {}\n",cmd); co_return ctrl; } #if 0 struct COMMANDEXEC_end{}; class COMMANDEXEC_iterator{ private: void fill_line(); int readnext() const; class POPENUSER *pop = nullptr; COMMANDEXEC_line content; public: using value_type = COMMANDEXEC_line; using difference_type = int; COMMANDEXEC_iterator(POPENUSER *_pop); COMMANDEXEC_iterator(const COMMANDEXEC_iterator &n); COMMANDEXEC_iterator & operator = (const COMMANDEXEC_iterator &n); ~COMMANDEXEC_iterator(); COMMANDEXEC_iterator & operator ++(); COMMANDEXEC_iterator operator ++(int); COMMANDEXEC_line & operator *() const; bool operator == (const COMMANDEXEC_iterator &d) const; bool operator == (const COMMANDEXEC_end &d) const; }; class COMMANDEXEC{ std::shared_ptr pop; std::string command; public: using iterator = COMMANDEXEC_iterator; using const_iterator = COMMANDEXEC_iterator; using difference_type = size_t; COMMANDEXEC(std::string_view command); COMMANDEXEC(const COMMANDEXEC &n); ~COMMANDEXEC(); iterator begin() noexcept; const_iterator begin() const noexcept; COMMANDEXEC_end end(); }; #endif //static boost::asio::io_context io_context(1); class COMMAND_test{ std::string command; // Shell command executed with system(). public: COMMAND_test (std::string_view _command): command(_command){} //boost::asio::experimental::generator run() const {return commandexec (io_context,command); } boost::cobalt::generator run() const {return commandexec (command); } const std::string &get() const { return command; } static auto store_error (std::vector &tb, unsigned maxlines=1000){ return std::views::filter([&tb,maxlines](auto &&line) mutable{ if (line.is_error() && tb.size() < maxlines) tb.push_back(line.line); return line.is_output(); }); } }; std::generator loopint() { co_yield 1; } template boost::cobalt::promise endnow(auto &g, RETVAL &val) { cout << format("endnow1\n"); val = co_await g; cout << format("endnow2\n"); co_return 0; } template struct COROITER{ CORO &coro; COROITER(CORO &_coro): coro(_coro){} struct CORO_end{ }; struct CORO_iter{ CORO &coro; RETVAL val; CORO_iter(CORO &_coro): coro(_coro) {} bool operator == (const CORO_end &){ return !(coro); } CORO_iter operator ++(){ //cout << format("iter has_data1={} {}\n",coro->has_data(),coro->prom->has_value); if (coro){ //CORO_iter::endnow(*coro); endnow (coro,val); #if 0 cout << format ("waitnow coro->get={}\n",coro->get()); if (g){ cout << "endnow running\n"; }else{ cout << "endnow stopped\n"; } #endif } //cout << format("iter has_data2={} {}\n",coro->has_data(),coro->prom->has_value); return *this; } RETVAL operator * () const { return val; } }; CORO_iter begin() { auto ret = CORO_iter(coro); ++ret; return ret; } CORO_end end() const { return CORO_end(); } }; boost::cobalt::main co_main(int argc, char * argv[]) { cpps::progdesc ("popen avec coroutine"); cpps::option cmd ('c',"command","Commande à exécuter","/usr/bin/ls -l /tmp",false); cpps::option verbose ('v',"verbose","Plus d'information",false,false); cpps::endoptions(argc,argv); try { auto context = co_await boost::cobalt::this_coro::executor; boost::asio::signal_set signals(context, SIGINT, SIGTERM); signal (SIGCHLD,SIG_IGN); //signals.async_wait([&](auto, auto){ context.stop(); }); //auto gg = loopint(); //auto pp = gg.promise(); //auto pp = gg(); auto g = commandexec(cmd.val); #if 0 while (g){ auto val = co_await g; cout << format ("line type={}: {}\n",val.is_output(),val.line); } #else auto coiter = COROITER (g); for (auto l:coiter); #endif #if 0 //auto handle = g.promise(); auto aw = g.receiver_.get_awaitable(); while (g){ cpps::COMMANDEXEC_line val; if (aw.await_ready()){ val = g.get(); cout << "ready "; cout << format ("line type={}: {}\n",val.is_output(),val.line); cerr << "await_suspend() called\n"; aw.await_suspend(std::noop_coroutine()); cerr << "await_suspend(handle) done\n"; }else{ cerr << "avait_resume called\n"; aw.await_resume(); cerr << "avait_resume done\n"; //val = co_await g; cout << " "; } } #endif //io_context.run(); } catch (std::exception& e) { cout << format ("Exception: {}\n", e.what()); } }