From c26a1bf65370dc435e286a70bc3d11e7ad9fc39e Mon Sep 17 00:00:00 2001 From: Harvey Fong Date: Sat, 1 Mar 2025 09:17:31 -0700 Subject: [PATCH] tweaking timeout, double checked connection reset --- bellatui.cpp | 112 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 72 insertions(+), 40 deletions(-) diff --git a/bellatui.cpp b/bellatui.cpp index 7d3a8bb..a9ecaa0 100644 --- a/bellatui.cpp +++ b/bellatui.cpp @@ -32,7 +32,7 @@ std::atomic abort_state (false); std::atomic server (false); -std::string get_pubkey_from_srv(std::string server_address, uint16_t pubkey_port); +std::string get_pubkey_from_srv(std::string server_address, uint16_t publickey_port); void client_thread( std::string server_pkey, std::string client_pkey, @@ -41,13 +41,14 @@ void client_thread( std::string server_pkey, uint16_t command_port); void server_thread( std::string server_skey, uint16_t command_port, + bool test_render, Engine engine); void openFileWithDefaultProgram(const std::string& filePath); bool ends_with_suffix(const std::string& str, const std::string& suffix); -void pkey_server(const std::string& pub_key, uint16_t heartbeat_port); +void pkey_server(const std::string& pub_key, uint16_t publickey_port); struct MyEngineObserver : public EngineObserver { @@ -113,13 +114,12 @@ void heartbeat_thread( std::string server_pkey, //CLIENT heartbeat_sock.set(zmq::sockopt::curve_server, true); heartbeat_sock.set(zmq::sockopt::curve_secretkey, server_skey); std::string url = "tcp://*:" + std::to_string(heartbeat_port); - std::cout << "heart url " << url << std::endl; heartbeat_sock.bind(url); while(true) { //Start polling heartbeats once client connects if (connection_state == true) { zmq::pollitem_t response_item = { heartbeat_sock, 0, ZMQ_POLLIN, 0 }; - zmq::poll(&response_item, 1, 25000); // Wait for response with timeout + zmq::poll(&response_item, 1, 5000); // Wait for response with timeout if (response_item.revents & ZMQ_POLLIN) { zmq::message_t message; @@ -182,17 +182,19 @@ int DL_main(Args& args) std::string server_address = "localhost"; uint16_t command_port = 5797; uint16_t heartbeat_port = 5798; - uint16_t pubkey_port = 5799; - + uint16_t publickey_port = 5799; + bool test_render = false; /*logBanner("Bella Engine SDK (version: %s, build date: %llu)", bellaSdkVersion().toString().buf(), bellaSdkBuildDate() );*/ args.add("ip", "serverAddress", "localhost", "Bella render server ip address"); - args.add("cp", "commandPort", "5556", "tcp port for zmq server socket for commands"); - args.add("hp", "heartbeatPort", "5555", "tcp port for zmq server socket for heartbeats"); + args.add("cp", "commandPort", "", "tcp port for zmq server socket for commands"); + args.add("hp", "heartbeatPort", "", "tcp port for zmq server socket for heartbeats"); + args.add("pp", "publickeyPort", "", "tcp port for zmq server socket for server pubkey"); args.add("s", "server", "", "turn on server mode"); + args.add("tr", "testRender", "", "force res to 100x100"); //args.add("e", "ext", "", "set render extension, default png"); if (args.versionReqested()) @@ -212,6 +214,11 @@ int DL_main(Args& args) { server=true; } + + if (args.have("--testRender")) + { + test_render=true; + } if (args.have("--serverAddress")) { @@ -228,6 +235,7 @@ int DL_main(Args& args) std::cerr << "invalid --heartbeatPort" << argString << std::endl; } } + if (args.have("--commandPort")) { String argString = args.value("--commandPort"); @@ -239,12 +247,25 @@ int DL_main(Args& args) } } + if (args.have("--publickeyPort")) + { + String argString = args.value("--publickeyPort"); + uint16_t u16; + if (argString.parse(u16)) { + publickey_port = u16; + } else { + std::cerr << "invalid --commandPort" << argString << std::endl; + } + } + + Engine engine; engine.scene().loadDefs(); // Generate brand new keypair on launch // [TODO] Add client side public key fingerprinting for added security if(server.load()) { + std::cout << "BellaTUI server started ..." << std::endl; char server_skey[41] = { 0 }; char server_pkey[41] = { 0 }; if ( zmq_curve_keypair(&server_pkey[0], &server_skey[0])) { @@ -252,7 +273,7 @@ int DL_main(Args& args) std::cout << "\ncurve keypair gen failed."; exit(EXIT_FAILURE); } - std::thread server_t(server_thread, server_skey, command_port, engine); + std::thread server_t(server_thread, server_skey, command_port, test_render, engine); ///std::thread heartbeat_t(heartbeat_thread, server_skey, server.load(), 5555); std::thread heartbeat_t(heartbeat_thread, //function "", //NA Public server key @@ -265,13 +286,14 @@ int DL_main(Args& args) // while(true) { // awaiting new client loop heartbeat_state = true; - pkey_server(server_pkey, pubkey_port); // blocking wait client to get public key + std::cout << "Awaiting new client ..." << std::endl; + pkey_server(server_pkey, publickey_port); // blocking wait client to get public key std::cout << "Client connected" << std::endl; while(true) { // inner loop if (heartbeat_state.load()==false) { std::cout << "Client connectiono dead" << std::endl; - break; + break; // Go back to awaiting client } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } @@ -282,7 +304,6 @@ int DL_main(Args& args) return 0; } else { - std::string ip_address = "localhost"; char client_skey[41] = { 0 }; char client_pkey[41] = { 0 }; if ( zmq_curve_keypair(&client_pkey[0], &client_skey[0])) { @@ -291,12 +312,10 @@ int DL_main(Args& args) exit(EXIT_FAILURE); } - std::string server_pkey = get_pubkey_from_srv(server_address, pubkey_port); + std::string server_pkey = get_pubkey_from_srv(server_address, publickey_port); std::string client_pkey_str(client_pkey); std::string client_skey_str(client_skey); - std::cout << server_address << " " << command_port << std::endl; - // Multithreaded std::thread command_t( client_thread, server_pkey, @@ -330,7 +349,7 @@ int DL_main(Args& args) } } -std::string get_pubkey_from_srv(std::string server_address, uint16_t pubkey_port) { +std::string get_pubkey_from_srv(std::string server_address, uint16_t publickey_port) { // No authentication is used, server will give out pubkey to anybody // Could use a unique message but since socket is unencrypted this provides // no protection. In main loop we establish an encrypted connection with the server @@ -340,7 +359,7 @@ std::string get_pubkey_from_srv(std::string server_address, uint16_t pubkey_port // 0MQ after we establish our intitial encrypted socket zmq::context_t ctx; zmq::socket_t pubkey_sock(ctx, zmq::socket_type::req); - std::string url = "tcp://" + server_address + ":" + std::to_string(pubkey_port); + std::string url = "tcp://" + server_address + ":" + std::to_string(publickey_port); pubkey_sock.connect(url); zmq::message_t z_out(std::string("Bellarender123")); @@ -350,13 +369,13 @@ std::string get_pubkey_from_srv(std::string server_address, uint16_t pubkey_port std::cout << "ERROR" << std::endl; } - std::cout << "bellazmq connecting to server..." << std::endl; + std::cout << "\nbellatui connecting to " << server_address << " ..." << std::endl; zmq::message_t z_in; pubkey_sock.recv(z_in); std::string pub_key = z_in.to_string(); pubkey_sock.close(); ctx.close(); - std::cout << "connection successful" << std::endl; + std::cout << "Connection to " << server_address << " successful" << std::endl; connection_state = true; return pub_key; } @@ -366,7 +385,7 @@ void client_thread( std::string server_pkey, std::string client_skey, std::string server_address, uint16_t command_port ) { - std::cout << "client thread: " << server_address << " " << command_port << std::endl; + //std::cout << "client thread: " << server_address << " " << command_port << std::endl; const size_t chunk_size = 65536; zmq::context_t ctx; zmq::socket_t command_sock (ctx, zmq::socket_type::req); @@ -379,7 +398,7 @@ void client_thread( std::string server_pkey, //std::string url = "tcp://" + server_address+ ":" + std::to_string(command_port); std::string url = "tcp://" + server_address + ":" + std::to_string(command_port); - std::cout << "client thread " << url << std::endl; + //std::cout << "client thread " << url << std::endl; command_sock.connect(url); std::string input; @@ -421,8 +440,7 @@ void client_thread( std::string server_pkey, continue; } } else if (command == "exit") { - std::cout << "now" << std::endl; - break; + ; } else if (command == "render") { std::string compoundArg; if(num_args > 1) { @@ -438,6 +456,15 @@ void client_thread( std::string server_pkey, ; } else if (command == "stat") { ; + } else if (command == "help") { + std::cout << "\033[32msend file.bsz\033[0m upload bella scene to server\n"; + std::cout << "\033[32mrender\033[0m start render on server\n"; + std::cout << "\033[32mget file.png\033[0m download png from server\n"; + std::cout << "\033[32mstop\033[0m stop render on server\n"; + std::cout << "\033[32mstat\033[0m display progress\n"; + continue; + } else if (command == "stop") { + ; } else { std::cout << "unknown" << std::endl; continue; @@ -455,12 +482,12 @@ void client_thread( std::string server_pkey, //ZIN<<< command_sock.recv(server_response, zmq::recv_flags::none); //RECV std::string response_str(static_cast(server_response.data()), server_response.size()-1); - //std::string response_str(static_cast(server_response.data()), server_response.size()); + std::string response_str2(static_cast(server_response.data()), server_response.size()); if(response_str=="RDY") { // Server acknowledges readiness for multi message commands std::cout << "Server Readiness: " << response_str << std::endl; - if(input == "exit") { - break; + if(command == "exit") { + exit(0); // RENDER } else if(command == "render") { //>>>ZOUT @@ -552,7 +579,7 @@ void client_thread( std::string server_pkey, } } } else { - std::cout << "Server response: \033[32m" << response_str << "\033[0m" << std::endl; + std::cout << "Server response: \033[32m" << response_str2 << "\033[0m" << std::endl; } } command_sock.close(); @@ -561,6 +588,7 @@ void client_thread( std::string server_pkey, void server_thread( std::string server_skey, uint16_t command_port, + bool test_render, Engine engine) { MyEngineObserver engineObserver; engine.subscribe(&engineObserver); @@ -573,7 +601,6 @@ void server_thread( std::string server_skey, command_sock.set(zmq::sockopt::curve_secretkey, server_skey); //command_sock.set(zmq::sockopt::linger, 100); // Close immediately on disconnect std::string url = "tcp://*:" + std::to_string(command_port); - std::cout << "server thread " << url << std::endl; command_sock.bind(url); zmq::message_t client_response; @@ -596,19 +623,27 @@ void server_thread( std::string server_skey, //>>>ZOUT command_sock.send(zmq::message_t("bye"), zmq::send_flags::none); } else if (client_command == "exit") { - std::cout << "exit" << std::endl; + std::cout << "Client disconnecting..." << std::endl; //>>>ZOUT - command_sock.send(zmq::message_t("ACK"), zmq::send_flags::none); + command_sock.send(zmq::message_t("RDY"), zmq::send_flags::none); + heartbeat_state = false; + connection_state = false; + abort_state = true; + break; // RENDER } else if (client_command == "render") { - //engine.scene().read("./oomer.bsz"); - //engine.scene().camera()["resolution"] = Vec2 {200, 200}; - //engine.start(); std::cout << "start render" << std::endl; - engine.scene().camera()["resolution"]= Vec2 {100, 100}; + if(test_render) { + engine.scene().camera()["resolution"]= Vec2 {100, 100}; + } engine.start(); //>>>ZOUT - command_sock.send(zmq::message_t("ACK"), zmq::send_flags::none); + command_sock.send(zmq::message_t("render started...type stat to get progress"), zmq::send_flags::none); + } else if (client_command == "stop") { + std::cout << "stop render" << std::endl; + engine.stop(); + //>>>ZOUT + command_sock.send(zmq::message_t("render stopped"), zmq::send_flags::none); //GET } else if (client_command == "get") { //REP mode @@ -785,16 +820,13 @@ bool ends_with_suffix(const std::string& str, const std::string& suffix) { } // Blocking zmq rep socket to pass server_public_key -void pkey_server(const std::string& pub_key, uint16_t heartbeat_port) { +void pkey_server(const std::string& pub_key, uint16_t publickey_port) { zmq::context_t ctx; zmq::socket_t sock(ctx, zmq::socket_type::rep); - // reuse heartbeat_port because 2 is enough - std::string url = "tcp://*:" + std::to_string(heartbeat_port); - std::cout << "pkey url " << url << std::endl; + std::string url = "tcp://*:" + std::to_string(publickey_port); sock.bind(url); zmq::message_t z_in; - std::cout << "Entered: Public Key Serving Mode" << std::endl; //ZIN<<< sock.recv(z_in); if (z_in.to_string().compare("Bellarender123") == 0) {