diff --git a/Makefile b/Makefile index 4eb85f1..3bbb130 100644 --- a/Makefile +++ b/Makefile @@ -60,7 +60,7 @@ endif # Library flags LIB_PATHS = -L$(DPP_BUILD_DIR) -LIBRARIES = -ldpp +LIBRARIES = -ldpp -lsqlite3 # Build type specific flags ifeq ($(BUILD_TYPE), debug) diff --git a/poomer-discord.cpp b/poomer-discord.cpp index 2e2e89e..ca0932f 100644 --- a/poomer-discord.cpp +++ b/poomer-discord.cpp @@ -19,8 +19,11 @@ // Dynamic arrays - for std::vector to hold image byte data #include -// Threading support - for std::thread to send images asynchronously +// Threading support - for worker thread and synchronization #include +#include +#include +#include // Time utilities - for std::chrono::seconds() delays #include @@ -28,6 +31,9 @@ // Algorithm functions - for std::transform (string case conversion) #include +// SQLite3 database - for persistent work queue storage +#include + // STB Image library - single header for loading images (JPEG, PNG, etc.) #define STB_IMAGE_IMPLEMENTATION #include "stb_image.h" @@ -39,6 +45,318 @@ // Embedded image data - contains the JPEG as a byte array (our generated header) #include "embedded_image.h" +/** + * Structure representing a work item in the processing queue + */ +struct WorkItem { + int64_t id; // Unique database ID + std::string attachment_url; // Discord attachment URL to download + std::string original_filename; // Original filename from Discord + uint64_t channel_id; // Discord channel ID for response + uint64_t user_id; // Discord user ID for mentions + int64_t created_at; // Unix timestamp when job was created + int retry_count; // Number of times this job has been retried + + WorkItem() : id(0), channel_id(0), user_id(0), created_at(0), retry_count(0) {} +}; + +/** + * SQLite-backed FIFO work queue for managing JPEG processing jobs + * Provides persistence across system crashes and sequential processing + */ +class WorkQueue { +private: + sqlite3* db; + std::mutex queue_mutex; + std::condition_variable queue_condition; + std::atomic shutdown_requested{false}; + +public: + WorkQueue() : db(nullptr) {} + + ~WorkQueue() { + if (db) { + sqlite3_close(db); + } + } + + /** + * Initialize SQLite database and create work queue table + */ + bool initialize(const std::string& db_path = "work_queue.db") { + std::lock_guard lock(queue_mutex); + + // Open SQLite database (creates file if it doesn't exist) + int rc = sqlite3_open(db_path.c_str(), &db); + if (rc != SQLITE_OK) { + std::cerr << "āŒ Failed to open SQLite database: " << sqlite3_errmsg(db) << std::endl; + return false; + } + + // Create work queue table if it doesn't exist + const char* create_table_sql = R"( + CREATE TABLE IF NOT EXISTS work_queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + attachment_url TEXT NOT NULL, + original_filename TEXT NOT NULL, + channel_id INTEGER NOT NULL, + user_id INTEGER NOT NULL, + created_at INTEGER NOT NULL, + retry_count INTEGER DEFAULT 0, + status TEXT DEFAULT 'pending' + ); + + CREATE INDEX IF NOT EXISTS idx_status_created + ON work_queue(status, created_at); + )"; + + char* error_msg = nullptr; + rc = sqlite3_exec(db, create_table_sql, nullptr, nullptr, &error_msg); + if (rc != SQLITE_OK) { + std::cerr << "āŒ Failed to create work queue table: " << error_msg << std::endl; + sqlite3_free(error_msg); + return false; + } + + std::cout << "āœ… Work queue database initialized: " << db_path << std::endl; + + // Log any existing pending jobs on startup + int pending_count = getPendingJobCount(); + if (pending_count > 0) { + std::cout << "šŸ“‹ Found " << pending_count << " pending jobs from previous session" << std::endl; + } + + return true; + } + + /** + * Add a new work item to the queue + */ + bool enqueue(const WorkItem& item) { + std::lock_guard lock(queue_mutex); + + const char* insert_sql = R"( + INSERT INTO work_queue + (attachment_url, original_filename, channel_id, user_id, created_at, retry_count) + VALUES (?, ?, ?, ?, ?, ?); + )"; + + sqlite3_stmt* stmt; + int rc = sqlite3_prepare_v2(db, insert_sql, -1, &stmt, nullptr); + if (rc != SQLITE_OK) { + std::cerr << "āŒ Failed to prepare insert statement: " << sqlite3_errmsg(db) << std::endl; + return false; + } + + // Bind parameters + sqlite3_bind_text(stmt, 1, item.attachment_url.c_str(), -1, SQLITE_STATIC); + sqlite3_bind_text(stmt, 2, item.original_filename.c_str(), -1, SQLITE_STATIC); + sqlite3_bind_int64(stmt, 3, item.channel_id); + sqlite3_bind_int64(stmt, 4, item.user_id); + sqlite3_bind_int64(stmt, 5, item.created_at); + sqlite3_bind_int(stmt, 6, item.retry_count); + + rc = sqlite3_step(stmt); + sqlite3_finalize(stmt); + + if (rc != SQLITE_DONE) { + std::cerr << "āŒ Failed to insert work item: " << sqlite3_errmsg(db) << std::endl; + return false; + } + + std::cout << "šŸ“„ Enqueued job: " << item.original_filename << " (ID: " << sqlite3_last_insert_rowid(db) << ")" << std::endl; + + // Notify worker thread that new work is available + queue_condition.notify_one(); + return true; + } + + /** + * Get the next work item from the queue (FIFO order) + * Blocks until work is available or shutdown is requested + */ + bool dequeue(WorkItem& item) { + std::unique_lock lock(queue_mutex); + + while (!shutdown_requested) { + const char* select_sql = R"( + SELECT id, attachment_url, original_filename, channel_id, user_id, created_at, retry_count + FROM work_queue + WHERE status = 'pending' + ORDER BY created_at ASC + LIMIT 1; + )"; + + sqlite3_stmt* stmt; + int rc = sqlite3_prepare_v2(db, select_sql, -1, &stmt, nullptr); + if (rc != SQLITE_OK) { + std::cerr << "āŒ Failed to prepare select statement: " << sqlite3_errmsg(db) << std::endl; + return false; + } + + rc = sqlite3_step(stmt); + if (rc == SQLITE_ROW) { + // Found a work item + item.id = sqlite3_column_int64(stmt, 0); + item.attachment_url = (const char*)sqlite3_column_text(stmt, 1); + item.original_filename = (const char*)sqlite3_column_text(stmt, 2); + item.channel_id = sqlite3_column_int64(stmt, 3); + item.user_id = sqlite3_column_int64(stmt, 4); + item.created_at = sqlite3_column_int64(stmt, 5); + item.retry_count = sqlite3_column_int(stmt, 6); + + sqlite3_finalize(stmt); + + // Mark as processing + markProcessing(item.id); + + std::cout << "šŸ“¤ Dequeued job " << item.id << ": " << item.original_filename << std::endl; + return true; + + } else if (rc == SQLITE_DONE) { + // No work available, wait for notification + sqlite3_finalize(stmt); + queue_condition.wait(lock); + + } else { + // Error occurred + std::cerr << "āŒ Failed to select work item: " << sqlite3_errmsg(db) << std::endl; + sqlite3_finalize(stmt); + return false; + } + } + + return false; // Shutdown requested + } + + /** + * Mark a work item as completed and remove it from the queue + */ + bool markCompleted(int64_t item_id) { + std::lock_guard lock(queue_mutex); + + const char* delete_sql = "DELETE FROM work_queue WHERE id = ?;"; + + sqlite3_stmt* stmt; + int rc = sqlite3_prepare_v2(db, delete_sql, -1, &stmt, nullptr); + if (rc != SQLITE_OK) { + std::cerr << "āŒ Failed to prepare delete statement: " << sqlite3_errmsg(db) << std::endl; + return false; + } + + sqlite3_bind_int64(stmt, 1, item_id); + rc = sqlite3_step(stmt); + sqlite3_finalize(stmt); + + if (rc != SQLITE_DONE) { + std::cerr << "āŒ Failed to delete completed work item: " << sqlite3_errmsg(db) << std::endl; + return false; + } + + std::cout << "āœ… Completed job " << item_id << std::endl; + return true; + } + + /** + * Mark a work item as failed and update retry count + */ + bool markFailed(int64_t item_id, int max_retries = 3) { + std::lock_guard lock(queue_mutex); + + // First, get current retry count + const char* select_sql = "SELECT retry_count FROM work_queue WHERE id = ?;"; + sqlite3_stmt* stmt; + int rc = sqlite3_prepare_v2(db, select_sql, -1, &stmt, nullptr); + if (rc != SQLITE_OK) { + std::cerr << "āŒ Failed to prepare retry count select: " << sqlite3_errmsg(db) << std::endl; + return false; + } + + sqlite3_bind_int64(stmt, 1, item_id); + rc = sqlite3_step(stmt); + + if (rc != SQLITE_ROW) { + sqlite3_finalize(stmt); + std::cerr << "āŒ Work item " << item_id << " not found for retry update" << std::endl; + return false; + } + + int current_retries = sqlite3_column_int(stmt, 0); + sqlite3_finalize(stmt); + + if (current_retries >= max_retries) { + // Too many retries, remove from queue + std::cout << "šŸ’€ Job " << item_id << " failed permanently after " << current_retries << " retries" << std::endl; + const char* delete_sql = "DELETE FROM work_queue WHERE id = ?;"; + sqlite3_prepare_v2(db, delete_sql, -1, &stmt, nullptr); + sqlite3_bind_int64(stmt, 1, item_id); + sqlite3_step(stmt); + sqlite3_finalize(stmt); + } else { + // Increment retry count and mark as pending for retry + std::cout << "šŸ”„ Job " << item_id << " failed, retry " << (current_retries + 1) << "/" << max_retries << std::endl; + const char* update_sql = "UPDATE work_queue SET retry_count = ?, status = 'pending' WHERE id = ?;"; + sqlite3_prepare_v2(db, update_sql, -1, &stmt, nullptr); + sqlite3_bind_int(stmt, 1, current_retries + 1); + sqlite3_bind_int64(stmt, 2, item_id); + sqlite3_step(stmt); + sqlite3_finalize(stmt); + + // Notify worker thread to try again + queue_condition.notify_one(); + } + + return true; + } + + /** + * Request shutdown of the work queue + */ + void requestShutdown() { + shutdown_requested = true; + queue_condition.notify_all(); + } + +private: + /** + * Mark a work item as being processed + */ + bool markProcessing(int64_t item_id) { + const char* update_sql = "UPDATE work_queue SET status = 'processing' WHERE id = ?;"; + + sqlite3_stmt* stmt; + int rc = sqlite3_prepare_v2(db, update_sql, -1, &stmt, nullptr); + if (rc != SQLITE_OK) { + return false; + } + + sqlite3_bind_int64(stmt, 1, item_id); + rc = sqlite3_step(stmt); + sqlite3_finalize(stmt); + + return rc == SQLITE_DONE; + } + + /** + * Get count of pending jobs in the queue + */ + int getPendingJobCount() { + const char* count_sql = "SELECT COUNT(*) FROM work_queue WHERE status = 'pending';"; + + sqlite3_stmt* stmt; + int rc = sqlite3_prepare_v2(db, count_sql, -1, &stmt, nullptr); + if (rc != SQLITE_OK) { + return 0; + } + + rc = sqlite3_step(stmt); + int count = (rc == SQLITE_ROW) ? sqlite3_column_int(stmt, 0) : 0; + sqlite3_finalize(stmt); + + return count; + } +}; + /** * Function to securely input text without displaying it on screen (like password input) * This is used for Discord bot tokens to keep them private in the terminal @@ -151,13 +469,128 @@ std::vector flipImageVertically(const std::vector& image_data) return output_jpeg; } +/** + * Worker thread function that processes the work queue sequentially + * This replaces the previous approach of creating one thread per image + */ +void workerThread(dpp::cluster* bot, WorkQueue* work_queue) { + std::cout << "šŸ”§ Worker thread started" << std::endl; + + WorkItem item; + while (work_queue->dequeue(item)) { + std::cout << "\n--- PROCESSING JPEG IMAGE (Job " << item.id << ") ---" << std::endl; + std::cout << "Downloading: " << item.original_filename << std::endl; + std::cout << "From URL: " << item.attachment_url << std::endl; + + // Download the image using DPP's HTTP client + std::cout << "🌐 Starting image download..." << std::endl; + + // Use a promise/future to make the async download synchronous for this worker + std::mutex download_mutex; + std::condition_variable download_cv; + bool download_complete = false; + bool download_success = false; + std::string download_data; + + bot->request(item.attachment_url, dpp::m_get, [&](const dpp::http_request_completion_t& response) { + std::lock_guard lock(download_mutex); + + if (response.status == 200) { + std::cout << "āœ… Downloaded image (" << response.body.size() << " bytes)" << std::endl; + download_data = response.body; + download_success = true; + } else { + std::cout << "āŒ Failed to download image. Status: " << response.status << std::endl; + download_success = false; + } + + download_complete = true; + download_cv.notify_one(); + }); + + // Wait for download to complete + { + std::unique_lock lock(download_mutex); + download_cv.wait(lock, [&]{ return download_complete; }); + } + + if (download_success) { + // Convert response body to vector for processing + std::vector original_data(download_data.begin(), download_data.end()); + + // Flip the image vertically (upside-down) + std::vector flipped_data = flipImageVertically(original_data); + + // Create new filename for flipped image + std::string flipped_filename = "upside_down_" + item.original_filename; + + // Create message with flipped image + std::string message_text = "šŸ”„ Here's your upside-down image! <@" + std::to_string(item.user_id) + ">"; + dpp::message msg(item.channel_id, message_text); + msg.add_file(flipped_filename, std::string(flipped_data.begin(), flipped_data.end())); + + // Send the flipped image + std::mutex send_mutex; + std::condition_variable send_cv; + bool send_complete = false; + bool send_success = false; + + bot->message_create(msg, [&](const dpp::confirmation_callback_t& callback) { + std::lock_guard lock(send_mutex); + + if (callback.is_error()) { + std::cout << "āŒ Failed to send flipped image: " << callback.get_error().message << std::endl; + send_success = false; + } else { + std::cout << "āœ… Successfully sent flipped " << item.original_filename << "!" << std::endl; + send_success = true; + } + + send_complete = true; + send_cv.notify_one(); + }); + + // Wait for send to complete + { + std::unique_lock lock(send_mutex); + send_cv.wait(lock, [&]{ return send_complete; }); + } + + if (send_success) { + work_queue->markCompleted(item.id); + } else { + work_queue->markFailed(item.id); + } + + } else { + // Download failed + bot->message_create(dpp::message(item.channel_id, "āŒ Failed to download image for processing.")); + work_queue->markFailed(item.id); + } + + // Small delay between processing jobs to prevent overwhelming the system + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + + std::cout << "šŸ”§ Worker thread shutting down" << std::endl; +} + /** * MAIN FUNCTION - Entry point of the Discord bot program * Sets up the bot, registers commands, and starts the event loop */ int main() { - // Step 1: Get Discord bot token securely from user + // Step 1: Initialize work queue database std::cout << "=== Discord Bot Startup ===" << std::endl; + std::cout << "šŸ—„ļø Initializing work queue database..." << std::endl; + + WorkQueue work_queue; + if (!work_queue.initialize()) { + std::cerr << "āŒ Failed to initialize work queue database" << std::endl; + return 1; + } + + // Step 2: Get Discord bot token securely from user std::string BOT_TOKEN = getHiddenInput("Enter Discord Bot Token: "); // Validate that a token was provided @@ -166,17 +599,21 @@ int main() { return 1; // Exit with error code } - // Step 2: Create Discord bot instance + // Step 3: Create Discord bot instance // dpp::cluster is the main bot class that handles all Discord connections // i_default_intents gives basic permissions, i_message_content allows reading message text dpp::cluster bot(BOT_TOKEN, dpp::i_default_intents | dpp::i_message_content); - // Step 3: Enable logging to see what the bot is doing + // Step 4: Enable logging to see what the bot is doing bot.on_log(dpp::utility::cout_logger()); + + // Step 5: Start worker thread for processing the queue + std::cout << "šŸ”§ Starting worker thread..." << std::endl; + std::thread worker(workerThread, &bot, &work_queue); - // Step 4: Set up event handler for file uploads + // Step 6: Set up event handler for file uploads // Lambda function that gets called whenever a message with attachments is posted - bot.on_message_create([&bot](const dpp::message_create_t& event) { + bot.on_message_create([&work_queue](const dpp::message_create_t& event) { // Ignore messages sent by bots (including our own) to prevent loops if (event.msg.author.is_bot()) { return; // Early exit - don't process bot messages @@ -217,68 +654,28 @@ int main() { } } - // If we found JPEG files, process them + // If we found JPEG files, enqueue them for processing if (found_jpeg) { - std::cout << "\nšŸŽÆ ACTION: Processing JPEG file upload" << std::endl; + std::cout << "\nšŸŽÆ ACTION: Enqueueing JPEG files for processing" << std::endl; // Add emoji reaction to the message (like clicking a reaction in Discord) - bot.message_add_reaction(event.msg.id, event.msg.channel_id, "šŸ”„"); + event.reply("šŸ“ø JPEG detected! Adding to processing queue..."); - // Send a text response in the same channel - bot.message_create(dpp::message(event.msg.channel_id, "šŸ“ø JPEG detected! Flipping image upside-down...")); - - // Process each JPEG in a separate thread (after 5 second delay) - // We use threads so the bot doesn't freeze while processing images + // Enqueue each JPEG for sequential processing for (const auto& jpeg_attachment : jpeg_attachments) { - std::thread([&bot, // Capture bot by reference (shared) - channel_id = event.msg.channel_id, // Copy these values into the thread - user_id = event.msg.author.id, // so they don't get lost when event ends - attachment_url = jpeg_attachment.url, - original_filename = jpeg_attachment.filename - ]() { - // Wait 5 seconds before processing - std::this_thread::sleep_for(std::chrono::seconds(5)); - - std::cout << "\n--- PROCESSING JPEG IMAGE ---" << std::endl; - std::cout << "Downloading: " << original_filename << std::endl; - std::cout << "From URL: " << attachment_url << std::endl; - - // Download the image using DPP's HTTP client - std::cout << "🌐 Starting image download..." << std::endl; - bot.request(attachment_url, dpp::m_get, [&bot, channel_id, user_id, original_filename](const dpp::http_request_completion_t& response) { - if (response.status == 200) { - std::cout << "āœ… Downloaded image (" << response.body.size() << " bytes)" << std::endl; - - // Convert response body to vector for processing - std::vector original_data(response.body.begin(), response.body.end()); - - // Flip the image vertically (upside-down) - std::vector flipped_data = flipImageVertically(original_data); - - // Create new filename for flipped image - std::string flipped_filename = "upside_down_" + original_filename; - - // Create message with flipped image - std::string message_text = "šŸ”„ Here's your upside-down image! <@" + std::to_string(user_id) + ">"; - dpp::message msg(channel_id, message_text); - msg.add_file(flipped_filename, std::string(flipped_data.begin(), flipped_data.end())); - - // Send the flipped image - bot.message_create(msg, [original_filename](const dpp::confirmation_callback_t& callback) { - if (callback.is_error()) { - std::cout << "āŒ Failed to send flipped image: " << callback.get_error().message << std::endl; - } else { - std::cout << "āœ… Successfully sent flipped " << original_filename << "!" << std::endl; - } - }); - - } else { - std::cout << "āŒ Failed to download image. Status: " << response.status << std::endl; - bot.message_create(dpp::message(channel_id, "āŒ Failed to download image for processing.")); - } - }); - - }).detach(); // detach() = thread runs independently, we don't wait for it to finish + WorkItem item; + item.attachment_url = jpeg_attachment.url; + item.original_filename = jpeg_attachment.filename; + item.channel_id = event.msg.channel_id; + item.user_id = event.msg.author.id; + item.created_at = std::time(nullptr); + item.retry_count = 0; + + if (work_queue.enqueue(item)) { + std::cout << "āœ… Enqueued: " << jpeg_attachment.filename << std::endl; + } else { + std::cout << "āŒ Failed to enqueue: " << jpeg_attachment.filename << std::endl; + } } } @@ -286,7 +683,7 @@ int main() { } }); - // Step 5: Set up slash command handler (commands that start with /) + // Step 7: Set up slash command handler (commands that start with /) // [&bot] captures the bot variable by reference so we can use it inside the lambda bot.on_slashcommand([&bot](const dpp::slashcommand_t& event) { // Generate unique ID for this command execution (for debugging) @@ -374,7 +771,7 @@ int main() { } }); - // Step 6: Set up bot ready event (called when bot successfully connects to Discord) + // Step 8: Set up bot ready event (called when bot successfully connects to Discord) bot.on_ready([&bot](const dpp::ready_t& event) { // run_once ensures this only happens on first connection, not reconnections if (dpp::run_once()) { @@ -418,10 +815,21 @@ int main() { } }); - // Step 7: Start the bot and run forever - // st_wait means the program will not exit, it will keep running until interrupted + // Step 9: Set up graceful shutdown handler std::cout << "Starting bot event loop..." << std::endl; - bot.start(dpp::st_wait); + + // Start the bot in a separate thread so we can handle shutdown gracefully + std::thread bot_thread([&bot]() { + bot.start(dpp::st_wait); + }); + + // Wait for the bot thread to finish (which should be never unless there's an error) + bot_thread.join(); + + // If we get here, the bot has shut down, so clean up the worker thread + std::cout << "Bot shutting down, stopping worker thread..." << std::endl; + work_queue.requestShutdown(); + worker.join(); // This line should never be reached unless the bot shuts down return 0;