From a9d76f5a5c288a0a78aa3eb7c75d21a747a69162 Mon Sep 17 00:00:00 2001 From: Harvey Fong Date: Tue, 25 Feb 2025 20:57:38 -0700 Subject: [PATCH] first day code compiles under Win/Mac and Linux, took out polling which required POSIX, used std::thread instead, less complicated than I thought --- README.md | 14 ++- client.cpp | 311 ++++++++++++++++++++++++++--------------------------- server.cpp | 234 +++++++++++++++++++++++----------------- 3 files changed, 301 insertions(+), 258 deletions(-) diff --git a/README.md b/README.md index fcd552e..96f4e82 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,12 @@ # zmqprototype -Prototype code for cross platform zmq rep-req render node +[ALPHA CODE] My learning project to prototype a cross platform zmq rep-req render node + +- This is the code before integration with bella_engine_sdk + +server starts in pubkey server mode, allowing one client to grab the pubkey +- keypairs are generated on every start +- requires port 5555, 5556, 5557 +client ###Build @@ -33,6 +40,11 @@ g++ server.cpp -o server -lzmq -Wl,-rpath,. ``` g++ -std=c++11 server.cpp -o server -I../libzmq/include -I../cppzmq -L../libzmq/build/lib -lzmq -Wl,-rpath,. g++ -std=c++11 server.cpp -o server -I../libzmq/include -I../cppzmq -L../libzmq/build/lib -lzmq -Wl,-rpath,. + +cl /std:c++17 client.cpp -Fe:client.exe -Ic:\Users\cupcake\github\vcpkg\installed\x64-windows\include\ /link c:\Users\cupcake\github\vcpkg\installed\x64-windows\lib\libzmq-mt-4_3_5.lib + +cl /std:c++17 server.cpp -Fe:server.exe -Ic:\Users\cupcake\github\vcpkg\installed\x64-windows\include\ /link c:\Users\cupcake\github\vcpkg\installed\x64-windows\lib\libzmq-mt-4_3_5.lib + ``` diff --git a/client.cpp b/client.cpp index 096c44b..c938c54 100644 --- a/client.cpp +++ b/client.cpp @@ -8,11 +8,127 @@ #include #include #include -#include // For STDIN_FILENO - -// dynamic char #include +#include +#include +#include + +// Atomic variable to store the counter value +std::atomic counter(0); +std::atomic heartbeat_state (true); +// Condition variable to signal the counter thread +std::condition_variable cv; +// Mutex to protect shared data +std::mutex mtx; +// Flag to indicate if the counter should reset +std::atomic shouldReset(false); + +void command_thread(std::string server_pkey, std::string client_pkey, std::string client_skey) { + const size_t chunk_size = 32768; + zmq::context_t ctx; + zmq::socket_t command_sock (ctx, zmq::socket_type::req); + command_sock.set(zmq::sockopt::sndtimeo, 10000); + command_sock.set(zmq::sockopt::rcvtimeo, 10000); + command_sock.set(zmq::sockopt::curve_serverkey, server_pkey); + command_sock.set(zmq::sockopt::curve_publickey, client_pkey); + command_sock.set(zmq::sockopt::curve_secretkey, client_skey); + command_sock.set(zmq::sockopt::linger, 1); // Close immediately on disconnect + command_sock.connect("tcp://localhost:5556"); + + while (true) { + std::string input; + std::getline(std::cin, input); + zmq::message_t msg_command (input); + command_sock.send(msg_command, zmq::send_flags::none); + std::cout << "Sent: " << input.data() << std::endl; + + zmq::message_t zmq_response; + command_sock.recv(zmq_response, zmq::recv_flags::none); + std::string response(static_cast(zmq_response.data()), zmq_response.size()-1); + std::cout << "Server Response: " << response << response.size() << std::endl; + + if(response=="ACK") { // Check server is ok to move on + std::cout << "kACKServer Response: " << response << std::endl; + + if(input == "exit") { + break; + } else if(input == "send") { + std::string read_file = "./orange-juice.bsz"; + std::cout << "sending\n"; + std::ifstream binaryInputFile; + binaryInputFile.open(read_file, std::ios::binary);// for reading + if (!binaryInputFile.is_open()) { + std::cerr << "Error opening file for read" << std::endl; + zmq::message_t zmsg1("ERR"); + command_sock.send(zmsg1, zmq::send_flags::none); + zmq::message_t z_in; + command_sock.recv(z_in); // Wait for acknowledgment from server + std::cout << z_in << std::endl; + } else { + + std::vector send_buffer(chunk_size); + std::streamsize bytes_read_in_chunk; + while (true) { + binaryInputFile.read(send_buffer.data(), chunk_size); // read the file into the buffer + bytes_read_in_chunk = binaryInputFile.gcount(); // Actual bytes read + if(bytes_read_in_chunk > 0){ + //std::cout << bytes_read_in_chunk << std::endl; + zmq::message_t message(send_buffer.data(), bytes_read_in_chunk); + zmq::message_t z_in; + command_sock.send(message, zmq::send_flags::none); + command_sock.recv(z_in); // Wait for acknowledgment from server + } else { + break; + } + } + // Send an empty message to signal end of file + command_sock.send(zmq::message_t(), zmq::send_flags::none); + zmq::message_t z_in; + command_sock.recv(z_in); // Wait for acknowledgment from server + } + } + } + } + command_sock.close(); + ctx.close(); +} + +void heartbeat_thread(std::string server_pkey, std::string client_pkey, std::string client_skey) { + zmq::context_t ctx; + zmq::socket_t heartbeat_sock (ctx, zmq::socket_type::req); + heartbeat_sock.set(zmq::sockopt::curve_serverkey, server_pkey); + heartbeat_sock.set(zmq::sockopt::curve_publickey, client_pkey); + heartbeat_sock.set(zmq::sockopt::curve_secretkey, client_skey); + heartbeat_sock.connect("tcp://localhost:5555"); + int heartbeat_count = 0; + std::vector items = {}; + while (true) { + // Increment the counter every 10 milliseconds + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + //std::cout << "beat" << std::endl; + std::string msg_string = "BEAT" + std::to_string(heartbeat_count++); + zmq::message_t msg_heartbeat (msg_string); + heartbeat_sock.send(msg_heartbeat, zmq::send_flags::none); + + + // Wait for response (poll for ZMQ_POLLIN) + zmq::pollitem_t response_item = { heartbeat_sock, 0, ZMQ_POLLIN, 0 }; + zmq::poll(&response_item, 1, 100); // Wait for response with timeout + if (response_item.revents & ZMQ_POLLIN) { + zmq::message_t msg_response; + heartbeat_sock.recv(msg_response, zmq::recv_flags::none); + //std::cout << "Heartbeat Response: " << std::endl; + } else { + std::cout << "Bella Server is unavailable" << std::endl; + heartbeat_state = false; + } + } + heartbeat_sock.close(); + ctx.close(); +} + + std::string get_pubkey_from_srv() { // No authentication is used, server will give out pubkey to anybody // Could use a unique message but since socket is unencrypted this provides @@ -33,173 +149,56 @@ std::string get_pubkey_from_srv() { try { zmq::send_result_t send_result = pubkey_sock.send(z_out, zmq::send_flags::none); - std::cout << "TRY" << std::endl; } catch (const zmq::error_t& e) { std::cout << "ERROR" << std::endl; } - std::cout << "RECEVIE" << std::endl; - + std::cout << "bellazmq connecting to server..." << std::endl; zmq::message_t z_in; pubkey_sock.recv(z_in); std::string pub_key = z_in.to_string(); pubkey_sock.close(); ctx.close(); + std::cout << "connection successful" << std::endl; return pub_key; } int main() { - try - { - const size_t chunk_size = 32768; - // Dynamically create keypair, every run is bespoke - // [TODO] send pubkey to server, mkdir, render to that dir - char client_skey[128] = { 0 }; - char client_pkey[128] = { 0 }; - if ( zmq_curve_keypair(&client_pkey[0], &client_skey[0])) { - // 1 is fail - std::cout << "\ncurve keypair gen failed."; - exit(EXIT_FAILURE); - } - - // Get server pubkey, set client keypair - std::string server_pkey = get_pubkey_from_srv(); - if(server_pkey.empty()) { - std::cout << "Server is Down" << std::endl; - return 1; - } - - zmq::context_t ctx(1); - - // Create zmq sockets - zmq::socket_t heartbeat_sock (ctx, zmq::socket_type::req); - zmq::socket_t command_sock (ctx, zmq::socket_type::req); - - // Encrypt heartbeat socket - heartbeat_sock.set(zmq::sockopt::curve_serverkey, server_pkey); - heartbeat_sock.set(zmq::sockopt::curve_publickey, client_pkey); - heartbeat_sock.set(zmq::sockopt::curve_secretkey, client_skey); - - // Encrypt command socket - command_sock.set(zmq::sockopt::curve_serverkey, server_pkey); - command_sock.set(zmq::sockopt::curve_publickey, client_pkey); - command_sock.set(zmq::sockopt::curve_secretkey, client_skey); - - std::cout << "keypair" << std::endl; - - // Set receive timeout to 1000 milliseconds - //command_sock.set(zmq::sockopt::sndtimeo, 100000); - //command_sock.set(zmq::sockopt::rcvtimeo, 1000); - command_sock.set(zmq::sockopt::linger, 1); // Close immediately on disconnect - - //zmq::context_t ctx(1); - //zmq::socket_t heartbeat_sock (ctx, zmq::socket_type::req); - //zmq::socket_t command_sock (ctx, zmq::socket_type::req); - //sock.set(zmq::sockopt::sndtimeo, 10000); - //sock.set(zmq::sockopt::rcvtimeo, 10000); - //command_sock.set(zmq::sockopt::linger, 1); // Close immediately on disconnect - // - heartbeat_sock.connect("tcp://localhost:5555"); - command_sock.connect("tcp://localhost:5556"); - - std::vector items = { - { heartbeat_sock, 0, ZMQ_POLLOUT, 0 }, // Monitor sender1 for send readiness - { 0, STDIN_FILENO, ZMQ_POLLIN, 0 } // Monitor std::cin - }; - - int heartbeat_count = 0; - while (true) { - zmq::poll(items, 100); - if (items[0].revents & ZMQ_POLLOUT) { - std::string msg_string = "BEAT" + std::to_string(heartbeat_count++); - zmq::message_t msg_heartbeat (msg_string); - heartbeat_sock.send(msg_heartbeat, zmq::send_flags::none); - - // Wait for response (poll for ZMQ_POLLIN) - zmq::pollitem_t response_item = { heartbeat_sock, 0, ZMQ_POLLIN, 0 }; - zmq::poll(&response_item, 1, 100); // Wait for response with timeout - - if (response_item.revents & ZMQ_POLLIN) { - zmq::message_t msg_response; - heartbeat_sock.recv(msg_response, zmq::recv_flags::none); - //std::cout << "Heartbeat Response: " << std::endl; - } else { - std::cout << "Bella Server is unavailable" << std::endl; - break; - } - } - - if (items[1].revents & ZMQ_POLLIN) { - // Gather input from console - std::string input; - std::getline(std::cin, input); - // Parse the line - - zmq::message_t msg_command (input); - command_sock.send(msg_command, zmq::send_flags::none); - std::cout << "Sent: " << input.data() << std::endl; - - // Wait for response (poll for ZMQ_POLLIN) - zmq::pollitem_t response_item = { command_sock, 0, ZMQ_POLLIN, 0 }; - zmq::poll(&response_item, 1, 100); // Wait for response with timeout - - /*if (response_item.revents & ZMQ_POLLIN) { - zmq::message_t zmq_response; - command_sock.recv(zmq_response, zmq::recv_flags::none); - std::string response(static_cast(zmq_response.data()), zmq_response.size()); - std::cout << "Server Response: " << response << std::endl; - } else { - std::cout << "Server Timeout" << std::endl; - break; - }*/ - zmq::message_t zmq_response; - command_sock.recv(zmq_response, zmq::recv_flags::none); - std::string response(static_cast(zmq_response.data()), zmq_response.size()); - std::cout << "Server Response: " << response << std::endl; - - if(input == "exit") { - break; - } else if(input == "send") { - std::string read_file = "./orange-juice.bsz"; - std::cout << "sending\n"; - std::ifstream binaryInputFile; - binaryInputFile.open(read_file, std::ios::binary);// for reading - //std::ifstream binaryInputFile(read_file, std::ios::binary); - std::vector send_buffer(chunk_size); - std::streamsize bytes_read_in_chunk; - while (true) { - binaryInputFile.read(send_buffer.data(), chunk_size); // read the file into the buffer - bytes_read_in_chunk = binaryInputFile.gcount(); // Actual bytes read - if(bytes_read_in_chunk > 0){ - //std::cout << bytes_read_in_chunk << std::endl; - zmq::message_t message(send_buffer.data(), bytes_read_in_chunk); - zmq::message_t z_in; - command_sock.send(message, zmq::send_flags::none); - command_sock.recv(z_in); // Wait for acknowledgment from server - } else { - //zmq::message_t message(""); - //zmq::message_t z_in; - //command_sock.send(message, zmq::send_flags::none); - //command_sock.recv(z_in); // Wait for acknowledgment from server - break; - } - } - // Send an empty message to signal end of file - command_sock.send(zmq::message_t(), zmq::send_flags::none); - zmq::message_t z_in; - command_sock.recv(z_in); // Wait for acknowledgment from server - } - } - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - } - heartbeat_sock.close(); - command_sock.close(); - ctx.close(); - return 0; - } catch (const zmq::error_t& e) { - std::cerr << "ZeroMQ error: " << e.what() << std::endl; - return 1; + const size_t chunk_size = 32768; + // Dynamically create keypair, every run is bespoke + // [TODO] send pubkey to server, mkdir, render to that dir + char client_skey[41] = { 0 }; + char client_pkey[41] = { 0 }; + if ( zmq_curve_keypair(&client_pkey[0], &client_skey[0])) { + // 1 is fail + std::cout << "\ncurve keypair gen failed."; + exit(EXIT_FAILURE); } + + // Get server pubkey, set client keypair + std::string server_pkey = get_pubkey_from_srv(); + /*if(server_pkey.empty()) { + std::cout << "Server is Down" << std::endl; + heartbeat_state = false; + }*/ + + std::string client_pkey_str(client_pkey); + std::string client_skey_str(client_skey); + + // Multithreaded + std::thread command_t(command_thread, server_pkey, client_pkey_str, client_skey_str); + std::thread heartbeat_t(heartbeat_thread, server_pkey, client_pkey_str, client_skey_str); + + while (true) { + if (!heartbeat_state.load()) { + std::cout << "Dead" << std::endl; + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + command_t.join(); + heartbeat_t.join(); + return 0; } diff --git a/server.cpp b/server.cpp index 6720cf4..d0bcf12 100644 --- a/server.cpp +++ b/server.cpp @@ -2,10 +2,127 @@ #include #include #include -#include #include #include +#include +#include +#include + + +// Atomic variable to store the counter value +//std::atomic counter(0); +std::atomic heartbeat_state (true); +// Condition variable to signal the counter thread +//std::condition_variable cv; +// Mutex to protect shared data +//std::mutex mtx; +// Flag to indicate if the counter should reset +std::atomic shouldReset(false); + + +void command_thread(std::string server_skey) { + zmq::context_t ctx; + + // Create zmq rep sockets + //zmq::socket_t heartbeat_sock(ctx, zmq::socket_type::rep); + zmq::socket_t command_sock(ctx, zmq::socket_type::rep); + command_sock.set(zmq::sockopt::curve_server, true); + command_sock.set(zmq::sockopt::curve_secretkey, server_skey); + command_sock.set(zmq::sockopt::linger, 1); // Close immediately on disconnect + command_sock.bind("tcp://*:5556"); + while (true) { + zmq::message_t msg_command; + command_sock.recv(msg_command, zmq::recv_flags::none); + std::string client_command = msg_command.to_string(); + std::cout << "Command: " << client_command << std::endl; + + if(client_command == "hello"){ + std::cout << "bye" << std::endl; + zmq::message_t zmsg1("bye"); + command_sock.send(zmsg1, zmq::send_flags::none); + } else if (client_command == "exit") { + std::cout << "ACK" << std::endl; + zmq::message_t zmsg1("ACK"); + command_sock.send(zmsg1, zmq::send_flags::none); + //heartbeat_state = false; + } else if (client_command == "send") { + std::ofstream output_file("received_file.bsz", std::ios::binary); // Open file in binary mode + if (!output_file.is_open()) { + std::cerr << "Error opening file for writing" << std::endl; + std::cout << "ERR" << std::endl; + zmq::message_t zmsg1("ERR"); + command_sock.send(zmsg1, zmq::send_flags::none); + } else { + std::cout << "file good ACK" << std::endl; + zmq::message_t zmsg1("ACK"); + command_sock.send(zmsg1, zmq::send_flags::none); + while (true) { + zmq::message_t recv_data; + command_sock.recv(recv_data, zmq::recv_flags::none); + if (recv_data.size() == 0) { + //std::cout << "chunk ACK" << std::endl; + zmq::message_t reply("ACK"); + command_sock.send(reply, zmq::send_flags::none); + break; // End of file + } else if(recv_data.size() == 4) { //LIKELY ERR\0 from client, can't find file + // [TODO] , parse lines only send valid commands + // Right now I allow any text to get through + + std::cout << "ERR client read ACK" << std::endl; + zmq::message_t reply("ACK"); + command_sock.send(reply, zmq::send_flags::none); + break; // End of file + } + //std::cout << recv_data.size() << std::endl; + output_file.write(static_cast(recv_data.data()), recv_data.size()); + zmq::message_t reply("ACK"); + command_sock.send(reply, zmq::send_flags::none); + } + output_file.close(); + } + } else { + zmq::message_t zmsg1("foo"); + command_sock.send(zmsg1, zmq::send_flags::none); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + command_sock.close(); + ctx.close(); +} + +void heartbeat_thread(std::string server_skey) { + heartbeat_state = true; + std::cout << "new heartbeat_thread" << std::endl; + zmq::context_t ctx; + zmq::socket_t heartbeat_sock (ctx, zmq::socket_type::rep); + heartbeat_sock.set(zmq::sockopt::curve_server, true); + heartbeat_sock.set(zmq::sockopt::curve_secretkey, server_skey); + heartbeat_sock.bind("tcp://*:5555"); + while(true) { + zmq::pollitem_t response_item = { heartbeat_sock, 0, ZMQ_POLLIN, 0 }; + zmq::poll(&response_item, 1, 10000); // Wait for response with timeout + + if (response_item.revents & ZMQ_POLLIN) { + zmq::message_t message; + heartbeat_sock.recv(message, zmq::recv_flags::none); + //std::cout << "heart:" << heartbeat_state.load() << std::endl; + + std::string response = "Heartbeat OK"; + zmq::message_t zmq_response (response); + heartbeat_sock.send(zmq_response, zmq::send_flags::dontwait); // No block + + } else { + std::cout << "Bella Client Lost" << std::endl; + heartbeat_state = false; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + heartbeat_sock.close(); + ctx.close(); +} + // Blocking zmq rep socket to pass server_public_key void pkey_server(const std::string& pub_key) { zmq::context_t ctx; @@ -25,12 +142,6 @@ void pkey_server(const std::string& pub_key) { int main() { - zmq::context_t ctx(1); - - // Create zmq rep sockets - zmq::socket_t heartbeat_sock(ctx, zmq::socket_type::rep); - zmq::socket_t command_sock(ctx, zmq::socket_type::rep); - // Generate brand new keypair on launch // [TODO] Add client side public key fingerprinting for added security char skey[128] = { 0 }; @@ -41,106 +152,27 @@ int main() exit(EXIT_FAILURE); } - heartbeat_sock.set(zmq::sockopt::curve_server, true); - heartbeat_sock.set(zmq::sockopt::curve_secretkey, skey); - - command_sock.set(zmq::sockopt::curve_server, true); - command_sock.set(zmq::sockopt::curve_secretkey, skey); - - //heartbeat_sock.set(zmq::sockopt::rcvtimeo, 10000); - //heartbeat_sock.set(zmq::sockopt::sndtimeo, 10000); - command_sock.set(zmq::sockopt::linger, 1); // Close immediately on disconnect - - // Binding to transport - heartbeat_sock.bind("tcp://*:5555"); - command_sock.bind("tcp://*:5556"); - - // Create poll items - std::vector items = { - { heartbeat_sock, 0, ZMQ_POLLIN, 0 }, - { command_sock, 0, ZMQ_POLLIN, 0 } - }; + // Multi threading + //std::thread command_t(command_thread, skey); + //std::thread heartbeat_t(heartbeat_thread, skey); + //std::thread command_t(command_thread, skey); + //std::thread heartbeat_t(heartbeat_thread, skey); + std::thread command_t(command_thread, skey); + std::thread heartbeat_t(heartbeat_thread, skey); // - while(true) { // awaiting client loop + while(true) { // awaiting new client loop + heartbeat_state = true; pkey_server(pkey); // blocking wait client to get public key on port 5555 + heartbeat_state = true; std::cout << "Client connected" << std::endl; - int heartbeat_miss = 0; - while (true) { //loop forever accepting encrypted messages, limit to one client - zmq::poll(items, 100); - //std::cout << "heart:" << heartbeat_miss << std::endl; - // Check if heartbeat_socket has data - if (items[0].revents & ZMQ_POLLIN) { - zmq::message_t message; - heartbeat_sock.recv(message, zmq::recv_flags::none); - //std::cout << "heart:" << heartbeat_miss << std::endl; - - std::string response = "Heartbeat OK"; - zmq::message_t zmq_response (response); - heartbeat_sock.send(zmq_response, zmq::send_flags::dontwait); // No block - heartbeat_miss = 0; // Reset heartbeat misses - } else { //No heartbeat detected during poll - heartbeat_miss++; - if (heartbeat_miss>25) { //This many misses means client is AWOL - break; //Exit inner loop to outer loop handling pubkey serving - } - } - // Check if command_socket has data - if (items[1].revents & ZMQ_POLLIN) { - zmq::message_t msg_command; - command_sock.recv(msg_command, zmq::recv_flags::none); - std::string client_command = msg_command.to_string(); - //std::string received_message (static_cast(msg_command.data()), msg_command.size()); - std::cout << "Command: " << client_command << std::endl; - - // Send a response - //zmq::message_t zmq_response("ACK"); - //command_sock.send(zmq_response, zmq::send_flags::none); - - //std::string client_command = msg_command.to_string(); - - // 2. Check if the string is empty - //if (client_command.empty()) { - // std::cerr << "Invalid message received: " << std::endl; - // break; // exit loop to await new client - //} - - if(client_command == "hello"){ - zmq::message_t zmsg1("bye"); - command_sock.send(zmsg1, zmq::send_flags::none); - } else if (client_command == "exit") { - zmq::message_t zmsg1("exit"); - command_sock.send(zmsg1, zmq::send_flags::none); - break; - } else if (client_command == "send") { - std::ofstream output_file("received_file.bsz", std::ios::binary); // Open file in binary mode - if (!output_file.is_open()) { - std::cerr << "Error opening file for writing" << std::endl; - return 1; - } - zmq::message_t zmsg1("ACK1"); - command_sock.send(zmsg1, zmq::send_flags::none); - while (true) { - zmq::message_t recv_data; - command_sock.recv(recv_data, zmq::recv_flags::none); - if (recv_data.size() == 0) { - zmq::message_t reply("ACK2"); - command_sock.send(reply, zmq::send_flags::none); - break; // End of file - } - //std::cout << recv_data.size() << std::endl; - output_file.write(static_cast(recv_data.data()), recv_data.size()); - zmq::message_t reply("ACK2"); - command_sock.send(reply, zmq::send_flags::none); - } - output_file.close(); - } else { - zmq::message_t zmsg1("ACK"); - command_sock.send(zmsg1, zmq::send_flags::none); - } + while(true) { // inner loop + //std::cout << "inner loop" << std::endl; + if (heartbeat_state.load()==false) { + std::cout << "client dead" << std::endl; + break; } - // Simulate doing other work (optional) std::this_thread::sleep_for(std::chrono::milliseconds(10)); } }