first working Linux

This commit is contained in:
Harvey Fong 2025-02-25 10:11:15 -07:00
parent d42e017d9a
commit e4b606e49c
3 changed files with 390 additions and 1 deletions

View File

@ -1,2 +1,39 @@
# zmqprototype # zmqprototype
Prototype code for cross platform Prototype code for cross platform zmq rep-req render node
###Build
##Linux
```
apt install -y libzmq-dev
ldconfig
apt install -y libtool
apt install -y libsodium-dev
apt install -y cmake
git clone https://github.com/zeromq/libzmq
apt install libgnutls28-dev
apt install pkg-config
cd libzmq
mkdir build
cd build
cmake .. -DENABLE_CURVE=ON -DWITH_LIBSODIUM=/usr/include/sodium
git https://github.com/zeromq/cppzmq
cd cppzmq
mkdir build
cd build
cmake ..
g++ server.cpp -o server -lzmq -Wl,-rpath,.
```
##MacOS
```
g++ -std=c++11 server.cpp -o server -I../libzmq/include -I../cppzmq -L../libzmq/build/lib -lzmq -Wl,-rpath,.
g++ -std=c++11 server.cpp -o server -I../libzmq/include -I../cppzmq -L../libzmq/build/lib -lzmq -Wl,-rpath,.
```

205
client.cpp Normal file
View File

