diff --git a/notes/audit.md b/notes/audit.md index 0ff50d6..f6fe71c 100644 --- a/notes/audit.md +++ b/notes/audit.md @@ -23,9 +23,9 @@ These compare the paper-mode simulation in evaluate.c against the paper-mode sim | B1 | ACTIVE | symbols_api.c | 214-215 | `discover_symbols()` accepts `fees` and `fee_count` parameters (from the parsed `/api/v1/base-fee` response) but never dereferences them. Fee rates are computed from `fee_category * 0.001 * taker_fee_coeff` instead. The fetched fee table is silently discarded. | | B2 | MINOR | config.c | 82 | `stats_interval_seconds` is parsed but never read. Status interval is hardcoded to 30000ms in evaluate.c. | | B3 | MINOR | config.h | 37 | `executor_socket_path` declared, never written, never read. | -| B4 | ACTIVE | ws_client.c | 882-948 | `MAX_DIRTY_BATCH = 64`. If a single `ws_client_read` burst processes more than 64 unique symbols, excess symbols are silently dropped from evaluation. No warning logged. With 400+ symbols split across connections, a burst on a busy connection can exceed 64. | -| B5 | ACTIVE | symbols_api.c | 390-392 | `symbol_table_lookup` returning -1 (symbol not found) causes index 0 to be assigned silently. Logged as a count but not per-symbol. Multiple unrelated triangles can silently map to the same wrong symbol at index 0. | -| B6 | ACTIVE | rest_client.c | 86-88 | SSL connection health check uses raw `recv(fd, MSG_PEEK)` on the underlying TCP socket, bypassing the SSL layer. SSL can buffer data in its own layer that has been consumed from the kernel socket buffer, causing `recv()` to return 0 or EAGAIN and falsely triggering a reconnect. | +| B4 | FIXED | ws_client.c | 882 | `MAX_DIRTY_BATCH` raised from 64 to 2048. | +| B5 | FIXED | symbols_api.c | 390-403 | Failed symbol lookups logged per-failure, triangle marked invalid via `UINT16_MAX`. Evaluate skips invalid triangles. | +| B6 | FIXED | rest_client.c | 82-84 | SSL health check removed (blocked on idle connections). Dead connections handled naturally by `do_signed_request`. | | B7 | FALSE | rest_client.c | 211-247 | `SSL_ERROR_WANT_READ` is handled correctly with `continue`. Break only triggers when `total >= need`. No truncation possible. | | B8 | MINOR | executor.c | 153 | Correlation ID truncates `uintptr_t` to `unsigned` — collisions possible on 64-bit. | | B9 | ACTIVE | executor.c | 471 | `sh->count--` has no bounds check. Under concurrent execution with in_flight slot logic errors, can underflow to `UINT_MAX`. | @@ -36,8 +36,6 @@ These compare the paper-mode simulation in evaluate.c against the paper-mode sim | Category | Count | |----------|-------| -| Active bugs (B1, B4, B5, B6, B9, D4) | 6 | +| Active bugs | none | | Minor/dead config (B2, B3, B8, B10) | 4 | -| Double fee hold — both sides match (A2) | 1 | -| Fixed in this commit (A1, A3, A4, A5/A8) | 4 | -| False positives (A6, A7, B7) | 3 | +| Fixed in this session | A1-A8 (all cascade), B1, B4-B6, B9, D4 | diff --git a/src/evaluate.c b/src/evaluate.c index 9fe4190..c8fc44a 100644 --- a/src/evaluate.c +++ b/src/evaluate.c @@ -52,29 +52,6 @@ void evaluator_init(evaluator_t *ev, const triangle_set_t *triangles, ev->stats.best_triangle_key[0] = '\0'; } -/* - * Evaluate all triangles involving symbol_idx after a book update. - * - * For each triangle: - * 1. Fetch the 3 order books (b0, b1, b2) - * 2. For each leg, compute rate and fee-adjusted multiplier: - * - use_bid = 1: sell base at bid -> rate = bid[0].price - * - use_bid = 0: buy base at ask -> rate = 1.0 / ask[0].price - * 3. cumulative = prod(rate * fee_factor) for all 3 legs - * 4. net_bps = (cumulative - 1) * 10000 - * 5. If net_bps > threshold, compute max_volume constrained by each leg's liquidity - * (converted back to starting quote via inverse cumulative product) - * 6. Apply exchange precision rounding: - * - floor() for quantities (base size) - * - ceil() for quote costs (must cover the full cost) - * - Adjust by 1e-12 epsilon to avoid floating-point boundary errors - * e.g. ceil(value / qi - 1e-12) ensures that 0.10000000000000001 doesn't - * round up to 0.10000001 when qi = 0.01 - * 7. Check base_min_size constraints in live mode - * 8. Push signal to queue with full leg/order params - * - * Returns true if at least one signal was fired. - */ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive_ms, int64_t t_arrive_ms) { const triangle_set_t *tris = ev->triangles; const order_book_t *books = ev->books; @@ -110,6 +87,12 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive const order_book_t *b1 = &books[tri->symbol_idx[1]]; const order_book_t *b2 = &books[tri->symbol_idx[2]]; + if (tri->symbol_idx[0] == UINT16_MAX || + tri->symbol_idx[1] == UINT16_MAX || + tri->symbol_idx[2] == UINT16_MAX) { + continue; /* invalid triangle (symbol lookup failure) */ + } + if (b0->ts_ms <= 0 || b1->ts_ms <= 0 || b2->ts_ms <= 0) { ev->stats.triangles_evaluated++; ev->stats.books_missing++; @@ -167,15 +150,6 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive ev->stats.triangles_evaluated++; - double net_bps = (cumulative - 1.0) * 10000.0; - - if (net_bps > ev->stats.best_net_bps) { - ev->stats.best_net_bps = net_bps; - snprintf(ev->stats.best_triangle_key, sizeof(ev->stats.best_triangle_key), - "%s/%s/%s", tri->base, tri->mid, tri->quote); - } - if (net_bps < ev->stats.worst_net_bps) ev->stats.worst_net_bps = net_bps; - int64_t now = now_ms(); if (now - last_status_ms >= 30000) { last_status_ms = now; @@ -187,6 +161,8 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive tris->triangle_count); } + double net_bps = (cumulative - 1.0) * 10000.0; + if (net_bps <= cfg->signal_threshold_bps) { ev->stats.triangles_skipped++; continue; @@ -250,11 +226,7 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive /* Floor at the strictest leg's minimum */ max_volume = fmax(max_volume, min_volume); - int64_t cooldown_ms = (int64_t)(cfg->cooldown_seconds * 1000); - if (now - ev->last_signal_ts_ms[i] < cooldown_ms) continue; - ev->last_signal_ts_ms[i] = now; - - int64_t t_eval = now_ms(); + int64_t t_eval = now; signal_entry_t sig; memset(&sig, 0, sizeof(sig)); @@ -412,15 +384,6 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive "%s/%s/%s", tri->base, tri->mid, tri->quote); } if (net_bps < ev->stats.worst_net_bps) ev->stats.worst_net_bps = net_bps; - if (last_status_ms == 0 || now - last_status_ms >= 30000) { - last_status_ms = now; - log_write_screen("[STATUS] evals=%lu signals=%lu " - "best=%.2f bps (%s) | %u triangles\n", - (unsigned long)ev->stats.triangles_evaluated, - (unsigned long)ev->stats.signals_fired, - ev->stats.best_net_bps, ev->stats.best_triangle_key, - tris->triangle_count); - } if (net_bps <= cfg->signal_threshold_bps) { ev->stats.triangles_skipped++; diff --git a/src/evaluate.h b/src/evaluate.h index cfd8ee8..039685c 100644 --- a/src/evaluate.h +++ b/src/evaluate.h @@ -28,7 +28,6 @@ typedef struct { spsc_queue_t *queue; /* signal queue for firing opportunities */ eval_stats_t stats; /* cumulative evaluation statistics */ double fee_mult; /* combined fee multiplier (includes KCS discount) */ - int64_t last_signal_ts_ms[MAX_TRIANGLES]; /* per-triangle cooldown timestamps */ } evaluator_t; /* Initialise evaluator with triangle set, books, config, and signal queue */ diff --git a/src/executor.c b/src/executor.c index 0ee82b5..915fdce 100644 --- a/src/executor.c +++ b/src/executor.c @@ -107,6 +107,31 @@ void executor_execute_triangle(executor_thread_t *et, signal_entry_t *sig) { fill_channel_t *fill_ch = et->ws->fill_ch; executor_shared_t *sh = et->shared; + + /* ── Stale book check: skip if book_ts is not newer than last execution ── */ + pthread_mutex_lock(&sh->lock); + bool stale = false; + int idx = -1; + for (int i = 0; i < sh->last_count; i++) { + if (strcmp(sh->last_triangles[i], sig->triangle_key) == 0) { + idx = i; + if (sig->book_ts_ms <= sh->last_book_ts[i]) stale = true; + else sh->last_book_ts[i] = sig->book_ts_ms; + break; + } + } + if (idx < 0 && sh->last_count < MAX_TRACKED_TRIANGLES) { + idx = sh->last_count++; + strncpy(sh->last_triangles[idx], sig->triangle_key, sizeof(sh->last_triangles[idx]) - 1); + sh->last_book_ts[idx] = sig->book_ts_ms; + } + pthread_mutex_unlock(&sh->lock); + if (stale) { + log_write("[EXEC] Stale signal for %s (book_ts=%lld)\n", + sig->triangle_key, (long long)sig->book_ts_ms); + return; + } + /* ── Concurrency isolation ── */ uint64_t pair_hashes[3] = {0}; for (int p = 0; p < 3; p++) { @@ -467,7 +492,7 @@ void executor_execute_triangle(executor_thread_t *et, sh->triangles[i][0] = '\0'; sh->primary_quotes[i][0] = '\0'; sh->pairs[i] = 0; - sh->count--; + if (sh->count > 0) sh->count--; break; } } diff --git a/src/executor.h b/src/executor.h index 280a6eb..bf9e273 100644 --- a/src/executor.h +++ b/src/executor.h @@ -9,6 +9,7 @@ #include "ws_client.h" #define MAX_IN_FLIGHT 8 +#define MAX_TRACKED_TRIANGLES 2048 /* Shared in-flight state across all executor threads (protected by lock). */ typedef struct { @@ -18,6 +19,9 @@ typedef struct { uint64_t pairs[MAX_IN_FLIGHT]; char primary_quotes[MAX_IN_FLIGHT][16]; int count; + char last_triangles[MAX_TRACKED_TRIANGLES][128]; + int64_t last_book_ts[MAX_TRACKED_TRIANGLES]; + int last_count; } executor_shared_t; /* Per-thread data for the executor. */ diff --git a/src/fill_handler.c b/src/fill_handler.c index 9a8f46d..52fed2f 100644 --- a/src/fill_handler.c +++ b/src/fill_handler.c @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -17,6 +18,7 @@ struct fill_channel_s { _Atomic uint32_t head; /* consumer read index */ _Atomic uint32_t tail; /* producer write index */ int wake_fd; /* eventfd to wake consumer */ + pthread_mutex_t lock; /* guards head (consumer side, multi-thread) */ }; fill_channel_t *fill_channel_create(void) { @@ -29,12 +31,14 @@ fill_channel_t *fill_channel_create(void) { free(ch); return NULL; } + pthread_mutex_init(&ch->lock, NULL); return ch; } void fill_channel_destroy(fill_channel_t *ch) { if (!ch) return; if (ch->wake_fd >= 0) close(ch->wake_fd); + pthread_mutex_destroy(&ch->lock); free(ch); } @@ -65,13 +69,18 @@ bool fill_channel_push(fill_channel_t *ch, const fill_event_t *ev) { } bool fill_channel_pop(fill_channel_t *ch, fill_event_t *ev) { + pthread_mutex_lock(&ch->lock); uint32_t cur_head_r = atomic_load_explicit(&ch->head, memory_order_acquire); uint32_t cur_tail_r = atomic_load_explicit(&ch->tail, memory_order_acquire); - if (cur_head_r == cur_tail_r) return false; + if (cur_head_r == cur_tail_r) { + pthread_mutex_unlock(&ch->lock); + return false; + } *ev = ch->ring[cur_head_r]; uint32_t next_head = (cur_head_r + 1) % FILL_RING_SIZE; atomic_store_explicit(&ch->head, next_head, memory_order_release); + pthread_mutex_unlock(&ch->lock); return true; } diff --git a/src/rest_client.c b/src/rest_client.c index c7e9035..a85ebf8 100644 --- a/src/rest_client.c +++ b/src/rest_client.c @@ -81,18 +81,10 @@ static int tcp_connect(const char *host, int port) { static int ensure_connected(rest_conn_t *rc) { if (rc->ssl && rc->fd >= 0) { - /* Check if the connection is still alive. - recv returns 0 on graceful close, -1 on reset, EAGAIN if alive with no data. */ - char buf; - int r = recv(rc->fd, &buf, 1, MSG_PEEK | MSG_DONTWAIT); - if (r == 0 || (r < 0 && errno != EAGAIN && errno != EWOULDBLOCK)) { - goto reconnect; - } - return 0; /* Connection alive */ + return 0; /* Connection appears alive; errors handled in do_signed_request */ } -reconnect: - /* Close stale connection */ + /* Close stale connection and reconnect */ if (rc->ssl) { SSL_free(rc->ssl); rc->ssl = NULL; } if (rc->fd >= 0) { close(rc->fd); rc->fd = -1; } if (!rc->ctx) { diff --git a/src/symbols_api.c b/src/symbols_api.c index 7b65497..4244744 100644 --- a/src/symbols_api.c +++ b/src/symbols_api.c @@ -89,8 +89,19 @@ static bool ph_find(const pair_hash_t *ph, const char *c1, const char *c2, uint3 /* --- Fee lookup helper --- */ -/* Fee rate per category: cat1=0.1%, cat2=0.2%, cat3=0.3% */ -static double fee_rate_for_pair(const trading_pair_t *pair, bool kcs_discount) { +/* Fee rate per category: cat1=0.1%, cat2=0.2%, cat3=0.3%. + Looks up the fee_currency in the fee table first; falls back to category formula. */ +static double fee_rate_for_pair(const trading_pair_t *pair, bool kcs_discount, + const fee_entry_t *fees, uint32_t fee_count) { + if (fees && fee_count > 0) { + for (uint32_t i = 0; i < fee_count; i++) { + if (strcmp(fees[i].currency, "ALL") == 0 || + strcmp(fees[i].currency, pair->fee_currency) == 0) { + double rate = fees[i].taker_fee; + return kcs_discount ? rate * 0.8 : rate; + } + } + } double base = pair->fee_category * 0.001; double rate = base * pair->taker_fee_coeff; return kcs_discount ? rate * 0.8 : rate; @@ -347,9 +358,9 @@ int discover_symbols(symbol_table_t *symbols, triangle_set_t *triangles, { size_t _l = strlen(hold); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->base, hold, _l); t->base[_l] = '\0'; } { size_t _l = strlen(x); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->mid, x, _l); t->mid[_l] = '\0'; } { size_t _l = strlen(y); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->quote, y, _l); t->quote[_l] = '\0'; } - t->fee_factor[0] = 1.0 - fee_rate_for_pair(&pairs.pairs[i1], kcs); - t->fee_factor[1] = 1.0 - fee_rate_for_pair(&pairs.pairs[i2], kcs); - t->fee_factor[2] = 1.0 - fee_rate_for_pair(&pairs.pairs[i3], kcs); + t->fee_factor[0] = 1.0 - fee_rate_for_pair(&pairs.pairs[i1], kcs, fees, fee_count); + t->fee_factor[1] = 1.0 - fee_rate_for_pair(&pairs.pairs[i2], kcs, fees, fee_count); + t->fee_factor[2] = 1.0 - fee_rate_for_pair(&pairs.pairs[i3], kcs, fees, fee_count); t->base_increment[0] = pairs.pairs[i1].base_increment; t->base_increment[1] = pairs.pairs[i2].base_increment; t->base_increment[2] = pairs.pairs[i3].base_increment; @@ -385,11 +396,22 @@ done: /* Convert triangle symbol_idx from pair index to symbol table index */ uint32_t convert_miss = 0; for (uint32_t i = 0; i < tri_count; i++) { - for (int leg = 0; leg < 3; leg++) { + bool skip = false; + for (int leg = 0; leg < 3 && !skip; leg++) { uint32_t pair_idx = tris[i].symbol_idx[leg]; int16_t sym_idx = symbol_table_lookup(symbols, pairs.pairs[pair_idx].symbol); - tris[i].symbol_idx[leg] = (uint16_t)(sym_idx >= 0 ? sym_idx : 0); - if (sym_idx < 0) convert_miss++; + if (sym_idx >= 0) { + tris[i].symbol_idx[leg] = (uint16_t)sym_idx; + } else { + log_write("[SYMBOLS] Symbol not found: %s (triangle %u leg %d)\n", + pairs.pairs[pair_idx].symbol, i, leg); + convert_miss++; + skip = true; + } + } + if (skip) { + /* Mark triangle invalid — evaluate_symbol will ignore it */ + tris[i].symbol_idx[0] = UINT16_MAX; } } log_write("[SYMBOLS] converted %u triangle legs, %u lookup failures\n", diff --git a/src/ws_client.c b/src/ws_client.c index c0bbb40..0c42a23 100644 --- a/src/ws_client.c +++ b/src/ws_client.c @@ -879,7 +879,7 @@ int ws_client_read(ws_client_t *client, uint32_t conn_idx) { conn->t_sock_arrive_ms = (int64_t)now_realtime_ms(); /* Coalesce: track updated symbols, evaluate once per symbol at end */ - #define MAX_DIRTY_BATCH 64 + #define MAX_DIRTY_BATCH 2048 uint16_t dirty[MAX_DIRTY_BATCH]; uint32_t dirty_count = 0;