triangular_arbitrage_bot/src/main.c

260 lines
8.5 KiB
C

/*
* main.c - Fused triangular arbitrage engine entry point
*
* Orchestrates: config loading -> fee table fetch -> symbol discovery ->
* WebSocket connections -> epoll event loops (hot/cold threads) -> HTTP status server.
* Signal delivery to external executor via Unix domain socket.
*/
#include "log.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/eventfd.h>
#include <fcntl.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include "config.h"
#include "hash.h"
#include "book.h"
#include "triangle.h"
#include "http_client.h"
#include "symbols_api.h"
#include "ws_client.h"
#include "evaluate.h"
#include "slot.h"
#include "events.h"
static volatile sig_atomic_t g_running = 1;
/* Set the global running flag to zero on SIGINT/SIGTERM. Thread-safe
via sig_atomic_t. The main loop checks g_running to exit cleanly. */
static void signal_handler(int sig) {
(void)sig;
g_running = 0;
}
int main(int argc, char *argv[]) {
const char *config_path = "config.yaml";
if (argc > 1) config_path = argv[1];
struct sigaction sa = {0};
sa.sa_handler = signal_handler;
sa.sa_flags = 0;
sigaction(SIGINT, &sa, NULL);
sigaction(SIGTERM, &sa, NULL);
sigset_t block_mask;
sigemptyset(&block_mask);
sigaddset(&block_mask, SIGINT);
sigaddset(&block_mask, SIGTERM);
sigaddset(&block_mask, SIGPIPE);
pthread_sigmask(SIG_BLOCK, &block_mask, NULL);
/* Early OpenSSL init to avoid provider thread issues */
SSL_library_init();
OpenSSL_add_all_algorithms();
SSL_load_error_strings();
log_init();
log_set_file("/tmp/engine.log");
log_write("[MAIN] Loading config from '%s'...\n", config_path);
config_t cfg;
if (config_load(config_path, &cfg) != 0) {
log_write("[MAIN] Failed to load config\n");
return 1;
}
log_write("[MAIN] Config: threshold=%.1f bps, hold=%u currencies\n",
cfg.signal_threshold_bps, cfg.hold_currency_count);
// Discover symbols from KuCoin: fetch pairs, enumerate triangles, populate table
log_write("[MAIN] >>> Initializing symbol table\n");
symbol_table_t symbols;
symbol_table_init(&symbols);
for (uint32_t i = 0; i < cfg.symbol_count; i++) {
symbol_table_add(&symbols, cfg.symbols[i]);
}
log_write("[MAIN] >>> Calling discover_symbols\n");
triangle_set_t triangles;
if (discover_symbols(&symbols, &triangles, &cfg) != 0) {
log_write("[MAIN] Symbol discovery failed\n");
return 1;
}
log_write("[MAIN] <<< discover_symbols done: %u symbols, %u triangles\n", symbols.count, triangles.triangle_count);
log_write("[MAIN] >>> Allocating books array\n");
order_book_t *books = calloc(MAX_SYMBOLS, sizeof(order_book_t));
log_write("[MAIN] books=%p, size=%zu\n", (void*)books, sizeof(order_book_t));
if (!books) {
log_write("[MAIN] Failed to allocate books\n");
return 1;
}
int n_slots = cfg.concurrent_slots;
if (n_slots < 1) n_slots = 1;
if (n_slots > 16) n_slots = 16;
executor_slot_t *slots = calloc((size_t)n_slots, sizeof(executor_slot_t));
if (!slots) {
log_write("[MAIN] Failed to allocate slots\n");
return 1;
}
for (int i = 0; i < n_slots; i++) {
slots[i].eventfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (slots[i].eventfd < 0) {
log_write("[MAIN] Failed to create slot eventfd\n");
for (int j = 0; j < i; j++) close(slots[j].eventfd);
free(slots);
return 1;
}
}
log_write("[MAIN] Created %d executor slots\n", n_slots);
evaluator_t evaluator;
evaluator_init(&evaluator, &triangles, books, &cfg, slots, n_slots,
cfg.kcs_discount_active);
ws_client_t ws_client;
if (ws_client_init(&ws_client, &cfg, &symbols, books, &evaluator) != 0) {
log_write("[MAIN] Failed to init WS client\n");
triangle_set_free(&triangles);
return 1;
}
event_loops_t events;
if (event_loops_init(&events, &ws_client, &cfg, slots, n_slots) != 0) {
log_write("[MAIN] Failed to init event loops\n");
ws_client_destroy(&ws_client);
triangle_set_free(&triangles);
return 1;
}
if (symbols.count > 0) {
uint16_t all_indices[MAX_SYMBOLS];
uint32_t n = symbols.count > MAX_SYMBOLS ? MAX_SYMBOLS : symbols.count;
for (uint32_t i = 0; i < n; i++) {
all_indices[i] = (uint16_t)i;
}
uint32_t conns_needed = (n + 399) / 400;
if (conns_needed < 1) conns_needed = 1;
if (conns_needed > WS_MAX_CONNECTIONS) conns_needed = WS_MAX_CONNECTIONS;
ws_client.connection_count = conns_needed;
log_write("[MAIN] Connecting %u WS connections...\n", conns_needed);
for (uint32_t i = 0; i < conns_needed; i++) {
if (ws_client_connect(&ws_client, i) != 0) {
log_write("[MAIN] WS connection %u failed\n", i);
}
}
// Batch-subscribe up to 400 symbols per WS connection
uint32_t batch_start = 0;
for (uint32_t conn_idx = 0; conn_idx < conns_needed; conn_idx++) {
uint32_t batch_end = batch_start + 400;
if (batch_end > n) batch_end = n;
uint32_t batch_count = batch_end - batch_start;
if (batch_count > 0) {
ws_client_subscribe(&ws_client, conn_idx,
all_indices + batch_start, batch_count);
}
batch_start = batch_end;
}
log_write("[MAIN] Subscribed to %u symbols across %u WS connections\n",
n, ws_client.connection_count);
}
log_write("[MAIN] Spawning threads...\n");
pthread_t hot_thread;
pthread_create(&hot_thread, NULL, event_hot_thread, &events);
int n = cfg.concurrent_slots;
if (n < 1) n = 1;
if (n > 16) n = 16;
pthread_t exec_threads[16];
int exec_thread_count = n;
executor_thread_arg_t exec_args[16];
for (int i = 0; i < n; i++) {
exec_args[i].loops = &events;
exec_args[i].slot = &slots[i];
exec_args[i].slot_index = i;
pthread_create(&exec_threads[i], NULL, event_executor_thread, &exec_args[i]);
}
// Unblock signals in main thread only; worker threads inherit blocked mask
sigset_t unblock_mask;
sigemptyset(&unblock_mask);
sigaddset(&unblock_mask, SIGINT);
sigaddset(&unblock_mask, SIGTERM);
pthread_sigmask(SIG_UNBLOCK, &unblock_mask, NULL);
log_write("[MAIN] Fused engine running (%d executor threads). Press Ctrl+C to stop.\n", n);
// Main loop: reconnect disconnected WS
while (g_running) {
struct timespec ts = {0, 100000000};
nanosleep(&ts, NULL);
for (uint32_t i = 0; i < ws_client.connection_count; i++) {
ws_connection_t *conn = &ws_client.connections[i];
if (conn->state == WS_STATE_DISCONNECTED && g_running) {
uint64_t now = ws_client_now_ms();
if (now - conn->last_activity_ms > 5000) {
log_write("[MAIN] Reconnecting WS %u...\n", i);
if (conn->fd >= 0)
event_loops_remove_fd(&events.hot_epoll, conn->fd);
if (ws_client_connect(&ws_client, i) == 0) {
int fl = fcntl(conn->fd, F_GETFL);
fcntl(conn->fd, F_SETFL, fl | O_NONBLOCK);
event_loops_add_fd(&events.hot_epoll, conn->fd, FD_TYPE_WS, i, NULL, EPOLLIN);
if (conn->symbol_count > 0) {
ws_client_subscribe(&ws_client, i,
conn->symbol_indices, conn->symbol_count);
}
}
conn->last_activity_ms = now;
}
}
}
}
log_write("[MAIN] Shutting down...\n");
events.running = false;
ws_client.running = false;
/* Wake all executor threads */
uint64_t val = 1;
for (int i = 0; i < n_slots; i++) {
if (write(slots[i].eventfd, &val, sizeof(val)) < 0) {}
}
pthread_join(hot_thread, NULL);
for (int i = 0; i < exec_thread_count; i++) {
pthread_join(exec_threads[i], NULL);
}
event_loops_destroy(&events);
ws_client_destroy(&ws_client);
for (int i = 0; i < n_slots; i++) {
close(slots[i].eventfd);
}
free(slots);
triangle_set_free(&triangles);
free(books);
log_write("[MAIN] Shutdown complete.\n");
log_shutdown();
return 0;
}