/* * BellaTUI - A Client-Server Rendering Application * * This application provides a command-line interface for a rendering system called Bella. * It consists of two main parts: * 1. A server component that handles rendering operations * 2. A client component that sends commands and files to the server * * The application uses ZeroMQ (ZMQ) for secure network communication between client and server. * Key features: * - Secure communication using CURVE encryption * - File transfer capabilities (.bsz files) * - Real-time rendering status updates * - Heartbeat monitoring to check connection status */ #include #include #include #include #include #include #include #include // For sprintf #include #include // For string streams #include #include // Add this line for std::mutex and std::lock_guard #include // Add this line for std::map #include // For std::system #include // For std::runtime_error #ifdef _WIN32 #include // For ShellExecuteW #include // For ShellExecuteW #include // For wstring_convert #elif defined(__APPLE__) || defined(__linux__) #include // For fork, exec #include // For waitpid #endif #include // For file watching #include // For file watching #include // For file watching #include #include #include "../bella_engine_sdk/src/bella_sdk/bella_engine.h" // For rendering #include "../bella_engine_sdk/src/dl_core/dl_fs.h" // For rendering using namespace dl; using namespace dl::bella_sdk; //#include "../oom/oom_bella_long.h" //#include "../oom/oom_bella_scene.h" //#include "../oom/oom_misc.h" // common misc code #include "../oom/oom_license.h" // common license //#include "../oom/oom_voxel_vmax.h" // common vmax voxel code and structures //#include "../oom/oom_voxel_ogt.h" // common opengametools voxel conversion wrappers //Forward declarations std::string bellaSliderPreviewsHTML(); //dl::Mat4 oomer_orbit(dl::Mat4 beginCamXform, int currentFrame, int totalFrames); /// A class that manages a queue of files to render with both FIFO order and fast lookups class RenderQueue { public: // Default constructor RenderQueue() = default; // Move constructor RenderQueue(RenderQueue&& other) noexcept { std::lock_guard lock(other.mutex); pathVector = std::move(other.pathVector); pathMap = std::move(other.pathMap); } // Move assignment operator RenderQueue& operator=(RenderQueue&& other) noexcept { if (this != &other) { std::lock_guard lock1(mutex); std::lock_guard lock2(other.mutex); pathVector = std::move(other.pathVector); pathMap = std::move(other.pathMap); } return *this; } // Delete copy operations since mutexes can't be copied RenderQueue(const RenderQueue&) = delete; RenderQueue& operator=(const RenderQueue&) = delete; // Add a file to the queue if it's not already there bool push(const dl::String& path) { std::lock_guard lock(mutex); if (pathMap.find(path) == pathMap.end()) { pathVector.push_back(path); pathMap[path] = true; return true; } return false; } // Get the next file to render (FIFO order) bool pop(dl::String& outPath) { std::lock_guard lock(mutex); if (!pathVector.empty()) { outPath = pathVector.front(); pathVector.erase(pathVector.begin()); pathMap.erase(outPath); return true; } return false; } // Remove a specific file by name bool remove(const dl::String& path) { std::lock_guard lock(mutex); if (pathMap.find(path) != pathMap.end()) { // Remove from vector using erase-remove idiom pathVector.erase( std::remove(pathVector.begin(), pathVector.end(), path), pathVector.end() ); // Remove from map pathMap.erase(path); return true; } return false; } // Check if a file exists in the queue bool contains(const dl::String& path) const { std::lock_guard lock(mutex); return pathMap.find(path) != pathMap.end(); } // Get the number of files in the queue size_t size() const { std::lock_guard lock(mutex); return pathVector.size(); } // Check if the queue is empty bool empty() const { std::lock_guard lock(mutex); return pathVector.empty(); } // Clear all files from the queue void clear() { std::lock_guard lock(mutex); pathVector.clear(); pathMap.clear(); } private: std::vector pathVector; // Maintains FIFO order std::map pathMap; // Enables fast lookups mutable std::mutex mutex; // Thread safety }; std::atomic active_render(false); //bool active_render(false); //RenderQueue renderQueue; // Replace the old vector and map with our new class //std::mutex renderQueueMutex; // Add mutex for thread safety //std::vector renderDelete; // This is the efsw queue for when we delete a file //std::mutex renderDeleteMutex; // Add mutex for thread safety dl::String currentRender; std::mutex currentRenderMutex; // Add mutex for thread safety // Queues for incoming files from the efsw watcher RenderQueue incomingDeleteQueue; RenderQueue incomingRenderQueue; std::mutex incomingDeleteQueueMutex; // Add mutex for thread safety std::mutex incomingRenderQueueMutex; // Add mutex for thread safety /// Processes a file action libefsw class UpdateListener : public efsw::FileWatchListener { public: UpdateListener() : should_stop_(false) {} void stop() { should_stop_ = true; } std::string getActionName( efsw::Action action ) { switch ( action ) { case efsw::Actions::Add: return "Add"; case efsw::Actions::Modified: return "Modified"; case efsw::Actions::Delete: return "Delete"; case efsw::Actions::Moved: return "Moved"; default: return "Bad Action"; } } void handleFileAction( efsw::WatchID watchid, const std::string& dir, const std::string& filename, efsw::Action action, std::string oldFilename = "" ) override { if (should_stop_) return; // Early exit if we're stopping std::string actionName = getActionName( action ); /*std::cout << "Watch ID " << watchid << " DIR (" << dir + ") FILE (" + ( oldFilename.empty() ? "" : "from file " + oldFilename + " to " ) + filename + ") has event " << actionName << std::endl;*/ if (actionName == "Delete") { dl::String belPath = (dir + filename).c_str(); if (belPath.endsWith(".bsz") || belPath.endsWith(".zip")) { { std::lock_guard lock(incomingDeleteQueueMutex); if (!incomingDeleteQueue.contains(belPath)) { incomingDeleteQueue.push(belPath); std::cout << "\n==" << "STOP RENDER: " << belPath.buf() << "\n==" << std::endl; } } } } if (actionName == "Add" || actionName == "Modified") { dl::String belPath = (dir + filename).c_str(); dl::String parentPath = dir.c_str(); //std::cout << "parentPath: " << parentPath.buf() << std::endl; if (should_stop_) return; // Check again before starting render if (parentPath.endsWith("download")) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); return; } if (belPath.endsWith(".bsz") || belPath.endsWith(".zip") && !parentPath.endsWith("download/")) { { std::lock_guard lock(incomingRenderQueueMutex); if (!incomingRenderQueue.contains(belPath)) { incomingRenderQueue.push(belPath); std::cout << "\n==" << "RENDER QUEUED: " << belPath.buf() << "\n==" << std::endl; } } } } } private: std::atomic should_stop_; // ctrl-c was not working, so we use this to stop the thread }; // Global state variables //std::string initializeGlobalLicense(); // Function to return license text //std::string initializeGlobalThirdPartyLicences(); // Function to return third-party licenses std::atomic connection_state (false); // Tracks if client/server are connected std::atomic abort_state (false); // Used to signal program termination std::atomic server (true); // Indicates if running in server mode UpdateListener* global_ul = nullptr; // Global pointer to UpdateListener // Function declarations std::string get_pubkey_from_srv(std::string server_address, uint16_t publickey_port); // Gets server's public key for encryption bool STOP = false; void sigend( int ) { std::cout << std::endl << "Bye bye" << std::endl; STOP = true; if (global_ul) { // Use the global pointer global_ul->stop(); } // Give a short time for cleanup std::this_thread::sleep_for(std::chrono::milliseconds(100)); exit(0); // Force exit after cleanup } efsw::WatchID handleWatchID( efsw::WatchID watchid ) { switch ( watchid ) { case efsw::Errors::FileNotFound: case efsw::Errors::FileRepeated: case efsw::Errors::FileOutOfScope: case efsw::Errors::FileRemote: case efsw::Errors::WatcherFailed: case efsw::Errors::Unspecified: { std::cout << efsw::Errors::Log::getLastErrorLog().c_str() << std::endl; break; } default: { std::cout << "Added WatchID: " << watchid << std::endl; } } return watchid; } static int s_logCtx = 0; static void log(void* /*ctx*/, LogType type, const char* msg) { switch (type) { case LogType_Info: DL_PRINT("[INFO] %s\n", msg); break; case LogType_Warning: DL_PRINT("[WARN] %s\n", msg); break; case LogType_Error: DL_PRINT("[ERROR] %s\n", msg); break; case LogType_Custom: DL_PRINT("%s\n", msg); break; } } // Main client communication thread void client_thread( std::string server_pkey, std::string client_pkey, std::string client_skey, std::string server_address, uint16_t command_port); // Utility function to open files with system default program void openFileWithDefaultProgram(const std::string& filePath); // Helper function to check file extensions bool ends_with_suffix(const std::string& str, const std::string& suffix); // Server function to handle initial key exchange void pkey_server(const std::string& pub_key, uint16_t publickey_port); /* * MyEngineObserver Class * This class receives callbacks from the Bella rendering engine to track rendering progress. * It implements the EngineObserver interface and provides methods to: * - Handle render start/stop events * - Track rendering progress * - Handle error conditions * - Store and retrieve the current progress state */ struct MyEngineObserver : public EngineObserver { public: // Called when a rendering pass starts void onStarted(String pass) override { std::cout << "Started pass " << pass.buf() << std::endl; logInfo("Started pass %s", pass.buf()); } // Called to update the current status of rendering //void onStatus(String pass, String status) override //{ // logInfo("%s [%s]", status.buf(), pass.buf()); //} // Called to update rendering progress (percentage, time remaining, etc) void onProgress(String pass, Progress progress) override { std::cout << progress.toString().buf() << std::endl; setString(new std::string(progress.toString().buf())); logInfo("%s [%s]", progress.toString().buf(), pass.buf()); } //void onImage(String pass, Image image) override //{ // logInfo("We got an image %d x %d.", (int)image.width(), (int)image.height()); //} // Called when an error occurs during rendering void onError(String pass, String msg) override { logError("%s [%s]", msg.buf(), pass.buf()); } // Called when a rendering pass completes void onStopped(String pass) override { logInfo("Stopped %s", pass.buf()); active_render = false; } // Returns the current progress as a string std::string getProgress() const { std::string* currentProgress = progressPtr.load(); if (currentProgress) { return *currentProgress; } else { return ""; } } // Cleanup resources in destructor ~MyEngineObserver() { setString(nullptr); } private: // Thread-safe pointer to current progress string std::atomic progressPtr{nullptr}; // Helper function to safely update the progress string void setString(std::string* newStatus) { std::string* oldStatus = progressPtr.exchange(newStatus); delete oldStatus; // Clean up old string if it exists } }; // Main server thread that handles client requests void server_thread( std::string server_skey, uint16_t command_port, bool test_render, Engine engine, MyEngineObserver& engineObserver); void render_thread( Engine engine, MyEngineObserver& engineObserver); /* * Heartbeat Monitoring System * * This function implements a heartbeat mechanism to monitor the connection between client and server. * It runs in a separate thread and: * - For server: listens for periodic messages from client * - For client: sends periodic messages to server * If either side stops receiving messages, it marks the connection as dead. * * Parameters: * - server_pkey: Server's public key (used by client) * - server_skey: Server's secret key (used by server) * - client_pkey: Client's public key (used by client) * - client_skey: Client's secret key (used by client) * - is_server: Boolean indicating if running in server mode * - server_address: Address of the server (used by client) * - heartbeat_port: Port number for heartbeat communication */ void heartbeat_thread( std::string server_pkey, std::string server_skey, std::string client_pkey, std::string client_skey, bool is_server, std::string server_address, uint16_t heartbeat_port ) { zmq::context_t ctx; // Create ZMQ context zmq::socket_t heartbeat_sock; // Socket for heartbeat messages if(is_server) { // Server mode: Listen for client heartbeats heartbeat_sock = zmq::socket_t(ctx, zmq::socket_type::rep); heartbeat_sock.set(zmq::sockopt::curve_server, true); heartbeat_sock.set(zmq::sockopt::curve_secretkey, server_skey); std::string url = "tcp://*:" + std::to_string(heartbeat_port); heartbeat_sock.bind(url); while(true) { // Only check heartbeats when client is connected if (connection_state == true) { // Wait up to 5 seconds for client heartbeat zmq::pollitem_t response_item = { heartbeat_sock, 0, ZMQ_POLLIN, 0 }; zmq::poll(&response_item, 1, 5000); if (response_item.revents & ZMQ_POLLIN) { // Received heartbeat from client zmq::message_t message; heartbeat_sock.recv(message, zmq::recv_flags::none); heartbeat_sock.send(zmq::message_t("ACK"), zmq::send_flags::dontwait); } else { // No heartbeat received - mark connection as dead std::cout << "Bella Client Lost" << std::endl; connection_state = false; } } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } else { // Client mode: Send heartbeats to server zmq::socket_t heartbeat_sock (ctx, zmq::socket_type::req); // Set up encryption keys heartbeat_sock.set(zmq::sockopt::curve_serverkey, server_pkey); heartbeat_sock.set(zmq::sockopt::curve_publickey, client_pkey); heartbeat_sock.set(zmq::sockopt::curve_secretkey, client_skey); heartbeat_sock.set(zmq::sockopt::linger, 1); std::string url = "tcp://" + server_address + ":" + std::to_string(heartbeat_port); heartbeat_sock.connect(url); while (true) { // Check if we should stop if(abort_state.load()==true) { break; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); if(connection_state == true) { // Send heartbeat to server heartbeat_sock.send(zmq::message_t("ACK"), zmq::send_flags::none); // Wait for server response zmq::pollitem_t response_item = { heartbeat_sock, 0, ZMQ_POLLIN, 0 }; zmq::poll(&response_item, 1, 5000); if (response_item.revents & ZMQ_POLLIN) { // Got response from server zmq::message_t msg_response; heartbeat_sock.recv(msg_response, zmq::recv_flags::none); } else { // No response - mark connection as dead std::cout << "Bella Server is unavailable" << std::endl; connection_state = false; break; } } } } // Clean up resources heartbeat_sock.close(); ctx.close(); } void file_watcher_thread(const std::string& watch_path = "") { bool commonTest = true; bool useGeneric = false; global_ul = new UpdateListener(); efsw::FileWatcher fileWatcher(useGeneric); fileWatcher.followSymlinks(false); fileWatcher.allowOutOfScopeLinks(false); if (!watch_path.empty() && dl::fs::exists(watch_path.data())) { commonTest = false; if (fileWatcher.addWatch(watch_path, global_ul, true) > 0) { fileWatcher.watch(); std::cout << "Watching directory: " << watch_path << std::endl; } else { std::cout << "Error trying to watch directory: " << watch_path << std::endl; std::cout << efsw::Errors::Log::getLastErrorLog().c_str() << std::endl; return; } } else if (commonTest) { std::string CurPath(efsw::System::getProcessPath()); std::cout << "CurPath: " << CurPath.c_str() << std::endl; fileWatcher.watch(); handleWatchID(fileWatcher.addWatch(CurPath + "test", global_ul, true)); } while(STOP == false) { efsw::System::sleep(500); } delete global_ul; global_ul = nullptr; } /* * Main Program Entry Point * * This function initializes the application and handles command-line arguments. * It can run in either server or client mode: * - Server mode: Starts rendering engine and waits for client connections * - Client mode: Connects to server and sends commands * * Command-line arguments: * --server : Run in server mode * --serverAddress : IP address of server (for client mode) * --commandPort : Port for main command communication * --heartbeatPort : Port for connection monitoring * --publickeyPort : Port for initial key exchange * --testRender : Use small resolution for testing * --thirdparty : Show third-party licenses * --licenseinfo : Show license information */ #include "dl_core/dl_main.inl" #include "dl_core/dl_args.h" int DL_main(Args& args) { // Default configuration values const size_t chunk_size = 65536; std::string server_address = "localhost"; uint16_t command_port = 5797; uint16_t heartbeat_port = 5798; uint16_t publickey_port = 5799; bool test_render = false; Engine engine; engine.scene().loadDefs(); MyEngineObserver engineObserver; engine.subscribe(&engineObserver); // Very early on, we will subscribe to the global bella logging callback, and ask to flush // any messages that may have accumulated prior to this point. // subscribeLog(&s_logCtx, log); flushStartupMessages(); // Register command-line arguments args.add("sa", "serverAddress", "", "Bella render server ip address"); args.add("cp", "commandPort", "", "tcp port for zmq server socket for commands"); args.add("hp", "heartbeatPort", "", "tcp port for zmq server socket for heartbeats"); args.add("pp", "publickeyPort", "", "tcp port for zmq server socket for server pubkey"); args.add("c", "client", "", "turn on client mode"); args.add("tr", "testRender", "", "force res to 100x100"); args.add("tp", "thirdparty", "", "prints third party licenses"); args.add("li", "licenseinfo", "", "prints license info"); args.add("ef", "efsw", "", "mode efsw"); args.add("wd", "watchdir", "", "mode file warch"); args.add("j", "jason", "", "print hello jason"); // Handle special command-line options if (args.versionReqested()) { printf("%s", bellaSdkVersion().toString().buf()); return 0; } if (args.helpRequested()) { printf("%s", args.help("SDK Test", fs::exePath(), bellaSdkVersion().toString()).buf()); return 0; } // print hello jason if (args.have("--jason")) { std::cout << "Hello Jason!" << std::endl; return 0; } std::string path="."; if (args.have("--watchdir")) { path = args.value("--watchdir").buf(); } //EFSW mode alwys on // Create the file watcher thread std::thread watcher_thread(file_watcher_thread, path); // Don't wait for the thread to finish here, let it run in background watcher_thread.detach(); // Show license information if requested if (args.have("--licenseinfo")) { std::cout << "bellatui Copyright (c) Harvey Fong 2025" << std::endl; std::cout << oom::license::printLicense() << std::endl; return 0; } // Show third-party licenses if requested if (args.have("--thirdparty")) { std::cout << oom::license::printBellaSDK() << "\n====\n" << std::endl; std::cout << oom::license::printLibSodium() << "\n====\n" << std::endl; std::cout << oom::license::printLibZMQ() << "\n====\n" << std::endl; std::cout << oom::license::printCppZMQ() << "\n====\n" << std::endl; std::cout << oom::license::printEFSW() << "\n====\n" << std::endl; return 0; } // Check if running in server mode // Check if running in server mode if (args.have("--client")) { server=false; } // Enable test rendering if requested if (args.have("--testRender")) { test_render=true; } // Parse server address (for client mode) if (args.have("--serverAddress")) { server_address = args.value("--serverAddress").buf(); server=false; } // Parse port numbers if provided if (args.have("--heartbeatPort")) { String argString = args.value("--heartbeatPort"); uint16_t u16; if (argString.parse(u16)) { heartbeat_port = u16; } else { std::cerr << "invalid --heartbeatPort" << argString << std::endl; } } if (args.have("--commandPort")) { String argString = args.value("--commandPort"); uint16_t u16; if (argString.parse(u16)) { command_port = u16; } else { std::cerr << "invalid --commandPort" << argString << std::endl; } } if (args.have("--publickeyPort")) { String argString = args.value("--publickeyPort"); uint16_t u16; if (argString.parse(u16)) { publickey_port = u16; } else { std::cerr << "invalid --commandPort" << argString << std::endl; } } // Generate brand new keypair on launch // [TODO] Add client side public key fingerprinting for added security if(server.load()) { std::cout << "BellaTUI server started ..." << std::endl; char server_skey[41] = { 0 }; char server_pkey[41] = { 0 }; if ( zmq_curve_keypair(&server_pkey[0], &server_skey[0])) { // 1 is fail std::cout << "\ncurve keypair gen failed."; exit(EXIT_FAILURE); } std::thread server_t(server_thread, server_skey, command_port, test_render, engine, std::ref(engineObserver)); std::thread render_t(render_thread, engine, std::ref(engineObserver)); std::thread heartbeat_t(heartbeat_thread, //function "", //NA Public server key server_skey, //Secret servery key "", //NA Public client key "", //NA Secret client key true, //is server "", //FQDN or ip address of server heartbeat_port); //bind port // while(true) { // awaiting new client loop std::cout << "Awaiting new client ..." << std::endl; pkey_server(server_pkey, publickey_port); // blocking wait client to get public key std::cout << "Client connected" << std::endl; connection_state = true; while(true) { // inner loop if (connection_state.load()==false) { std::cout << "Client connection dead" << std::endl; break; // Go back to awaiting client } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } //abort_state==true; server_t.join(); heartbeat_t.join(); return 0; } else { //Client char client_skey[41] = { 0 }; char client_pkey[41] = { 0 }; if ( zmq_curve_keypair(&client_pkey[0], &client_skey[0])) { // 1 is fail std::cout << "\ncurve keypair gen failed."; exit(EXIT_FAILURE); } std::string server_pkey = get_pubkey_from_srv(server_address, publickey_port); std::string client_pkey_str(client_pkey); std::string client_skey_str(client_skey); // Multithreaded std::thread command_t( client_thread, server_pkey, client_pkey_str, client_skey_str, server_address, command_port); //std::thread heartbeat_t(heartbeat_thread, server_pkey, client_pkey_str, client_skey_str); std::thread heartbeat_t(heartbeat_thread, //function server_pkey, //Public server key "", //NA Secret server key client_pkey_str, //Public client key client_skey_str, //Secret client key false, //is server server_address, //Server FQDN or ip address heartbeat_port); //connect port while (true) { /*if (!heartbeat_state.load()) { std::cout << "Dead" << std::endl; abort_state==true; break; }*/ if (connection_state.load() == false) { std::cout << "Dead2" << std::endl; abort_state==true; break; } std::this_thread::sleep_for(std::chrono::milliseconds(500)); } } } std::string get_pubkey_from_srv(std::string server_address, uint16_t publickey_port) { // No authentication is used, server will give out pubkey to anybody // Could use a unique message but since socket is unencrypted this provides // no protection. In main loop we establish an encrypted connection with the server // now that we have the pubkey and in combo with the client_secret_key we can // be secure. 0MQ uses PFS perfect forward security, because this initial // back and forth is extended with behind the scenes new keypairs taken care of by // 0MQ after we establish our intitial encrypted socket zmq::context_t ctx; zmq::socket_t pubkey_sock(ctx, zmq::socket_type::req); std::string url = "tcp://" + server_address + ":" + std::to_string(publickey_port); pubkey_sock.connect(url); zmq::message_t z_out(std::string("Bellarender123")); try { zmq::send_result_t send_result = pubkey_sock.send(z_out, zmq::send_flags::none); } catch (const zmq::error_t& e) { std::cout << "ERROR" << std::endl; } std::cout << "\nbellatui connecting to " << server_address << " ..." << std::endl; zmq::message_t z_in; pubkey_sock.recv(z_in); std::string pub_key = z_in.to_string(); pubkey_sock.close(); ctx.close(); std::cout << "Connection to " << server_address << " successful" << std::endl; connection_state = true; return pub_key; } void client_thread( std::string server_pkey, std::string client_pkey, std::string client_skey, std::string server_address, uint16_t command_port ) { //std::cout << "client thread: " << server_address << " " << command_port << std::endl; const size_t chunk_size = 65536; zmq::context_t ctx; zmq::socket_t command_sock (ctx, zmq::socket_type::req); //command_sock.set(zmq::sockopt::sndtimeo, 10000); //command_sock.set(zmq::sockopt::rcvtimeo, 10000); command_sock.set(zmq::sockopt::curve_serverkey, server_pkey); command_sock.set(zmq::sockopt::curve_publickey, client_pkey); command_sock.set(zmq::sockopt::curve_secretkey, client_skey); command_sock.set(zmq::sockopt::linger, 1); // Close immediately on disconnect //std::string url = "tcp://" + server_address+ ":" + std::to_string(command_port); std::string url = "tcp://" + server_address + ":" + std::to_string(command_port); //std::cout << "client thread " << url << std::endl; command_sock.connect(url); std::string input; while (true) { if(abort_state.load()==true) { break; } std::getline(std::cin, input); std::stringstream ss(input); std::string arg; std::vector args; while (ss >> arg) { args.push_back(arg); } // Sanity checks on input before sending to server int num_args = args.size(); std::string command; if (num_args > 0) { command = args[0]; if ( command == "send") { if(num_args == 1) { std::cout << "Please provide a .bsz file" << std::endl; continue; } if(!ends_with_suffix(args[1],".bsz")) { std::cout << "Only .bsz files can be sent" << std::endl; continue; } if(!dl::fs::exists(args[1].data())) { std::cout << args[1] << " cannot be found" << std::endl; continue; } std::cout << "Sending:" << args[1] << std::endl; } else if (command == "get") { if(num_args == 1) { std::cout << "Please provide image filename" << std::endl; continue; } } else if (command == "exit") { ; } else if (command == "render") { std::string compoundArg; if(num_args > 1) { for (size_t i = 1; i < args.size(); ++i) { compoundArg += args[i]; if (i < args.size() - 1) { compoundArg += " "; // Add spaces between arguments } } std::cout << compoundArg << std::endl; } } else if (command == "hello") { ; } else if (command == "stat") { ; } else if (command == "help") { std::cout << "\033[32msend file.bsz\033[0m upload bella scene to server\n"; std::cout << "\033[32mrender\033[0m start render on server\n"; std::cout << "\033[32mget file.png\033[0m download png from server\n"; std::cout << "\033[32mstop\033[0m stop render on server\n"; std::cout << "\033[32mstat\033[0m display progress\n"; continue; } else if (command == "stop") { ; } else { std::cout << "unknown" << std::endl; continue; } } // Sanity check input complete // Push to server over encrypted socket zmq::message_t server_response; zmq::message_t msg_command(command); //>>>ZOUT command_sock.send(msg_command, zmq::send_flags::none); //SEND //std::cout << "Sent: " << input.data() << std::endl; //ZIN<<< command_sock.recv(server_response, zmq::recv_flags::none); //RECV std::string response_str(static_cast(server_response.data()), server_response.size()-1); std::string response_str2(static_cast(server_response.data()), server_response.size()); if(response_str=="RDY") { // Server acknowledges readiness for multi message commands std::cout << "Server Readiness: " << response_str << std::endl; if(command == "exit") { exit(0); // RENDER } else if(command == "render") { std::string compoundArg; if(num_args > 1) { for (size_t i = 1; i < args.size(); ++i) { compoundArg += args[i]; if (i < args.size() - 1) { compoundArg += " "; // Add spaces between arguments } } std::cout << compoundArg << std::endl; } //>>>ZOUT command_sock.send(zmq::message_t("render"), zmq::send_flags::none); //ZIN<<< command_sock.recv(server_response, zmq::recv_flags::none); } else if(command == "stat") { //>>>ZOUT command_sock.send(zmq::message_t("stat"), zmq::send_flags::none); //ZIN<<< command_sock.recv(server_response, zmq::recv_flags::none); // GET } else if(command == "get") { std::ofstream output_file(args[1], std::ios::binary); // Open file in binary mode if (!output_file.is_open()) { std::cerr << "Error opening file for writing" << std::endl; std::cout << "ERR" << std::endl; continue; // Don't bother server } else { while (true) { //>>>ZOUT command_sock.send(zmq::message_t("GO"), zmq::send_flags::none); zmq::message_t recv_data; //ZIN<<< command_sock.recv(recv_data, zmq::recv_flags::none); // data transfer // inline messaging with data, breaks to exit loop if (recv_data.size() < 8) { std::string recv_string(static_cast(recv_data.data()), recv_data.size()-1); //std::string recv_string = recv_data.to_string(); if (recv_string == "EOF") { std::cout << "EOF" << std::endl; break; // End of file } else if(recv_string == "ERR") { //LIKELY ERR\0 from client, can't find file std::cout << "ERR client read ACK" << std::endl; break; // Err } else { std::cout << "HUH" << recv_string << std::endl; break; } } // by reaching this point we assume binary data ( even 8 bytes will reach here ) std::cout << "\033[32m.\033[0m"; output_file.write(static_cast(recv_data.data()), recv_data.size()); } output_file.close(); try { openFileWithDefaultProgram(args[1]); // Replace with your file path std::cout << "File opened successfully." << std::endl; } catch (const std::runtime_error& e) { std::cerr << "Error: " << e.what() << std::endl; } } // SEND } else if(command == "send") { std::string read_file = args[1]; std::cout << "sending\n"; std::ifstream binaryInputFile; binaryInputFile.open(read_file, std::ios::binary);// for reading if (!binaryInputFile.is_open()) { std::cerr << "Error opening file for read" << std::endl; //>>>ZOUT command_sock.send(zmq::message_t("ERR"), zmq::send_flags::none); ///ZIN<<< command_sock.recv(server_response, zmq::recv_flags::none); } else { std::vector send_buffer(chunk_size); std::streamsize bytes_read_in_chunk; while (true) { binaryInputFile.read(send_buffer.data(), chunk_size); // read the file into the buffer bytes_read_in_chunk = binaryInputFile.gcount(); // Actual bytes read if(bytes_read_in_chunk > 0){ std::cout << "\033[32m.\033[0m"; zmq::message_t message(send_buffer.data(), bytes_read_in_chunk); //>>>ZOUT command_sock.send(message, zmq::send_flags::none); //ZIN<<< command_sock.recv(server_response, zmq::recv_flags::none); } else { std::cout << "\n"; break; } } //<<>> command_sock.recv(server_response, zmq::recv_flags::none); } } } else { std::cout << "Server response: \033[32m" << response_str2 << "\033[0m" << std::endl; } } command_sock.close(); ctx.close(); } void server_thread( std::string server_skey, uint16_t command_port, bool test_render, Engine engine, MyEngineObserver& engineObserver) { //MyEngineObserver engineObserver; //engine.subscribe(&engineObserver); zmq::context_t ctx; zmq::socket_t command_sock(ctx, zmq::socket_type::rep); //command_sock.set(zmq::sockopt::sndtimeo, 10000); //command_sock.set(zmq::sockopt::rcvtimeo, 10000); command_sock.set(zmq::sockopt::curve_server, true); command_sock.set(zmq::sockopt::curve_secretkey, server_skey); //command_sock.set(zmq::sockopt::linger, 100); // Close immediately on disconnect std::string url = "tcp://*:" + std::to_string(command_port); command_sock.bind(url); zmq::message_t client_response; try { std::string write_file = "./oomer.bsz"; std::string read_file = "./oomer.png"; const size_t chunk_size = 65536; std::vector sftp_buffer(chunk_size); // Buffer to hold each chunk std::ofstream binaryOutputFile;// for writing std::ifstream binaryInputFile;// for reading while (true) { zmq::message_t msg_command; //ZIN<<< command_sock.recv(msg_command, zmq::recv_flags::none); std::string client_command = msg_command.to_string(); std::cout << "\033[32mCommand: " << client_command << "\033[0m"<< std::endl; if(client_command == "hello"){ std::cout << "bye" << std::endl; //>>>ZOUT command_sock.send(zmq::message_t("bye"), zmq::send_flags::none); } else if (client_command == "exit") { std::cout << "Client disconnecting..." << std::endl; //>>>ZOUT command_sock.send(zmq::message_t("RDY"), zmq::send_flags::none); connection_state = false; //<< // RENDER } else if (client_command == "xxxxrender") { std::cout << "start render" << std::endl; //>>>ZOUT command_sock.send(zmq::message_t("render started...type stat to get progress"), zmq::send_flags::none); } else if (client_command == "stop") { std::cout << "stop render" << std::endl; engine.stop(); //>>>ZOUT command_sock.send(zmq::message_t("render stopped"), zmq::send_flags::none); //GET } else if (client_command == "get") { //REP mode std::string read_file = "./oomer.png"; std::cout << "Executing get command\n"; std::ifstream binaryInputFile; binaryInputFile.open(read_file, std::ios::binary);// for reading if (!binaryInputFile.is_open()) { std::cerr << "Error opening file for read" << std::endl; //>>>ZOUT command_sock.send(zmq::message_t("ERR"), zmq::send_flags::none); } else { //>>>ZOUT command_sock.send(zmq::message_t("RDY"), zmq::send_flags::none); std::vector send_buffer(chunk_size); std::streamsize bytes_read_in_chunk; while (true) { zmq::message_t z_in; //ZIN command_sock.recv(z_in); // Block until zGo, or any message binaryInputFile.read(send_buffer.data(), chunk_size); // read the file into the buffer bytes_read_in_chunk = binaryInputFile.gcount(); // Actual bytes read if(bytes_read_in_chunk > 0){ std::cout << bytes_read_in_chunk << std::endl; zmq::message_t message(send_buffer.data(), bytes_read_in_chunk); //ZOUT command_sock.send(message, zmq::send_flags::none); } else { //ZOUT command_sock.send(zmq::message_t("EOF"), zmq::send_flags::none); std::cout << "EOF" << std::endl; break; // Exit when 0 bytes read } } } } else if (client_command == "stat") { //std::string currentProgress = engineObserver.getProgress(); std::string currentProgress = "100%"; if (!currentProgress.empty()) { std::cout << "Current Progress: " << currentProgress << std::endl; command_sock.send(zmq::message_t(currentProgress), zmq::send_flags::none); } else { command_sock.send(zmq::message_t("ACK"), zmq::send_flags::none); } } else if (client_command == "send") { std::ofstream output_file("oomer.bsz", std::ios::binary); // Open file in binary mode if (!output_file.is_open()) { std::cerr << "Error opening file for writing" << std::endl; //>>>ZOUT command_sock.send(zmq::message_t("ERR"), zmq::send_flags::none); } else { // File handle open and ready //>>>ZOUT command_sock.send(zmq::message_t("RDY"), zmq::send_flags::none); while (true) { zmq::message_t recv_data; //ZIN<<< command_sock.recv(recv_data, zmq::recv_flags::none); if(recv_data.size() < 8) { // data and signals sent on same socket // Allow for signals up to 8 bytes, EOF, ERR // messages are null terminated requiring -1 std::string response_str(static_cast(recv_data.data()), recv_data.size()-1); if (response_str=="EOF") { //>>>ZOUT command_sock.send(zmq::message_t("ACK"), zmq::send_flags::none); break; // End of file } else if(response_str=="ERR") { std::cout << "ERR on client" << std::endl; //>>>ZOUT command_sock.send(zmq::message_t("ACK"), zmq::send_flags::none); break; // End of file } } // File write output_file.write(static_cast(recv_data.data()), recv_data.size()); //>>ZOUT command_sock.send(zmq::message_t("ACK"), zmq::send_flags::none); } output_file.close(); std::cout << "\033[32mClient uploaded .bsz successfully saved\033[0m" << std::endl; engine.scene().read("oomer.bsz"); engine.scene().beautyPass()["outputExt"] = ".png"; engine.scene().beautyPass()["outputName"] = ""; engine.scene().beautyPass()["overridePath"] = bella_sdk::Node(); } } else { // A unknown REQ sent, acknowledge because req-rep pattern is blocking //>>ZOUT command_sock.send(zmq::message_t("ACK"), zmq::send_flags::none); } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } catch (const zmq::error_t& e) { // Handle ZMQ-specific exceptions std::cerr << "ZMQ error: " << e.what() << std::endl; ctx.close(); command_sock.close(); //Potentially close sockets, clean up etc. } catch (const std::exception& e) { // Handle standard exceptions (e.g., std::bad_alloc) std::cerr << "Standard exception: " << e.what() << std::endl; } catch (...) { // Catch any other exceptions std::cerr << "Unknown exception caught." << std::endl; } command_sock.close(); ctx.close(); } void render_thread( Engine engine, MyEngineObserver& engineObserver) { // Create persistent instances outside the loop RenderQueue renderThreadQueue; RenderQueue renderThreadDeleteQueue; while (true) { // Append items from incoming queues to our persistent queues for thread safety { std::lock_guard lock(incomingRenderQueueMutex); dl::String path; while (incomingRenderQueue.pop(path)) { renderThreadQueue.push(path); } incomingRenderQueue.clear(); } { std::lock_guard lock(incomingDeleteQueueMutex); dl::String path; while (incomingDeleteQueue.pop(path)) { renderThreadDeleteQueue.push(path); } incomingDeleteQueue.clear(); } // Process the files without holding the mutex bool expected = false; // This is an atomic operation that does two things at once: // 1. Checks if active_render equals expected (false) // 2. If they are equal, sets active_render to true // // The operation is atomic, meaning no other thread can interfere // between the check and the set. This prevents two threads from // both thinking they can start rendering at the same time. // // Returns true if the exchange was successful (we got the render slot) // Returns false if active_render was already true (someone else is rendering) dl::String belPath; if (active_render.compare_exchange_strong(expected, true)) { // We successfully got the render slot - no one else is rendering if (renderThreadQueue.pop(belPath)) { std::cout << "\n==" << "RENDERING: " << belPath.buf() << "\n==" << std::endl; if (belPath.endsWith(".bsz")) { engine.loadScene(belPath); //engine.scene().camera()["resolution"]= Vec2 {100, 100}; engine.start(); { std::lock_guard lock(currentRenderMutex); currentRender = belPath; } } else if (belPath.endsWith(".zip")) { dl::Path fooPath = dl::Path(belPath); dl::String justDir = fooPath.dir(); dl::String justName = fooPath.file(false); //auto previewPath = args.iPath().canonical();; dl::String previewPath = bella_sdk::previewPath(); engine.loadScene(previewPath); auto oomerOutputPath = engine.scene().createNode("outputImagePath", "oomerOutputPath"); oomerOutputPath["ext"] = ".jpg"; auto oomerPBR = engine.scene().createNode("pbr", "oomerPBR"); oomerPBR["file"] = justName; oomerPBR["ext"] = ".zip"; oomerPBR["dir"] = justDir; auto oomerPreview = engine.scene().findNode("__preview__"); auto oomerCamXform = engine.scene().findNode("__camera__"); oomerPreview["material"] = oomerPBR; std::filesystem::path dirPath = std::string(justDir.buf())+"/"+std::string(justName.buf()); bool success = std::filesystem::create_directory(dirPath); oomerOutputPath["dir"] = dirPath.string().c_str(); //engine.scene().beautyPass()["outputExt"] = ".jpg"; engine.scene().beautyPass()["targetNoise"] = dl::Int(10); engine.scene().beautyPass()["saveImage"] = dl::Int(0); engine.scene().beautyPass()["overridePath"] = oomerOutputPath; engine.scene().camera()["resolution"]= Vec2 {320, 320}; std::ofstream outFile(dirPath.string()+"/bella.js"); outFile << std::string("bellaScene=\"") << justName.buf() << std::string("\";") << std::endl; outFile << std::string("bellaNodeType=\"") << "foo1" << std::string("\";") << std::endl; outFile << std::string("bellaNode=\"") << "foo2" << std::string("\";") << std::endl; outFile << std::string("bellaNodeAttribute=\"") << "hhh" << std::string("\";") << std::endl; outFile << std::string("bellaSteps=[];") << std::endl; outFile << std::string("bellaSteps[1]=\"") << -1.00000f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[2]=\"") << -0.93104f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[3]=\"") << -0.86208f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[4]=\"") << -0.79312f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[5]=\"") << -0.72414f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[6]=\"") << -0.65518f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[7]=\"") << -0.58622f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[8]=\"") << -0.51726f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[9]=\"") << -0.44828f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[10]=\"") << -0.37932f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[11]=\"") << -0.31036f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[12]=\"") << -0.24138f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[13]=\"") << -0.17242f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[14]=\"") << -0.10346f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[15]=\"") << -0.03450f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[16]=\"") << 0.03448f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[17]=\"") << 0.10344f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[18]=\"") << 0.17240f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[19]=\"") << 0.24136f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[20]=\"") << 0.31034f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[21]=\"") << 0.37930f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[22]=\"") << 0.44826f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[23]=\"") << 0.51724f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[24]=\"") << 0.58620f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[25]=\"") << 0.65516f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[26]=\"") << 0.72412f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[27]=\"") << 0.79310f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[28]=\"") << 0.86206f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[29]=\"") << 0.93102f << std::string("\";") << std::endl; outFile << std::string("bellaSteps[30]=\"") << 1.00000f << std::string("\";") << std::endl; outFile << std::string("bellaQueue=[];") << std::endl; outFile.close(); std::ofstream outFile2(dirPath.string()+"/index.html"); outFile2 << bellaSliderPreviewsHTML(); outFile2.close(); { std::lock_guard lock(currentRenderMutex); currentRender = belPath; active_render = true; } bool stop_loop = false; for (int i = 1; i <= 30; i++) { auto offset = dl::Vec2 {i*0.1, 0.0}; dl::bella_sdk::orbitCamera(engine.scene().cameraPath(),offset); oomerOutputPath["file"] = dl::String::format("bella%04d",i); engine.start(); while (engine.rendering()) { //blocking so we only do one frame at a time // since we are looping here we don't check for file dequeing { std::lock_guard lock(incomingDeleteQueueMutex); dl::String path; while (incomingDeleteQueue.pop(path)) { // transfer to this thread's delete queue renderThreadDeleteQueue.push(path); } incomingDeleteQueue.clear(); // empty file queue if (renderThreadDeleteQueue.contains(belPath)) { renderThreadDeleteQueue.remove(belPath); active_render = false; engine.stop(); stop_loop = true; break; } } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } if (stop_loop) { break; } } } else { active_render = false; // Release the render slot } } else { active_render = false; // Release the render slot } std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } else { // someone else is rendering //std::cout << "Waiting for render slot" << std::endl; //std::cout << "Render Queue size: " << renderThreadQueue.size() << std::endl; //std::cout << "Delete Queue size: " << renderThreadDeleteQueue.size() << std::endl; while (renderThreadDeleteQueue.pop(belPath)) { // pop all the deletes //std::cout << "renderThreadDeleteQueue contains " << belPath.buf() << " " << renderThreadDeleteQueue.contains(belPath) << std::endl; if (belPath == currentRender) { std::cout << "/n==/nStopping render" << belPath.buf() << std::endl; engine.stop(); active_render = false; } else if (renderThreadQueue.contains(belPath)) { // dequeue deletes renderThreadQueue.remove(belPath); } } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } void openFileWithDefaultProgram(const std::string& filePath) { #ifdef _WIN32 std::wstring_convert> converter; std::wstring wideFilePath = converter.from_bytes(filePath); HINSTANCE result = ShellExecuteW(nullptr, nullptr, wideFilePath.c_str(), nullptr, nullptr, SW_SHOW); if ((intptr_t)result <= 32) { throw std::runtime_error("Failed to open file: ShellExecuteW returned " + std::to_string((intptr_t)result)); } #elif defined(__APPLE__) pid_t pid = fork(); if (pid == 0) { // Child process execl("/usr/bin/open", "open", filePath.c_str(), nullptr); // If execl fails: std::cerr << "Failed to open file: execl failed" << std::endl; exit(1); // Exit child process on error } else if (pid > 0) { // Parent process int status; waitpid(pid, &status, 0); if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { throw std::runtime_error("Failed to open file: open command failed"); } } else { throw std::runtime_error("Failed to open file: fork failed"); } #elif defined(__linux__) pid_t pid = fork(); if (pid == 0) { // Child process execl("/usr/bin/xdg-open", "xdg-open", filePath.c_str(), nullptr); // If execl fails: std::cerr << "Failed to open file: execl failed" << std::endl; exit(1); // Exit child process on error } else if (pid > 0) { // Parent process int status; waitpid(pid, &status, 0); if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { throw std::runtime_error("Failed to open file: xdg-open command failed"); } } else { throw std::runtime_error("Failed to open file: fork failed"); } #else // Fallback: Use system, but this is less reliable and secure. std::string command = "open \"" + filePath + "\""; // May need to adapt quoting int result = std::system(command.c_str()); if (result != 0) { throw std::runtime_error("Failed to open file: system command failed"); } #endif } bool ends_with_suffix(const std::string& str, const std::string& suffix) { if (str.length() >= 4) { return str.substr(str.length() - 4) == suffix; } return false; } // Blocking zmq rep socket to pass server_public_key void pkey_server(const std::string& pub_key, uint16_t publickey_port) { zmq::context_t ctx; zmq::socket_t sock(ctx, zmq::socket_type::rep); std::string url = "tcp://*:" + std::to_string(publickey_port); sock.bind(url); zmq::message_t z_in; //ZIN<<< sock.recv(z_in); if (z_in.to_string().compare("Bellarender123") == 0) { zmq::message_t z_out(pub_key); //ZOUT>>> sock.send(z_out, zmq::send_flags::none); connection_state = true; } sock.close(); ctx.close(); } std::string bellaSliderPreviewsHTML() { return R"HTML(
  >     >     >  

      
)HTML"; }