diff --git a/src/events.c b/src/events.c index ceb6b9a..dc2011b 100644 --- a/src/events.c +++ b/src/events.c @@ -225,16 +225,16 @@ void *event_executor_thread(void *arg) { executor_slot_t *slot = ta->slot; executor_thread_t *exec = executor_thread_create(loops->ws_client->cfg, - loops->ws_client->fill_ch, loops->ws_client, loops->executor_shared, - slot); + slot, + ta->slot_index); if (!exec) { log_write("[EXEC] Failed to create executor\n"); return NULL; } - int fill_wake_fd = fill_channel_wake_fd(loops->ws_client->fill_ch); + int fill_wake_fd = fill_channel_wake_fd(slot->fill_ch); struct pollfd pfds[2]; memset(pfds, 0, sizeof(pfds)); pfds[0].fd = slot->eventfd; diff --git a/src/executor.c b/src/executor.c index 248377c..ba58eeb 100644 --- a/src/executor.c +++ b/src/executor.c @@ -18,6 +18,7 @@ struct executor_thread_s { ws_client_t *ws; executor_shared_t *shared; executor_slot_t *slot; + int slot_index; }; /* ── Reporting ── */ @@ -72,17 +73,17 @@ static double apply_increment_floor(double vol, double inc) { /* ── Core execution loop ── */ executor_thread_t *executor_thread_create(const config_t *cfg, - fill_channel_t *fill_ch, ws_client_t *ws, executor_shared_t *shared, - executor_slot_t *slot) { - (void)fill_ch; + executor_slot_t *slot, + int slot_index) { executor_thread_t *et = calloc(1, sizeof(*et)); if (!et) return NULL; et->cfg = cfg; et->ws = ws; et->shared = shared; et->slot = slot; + et->slot_index = slot_index; et->rest = rest_conn_new(); if (et->rest) { rest_conn_set_auth(et->rest, @@ -107,7 +108,6 @@ bool executor_keepalive(executor_thread_t *et) { void executor_execute_triangle(executor_thread_t *et, signal_entry_t *sig) { - fill_channel_t *fill_ch = et->ws->fill_ch; executor_shared_t *sh = et->shared; /* ── Stale book check: skip if book_ts is not newer than last execution ── */ @@ -208,9 +208,10 @@ void executor_execute_triangle(executor_thread_t *et, is_buy ? (sl->funds_increment > 0 ? sl->funds_increment : sl->quote_increment) : sl->base_increment); } - /* Build a client OID */ + /* Build a client OID with slot index for fill routing */ char client_oid[64]; - snprintf(client_oid, sizeof(client_oid), "c%08x%04x", + snprintf(client_oid, sizeof(client_oid), "c%02x%08x%04x", + (unsigned)et->slot_index, (unsigned)((uint64_t)now_realtime_ms() & 0xFFFFFFFF), (unsigned)leg); @@ -268,12 +269,18 @@ void executor_execute_triangle(executor_thread_t *et, break; } + /* Register orderId for fill event routing (hot thread uses this to + route terminal events without clientOid to the right slot). */ + if (ok && order_id[0]) { + ws_client_register_order_slot(et->ws, order_id, et->slot_index); + } + /* ── Wait for fill (live only) ── */ double total_size = 0, total_funds = 0, avg_price = 0, total_fee = 0; if (sig->live) { fill_result_t fr = {0}; - bool filled = fill_channel_await(fill_ch, client_oid, + bool filled = fill_channel_await(et->slot->fill_ch, client_oid, FILL_TIMEOUT_MS, &fr); if (!filled) { error_str = fr.match_count > 0 ? "partial_fill" : "fill_timeout"; @@ -474,6 +481,22 @@ void executor_execute_triangle(executor_thread_t *et, if (fills_pos >= (int)sizeof(fills_str) - 1) break; } + /* ── Build books string: top-of-book at signal time ── */ + char books_str[512] = ""; + int books_pos = 0; + for (int leg = 0; leg < 3; leg++) { + const signal_book_t *bk = &sig->books[leg]; + if (bk->symbol[0] == '\0') continue; + books_pos += snprintf(books_str + books_pos, sizeof(books_str) - books_pos, + "%s%s: bid=%.8g@%.4g ask=%.8g@%.4g", + leg > 0 ? ", " : "", bk->symbol, + bk->bid_count > 0 ? bk->bids[0].price : 0.0, + bk->bid_count > 0 ? bk->bids[0].size : 0.0, + bk->ask_count > 0 ? bk->asks[0].price : 0.0, + bk->ask_count > 0 ? bk->asks[0].size : 0.0); + if (books_pos >= (int)sizeof(books_str) - 1) break; + } + /* ── Emit report (single line) ── */ format_ts(ts_buf, sizeof(ts_buf)); const char *status = success ? "FILLED" : "FAILED"; @@ -484,11 +507,11 @@ void executor_execute_triangle(executor_thread_t *et, "%s %s | corr=%s | triangle=['%s'] | " "predicted_bps=%.2f | effective_bps=%s | " "book_ts=%lld | profit=%.4f | timings=[%s] | " - "fills=[%s]%s%s\n", + "fills=[%s] | books=[%s]%s%s\n", ts_buf, status, corr_id, sig->triangle_key, sig->predicted_bps, bps_str, (long long)sig->book_ts_ms, profit, timings_str, - fills_str, + fills_str, books_str, error_str[0] ? " | error=" : "", error_str[0] ? error_str : ""); /* Release isolation slot */ diff --git a/src/executor.h b/src/executor.h index 982273a..08616db 100644 --- a/src/executor.h +++ b/src/executor.h @@ -28,10 +28,10 @@ typedef struct executor_thread_s executor_thread_t; /* Create an executor thread (one per concurrent slot). */ executor_thread_t *executor_thread_create(const config_t *cfg, - fill_channel_t *fill_ch, ws_client_t *ws, executor_shared_t *shared, - executor_slot_t *slot); + executor_slot_t *slot, + int slot_index); /* Execute a single triangle signal (blocking, called from executor thread). */ void executor_execute_triangle(executor_thread_t *et, diff --git a/src/main.c b/src/main.c index 1147ca5..1d14b46 100644 --- a/src/main.c +++ b/src/main.c @@ -116,6 +116,13 @@ int main(int argc, char *argv[]) { return 1; } + slots[i].fill_ch = fill_channel_create(); + if (!slots[i].fill_ch) { + log_write("[MAIN] Failed to create slot fill channel\n"); + for (int j = 0; j < i; j++) { close(slots[j].eventfd); fill_channel_destroy(slots[j].fill_ch); } + free(slots); + return 1; + } } log_write("[MAIN] Created %d executor slots\n", n_slots); @@ -124,7 +131,7 @@ int main(int argc, char *argv[]) { cfg.kcs_discount_active); ws_client_t ws_client; - if (ws_client_init(&ws_client, &cfg, &symbols, books, &evaluator) != 0) { + if (ws_client_init(&ws_client, &cfg, &symbols, books, &evaluator, slots, n_slots) != 0) { log_write("[MAIN] Failed to init WS client\n"); triangle_set_free(&triangles); @@ -218,12 +225,9 @@ int main(int argc, char *argv[]) { int fl = fcntl(conn->fd, F_GETFL); fcntl(conn->fd, F_SETFL, fl | O_NONBLOCK); event_loops_add_fd(&events.hot_epoll, conn->fd, FD_TYPE_WS, i, NULL, EPOLLIN); - if (conn->symbol_count > 0) { - ws_client_subscribe(&ws_client, i, - conn->symbol_indices, conn->symbol_count); - } } - conn->last_activity_ms = now; + if (conn->state == WS_STATE_CONNECTED) + conn->last_activity_ms = now; } } } @@ -248,6 +252,7 @@ int main(int argc, char *argv[]) { ws_client_destroy(&ws_client); for (int i = 0; i < n_slots; i++) { close(slots[i].eventfd); + fill_channel_destroy(slots[i].fill_ch); } free(slots); triangle_set_free(&triangles); diff --git a/src/slot.h b/src/slot.h index abd590e..e017fb4 100644 --- a/src/slot.h +++ b/src/slot.h @@ -4,6 +4,7 @@ #include #include #include "triangle.h" +#include "fill_handler.h" #define EXECUTOR_SLOT_FREE 0 #define EXECUTOR_SLOT_CLAIMED 1 @@ -14,6 +15,7 @@ typedef struct { _Atomic int state; signal_entry_t signal; int eventfd; + fill_channel_t *fill_ch; } executor_slot_t; bool slot_deliver(executor_slot_t *slots, int n_slots, const signal_entry_t *sig); diff --git a/src/ws_client.c b/src/ws_client.c index 743afd8..2225b0d 100644 --- a/src/ws_client.c +++ b/src/ws_client.c @@ -156,13 +156,16 @@ static int setup_tls(ws_connection_t *conn) { int ws_client_init(ws_client_t *client, const config_t *cfg, symbol_table_t *symbols, order_book_t *books, - evaluator_t *evaluator) { + evaluator_t *evaluator, + executor_slot_t *slots, int n_slots) { memset(client, 0, sizeof(*client)); client->cfg = cfg; client->symbols = symbols; client->books = books; client->evaluator = evaluator; client->running = true; + client->slots = slots; + client->n_slots = n_slots; SSL_CTX *shared_ctx = create_ssl_ctx(); if (!shared_ctx) { @@ -188,6 +191,7 @@ int ws_client_init(ws_client_t *client, const config_t *cfg, } pthread_mutex_init(&client->balance_lock, NULL); + pthread_mutex_init(&client->order_slots_lock, NULL); client->balance_wake_fd = eventfd(0, EFD_NONBLOCK); if (client->balance_wake_fd < 0) { log_write("[WS] Failed to create balance wake eventfd\n"); @@ -196,6 +200,22 @@ int ws_client_init(ws_client_t *client, const config_t *cfg, return 0; } +void ws_client_register_order_slot(ws_client_t *client, + const char *order_id, int slot_index) { + if (!order_id || !order_id[0]) return; + pthread_mutex_lock(&client->order_slots_lock); + for (int i = 0; i < MAX_ORDER_SLOT_ENTRIES; i++) { + if (!atomic_load_explicit(&client->order_slots[i].active, memory_order_relaxed)) { + strncpy(client->order_slots[i].order_id, order_id, sizeof(client->order_slots[i].order_id) - 1); + client->order_slots[i].slot_index = slot_index; + atomic_store_explicit(&client->order_slots[i].active, 1, memory_order_release); + pthread_mutex_unlock(&client->order_slots_lock); + return; + } + } + pthread_mutex_unlock(&client->order_slots_lock); +} + void ws_client_destroy(ws_client_t *client) { client->running = false; SSL_CTX *ctx = NULL; @@ -383,17 +403,16 @@ int ws_client_connect(ws_client_t *client, uint32_t conn_idx) { ws_client_subscribe(client, conn_idx, conn->symbol_indices, conn->symbol_count); } - /* Subscribe to private channels */ - { + /* Subscribe to private channels — only on connection 0 to avoid + duplicate delivery of order/balance events across connections */ + if (conn_idx == 0) { char msg[256]; snprintf(msg, sizeof(msg), "{\"type\":\"subscribe\",\"topic\":\"/spotMarket/tradeOrdersV2\"," "\"response\":true,\"privateChannel\":\"true\"}"); ws_send_text(conn, msg); log_write("[WS] Subscribed to tradeOrdersV2\n"); - } - { - char msg[256]; + snprintf(msg, sizeof(msg), "{\"type\":\"subscribe\",\"topic\":\"/account/balance\"," "\"response\":true,\"privateChannel\":\"true\"}"); @@ -777,6 +796,39 @@ int16_t ws_client_process_frame(ws_client_t *client, uint32_t conn_idx) { cJSON_IsString(status) && strcmp(status->valuestring, "done") == 0); + /* Route to the correct slot's fill channel. + First try clientOid (slot encoded in bytes 1-2), + then fall back to orderId lookup. */ + fill_channel_t *target_ch = client->fill_ch; + const char *coid = oid->valuestring; + if (coid[0] == 'c' && coid[1] && coid[2]) { + unsigned long slot_idx = 0; + for (int h = 1; h <= 2; h++) { + char c = coid[h]; + slot_idx *= 16; + if (c >= '0' && c <= '9') slot_idx += (unsigned long)(c - '0'); + else if (c >= 'a' && c <= 'f') slot_idx += (unsigned long)(c - 'a' + 10); + else if (c >= 'A' && c <= 'F') slot_idx += (unsigned long)(c - 'A' + 10); + } + if (slot_idx < (unsigned long)client->n_slots && client->slots) + target_ch = client->slots[slot_idx].fill_ch; + } + /* If clientOid routing failed, try orderId mapping */ + if (target_ch == client->fill_ch && order_id && cJSON_IsString(order_id)) { + const char *oid_str = order_id->valuestring; + pthread_mutex_lock(&client->order_slots_lock); + for (int i = 0; i < MAX_ORDER_SLOT_ENTRIES; i++) { + if (atomic_load_explicit(&client->order_slots[i].active, memory_order_acquire) && + strcmp(client->order_slots[i].order_id, oid_str) == 0) { + int si = client->order_slots[i].slot_index; + if (si >= 0 && si < client->n_slots && client->slots) + target_ch = client->slots[si].fill_ch; + break; + } + } + pthread_mutex_unlock(&client->order_slots_lock); + } + if (is_match) { cJSON *ms = cJSON_GetObjectItem(data, "matchSize"); cJSON *mp = cJSON_GetObjectItem(data, "matchPrice"); @@ -787,16 +839,12 @@ int16_t ws_client_process_frame(ws_client_t *client, uint32_t conn_idx) { cJSON_IsNumber(mp) ? mp->valuedouble : 0; fe.match_fee = cJSON_IsString(mf) ? atof(mf->valuestring) : cJSON_IsNumber(mf) ? mf->valuedouble : 0; - if (!fill_channel_push(client->fill_ch, &fe) && + if (!fill_channel_push(target_ch, &fe) && ++client->fill_drop_warn <= 3) log_write("[WS] Fill ring full, dropping match\n"); } else if (is_terminal) { - /* Terminal event: push a marker that stops accumulation. - A preceding "match" event already communicated the data. - If no matches arrived, this is a cancelled+empty order. - We push a terminal marker to unblock the waiter. */ fe.is_terminal = true; - if (!fill_channel_push(client->fill_ch, &fe) && + if (!fill_channel_push(target_ch, &fe) && ++client->fill_drop_warn <= 3) log_write("[WS] Fill ring full, dropping terminal\n"); } diff --git a/src/ws_client.h b/src/ws_client.h index 9f847fd..3682a26 100644 --- a/src/ws_client.h +++ b/src/ws_client.h @@ -11,6 +11,7 @@ #include "hash.h" #include "evaluate.h" #include "fill_handler.h" +#include "slot.h" #define MAX_BALANCE_ENTRIES 64 @@ -71,6 +72,14 @@ typedef struct { uint32_t symbol_count; /* number of subscribed symbols */ } ws_connection_t; +typedef struct { + char order_id[32]; + int slot_index; + _Atomic int active; +} order_slot_entry_t; + +#define MAX_ORDER_SLOT_ENTRIES 64 + /* Top-level WebSocket client managing multiple connections */ typedef struct { ws_connection_t connections[WS_MAX_CONNECTIONS]; /* fixed-size connection pool */ @@ -80,18 +89,23 @@ typedef struct { order_book_t *books; /* pointer to shared order books */ evaluator_t *evaluator; /* pointer to evaluator */ bool running; /* false signals client to stop */ + executor_slot_t *slots; /* per-slot state for fill routing */ + int n_slots; /* number of executor slots */ fill_channel_t *fill_ch; /* fill event channel (hot→executor) */ int fill_drop_warn; /* rate-limited fill drop warning counter */ balance_entry_t balance_cache[MAX_BALANCE_ENTRIES]; /* latest available balances from WS */ int balance_count; pthread_mutex_t balance_lock; int balance_wake_fd; /* eventfd: written on every balance update */ + order_slot_entry_t order_slots[MAX_ORDER_SLOT_ENTRIES]; /* orderId→slot_index mapping */ + pthread_mutex_t order_slots_lock; } ws_client_t; -/* Initialise a WebSocket client with config, symbol table, books, and evaluator */ +/* Initialise a WebSocket client with config, symbol table, books, evaluator, and slots */ int ws_client_init(ws_client_t *client, const config_t *cfg, symbol_table_t *symbols, order_book_t *books, - evaluator_t *evaluator); + evaluator_t *evaluator, + executor_slot_t *slots, int n_slots); /* Destroy WebSocket client and close all connections */ void ws_client_destroy(ws_client_t *client); /* Initiate a WebSocket connection (non-blocking) */ @@ -125,7 +139,11 @@ bool ws_client_await_balance(ws_client_t *client, const char *currency, double min_amount, int64_t timeout_ms); /* Read the latest known available balance for a currency from cache (thread-safe). - Returns 0 if currency is not in cache. */ + Returns 0 if currency is not in cache. */ double ws_client_latest_balance(ws_client_t *client, const char *currency); +/* Register orderId→slot_index mapping for fill event routing (executor thread). */ +void ws_client_register_order_slot(ws_client_t *client, + const char *order_id, int slot_index); + #endif