@ -0,0 +1,205 @@
#include <iostream>
#include <fstream>
#include <filesystem>
#include <random>
#include <thread>
#include <zmq.hpp>
#include <string>
#include <vector>
#include <chrono>
#include <unistd.h> // For STDIN_FILENO
// dynamic char
#include <vector>
std::string get_pubkey_from_srv() {
// No authentication is used, server will give out pubkey to anybody
// Could use a unique message but since socket is unencrypted this provides
// no protection. In main loop we establish an encrypted connection with the server
// now that we have the pubkey and in combo with the client_secret_key we can
// be secure. 0MQ uses PFS perfect forward security, because this initial
// back and forth is extended with behind the scenes new keypairs taken care of by
// 0MQ after we establish our intitial encrypted socket
zmq::context_t ctx;
zmq::socket_t pubkey_sock(ctx, zmq::socket_type::req);
pubkey_sock.set(zmq::sockopt::sndtimeo, 10000);
pubkey_sock.set(zmq::sockopt::rcvtimeo, 10000);
pubkey_sock.set(zmq::sockopt::linger, 0); // Close immediately on disconnect
pubkey_sock.connect("tcp://127.0.0.1:9555");
zmq::message_t z_out(std::string("Bellarender123"));
try {
zmq::send_result_t send_result = pubkey_sock.send(z_out, zmq::send_flags::none);
std::cout << "TRY" << std::endl;
} catch (const zmq::error_t& e) {
std::cout << "ERROR" << std::endl;
}
std::cout << "RECEVIE" << std::endl;
zmq::message_t z_in;
pubkey_sock.recv(z_in);
std::string pub_key = z_in.to_string();
pubkey_sock.close();
ctx.close();
return pub_key;
}
int main()
{
try
{
const size_t chunk_size = 32768;
// Dynamically create keypair, every run is bespoke
// [TODO] send pubkey to server, mkdir, render to that dir
char client_skey[128] = { 0 };
char client_pkey[128] = { 0 };
if ( zmq_curve_keypair(&client_pkey[0], &client_skey[0])) {
// 1 is fail
std::cout << "\ncurve keypair gen failed.";
exit(EXIT_FAILURE);
}
// Get server pubkey, set client keypair
std::string server_pkey = get_pubkey_from_srv();
if(server_pkey.empty()) {
std::cout << "Server is Down" << std::endl;
return 1;
}
zmq::context_t ctx(1);
// Create zmq sockets
zmq::socket_t heartbeat_sock (ctx, zmq::socket_type::req);
zmq::socket_t command_sock (ctx, zmq::socket_type::req);
// Encrypt heartbeat socket
heartbeat_sock.set(zmq::sockopt::curve_serverkey, server_pkey);
heartbeat_sock.set(zmq::sockopt::curve_publickey, client_pkey);
heartbeat_sock.set(zmq::sockopt::curve_secretkey, client_skey);
// Encrypt command socket
command_sock.set(zmq::sockopt::curve_serverkey, server_pkey);
command_sock.set(zmq::sockopt::curve_publickey, client_pkey);
command_sock.set(zmq::sockopt::curve_secretkey, client_skey);
std::cout << "keypair" << std::endl;
// Set receive timeout to 1000 milliseconds
//command_sock.set(zmq::sockopt::sndtimeo, 100000);
//command_sock.set(zmq::sockopt::rcvtimeo, 1000);
command_sock.set(zmq::sockopt::linger, 1); // Close immediately on disconnect
//zmq::context_t ctx(1);
//zmq::socket_t heartbeat_sock (ctx, zmq::socket_type::req);
//zmq::socket_t command_sock (ctx, zmq::socket_type::req);
//sock.set(zmq::sockopt::sndtimeo, 10000);
//sock.set(zmq::sockopt::rcvtimeo, 10000);
//command_sock.set(zmq::sockopt::linger, 1); // Close immediately on disconnect
//
heartbeat_sock.connect("tcp://localhost:5555");
command_sock.connect("tcp://localhost:5556");
std::vector<zmq::pollitem_t> items = {
{ heartbeat_sock, 0, ZMQ_POLLOUT, 0 }, // Monitor sender1 for send readiness
{ 0, STDIN_FILENO, ZMQ_POLLIN, 0 } // Monitor std::cin
};
int heartbeat_count = 0;
while (true) {
zmq::poll(items, 100);
if (items[0].revents & ZMQ_POLLOUT) {
std::string msg_string = "BEAT" + std::to_string(heartbeat_count++);
zmq::message_t msg_heartbeat (msg_string);
heartbeat_sock.send(msg_heartbeat, zmq::send_flags::none);
// Wait for response (poll for ZMQ_POLLIN)
zmq::pollitem_t response_item = { heartbeat_sock, 0, ZMQ_POLLIN, 0 };
zmq::poll(&response_item, 1, 100); // Wait for response with timeout
if (response_item.revents & ZMQ_POLLIN) {
zmq::message_t msg_response;
heartbeat_sock.recv(msg_response, zmq::recv_flags::none);
//std::cout << "Heartbeat Response: " << std::endl;
} else {
std::cout << "Bella Server is unavailable" << std::endl;
break;
}
}
if (items[1].revents & ZMQ_POLLIN) {
// Gather input from console
std::string input;
std::getline(std::cin, input);
// Parse the line
zmq::message_t msg_command (input);
command_sock.send(msg_command, zmq::send_flags::none);
std::cout << "Sent: " << input.data() << std::endl;
// Wait for response (poll for ZMQ_POLLIN)
zmq::pollitem_t response_item = { command_sock, 0, ZMQ_POLLIN, 0 };
zmq::poll(&response_item, 1, 100); // Wait for response with timeout
/*if (response_item.revents & ZMQ_POLLIN) {
zmq::message_t zmq_response;
command_sock.recv(zmq_response, zmq::recv_flags::none);
std::string response(static_cast<char*>(zmq_response.data()), zmq_response.size());
std::cout << "Server Response: " << response << std::endl;
} else {
std::cout << "Server Timeout" << std::endl;
break;
}*/
zmq::message_t zmq_response;
command_sock.recv(zmq_response, zmq::recv_flags::none);
std::string response(static_cast<char*>(zmq_response.data()), zmq_response.size());
std::cout << "Server Response: " << response << std::endl;
if(input == "exit") {
break;
} else if(input == "send") {
std::string read_file = "./orange-juice.bsz";
std::cout << "sending\n";
std::ifstream binaryInputFile;
binaryInputFile.open(read_file, std::ios::binary);// for reading
//std::ifstream binaryInputFile(read_file, std::ios::binary);
std::vector<char> send_buffer(chunk_size);
std::streamsize bytes_read_in_chunk;
while (true) {
binaryInputFile.read(send_buffer.data(), chunk_size); // read the file into the buffer
bytes_read_in_chunk = binaryInputFile.gcount(); // Actual bytes read
if(bytes_read_in_chunk > 0){
//std::cout << bytes_read_in_chunk << std::endl;
zmq::message_t message(send_buffer.data(), bytes_read_in_chunk);
zmq::message_t z_in;
command_sock.send(message, zmq::send_flags::none);
command_sock.recv(z_in); // Wait for acknowledgment from server
} else {
//zmq::message_t message("");
//zmq::message_t z_in;
//command_sock.send(message, zmq::send_flags::none);
//command_sock.recv(z_in); // Wait for acknowledgment from server
break;
}
}
// Send an empty message to signal end of file
command_sock.send(zmq::message_t(), zmq::send_flags::none);
zmq::message_t z_in;
command_sock.recv(z_in); // Wait for acknowledgment from server
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
heartbeat_sock.close();
command_sock.close();
ctx.close();
return 0;
} catch (const zmq::error_t& e) {
std::cerr << "ZeroMQ error: " << e.what() << std::endl;
return 1;
}
}

147
server.cpp Normal file
View File

@ -0,0 +1,147 @@
#include <iostream>
#include <fstream>
#include <thread>
#include <zmq.hpp>
#include <unistd.h>
#include <vector>
#include <chrono>
// Blocking zmq rep socket to pass server_public_key
void pkey_server(const std::string& pub_key) {
zmq::context_t ctx;
zmq::socket_t sock(ctx, zmq::socket_type::rep);
sock.bind("tcp://*:9555"); //[TODO] args to set port
zmq::message_t z_in;
std::cout << "Entered: Public Key Serving Mode" << std::endl;
sock.recv(z_in);
if (z_in.to_string().compare("Bellarender123") == 0) {
zmq::message_t z_out(pub_key);
sock.send(z_out, zmq::send_flags::none);
}
sock.close();
ctx.close();
}
int main()
{
zmq::context_t ctx(1);
// Create zmq rep sockets
zmq::socket_t heartbeat_sock(ctx, zmq::socket_type::rep);
zmq::socket_t command_sock(ctx, zmq::socket_type::rep);
// Generate brand new keypair on launch
// [TODO] Add client side public key fingerprinting for added security
char skey[128] = { 0 };
char pkey[128] = { 0 };
if ( zmq_curve_keypair(&pkey[0], &skey[0])) {
// 1 is fail
std::cout << "\ncurve keypair gen failed.";
exit(EXIT_FAILURE);
}
heartbeat_sock.set(zmq::sockopt::curve_server, true);
heartbeat_sock.set(zmq::sockopt::curve_secretkey, skey);
command_sock.set(zmq::sockopt::curve_server, true);
command_sock.set(zmq::sockopt::curve_secretkey, skey);
//heartbeat_sock.set(zmq::sockopt::rcvtimeo, 10000);
//heartbeat_sock.set(zmq::sockopt::sndtimeo, 10000);
command_sock.set(zmq::sockopt::linger, 1); // Close immediately on disconnect
// Binding to transport
heartbeat_sock.bind("tcp://*:5555");
command_sock.bind("tcp://*:5556");
// Create poll items
std::vector<zmq::pollitem_t> items = {
{ heartbeat_sock, 0, ZMQ_POLLIN, 0 },
{ command_sock, 0, ZMQ_POLLIN, 0 }
};
//
while(true) { // awaiting client loop
pkey_server(pkey); // blocking wait client to get public key on port 5555
std::cout << "Client connected" << std::endl;
int heartbeat_miss = 0;
while (true) { //loop forever accepting encrypted messages, limit to one client
zmq::poll(items, 100);
//std::cout << "heart:" << heartbeat_miss << std::endl;
// Check if heartbeat_socket has data
if (items[0].revents & ZMQ_POLLIN) {
zmq::message_t message;
heartbeat_sock.recv(message, zmq::recv_flags::none);
//std::cout << "heart:" << heartbeat_miss << std::endl;
std::string response = "Heartbeat OK";
zmq::message_t zmq_response (response);
heartbeat_sock.send(zmq_response, zmq::send_flags::dontwait); // No block
heartbeat_miss = 0; // Reset heartbeat misses
} else { //No heartbeat detected during poll
heartbeat_miss++;
if (heartbeat_miss>25) { //This many misses means client is AWOL
break; //Exit inner loop to outer loop handling pubkey serving
}
}
// Check if command_socket has data
if (items[1].revents & ZMQ_POLLIN) {
zmq::message_t msg_command;
command_sock.recv(msg_command, zmq::recv_flags::none);
std::string client_command = msg_command.to_string();
//std::string received_message (static_cast<char*>(msg_command.data()), msg_command.size());
std::cout << "Command: " << client_command << std::endl;
// Send a response
//zmq::message_t zmq_response("ACK");
//command_sock.send(zmq_response, zmq::send_flags::none);
//std::string client_command = msg_command.to_string();
// 2. Check if the string is empty
//if (client_command.empty()) {
// std::cerr << "Invalid message received: " << std::endl;
// break; // exit loop to await new client
//}
if(client_command == "hello"){
zmq::message_t zmsg1("bye");
command_sock.send(zmsg1, zmq::send_flags::none);
} else if (client_command == "exit") {
zmq::message_t zmsg1("exit");
command_sock.send(zmsg1, zmq::send_flags::none);
break;
} else if (client_command == "send") {
std::ofstream output_file("received_file.bsz", std::ios::binary); // Open file in binary mode
if (!output_file.is_open()) {
std::cerr << "Error opening file for writing" << std::endl;
return 1;
}
zmq::message_t zmsg1("ACK1");
command_sock.send(zmsg1, zmq::send_flags::none);
while (true) {
zmq::message_t recv_data;
command_sock.recv(recv_data, zmq::recv_flags::none);
if (recv_data.size() == 0) {
zmq::message_t reply("ACK2");
command_sock.send(reply, zmq::send_flags::none);
break; // End of file
}
//std::cout << recv_data.size() << std::endl;
output_file.write(static_cast<char*>(recv_data.data()), recv_data.size());
zmq::message_t reply("ACK2");
command_sock.send(reply, zmq::send_flags::none);
}
output_file.close();
} else {
zmq::message_t zmsg1("ACK");
command_sock.send(zmsg1, zmq::send_flags::none);
}
}
// Simulate doing other work (optional)
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
}