From e4b606e49ca655ebaa4a47b1a98ac6f3a324a8e9 Mon Sep 17 00:00:00 2001 From: Harvey Fong Date: Tue, 25 Feb 2025 10:11:15 -0700 Subject: [PATCH] first working Linux --- README.md | 39 +++++++++- client.cpp | 205 +++++++++++++++++++++++++++++++++++++++++++++++++++++ server.cpp | 147 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 390 insertions(+), 1 deletion(-) create mode 100644 client.cpp create mode 100644 server.cpp diff --git a/README.md b/README.md index 4ce1d8a..fcd552e 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,39 @@ # zmqprototype -Prototype code for cross platform +Prototype code for cross platform zmq rep-req render node + +###Build + +##Linux +``` +apt install -y libzmq-dev +ldconfig +apt install -y libtool +apt install -y libsodium-dev +apt install -y cmake + +git clone https://github.com/zeromq/libzmq +apt install libgnutls28-dev +apt install pkg-config +cd libzmq +mkdir build +cd build +cmake .. -DENABLE_CURVE=ON -DWITH_LIBSODIUM=/usr/include/sodium + + +git https://github.com/zeromq/cppzmq +cd cppzmq +mkdir build +cd build +cmake .. + +g++ server.cpp -o server -lzmq -Wl,-rpath,. +``` + +##MacOS +``` +g++ -std=c++11 server.cpp -o server -I../libzmq/include -I../cppzmq -L../libzmq/build/lib -lzmq -Wl,-rpath,. +g++ -std=c++11 server.cpp -o server -I../libzmq/include -I../cppzmq -L../libzmq/build/lib -lzmq -Wl,-rpath,. +``` + + + diff --git a/client.cpp b/client.cpp new file mode 100644 index 0000000..096c44b --- /dev/null +++ b/client.cpp @@ -0,0 +1,205 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include // For STDIN_FILENO + +// dynamic char +#include + +std::string get_pubkey_from_srv() { + // No authentication is used, server will give out pubkey to anybody + // Could use a unique message but since socket is unencrypted this provides + // no protection. In main loop we establish an encrypted connection with the server + // now that we have the pubkey and in combo with the client_secret_key we can + // be secure. 0MQ uses PFS perfect forward security, because this initial + // back and forth is extended with behind the scenes new keypairs taken care of by + // 0MQ after we establish our intitial encrypted socket + zmq::context_t ctx; + zmq::socket_t pubkey_sock(ctx, zmq::socket_type::req); + + pubkey_sock.set(zmq::sockopt::sndtimeo, 10000); + pubkey_sock.set(zmq::sockopt::rcvtimeo, 10000); + pubkey_sock.set(zmq::sockopt::linger, 0); // Close immediately on disconnect + + pubkey_sock.connect("tcp://127.0.0.1:9555"); + zmq::message_t z_out(std::string("Bellarender123")); + + try { + zmq::send_result_t send_result = pubkey_sock.send(z_out, zmq::send_flags::none); + std::cout << "TRY" << std::endl; + } catch (const zmq::error_t& e) { + std::cout << "ERROR" << std::endl; + } + + std::cout << "RECEVIE" << std::endl; + + zmq::message_t z_in; + pubkey_sock.recv(z_in); + std::string pub_key = z_in.to_string(); + pubkey_sock.close(); + ctx.close(); + return pub_key; +} + +int main() +{ + try + { + const size_t chunk_size = 32768; + // Dynamically create keypair, every run is bespoke + // [TODO] send pubkey to server, mkdir, render to that dir + char client_skey[128] = { 0 }; + char client_pkey[128] = { 0 }; + if ( zmq_curve_keypair(&client_pkey[0], &client_skey[0])) { + // 1 is fail + std::cout << "\ncurve keypair gen failed."; + exit(EXIT_FAILURE); + } + + // Get server pubkey, set client keypair + std::string server_pkey = get_pubkey_from_srv(); + if(server_pkey.empty()) { + std::cout << "Server is Down" << std::endl; + return 1; + } + + zmq::context_t ctx(1); + + // Create zmq sockets + zmq::socket_t heartbeat_sock (ctx, zmq::socket_type::req); + zmq::socket_t command_sock (ctx, zmq::socket_type::req); + + // Encrypt heartbeat socket + heartbeat_sock.set(zmq::sockopt::curve_serverkey, server_pkey); + heartbeat_sock.set(zmq::sockopt::curve_publickey, client_pkey); + heartbeat_sock.set(zmq::sockopt::curve_secretkey, client_skey); + + // Encrypt command socket + command_sock.set(zmq::sockopt::curve_serverkey, server_pkey); + command_sock.set(zmq::sockopt::curve_publickey, client_pkey); + command_sock.set(zmq::sockopt::curve_secretkey, client_skey); + + std::cout << "keypair" << std::endl; + + // Set receive timeout to 1000 milliseconds + //command_sock.set(zmq::sockopt::sndtimeo, 100000); + //command_sock.set(zmq::sockopt::rcvtimeo, 1000); + command_sock.set(zmq::sockopt::linger, 1); // Close immediately on disconnect + + //zmq::context_t ctx(1); + //zmq::socket_t heartbeat_sock (ctx, zmq::socket_type::req); + //zmq::socket_t command_sock (ctx, zmq::socket_type::req); + //sock.set(zmq::sockopt::sndtimeo, 10000); + //sock.set(zmq::sockopt::rcvtimeo, 10000); + //command_sock.set(zmq::sockopt::linger, 1); // Close immediately on disconnect + // + heartbeat_sock.connect("tcp://localhost:5555"); + command_sock.connect("tcp://localhost:5556"); + + std::vector items = { + { heartbeat_sock, 0, ZMQ_POLLOUT, 0 }, // Monitor sender1 for send readiness + { 0, STDIN_FILENO, ZMQ_POLLIN, 0 } // Monitor std::cin + }; + + int heartbeat_count = 0; + while (true) { + zmq::poll(items, 100); + if (items[0].revents & ZMQ_POLLOUT) { + std::string msg_string = "BEAT" + std::to_string(heartbeat_count++); + zmq::message_t msg_heartbeat (msg_string); + heartbeat_sock.send(msg_heartbeat, zmq::send_flags::none); + + // Wait for response (poll for ZMQ_POLLIN) + zmq::pollitem_t response_item = { heartbeat_sock, 0, ZMQ_POLLIN, 0 }; + zmq::poll(&response_item, 1, 100); // Wait for response with timeout + + if (response_item.revents & ZMQ_POLLIN) { + zmq::message_t msg_response; + heartbeat_sock.recv(msg_response, zmq::recv_flags::none); + //std::cout << "Heartbeat Response: " << std::endl; + } else { + std::cout << "Bella Server is unavailable" << std::endl; + break; + } + } + + if (items[1].revents & ZMQ_POLLIN) { + // Gather input from console + std::string input; + std::getline(std::cin, input); + // Parse the line + + zmq::message_t msg_command (input); + command_sock.send(msg_command, zmq::send_flags::none); + std::cout << "Sent: " << input.data() << std::endl; + + // Wait for response (poll for ZMQ_POLLIN) + zmq::pollitem_t response_item = { command_sock, 0, ZMQ_POLLIN, 0 }; + zmq::poll(&response_item, 1, 100); // Wait for response with timeout + + /*if (response_item.revents & ZMQ_POLLIN) { + zmq::message_t zmq_response; + command_sock.recv(zmq_response, zmq::recv_flags::none); + std::string response(static_cast(zmq_response.data()), zmq_response.size()); + std::cout << "Server Response: " << response << std::endl; + } else { + std::cout << "Server Timeout" << std::endl; + break; + }*/ + zmq::message_t zmq_response; + command_sock.recv(zmq_response, zmq::recv_flags::none); + std::string response(static_cast(zmq_response.data()), zmq_response.size()); + std::cout << "Server Response: " << response << std::endl; + + if(input == "exit") { + break; + } else if(input == "send") { + std::string read_file = "./orange-juice.bsz"; + std::cout << "sending\n"; + std::ifstream binaryInputFile; + binaryInputFile.open(read_file, std::ios::binary);// for reading + //std::ifstream binaryInputFile(read_file, std::ios::binary); + std::vector send_buffer(chunk_size); + std::streamsize bytes_read_in_chunk; + while (true) { + binaryInputFile.read(send_buffer.data(), chunk_size); // read the file into the buffer + bytes_read_in_chunk = binaryInputFile.gcount(); // Actual bytes read + if(bytes_read_in_chunk > 0){ + //std::cout << bytes_read_in_chunk << std::endl; + zmq::message_t message(send_buffer.data(), bytes_read_in_chunk); + zmq::message_t z_in; + command_sock.send(message, zmq::send_flags::none); + command_sock.recv(z_in); // Wait for acknowledgment from server + } else { + //zmq::message_t message(""); + //zmq::message_t z_in; + //command_sock.send(message, zmq::send_flags::none); + //command_sock.recv(z_in); // Wait for acknowledgment from server + break; + } + } + // Send an empty message to signal end of file + command_sock.send(zmq::message_t(), zmq::send_flags::none); + zmq::message_t z_in; + command_sock.recv(z_in); // Wait for acknowledgment from server + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + heartbeat_sock.close(); + command_sock.close(); + ctx.close(); + return 0; + } catch (const zmq::error_t& e) { + std::cerr << "ZeroMQ error: " << e.what() << std::endl; + return 1; + } +} + diff --git a/server.cpp b/server.cpp new file mode 100644 index 0000000..6720cf4 --- /dev/null +++ b/server.cpp @@ -0,0 +1,147 @@ +#include +#include +#include +#include +#include +#include +#include + +// Blocking zmq rep socket to pass server_public_key +void pkey_server(const std::string& pub_key) { + zmq::context_t ctx; + zmq::socket_t sock(ctx, zmq::socket_type::rep); + sock.bind("tcp://*:9555"); //[TODO] args to set port + + zmq::message_t z_in; + std::cout << "Entered: Public Key Serving Mode" << std::endl; + sock.recv(z_in); + if (z_in.to_string().compare("Bellarender123") == 0) { + zmq::message_t z_out(pub_key); + sock.send(z_out, zmq::send_flags::none); + } + sock.close(); + ctx.close(); +} + +int main() +{ + zmq::context_t ctx(1); + + // Create zmq rep sockets + zmq::socket_t heartbeat_sock(ctx, zmq::socket_type::rep); + zmq::socket_t command_sock(ctx, zmq::socket_type::rep); + + // Generate brand new keypair on launch + // [TODO] Add client side public key fingerprinting for added security + char skey[128] = { 0 }; + char pkey[128] = { 0 }; + if ( zmq_curve_keypair(&pkey[0], &skey[0])) { + // 1 is fail + std::cout << "\ncurve keypair gen failed."; + exit(EXIT_FAILURE); + } + + heartbeat_sock.set(zmq::sockopt::curve_server, true); + heartbeat_sock.set(zmq::sockopt::curve_secretkey, skey); + + command_sock.set(zmq::sockopt::curve_server, true); + command_sock.set(zmq::sockopt::curve_secretkey, skey); + + //heartbeat_sock.set(zmq::sockopt::rcvtimeo, 10000); + //heartbeat_sock.set(zmq::sockopt::sndtimeo, 10000); + command_sock.set(zmq::sockopt::linger, 1); // Close immediately on disconnect + + // Binding to transport + heartbeat_sock.bind("tcp://*:5555"); + command_sock.bind("tcp://*:5556"); + + // Create poll items + std::vector items = { + { heartbeat_sock, 0, ZMQ_POLLIN, 0 }, + { command_sock, 0, ZMQ_POLLIN, 0 } + }; + + // + while(true) { // awaiting client loop + pkey_server(pkey); // blocking wait client to get public key on port 5555 + std::cout << "Client connected" << std::endl; + int heartbeat_miss = 0; + while (true) { //loop forever accepting encrypted messages, limit to one client + zmq::poll(items, 100); + //std::cout << "heart:" << heartbeat_miss << std::endl; + + // Check if heartbeat_socket has data + if (items[0].revents & ZMQ_POLLIN) { + zmq::message_t message; + heartbeat_sock.recv(message, zmq::recv_flags::none); + //std::cout << "heart:" << heartbeat_miss << std::endl; + + std::string response = "Heartbeat OK"; + zmq::message_t zmq_response (response); + heartbeat_sock.send(zmq_response, zmq::send_flags::dontwait); // No block + heartbeat_miss = 0; // Reset heartbeat misses + } else { //No heartbeat detected during poll + heartbeat_miss++; + if (heartbeat_miss>25) { //This many misses means client is AWOL + break; //Exit inner loop to outer loop handling pubkey serving + } + } + // Check if command_socket has data + if (items[1].revents & ZMQ_POLLIN) { + zmq::message_t msg_command; + command_sock.recv(msg_command, zmq::recv_flags::none); + std::string client_command = msg_command.to_string(); + //std::string received_message (static_cast(msg_command.data()), msg_command.size()); + std::cout << "Command: " << client_command << std::endl; + + // Send a response + //zmq::message_t zmq_response("ACK"); + //command_sock.send(zmq_response, zmq::send_flags::none); + + //std::string client_command = msg_command.to_string(); + + // 2. Check if the string is empty + //if (client_command.empty()) { + // std::cerr << "Invalid message received: " << std::endl; + // break; // exit loop to await new client + //} + + if(client_command == "hello"){ + zmq::message_t zmsg1("bye"); + command_sock.send(zmsg1, zmq::send_flags::none); + } else if (client_command == "exit") { + zmq::message_t zmsg1("exit"); + command_sock.send(zmsg1, zmq::send_flags::none); + break; + } else if (client_command == "send") { + std::ofstream output_file("received_file.bsz", std::ios::binary); // Open file in binary mode + if (!output_file.is_open()) { + std::cerr << "Error opening file for writing" << std::endl; + return 1; + } + zmq::message_t zmsg1("ACK1"); + command_sock.send(zmsg1, zmq::send_flags::none); + while (true) { + zmq::message_t recv_data; + command_sock.recv(recv_data, zmq::recv_flags::none); + if (recv_data.size() == 0) { + zmq::message_t reply("ACK2"); + command_sock.send(reply, zmq::send_flags::none); + break; // End of file + } + //std::cout << recv_data.size() << std::endl; + output_file.write(static_cast(recv_data.data()), recv_data.size()); + zmq::message_t reply("ACK2"); + command_sock.send(reply, zmq::send_flags::none); + } + output_file.close(); + } else { + zmq::message_t zmsg1("ACK"); + command_sock.send(zmsg1, zmq::send_flags::none); + } + } + // Simulate doing other work (optional) + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } +}