diff --git a/client.cpp b/client.cpp index c938c54..b72d682 100644 --- a/client.cpp +++ b/client.cpp @@ -14,80 +14,185 @@ #include #include -// Atomic variable to store the counter value -std::atomic counter(0); std::atomic heartbeat_state (true); -// Condition variable to signal the counter thread -std::condition_variable cv; -// Mutex to protect shared data -std::mutex mtx; -// Flag to indicate if the counter should reset -std::atomic shouldReset(false); +std::atomic connection_state (false); +std::atomic abort_state (false); + +bool ends_with_suffix(const std::string& str, const std::string& suffix) { + if (str.length() >= 4) { + return str.substr(str.length() - 4) == suffix; + } + return false; +} void command_thread(std::string server_pkey, std::string client_pkey, std::string client_skey) { - const size_t chunk_size = 32768; + const size_t chunk_size = 65536; zmq::context_t ctx; zmq::socket_t command_sock (ctx, zmq::socket_type::req); - command_sock.set(zmq::sockopt::sndtimeo, 10000); - command_sock.set(zmq::sockopt::rcvtimeo, 10000); + //command_sock.set(zmq::sockopt::sndtimeo, 10000); + //command_sock.set(zmq::sockopt::rcvtimeo, 10000); command_sock.set(zmq::sockopt::curve_serverkey, server_pkey); command_sock.set(zmq::sockopt::curve_publickey, client_pkey); command_sock.set(zmq::sockopt::curve_secretkey, client_skey); command_sock.set(zmq::sockopt::linger, 1); // Close immediately on disconnect command_sock.connect("tcp://localhost:5556"); - + + std::string input; while (true) { - std::string input; - std::getline(std::cin, input); - zmq::message_t msg_command (input); - command_sock.send(msg_command, zmq::send_flags::none); + if(abort_state.load()==true) { + break; + } + std::getline(std::cin, input); + std::stringstream ss(input); + std::string arg; + std::vector args; + while (ss >> arg) { + args.push_back(arg); + } + + // Sanity checks on input before sending to server + int num_args = args.size(); + std::string command; + if (num_args > 0) { + command = args[0]; + if ( command == "send") { + if(num_args == 1) { + std::cout << "Please provide a .bsz file" << std::endl; + continue; + } + if(!ends_with_suffix(args[1],"bsz")) { + std::cout << "Only .bsz files can be sent" << std::endl; + continue; + } + std::cout << "Sending:" << args[1] << std::endl; + } else if (command == "get") { + if(num_args == 1) { + std::cout << "Please provide image filename" << std::endl; + continue; + } + } else if (command == "exit") { + std::cout << "now" << std::endl; + break; + } else if (command == "render") { + std::string compoundArg; + if(num_args > 1) { + for (size_t i = 1; i < args.size(); ++i) { + compoundArg += args[i]; + if (i < args.size() - 1) { + compoundArg += " "; // Add spaces between arguments + } + } + std::cout << compoundArg << std::endl; + } + } else if (command == "hello") { + ; + } else { + std::cout << "unknown" << std::endl; + continue; + } + } + + // Sanity check input complete + // Push to server over encrypted socket + zmq::message_t server_response; + zmq::message_t msg_command(command); + //>>>ZOUT + command_sock.send(msg_command, zmq::send_flags::none); //SEND std::cout << "Sent: " << input.data() << std::endl; - zmq::message_t zmq_response; - command_sock.recv(zmq_response, zmq::recv_flags::none); - std::string response(static_cast(zmq_response.data()), zmq_response.size()-1); - std::cout << "Server Response: " << response << response.size() << std::endl; - - if(response=="ACK") { // Check server is ok to move on - std::cout << "kACKServer Response: " << response << std::endl; + //ZIN<<< + command_sock.recv(server_response, zmq::recv_flags::none); //RECV + std::string response_str(static_cast(server_response.data()), server_response.size()-1); + if(response_str=="RDY") { // Server acknowledges readiness for multi message commands + std::cout << "Server Readiness: " << response_str << std::endl; if(input == "exit") { break; - } else if(input == "send") { + // RENDER + } else if(command == "render") { + //>>>ZOUT + command_sock.send(zmq::message_t("render"), zmq::send_flags::none); + //ZIN<<< + command_sock.recv(server_response, zmq::recv_flags::none); + } else if(command == "stat") { + //>>>ZOUT + command_sock.send(zmq::message_t("stat"), zmq::send_flags::none); + //ZIN<<< + command_sock.recv(server_response, zmq::recv_flags::none); + + // GET + } else if(command == "get") { + std::ofstream output_file("orange-juice.png", std::ios::binary); // Open file in binary mode + std::cout << "getting\n"; + if (!output_file.is_open()) { + std::cerr << "Error opening file for writing" << std::endl; + std::cout << "ERR" << std::endl; + continue; // Don't bother server + } else { + while (true) { + //>>>ZOUT + command_sock.send(zmq::message_t("GO"), zmq::send_flags::none); + zmq::message_t recv_data; + //ZIN<<< + command_sock.recv(recv_data, zmq::recv_flags::none); // data transfer + + // inline messaging with data, breaks to exit loop + if (recv_data.size() < 8) { + std::string recv_string(static_cast(recv_data.data()), recv_data.size()-1); + //std::string recv_string = recv_data.to_string(); + if (recv_string == "EOF") { + std::cout << "EOF" << std::endl; + break; // End of file + } else if(recv_string == "ERR") { //LIKELY ERR\0 from client, can't find file + std::cout << "ERR client read ACK" << std::endl; + break; // Err + } else { + std::cout << "HUH" << recv_string << std::endl; + break; + } + } + // by reaching this point we assume binary data ( even 8 bytes will reach here ) + std::cout << recv_data.size() << std::endl; + output_file.write(static_cast(recv_data.data()), recv_data.size()); + } + output_file.close(); + } + // SEND + } else if(command == "send") { std::string read_file = "./orange-juice.bsz"; std::cout << "sending\n"; std::ifstream binaryInputFile; binaryInputFile.open(read_file, std::ios::binary);// for reading if (!binaryInputFile.is_open()) { std::cerr << "Error opening file for read" << std::endl; - zmq::message_t zmsg1("ERR"); - command_sock.send(zmsg1, zmq::send_flags::none); - zmq::message_t z_in; - command_sock.recv(z_in); // Wait for acknowledgment from server - std::cout << z_in << std::endl; + //>>>ZOUT + command_sock.send(zmq::message_t("ERR"), zmq::send_flags::none); + ///ZIN<<< + command_sock.recv(server_response, zmq::recv_flags::none); } else { - std::vector send_buffer(chunk_size); std::streamsize bytes_read_in_chunk; while (true) { binaryInputFile.read(send_buffer.data(), chunk_size); // read the file into the buffer bytes_read_in_chunk = binaryInputFile.gcount(); // Actual bytes read if(bytes_read_in_chunk > 0){ - //std::cout << bytes_read_in_chunk << std::endl; zmq::message_t message(send_buffer.data(), bytes_read_in_chunk); - zmq::message_t z_in; + //>>>ZOUT command_sock.send(message, zmq::send_flags::none); - command_sock.recv(z_in); // Wait for acknowledgment from server + //ZIN<<< + command_sock.recv(server_response, zmq::recv_flags::none); } else { break; } } - // Send an empty message to signal end of file - command_sock.send(zmq::message_t(), zmq::send_flags::none); - zmq::message_t z_in; - command_sock.recv(z_in); // Wait for acknowledgment from server + //<<>> + command_sock.recv(server_response, zmq::recv_flags::none); } } + } else { + std::cout << "Server response: " << response_str << std::endl; } } command_sock.close(); @@ -100,28 +205,31 @@ void heartbeat_thread(std::string server_pkey, std::string client_pkey, std::str heartbeat_sock.set(zmq::sockopt::curve_serverkey, server_pkey); heartbeat_sock.set(zmq::sockopt::curve_publickey, client_pkey); heartbeat_sock.set(zmq::sockopt::curve_secretkey, client_skey); + heartbeat_sock.set(zmq::sockopt::linger, 1); // Close immediately on disconnect heartbeat_sock.connect("tcp://localhost:5555"); int heartbeat_count = 0; std::vector items = {}; while (true) { - // Increment the counter every 10 milliseconds - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - //std::cout << "beat" << std::endl; - std::string msg_string = "BEAT" + std::to_string(heartbeat_count++); - zmq::message_t msg_heartbeat (msg_string); - heartbeat_sock.send(msg_heartbeat, zmq::send_flags::none); + if(abort_state.load()==true) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); - - // Wait for response (poll for ZMQ_POLLIN) - zmq::pollitem_t response_item = { heartbeat_sock, 0, ZMQ_POLLIN, 0 }; - zmq::poll(&response_item, 1, 100); // Wait for response with timeout - if (response_item.revents & ZMQ_POLLIN) { - zmq::message_t msg_response; - heartbeat_sock.recv(msg_response, zmq::recv_flags::none); - //std::cout << "Heartbeat Response: " << std::endl; - } else { - std::cout << "Bella Server is unavailable" << std::endl; - heartbeat_state = false; + if(connection_state == true) { + heartbeat_sock.send(zmq::message_t("ACK"), zmq::send_flags::none); + // Wait for response (poll for ZMQ_POLLIN) + zmq::pollitem_t response_item = { heartbeat_sock, 0, ZMQ_POLLIN, 0 }; + zmq::poll(&response_item, 1, 100); // Wait for response with timeout + if (response_item.revents & ZMQ_POLLIN) { + zmq::message_t msg_response; + heartbeat_sock.recv(msg_response, zmq::recv_flags::none); + //std::cout << "Heartbeat Response: " << std::endl; + } else { + std::cout << "Bella Server is unavailable" << std::endl; + heartbeat_state = false; + connection_state = false; + break; + } } } heartbeat_sock.close(); @@ -139,11 +247,6 @@ std::string get_pubkey_from_srv() { // 0MQ after we establish our intitial encrypted socket zmq::context_t ctx; zmq::socket_t pubkey_sock(ctx, zmq::socket_type::req); - - pubkey_sock.set(zmq::sockopt::sndtimeo, 10000); - pubkey_sock.set(zmq::sockopt::rcvtimeo, 10000); - pubkey_sock.set(zmq::sockopt::linger, 0); // Close immediately on disconnect - pubkey_sock.connect("tcp://127.0.0.1:9555"); zmq::message_t z_out(std::string("Bellarender123")); @@ -160,6 +263,7 @@ std::string get_pubkey_from_srv() { pubkey_sock.close(); ctx.close(); std::cout << "connection successful" << std::endl; + connection_state = true; return pub_key; } @@ -193,10 +297,17 @@ int main() while (true) { if (!heartbeat_state.load()) { std::cout << "Dead" << std::endl; + abort_state==true; + break; + } + if (connection_state.load() == false) { + std::cout << "Dead2" << std::endl; + abort_state==true; break; } std::this_thread::sleep_for(std::chrono::milliseconds(500)); } + abort_state==true; command_t.join(); heartbeat_t.join(); return 0; diff --git a/server.cpp b/server.cpp index d0bcf12..5a9d6a9 100644 --- a/server.cpp +++ b/server.cpp @@ -5,87 +5,159 @@ #include #include +#include // For string streams #include -#include -#include +#include -// Atomic variable to store the counter value -//std::atomic counter(0); std::atomic heartbeat_state (true); -// Condition variable to signal the counter thread -//std::condition_variable cv; -// Mutex to protect shared data -//std::mutex mtx; -// Flag to indicate if the counter should reset -std::atomic shouldReset(false); - +std::atomic client_state (false); void command_thread(std::string server_skey) { - zmq::context_t ctx; - // Create zmq rep sockets - //zmq::socket_t heartbeat_sock(ctx, zmq::socket_type::rep); + zmq::context_t ctx; zmq::socket_t command_sock(ctx, zmq::socket_type::rep); + //command_sock.set(zmq::sockopt::sndtimeo, 10000); + //command_sock.set(zmq::sockopt::rcvtimeo, 10000); command_sock.set(zmq::sockopt::curve_server, true); command_sock.set(zmq::sockopt::curve_secretkey, server_skey); - command_sock.set(zmq::sockopt::linger, 1); // Close immediately on disconnect + //command_sock.set(zmq::sockopt::linger, 100); // Close immediately on disconnect command_sock.bind("tcp://*:5556"); - while (true) { - zmq::message_t msg_command; - command_sock.recv(msg_command, zmq::recv_flags::none); - std::string client_command = msg_command.to_string(); - std::cout << "Command: " << client_command << std::endl; + zmq::message_t client_response; - if(client_command == "hello"){ - std::cout << "bye" << std::endl; - zmq::message_t zmsg1("bye"); - command_sock.send(zmsg1, zmq::send_flags::none); - } else if (client_command == "exit") { - std::cout << "ACK" << std::endl; - zmq::message_t zmsg1("ACK"); - command_sock.send(zmsg1, zmq::send_flags::none); - //heartbeat_state = false; - } else if (client_command == "send") { - std::ofstream output_file("received_file.bsz", std::ios::binary); // Open file in binary mode - if (!output_file.is_open()) { - std::cerr << "Error opening file for writing" << std::endl; - std::cout << "ERR" << std::endl; - zmq::message_t zmsg1("ERR"); - command_sock.send(zmsg1, zmq::send_flags::none); - } else { - std::cout << "file good ACK" << std::endl; - zmq::message_t zmsg1("ACK"); - command_sock.send(zmsg1, zmq::send_flags::none); - while (true) { - zmq::message_t recv_data; - command_sock.recv(recv_data, zmq::recv_flags::none); - if (recv_data.size() == 0) { - //std::cout << "chunk ACK" << std::endl; - zmq::message_t reply("ACK"); - command_sock.send(reply, zmq::send_flags::none); - break; // End of file - } else if(recv_data.size() == 4) { //LIKELY ERR\0 from client, can't find file - // [TODO] , parse lines only send valid commands - // Right now I allow any text to get through + try { + std::string write_file = "./oomer.bsz"; + std::string read_file = "./oomer.png"; + const size_t chunk_size = 65536; + std::vector sftp_buffer(chunk_size); // Buffer to hold each chunk + std::ofstream binaryOutputFile;// for writing + std::ifstream binaryInputFile;// for reading + while (true) { + zmq::message_t msg_command; + //ZIN<<< + command_sock.recv(msg_command, zmq::recv_flags::none); + std::string client_command = msg_command.to_string(); + std::cout << "Command: " << client_command << std::endl; - std::cout << "ERR client read ACK" << std::endl; - zmq::message_t reply("ACK"); - command_sock.send(reply, zmq::send_flags::none); - break; // End of file + if(client_command == "hello"){ + std::cout << "bye" << std::endl; + //>>>ZOUT + command_sock.send(zmq::message_t("bye"), zmq::send_flags::none); + } else if (client_command == "exit") { + std::cout << "exit" << std::endl; + //>>>ZOUT + command_sock.send(zmq::message_t("ACK"), zmq::send_flags::none); + // RENDER + } else if (client_command == "render") { + //engine.scene().read("./oomer.bsz"); + //engine.scene().camera()["resolution"] = Vec2 {200, 200}; + //engine.start(); + std::cout << "start render" << std::endl; + //>>>ZOUT + command_sock.send(zmq::message_t("ACK"), zmq::send_flags::none); + + //GET + } else if (client_command == "get") { //REP mode + std::string read_file = "./oomer.png"; + std::cout << "Executing get command\n"; + std::ifstream binaryInputFile; + binaryInputFile.open(read_file, std::ios::binary);// for reading + if (!binaryInputFile.is_open()) { + std::cerr << "Error opening file for read" << std::endl; + //>>>ZOUT + command_sock.send(zmq::message_t("ERR"), zmq::send_flags::none); + } else { + //>>>ZOUT + command_sock.send(zmq::message_t("RDY"), zmq::send_flags::none); + std::vector send_buffer(chunk_size); + std::streamsize bytes_read_in_chunk; + while (true) { + zmq::message_t z_in; + //ZIN + command_sock.recv(z_in); // Block until zGo, or any message + binaryInputFile.read(send_buffer.data(), chunk_size); // read the file into the buffer + bytes_read_in_chunk = binaryInputFile.gcount(); // Actual bytes read + if(bytes_read_in_chunk > 0){ + std::cout << bytes_read_in_chunk << std::endl; + zmq::message_t message(send_buffer.data(), bytes_read_in_chunk); + //ZOUT + command_sock.send(message, zmq::send_flags::none); + } else { + //ZOUT + command_sock.send(zmq::message_t("EOF"), zmq::send_flags::none); + std::cout << "EOF" << std::endl; + break; // Exit when 0 bytes read + } } - //std::cout << recv_data.size() << std::endl; - output_file.write(static_cast(recv_data.data()), recv_data.size()); - zmq::message_t reply("ACK"); - command_sock.send(reply, zmq::send_flags::none); } - output_file.close(); + + } else if (client_command == "stat") { + std::ifstream log_file("logfile.txt"); + if (log_file.is_open()) { + std::string log_line; + if (std::getline(log_file, log_line)) { // Reads the entire line, including spaces + //>>>ZOUT + command_sock.send(zmq::message_t(log_line), zmq::send_flags::none); + } else { + //>>>ZOUT + command_sock.send(zmq::message_t("ACK"), zmq::send_flags::none); + } + log_file.close(); + } + } else if (client_command == "send") { + std::ofstream output_file("oomer.bsz", std::ios::binary); // Open file in binary mode + if (!output_file.is_open()) { + std::cerr << "Error opening file for writing" << std::endl; + //>>>ZOUT + command_sock.send(zmq::message_t("ERR"), zmq::send_flags::none); + } else { // File handle open and ready + //>>>ZOUT + command_sock.send(zmq::message_t("RDY"), zmq::send_flags::none); + while (true) { + zmq::message_t recv_data; + //ZIN<<< + command_sock.recv(recv_data, zmq::recv_flags::none); + if(recv_data.size() < 8) { // data and signals sent on same socket + // Allow for signals up to 8 bytes, EOF, ERR + // messages are null terminated requiring -1 + std::string response_str(static_cast(recv_data.data()), recv_data.size()-1); + if (response_str=="EOF") { + //>>>ZOUT + command_sock.send(zmq::message_t("ACK"), zmq::send_flags::none); + break; // End of file + } else if(response_str=="ERR") { + std::cout << "ERR on client" << std::endl; + //>>>ZOUT + command_sock.send(zmq::message_t("ACK"), zmq::send_flags::none); + break; // End of file + } + } + // File write + output_file.write(static_cast(recv_data.data()), recv_data.size()); + //>>ZOUT + command_sock.send(zmq::message_t("ACK"), zmq::send_flags::none); + } + output_file.close(); + } + } else { // A unknown REQ sent, acknowledge because req-rep pattern is blocking + //>>ZOUT + command_sock.send(zmq::message_t("ACK"), zmq::send_flags::none); } - } else { - zmq::message_t zmsg1("foo"); - command_sock.send(zmsg1, zmq::send_flags::none); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + } catch (const zmq::error_t& e) { + // Handle ZMQ-specific exceptions + std::cerr << "ZMQ error: " << e.what() << std::endl; + ctx.close(); + command_sock.close(); + //Potentially close sockets, clean up etc. + } catch (const std::exception& e) { + // Handle standard exceptions (e.g., std::bad_alloc) + std::cerr << "Standard exception: " << e.what() << std::endl; + } catch (...) { + // Catch any other exceptions + std::cerr << "Unknown exception caught." << std::endl; } command_sock.close(); ctx.close(); @@ -100,23 +172,23 @@ void heartbeat_thread(std::string server_skey) { heartbeat_sock.set(zmq::sockopt::curve_secretkey, server_skey); heartbeat_sock.bind("tcp://*:5555"); while(true) { - zmq::pollitem_t response_item = { heartbeat_sock, 0, ZMQ_POLLIN, 0 }; - zmq::poll(&response_item, 1, 10000); // Wait for response with timeout - if (response_item.revents & ZMQ_POLLIN) { - zmq::message_t message; - heartbeat_sock.recv(message, zmq::recv_flags::none); - //std::cout << "heart:" << heartbeat_state.load() << std::endl; - - std::string response = "Heartbeat OK"; - zmq::message_t zmq_response (response); - heartbeat_sock.send(zmq_response, zmq::send_flags::dontwait); // No block - - } else { - std::cout << "Bella Client Lost" << std::endl; - heartbeat_state = false; - } + //Start polling heartbeats once client connects + if (client_state == true) { + zmq::pollitem_t response_item = { heartbeat_sock, 0, ZMQ_POLLIN, 0 }; + zmq::poll(&response_item, 1, 25000); // Wait for response with timeout + if (response_item.revents & ZMQ_POLLIN) { + zmq::message_t message; + //ZIN<<< + heartbeat_sock.recv(message, zmq::recv_flags::none); + //ZOUT>>> + heartbeat_sock.send(zmq::message_t("ACK"), zmq::send_flags::dontwait); // No block + } else { //timeout + std::cout << "Bella Client Lost" << std::endl; + heartbeat_state = false; + } + } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } heartbeat_sock.close(); @@ -131,17 +203,27 @@ void pkey_server(const std::string& pub_key) { zmq::message_t z_in; std::cout << "Entered: Public Key Serving Mode" << std::endl; + //ZIN<<< sock.recv(z_in); if (z_in.to_string().compare("Bellarender123") == 0) { zmq::message_t z_out(pub_key); + //ZOUT>>> sock.send(z_out, zmq::send_flags::none); + client_state = true; } sock.close(); ctx.close(); } -int main() -{ +// We will use the dl_core main helper here. This gives us a helpful Args instance to use, and +// also hides the confusing details of dealing with main vs. WinMain on windows, and gives us +// utf8-encoded args when the application is unicode. +// +//#include "dl_core/dl_main.inl" +//int DL_main(Args& args) +//{ + +int main() { // Generate brand new keypair on launch // [TODO] Add client side public key fingerprinting for added security char skey[128] = { 0 }; @@ -153,10 +235,6 @@ int main() } // Multi threading - //std::thread command_t(command_thread, skey); - //std::thread heartbeat_t(heartbeat_thread, skey); - //std::thread command_t(command_thread, skey); - //std::thread heartbeat_t(heartbeat_thread, skey); std::thread command_t(command_thread, skey); std::thread heartbeat_t(heartbeat_thread, skey); @@ -164,13 +242,11 @@ int main() while(true) { // awaiting new client loop heartbeat_state = true; pkey_server(pkey); // blocking wait client to get public key on port 5555 - heartbeat_state = true; std::cout << "Client connected" << std::endl; while(true) { // inner loop - //std::cout << "inner loop" << std::endl; if (heartbeat_state.load()==false) { - std::cout << "client dead" << std::endl; + std::cout << "Client connectiono dead" << std::endl; break; } std::this_thread::sleep_for(std::chrono::milliseconds(10));