From 174b7570faf96b0fd2f733331cca3282a245174c Mon Sep 17 00:00:00 2001 From: nicolas Date: Wed, 27 May 2026 13:18:53 -0300 Subject: [PATCH] feat: concurrent executor slots; fix: fundsIncrement for market buys, remove double-counted leg0 fee hold - Add concurrent_slots config (fused_engine section, default 1) - Create executor_shared_t with shared in_flight table + queue mutex for multi-thread - Move in_flight state from executor_thread_t to executor_shared_t (cross-thread isolation) - event_executor_thread: per-thread entry point, N threads created in main.c - Add fundsIncrement to trading_pair_t, triangle_t, signal_leg_t (fetch from KuCoin API) - Use funds_increment for rounding market buy quote_cost (evaluate.c) and increment floor (executor.c) - Fix leg 0: remove double-counted apply_fee_hold (evaluate already accounts via ff) --- src/config.c | 3 +++ src/config.h | 1 + src/evaluate.c | 5 +++- src/events.c | 54 +++++++++++++++++++++++++---------------- src/events.h | 10 +++++--- src/executor.c | 61 +++++++++++++++++++++++++---------------------- src/executor.h | 16 ++++++++++++- src/main.c | 20 ++++++++++++---- src/queue.h | 1 + src/symbols_api.c | 5 ++++ src/symbols_api.h | 1 + src/triangle.h | 1 + 12 files changed, 120 insertions(+), 58 deletions(-) diff --git a/src/config.c b/src/config.c index 5a4a061..c62c3c2 100644 --- a/src/config.c +++ b/src/config.c @@ -80,6 +80,8 @@ static void handle_value(parse_state_t *st, const char *val) { st->cfg->cooldown_seconds = atof(val); } else if (strcmp(key, "stats_interval_seconds") == 0) { st->cfg->stats_interval_seconds = atof(val); + } else if (strcmp(key, "concurrent_slots") == 0) { + st->cfg->concurrent_slots = atoi(val); } } else if (strcmp(st->section, "executor") == 0) { return; @@ -123,6 +125,7 @@ int config_load(const char *path, config_t *cfg) { cfg->kcs_discount_active = false; cfg->cooldown_seconds = 0.0; cfg->stats_interval_seconds = 60.0; + cfg->concurrent_slots = 1; cfg->live_mode = false; FILE *f = fopen(path, "r"); diff --git a/src/config.h b/src/config.h index f82e044..88f134a 100644 --- a/src/config.h +++ b/src/config.h @@ -37,6 +37,7 @@ typedef struct { char executor_socket_path[256]; /* unix socket path for signal executor */ double cooldown_seconds; /* min seconds between signals for same triangle */ double stats_interval_seconds; /* period between stats log dumps */ + int concurrent_slots; /* number of executor threads (1 = single-threaded) */ bool live_mode; /* live trading vs paper/simulation */ /* Capital allocation limits — each entry maps a currency ticker to a max diff --git a/src/evaluate.c b/src/evaluate.c index 0e699f2..bbb4171 100644 --- a/src/evaluate.c +++ b/src/evaluate.c @@ -284,10 +284,12 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive double leg_output; if (is_buy) { + double fi = tri->funds_increment[leg]; + if (fi <= 0) fi = qi; double quote_input = (qi > 0) ? floor(leg_input / qi - 1e-12) * qi : leg_input; double net = quote_input * ff; double base = (bi > 0) ? floor(net / price / bi + 1e-12) * bi : (net / price); - double quote_cost = (qi > 0) ? floor(base * price / qi - 1e-12) * qi : (base * price); + double quote_cost = (fi > 0) ? floor(base * price / fi - 1e-12) * fi : (base * price); leg_quote_vol[leg] = quote_cost; sig.legs.legs[leg].quote_volume = quote_cost; leg_base_size[leg] = base; @@ -374,6 +376,7 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive } sl->base_increment = tri->base_increment[leg]; sl->quote_increment = tri->quote_increment[leg]; + sl->funds_increment = tri->funds_increment[leg]; sl->base_min_size = tri->base_min_size[leg]; if (use_bid) { diff --git a/src/events.c b/src/events.c index 198d3af..7ecdd2e 100644 --- a/src/events.c +++ b/src/events.c @@ -104,6 +104,12 @@ int event_loops_init(event_loops_t *loops, ws_client_t *ws_client, loops->running = true; loops->wakeup_fd = wakeup_fd; + loops->executor_shared = calloc(1, sizeof(executor_shared_t)); + if (loops->executor_shared) { + pthread_mutex_init(&loops->executor_shared->lock, NULL); + pthread_mutex_init(&loops->executor_shared->queue_lock, NULL); + } + epoll_set_init(&loops->hot_epoll); epoll_set_init(&loops->cold_epoll); @@ -124,6 +130,12 @@ void event_loops_destroy(event_loops_t *loops) { loops->running = false; if (loops->timer_fd >= 0) close(loops->timer_fd); if (loops->wakeup_fd >= 0) close(loops->wakeup_fd); + if (loops->executor_shared) { + pthread_mutex_destroy(&loops->executor_shared->lock); + pthread_mutex_destroy(&loops->executor_shared->queue_lock); + free(loops->executor_shared); + loops->executor_shared = NULL; + } if (loops->hot_epoll.epoll_fd >= 0) close(loops->hot_epoll.epoll_fd); if (loops->cold_epoll.epoll_fd >= 0) close(loops->cold_epoll.epoll_fd); } @@ -199,17 +211,18 @@ void *event_hot_thread(void *arg) { } /* - * EXECUTOR thread: drain SPSC signal queue and execute triangles directly. - * Wakes on SPSC eventfd (signal available) and fill channel eventfd (fill arrived). + * Per-executor-thread entry point: creates its own executor_thread_t, + * polls the shared SPSC signal queue, and executes triangles. + * Multiple threads run concurrently, sharing the same executor_shared_t + * for concurrency isolation (via shared in_flight table under mutex). */ -void *event_cold_thread(void *arg) { +void *event_executor_thread(void *arg) { event_loops_t *loops = (event_loops_t *)arg; - log_write("[EXEC] Thread started\n"); - executor_thread_t *exec = executor_thread_create(loops->ws_client->cfg, loops->ws_client->fill_ch, - loops->ws_client); + loops->ws_client, + loops->executor_shared); if (!exec) { log_write("[EXEC] Failed to create executor\n"); return NULL; @@ -235,14 +248,6 @@ void *event_cold_thread(void *arg) { break; } - /* Drain signal queue */ - while (!spsc_empty(loops->signal_queue)) { - signal_entry_t sig; - if (spsc_pop(loops->signal_queue, &sig)) { - executor_execute_triangle(exec, &sig); - } - } - /* Drain fill wakeup */ if (fds[1].revents & POLLIN) { uint64_t val; @@ -255,12 +260,15 @@ void *event_cold_thread(void *arg) { last_keepalive_ms = now_mono_ms(); } - /* Drain again to catch signals enqueued during execution */ - while (!spsc_empty(loops->signal_queue)) { - signal_entry_t sig; - if (spsc_pop(loops->signal_queue, &sig)) { - executor_execute_triangle(exec, &sig); - } + /* Pop and execute one signal at a time (non-blocking) */ + signal_entry_t sig; + bool got = false; + pthread_mutex_lock(&loops->executor_shared->queue_lock); + if (spsc_pop(loops->signal_queue, &sig)) got = true; + pthread_mutex_unlock(&loops->executor_shared->queue_lock); + + if (got) { + executor_execute_triangle(exec, &sig); } } @@ -268,3 +276,9 @@ void *event_cold_thread(void *arg) { log_write("[EXEC] Thread exited\n"); return NULL; } + +/* Legacy single-thread entry point — delegates to event_executor_thread semantics + but creates its own shared state. */ +void *event_cold_thread(void *arg) { + return event_executor_thread(arg); +} diff --git a/src/events.h b/src/events.h index cbd5755..ab23f50 100644 --- a/src/events.h +++ b/src/events.h @@ -6,6 +6,7 @@ #include #include "ws_client.h" #include "queue.h" +#include "executor.h" #define MAX_EPOLL_FDS 64 @@ -37,8 +38,9 @@ typedef struct { epoll_set_t hot_epoll; /* hot epoll set for latency-sensitive ws events */ epoll_set_t cold_epoll; /* cold epoll set for timer/http events */ ws_client_t *ws_client; /* WebSocket client instance */ - spsc_queue_t *signal_queue; /* signal queue for emitting opportunities */ - int timer_fd; /* timerfd for periodic tasks */ + spsc_queue_t *signal_queue; /* signal queue for emitting opportunities */ + executor_shared_t *executor_shared; /* shared executor state (in_flight + queue lock) */ + int timer_fd; /* timerfd for periodic tasks */ int wakeup_fd; /* eventfd for waking the cold loop */ uint64_t next_ping_ms; /* next scheduled WebSocket ping timestamp */ bool running; /* false signals event loops to exit */ @@ -56,7 +58,9 @@ int event_loops_add_fd(epoll_set_t *set, int fd, fd_type_t type, void event_loops_remove_fd(epoll_set_t *set, int fd); /* Hot event loop thread: handles WebSocket I/O */ void *event_hot_thread(void *arg); -/* Cold event loop thread: handles timers, HTTP, and signal dispatch */ +/* Cold event loop thread (legacy, single-threaded) */ void *event_cold_thread(void *arg); +/* Per-executor-thread entry point: creates its own executor_thread_t and polls signal queue */ +void *event_executor_thread(void *arg); #endif diff --git a/src/executor.c b/src/executor.c index d6e7f2f..7b92c3c 100644 --- a/src/executor.c +++ b/src/executor.c @@ -12,18 +12,12 @@ #define _D1 1.0 #define FILL_TIMEOUT_MS 5000 -#define MAX_IN_FLIGHT 8 struct executor_thread_s { - const config_t *cfg; - rest_conn_t *rest; - ws_client_t *ws; - - /* Concurrency isolation state */ - char in_flight_triangles[MAX_IN_FLIGHT][128]; /* triangle_key */ - uint64_t in_flight_pairs[MAX_IN_FLIGHT]; /* fnv1a hash of each pair */ - char in_flight_primary_quotes[MAX_IN_FLIGHT][16]; /* primary quote currency */ - int in_flight_count; + const config_t *cfg; + rest_conn_t *rest; + ws_client_t *ws; + executor_shared_t *shared; }; /* ── Reporting ── */ @@ -79,12 +73,14 @@ static double apply_increment_floor(double vol, double inc) { executor_thread_t *executor_thread_create(const config_t *cfg, fill_channel_t *fill_ch, - ws_client_t *ws) { + ws_client_t *ws, + executor_shared_t *shared) { (void)fill_ch; executor_thread_t *et = calloc(1, sizeof(*et)); if (!et) return NULL; et->cfg = cfg; et->ws = ws; + et->shared = shared; et->rest = rest_conn_new(); if (et->rest) { rest_conn_set_auth(et->rest, @@ -110,22 +106,27 @@ bool executor_keepalive(executor_thread_t *et) { void executor_execute_triangle(executor_thread_t *et, signal_entry_t *sig) { fill_channel_t *fill_ch = et->ws->fill_ch; + executor_shared_t *sh = et->shared; /* ── Concurrency isolation ── */ uint64_t pair_hashes[3] = {0}; for (int p = 0; p < 3; p++) { pair_hashes[p] = fnv1a_hash(sig->legs.legs[p].symbol, (uint32_t)strlen(sig->legs.legs[p].symbol)); } - for (int i = 0; i < et->in_flight_count; i++) { - if (strcmp(et->in_flight_triangles[i], sig->triangle_key) == 0) { + pthread_mutex_lock(&sh->lock); + for (int i = 0; i < sh->count; i++) { + if (strcmp(sh->triangles[i], sig->triangle_key) == 0) { + pthread_mutex_unlock(&sh->lock); log_write("[EXEC] Dropping signal for overlapping triangle: %s\n", sig->triangle_key); return; } - if (strcmp(et->in_flight_primary_quotes[i], sig->primary_quote) == 0) { + if (strcmp(sh->primary_quotes[i], sig->primary_quote) == 0) { + pthread_mutex_unlock(&sh->lock); log_write("[EXEC] Dropping signal for same primary quote: %s\n", sig->triangle_key); return; } for (int p = 0; p < 3; p++) { - if (et->in_flight_pairs[i] == pair_hashes[p]) { + if (sh->pairs[i] == pair_hashes[p]) { + pthread_mutex_unlock(&sh->lock); log_write("[EXEC] Dropping signal for overlapping pair on %s\n", sig->triangle_key); return; } @@ -134,14 +135,15 @@ void executor_execute_triangle(executor_thread_t *et, /* Register this execution */ int slot = -1; for (int i = 0; i < MAX_IN_FLIGHT; i++) { - if (!et->in_flight_triangles[i][0]) { slot = i; break; } + if (!sh->triangles[i][0]) { slot = i; break; } } if (slot >= 0) { - strncpy(et->in_flight_triangles[slot], sig->triangle_key, sizeof(et->in_flight_triangles[slot]) - 1); - strncpy(et->in_flight_primary_quotes[slot], sig->primary_quote, sizeof(et->in_flight_primary_quotes[slot]) - 1); - et->in_flight_pairs[slot] = pair_hashes[0]; - et->in_flight_count++; + strncpy(sh->triangles[slot], sig->triangle_key, sizeof(sh->triangles[slot]) - 1); + strncpy(sh->primary_quotes[slot], sig->primary_quote, sizeof(sh->primary_quotes[slot]) - 1); + sh->pairs[slot] = pair_hashes[0]; + sh->count++; } + pthread_mutex_unlock(&sh->lock); char ts_buf[32]; char corr_id[64]; @@ -167,12 +169,13 @@ void executor_execute_triangle(executor_thread_t *et, double input_vol; if (leg == 0) { input_vol = atof(sl->order_param); - input_vol = apply_fee_hold(input_vol, sl->fee_rate, is_buy); + input_vol = apply_increment_floor(input_vol, + is_buy ? (sl->funds_increment > 0 ? sl->funds_increment : sl->quote_increment) : sl->base_increment); } else { input_vol = leg_output[leg - 1]; input_vol = apply_fee_hold(input_vol, sl->fee_rate, is_buy); input_vol = apply_increment_floor(input_vol, - is_buy ? sl->quote_increment : sl->base_increment); + is_buy ? (sl->funds_increment > 0 ? sl->funds_increment : sl->quote_increment) : sl->base_increment); } /* Build a client OID */ @@ -344,7 +347,7 @@ void executor_execute_triangle(executor_thread_t *et, bool nxt_buy = (strcmp(nsl->side, "buy") == 0); leg_output[leg] = apply_fee_hold(leg_output[leg], nsl->fee_rate, nxt_buy); leg_output[leg] = apply_increment_floor(leg_output[leg], - nxt_buy ? nsl->quote_increment : nsl->base_increment); + nxt_buy ? (nsl->funds_increment > 0 ? nsl->funds_increment : nsl->quote_increment) : nsl->base_increment); } } @@ -459,15 +462,17 @@ void executor_execute_triangle(executor_thread_t *et, error_str[0] ? " | error=" : "", error_str[0] ? error_str : ""); /* Release isolation slot */ + pthread_mutex_lock(&sh->lock); for (int i = 0; i < MAX_IN_FLIGHT; i++) { - if (strcmp(et->in_flight_triangles[i], sig->triangle_key) == 0) { - et->in_flight_triangles[i][0] = '\0'; - et->in_flight_primary_quotes[i][0] = '\0'; - et->in_flight_pairs[i] = 0; - et->in_flight_count--; + if (strcmp(sh->triangles[i], sig->triangle_key) == 0) { + sh->triangles[i][0] = '\0'; + sh->primary_quotes[i][0] = '\0'; + sh->pairs[i] = 0; + sh->count--; break; } } + pthread_mutex_unlock(&sh->lock); log_write("[EXEC] Triangle %s in %lld ms, profit=%.4f predicted=%.2f eff=%.2f\n", status, (long long)total_ms, profit, sig->predicted_bps, effective_bps); diff --git a/src/executor.h b/src/executor.h index 04f8516..280a6eb 100644 --- a/src/executor.h +++ b/src/executor.h @@ -2,18 +2,32 @@ #define EXECUTOR_H #include +#include #include "fill_handler.h" #include "config.h" #include "queue.h" #include "ws_client.h" +#define MAX_IN_FLIGHT 8 + +/* Shared in-flight state across all executor threads (protected by lock). */ +typedef struct { + pthread_mutex_t lock; + pthread_mutex_t queue_lock; + char triangles[MAX_IN_FLIGHT][128]; + uint64_t pairs[MAX_IN_FLIGHT]; + char primary_quotes[MAX_IN_FLIGHT][16]; + int count; +} executor_shared_t; + /* Per-thread data for the executor. */ typedef struct executor_thread_s executor_thread_t; /* Create an executor thread (one per concurrent slot). */ executor_thread_t *executor_thread_create(const config_t *cfg, fill_channel_t *fill_ch, - ws_client_t *ws); + ws_client_t *ws, + executor_shared_t *shared); /* Execute a single triangle signal (blocking, called from executor thread). */ void executor_execute_triangle(executor_thread_t *et, diff --git a/src/main.c b/src/main.c index 53b7845..0d7f2f2 100644 --- a/src/main.c +++ b/src/main.c @@ -194,9 +194,17 @@ int main(int argc, char *argv[]) { } log_write("[MAIN] Spawning threads...\n"); - pthread_t hot_thread, cold_thread; + pthread_t hot_thread; pthread_create(&hot_thread, NULL, event_hot_thread, &events); - pthread_create(&cold_thread, NULL, event_cold_thread, &events); + + int n = cfg.concurrent_slots; + if (n < 1) n = 1; + if (n > 16) n = 16; + pthread_t exec_threads[16]; + int exec_thread_count = n; + for (int i = 0; i < n; i++) { + pthread_create(&exec_threads[i], NULL, event_executor_thread, &events); + } // Unblock signals in main thread only; worker threads inherit blocked mask sigset_t unblock_mask; @@ -205,7 +213,7 @@ int main(int argc, char *argv[]) { sigaddset(&unblock_mask, SIGTERM); pthread_sigmask(SIG_UNBLOCK, &unblock_mask, NULL); - log_write("[MAIN] Fused engine running. Press Ctrl+C to stop.\n"); + log_write("[MAIN] Fused engine running (%d executor threads). Press Ctrl+C to stop.\n", n); // Main loop: reconnect disconnected WS while (g_running) { @@ -235,11 +243,13 @@ int main(int argc, char *argv[]) { ws_client.running = false; uint64_t val = 1; - ssize_t wr = write(events.wakeup_fd, &val, sizeof(val)); + ssize_t wr = write(events.wakeup_fd, &val, sizeof(val)); (void)wr; pthread_join(hot_thread, NULL); - pthread_join(cold_thread, NULL); + for (int i = 0; i < exec_thread_count; i++) { + pthread_join(exec_threads[i], NULL); + } event_loops_destroy(&events); ws_client_destroy(&ws_client); diff --git a/src/queue.h b/src/queue.h index 4f40740..1f5f9bf 100644 --- a/src/queue.h +++ b/src/queue.h @@ -39,6 +39,7 @@ typedef struct { double quote_volume; /* notional quote volume for the leg */ double base_increment; /* base asset lot size step */ double quote_increment; /* quote asset lot size step */ + double funds_increment; /* funds lot size step (market buy funds param) */ double base_min_size; /* minimum base asset order size */ } signal_leg_t; diff --git a/src/symbols_api.c b/src/symbols_api.c index d5101bd..7b65497 100644 --- a/src/symbols_api.c +++ b/src/symbols_api.c @@ -155,6 +155,7 @@ int fetch_trading_pairs(pair_list_t *out) { cJSON *quote_inc = cJSON_GetObjectItem(item, "quoteIncrement"); cJSON *base_min = cJSON_GetObjectItem(item, "baseMinSize"); cJSON *quote_min = cJSON_GetObjectItem(item, "quoteMinSize"); + cJSON *funds_inc = cJSON_GetObjectItem(item, "fundsIncrement"); if (!cJSON_IsString(sym) || !cJSON_IsString(base) || !cJSON_IsString(quote)) continue; if (!cJSON_IsBool(enable) || !cJSON_IsTrue(enable)) continue; @@ -173,6 +174,7 @@ int fetch_trading_pairs(pair_list_t *out) { pair->quote_increment = cJSON_IsString(quote_inc) ? atof(quote_inc->valuestring) : 0.0; pair->base_min_size = cJSON_IsString(base_min) ? atof(base_min->valuestring) : 0.0; pair->quote_min_size = cJSON_IsString(quote_min) ? atof(quote_min->valuestring) : 0.0; + pair->funds_increment = cJSON_IsString(funds_inc) ? atof(funds_inc->valuestring) : 0.0; } cJSON_Delete(root); @@ -360,6 +362,9 @@ int discover_symbols(symbol_table_t *symbols, triangle_set_t *triangles, t->quote_min_size[0] = pairs.pairs[i1].quote_min_size; t->quote_min_size[1] = pairs.pairs[i2].quote_min_size; t->quote_min_size[2] = pairs.pairs[i3].quote_min_size; + t->funds_increment[0] = pairs.pairs[i1].funds_increment; + t->funds_increment[1] = pairs.pairs[i2].funds_increment; + t->funds_increment[2] = pairs.pairs[i3].funds_increment; tri_count++; } diff --git a/src/symbols_api.h b/src/symbols_api.h index 3c1e7ec..019cc3a 100644 --- a/src/symbols_api.h +++ b/src/symbols_api.h @@ -18,6 +18,7 @@ typedef struct { double maker_fee_coeff; /* maker fee coefficient */ double base_increment; /* base lot size step */ double quote_increment; /* quote lot size step */ + double funds_increment; /* funds lot size step (market buy funds param) */ double base_min_size; /* minimum base order size */ double quote_min_size; /* minimum quote order size */ } trading_pair_t; diff --git a/src/triangle.h b/src/triangle.h index 3b52669..6bf3c2f 100644 --- a/src/triangle.h +++ b/src/triangle.h @@ -17,6 +17,7 @@ typedef struct { double fee_factor[3]; /* fee multiplier (1 - fee_rate) for each leg */ double base_increment[3]; /* base asset lot step for each leg */ double quote_increment[3]; /* quote asset lot step for each leg */ + double funds_increment[3]; /* funds lot step for market buys for each leg */ double base_min_size[3]; /* min base order size for each leg */ double quote_min_size[3]; /* min quote order size for each leg */ uint16_t id; /* unique triangle ID */