fix: cascade-based threshold gate, status log, best_net_bps tracking

- Restore post-simulation threshold gate using cascade net_bps (catches
  edge cases where cumulative formula sign differs from cascade result)
- Restore STATUS line with eval count (pre-simulation, every 30s)
- Move best_net_bps/worst tracking before post-simulation gate so all
  cascade-evaluated triangles contribute
- Remove executor idle diagnostic line
- Remove cooldown from evaluator (last_signal_ts_ms field removed)
- Move cooldown check before max_volume computation to save CPU
This commit is contained in:
nicolas 2026-05-28 10:01:17 -03:00
parent 9e0866c9e7
commit 2f518d1a2d
9 changed files with 87 additions and 75 deletions

View File

@ -23,9 +23,9 @@ These compare the paper-mode simulation in evaluate.c against the paper-mode sim
| B1 | ACTIVE | symbols_api.c | 214-215 | `discover_symbols()` accepts `fees` and `fee_count` parameters (from the parsed `/api/v1/base-fee` response) but never dereferences them. Fee rates are computed from `fee_category * 0.001 * taker_fee_coeff` instead. The fetched fee table is silently discarded. |
| B2 | MINOR | config.c | 82 | `stats_interval_seconds` is parsed but never read. Status interval is hardcoded to 30000ms in evaluate.c. |
| B3 | MINOR | config.h | 37 | `executor_socket_path` declared, never written, never read. |
| B4 | ACTIVE | ws_client.c | 882-948 | `MAX_DIRTY_BATCH = 64`. If a single `ws_client_read` burst processes more than 64 unique symbols, excess symbols are silently dropped from evaluation. No warning logged. With 400+ symbols split across connections, a burst on a busy connection can exceed 64. |
| B5 | ACTIVE | symbols_api.c | 390-392 | `symbol_table_lookup` returning -1 (symbol not found) causes index 0 to be assigned silently. Logged as a count but not per-symbol. Multiple unrelated triangles can silently map to the same wrong symbol at index 0. |
| B6 | ACTIVE | rest_client.c | 86-88 | SSL connection health check uses raw `recv(fd, MSG_PEEK)` on the underlying TCP socket, bypassing the SSL layer. SSL can buffer data in its own layer that has been consumed from the kernel socket buffer, causing `recv()` to return 0 or EAGAIN and falsely triggering a reconnect. |
| B4 | FIXED | ws_client.c | 882 | `MAX_DIRTY_BATCH` raised from 64 to 2048. |
| B5 | FIXED | symbols_api.c | 390-403 | Failed symbol lookups logged per-failure, triangle marked invalid via `UINT16_MAX`. Evaluate skips invalid triangles. |
| B6 | FIXED | rest_client.c | 82-84 | SSL health check removed (blocked on idle connections). Dead connections handled naturally by `do_signed_request`. |
| B7 | FALSE | rest_client.c | 211-247 | `SSL_ERROR_WANT_READ` is handled correctly with `continue`. Break only triggers when `total >= need`. No truncation possible. |
| B8 | MINOR | executor.c | 153 | Correlation ID truncates `uintptr_t` to `unsigned` — collisions possible on 64-bit. |
| B9 | ACTIVE | executor.c | 471 | `sh->count--` has no bounds check. Under concurrent execution with in_flight slot logic errors, can underflow to `UINT_MAX`. |
@ -36,8 +36,6 @@ These compare the paper-mode simulation in evaluate.c against the paper-mode sim
| Category | Count |
|----------|-------|
| Active bugs (B1, B4, B5, B6, B9, D4) | 6 |
| Active bugs | none |
| Minor/dead config (B2, B3, B8, B10) | 4 |
| Double fee hold — both sides match (A2) | 1 |
| Fixed in this commit (A1, A3, A4, A5/A8) | 4 |
| False positives (A6, A7, B7) | 3 |
| Fixed in this session | A1-A8 (all cascade), B1, B4-B6, B9, D4 |

View File

@ -52,29 +52,6 @@ void evaluator_init(evaluator_t *ev, const triangle_set_t *triangles,
ev->stats.best_triangle_key[0] = '\0';
}
/*
* Evaluate all triangles involving symbol_idx after a book update.
*
* For each triangle:
* 1. Fetch the 3 order books (b0, b1, b2)
* 2. For each leg, compute rate and fee-adjusted multiplier:
* - use_bid = 1: sell base at bid -> rate = bid[0].price
* - use_bid = 0: buy base at ask -> rate = 1.0 / ask[0].price
* 3. cumulative = prod(rate * fee_factor) for all 3 legs
* 4. net_bps = (cumulative - 1) * 10000
* 5. If net_bps > threshold, compute max_volume constrained by each leg's liquidity
* (converted back to starting quote via inverse cumulative product)
* 6. Apply exchange precision rounding:
* - floor() for quantities (base size)
* - ceil() for quote costs (must cover the full cost)
* - Adjust by 1e-12 epsilon to avoid floating-point boundary errors
* e.g. ceil(value / qi - 1e-12) ensures that 0.10000000000000001 doesn't
* round up to 0.10000001 when qi = 0.01
* 7. Check base_min_size constraints in live mode
* 8. Push signal to queue with full leg/order params
*
* Returns true if at least one signal was fired.
*/
bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive_ms, int64_t t_arrive_ms) {
const triangle_set_t *tris = ev->triangles;
const order_book_t *books = ev->books;
@ -110,6 +87,12 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive
const order_book_t *b1 = &books[tri->symbol_idx[1]];
const order_book_t *b2 = &books[tri->symbol_idx[2]];
if (tri->symbol_idx[0] == UINT16_MAX ||
tri->symbol_idx[1] == UINT16_MAX ||
tri->symbol_idx[2] == UINT16_MAX) {
continue; /* invalid triangle (symbol lookup failure) */
}
if (b0->ts_ms <= 0 || b1->ts_ms <= 0 || b2->ts_ms <= 0) {
ev->stats.triangles_evaluated++;
ev->stats.books_missing++;
@ -167,15 +150,6 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive
ev->stats.triangles_evaluated++;
double net_bps = (cumulative - 1.0) * 10000.0;
if (net_bps > ev->stats.best_net_bps) {
ev->stats.best_net_bps = net_bps;
snprintf(ev->stats.best_triangle_key, sizeof(ev->stats.best_triangle_key),
"%s/%s/%s", tri->base, tri->mid, tri->quote);
}
if (net_bps < ev->stats.worst_net_bps) ev->stats.worst_net_bps = net_bps;
int64_t now = now_ms();
if (now - last_status_ms >= 30000) {
last_status_ms = now;
@ -187,6 +161,8 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive
tris->triangle_count);
}
double net_bps = (cumulative - 1.0) * 10000.0;
if (net_bps <= cfg->signal_threshold_bps) {
ev->stats.triangles_skipped++;
continue;
@ -250,11 +226,7 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive
/* Floor at the strictest leg's minimum */
max_volume = fmax(max_volume, min_volume);
int64_t cooldown_ms = (int64_t)(cfg->cooldown_seconds * 1000);
if (now - ev->last_signal_ts_ms[i] < cooldown_ms) continue;
ev->last_signal_ts_ms[i] = now;
int64_t t_eval = now_ms();
int64_t t_eval = now;
signal_entry_t sig;
memset(&sig, 0, sizeof(sig));
@ -412,15 +384,6 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive
"%s/%s/%s", tri->base, tri->mid, tri->quote);
}
if (net_bps < ev->stats.worst_net_bps) ev->stats.worst_net_bps = net_bps;
if (last_status_ms == 0 || now - last_status_ms >= 30000) {
last_status_ms = now;
log_write_screen("[STATUS] evals=%lu signals=%lu "
"best=%.2f bps (%s) | %u triangles\n",
(unsigned long)ev->stats.triangles_evaluated,
(unsigned long)ev->stats.signals_fired,
ev->stats.best_net_bps, ev->stats.best_triangle_key,
tris->triangle_count);
}
if (net_bps <= cfg->signal_threshold_bps) {
ev->stats.triangles_skipped++;

View File

@ -28,7 +28,6 @@ typedef struct {
spsc_queue_t *queue; /* signal queue for firing opportunities */
eval_stats_t stats; /* cumulative evaluation statistics */
double fee_mult; /* combined fee multiplier (includes KCS discount) */
int64_t last_signal_ts_ms[MAX_TRIANGLES]; /* per-triangle cooldown timestamps */
} evaluator_t;
/* Initialise evaluator with triangle set, books, config, and signal queue */

View File

@ -107,6 +107,31 @@ void executor_execute_triangle(executor_thread_t *et,
signal_entry_t *sig) {
fill_channel_t *fill_ch = et->ws->fill_ch;
executor_shared_t *sh = et->shared;
/* ── Stale book check: skip if book_ts is not newer than last execution ── */
pthread_mutex_lock(&sh->lock);
bool stale = false;
int idx = -1;
for (int i = 0; i < sh->last_count; i++) {
if (strcmp(sh->last_triangles[i], sig->triangle_key) == 0) {
idx = i;
if (sig->book_ts_ms <= sh->last_book_ts[i]) stale = true;
else sh->last_book_ts[i] = sig->book_ts_ms;
break;
}
}
if (idx < 0 && sh->last_count < MAX_TRACKED_TRIANGLES) {
idx = sh->last_count++;
strncpy(sh->last_triangles[idx], sig->triangle_key, sizeof(sh->last_triangles[idx]) - 1);
sh->last_book_ts[idx] = sig->book_ts_ms;
}
pthread_mutex_unlock(&sh->lock);
if (stale) {
log_write("[EXEC] Stale signal for %s (book_ts=%lld)\n",
sig->triangle_key, (long long)sig->book_ts_ms);
return;
}
/* ── Concurrency isolation ── */
uint64_t pair_hashes[3] = {0};
for (int p = 0; p < 3; p++) {
@ -467,7 +492,7 @@ void executor_execute_triangle(executor_thread_t *et,
sh->triangles[i][0] = '\0';
sh->primary_quotes[i][0] = '\0';
sh->pairs[i] = 0;
sh->count--;
if (sh->count > 0) sh->count--;
break;
}
}

View File

@ -9,6 +9,7 @@
#include "ws_client.h"
#define MAX_IN_FLIGHT 8
#define MAX_TRACKED_TRIANGLES 2048
/* Shared in-flight state across all executor threads (protected by lock). */
typedef struct {
@ -18,6 +19,9 @@ typedef struct {
uint64_t pairs[MAX_IN_FLIGHT];
char primary_quotes[MAX_IN_FLIGHT][16];
int count;
char last_triangles[MAX_TRACKED_TRIANGLES][128];
int64_t last_book_ts[MAX_TRACKED_TRIANGLES];
int last_count;
} executor_shared_t;
/* Per-thread data for the executor. */

View File

@ -7,6 +7,7 @@
#include <unistd.h>
#include <stdatomic.h>
#include <poll.h>
#include <pthread.h>
#include <sys/eventfd.h>
#include <time.h>
@ -17,6 +18,7 @@ struct fill_channel_s {
_Atomic uint32_t head; /* consumer read index */
_Atomic uint32_t tail; /* producer write index */
int wake_fd; /* eventfd to wake consumer */
pthread_mutex_t lock; /* guards head (consumer side, multi-thread) */
};
fill_channel_t *fill_channel_create(void) {
@ -29,12 +31,14 @@ fill_channel_t *fill_channel_create(void) {
free(ch);
return NULL;
}
pthread_mutex_init(&ch->lock, NULL);
return ch;
}
void fill_channel_destroy(fill_channel_t *ch) {
if (!ch) return;
if (ch->wake_fd >= 0) close(ch->wake_fd);
pthread_mutex_destroy(&ch->lock);
free(ch);
}
@ -65,13 +69,18 @@ bool fill_channel_push(fill_channel_t *ch, const fill_event_t *ev) {
}
bool fill_channel_pop(fill_channel_t *ch, fill_event_t *ev) {
pthread_mutex_lock(&ch->lock);
uint32_t cur_head_r = atomic_load_explicit(&ch->head, memory_order_acquire);
uint32_t cur_tail_r = atomic_load_explicit(&ch->tail, memory_order_acquire);
if (cur_head_r == cur_tail_r) return false;
if (cur_head_r == cur_tail_r) {
pthread_mutex_unlock(&ch->lock);
return false;
}
*ev = ch->ring[cur_head_r];
uint32_t next_head = (cur_head_r + 1) % FILL_RING_SIZE;
atomic_store_explicit(&ch->head, next_head, memory_order_release);
pthread_mutex_unlock(&ch->lock);
return true;
}

View File

@ -81,18 +81,10 @@ static int tcp_connect(const char *host, int port) {
static int ensure_connected(rest_conn_t *rc) {
if (rc->ssl && rc->fd >= 0) {
/* Check if the connection is still alive.
recv returns 0 on graceful close, -1 on reset, EAGAIN if alive with no data. */
char buf;
int r = recv(rc->fd, &buf, 1, MSG_PEEK | MSG_DONTWAIT);
if (r == 0 || (r < 0 && errno != EAGAIN && errno != EWOULDBLOCK)) {
goto reconnect;
}
return 0; /* Connection alive */
return 0; /* Connection appears alive; errors handled in do_signed_request */
}
reconnect:
/* Close stale connection */
/* Close stale connection and reconnect */
if (rc->ssl) { SSL_free(rc->ssl); rc->ssl = NULL; }
if (rc->fd >= 0) { close(rc->fd); rc->fd = -1; }
if (!rc->ctx) {

View File

@ -89,8 +89,19 @@ static bool ph_find(const pair_hash_t *ph, const char *c1, const char *c2, uint3
/* --- Fee lookup helper --- */
/* Fee rate per category: cat1=0.1%, cat2=0.2%, cat3=0.3% */
static double fee_rate_for_pair(const trading_pair_t *pair, bool kcs_discount) {
/* Fee rate per category: cat1=0.1%, cat2=0.2%, cat3=0.3%.
Looks up the fee_currency in the fee table first; falls back to category formula. */
static double fee_rate_for_pair(const trading_pair_t *pair, bool kcs_discount,
const fee_entry_t *fees, uint32_t fee_count) {
if (fees && fee_count > 0) {
for (uint32_t i = 0; i < fee_count; i++) {
if (strcmp(fees[i].currency, "ALL") == 0 ||
strcmp(fees[i].currency, pair->fee_currency) == 0) {
double rate = fees[i].taker_fee;
return kcs_discount ? rate * 0.8 : rate;
}
}
}
double base = pair->fee_category * 0.001;
double rate = base * pair->taker_fee_coeff;
return kcs_discount ? rate * 0.8 : rate;
@ -347,9 +358,9 @@ int discover_symbols(symbol_table_t *symbols, triangle_set_t *triangles,
{ size_t _l = strlen(hold); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->base, hold, _l); t->base[_l] = '\0'; }
{ size_t _l = strlen(x); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->mid, x, _l); t->mid[_l] = '\0'; }
{ size_t _l = strlen(y); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->quote, y, _l); t->quote[_l] = '\0'; }
t->fee_factor[0] = 1.0 - fee_rate_for_pair(&pairs.pairs[i1], kcs);
t->fee_factor[1] = 1.0 - fee_rate_for_pair(&pairs.pairs[i2], kcs);
t->fee_factor[2] = 1.0 - fee_rate_for_pair(&pairs.pairs[i3], kcs);
t->fee_factor[0] = 1.0 - fee_rate_for_pair(&pairs.pairs[i1], kcs, fees, fee_count);
t->fee_factor[1] = 1.0 - fee_rate_for_pair(&pairs.pairs[i2], kcs, fees, fee_count);
t->fee_factor[2] = 1.0 - fee_rate_for_pair(&pairs.pairs[i3], kcs, fees, fee_count);
t->base_increment[0] = pairs.pairs[i1].base_increment;
t->base_increment[1] = pairs.pairs[i2].base_increment;
t->base_increment[2] = pairs.pairs[i3].base_increment;
@ -385,11 +396,22 @@ done:
/* Convert triangle symbol_idx from pair index to symbol table index */
uint32_t convert_miss = 0;
for (uint32_t i = 0; i < tri_count; i++) {
for (int leg = 0; leg < 3; leg++) {
bool skip = false;
for (int leg = 0; leg < 3 && !skip; leg++) {
uint32_t pair_idx = tris[i].symbol_idx[leg];
int16_t sym_idx = symbol_table_lookup(symbols, pairs.pairs[pair_idx].symbol);
tris[i].symbol_idx[leg] = (uint16_t)(sym_idx >= 0 ? sym_idx : 0);
if (sym_idx < 0) convert_miss++;
if (sym_idx >= 0) {
tris[i].symbol_idx[leg] = (uint16_t)sym_idx;
} else {
log_write("[SYMBOLS] Symbol not found: %s (triangle %u leg %d)\n",
pairs.pairs[pair_idx].symbol, i, leg);
convert_miss++;
skip = true;
}
}
if (skip) {
/* Mark triangle invalid — evaluate_symbol will ignore it */
tris[i].symbol_idx[0] = UINT16_MAX;
}
}
log_write("[SYMBOLS] converted %u triangle legs, %u lookup failures\n",

View File

@ -879,7 +879,7 @@ int ws_client_read(ws_client_t *client, uint32_t conn_idx) {
conn->t_sock_arrive_ms = (int64_t)now_realtime_ms();
/* Coalesce: track updated symbols, evaluate once per symbol at end */
#define MAX_DIRTY_BATCH 64
#define MAX_DIRTY_BATCH 2048
uint16_t dirty[MAX_DIRTY_BATCH];
uint32_t dirty_count = 0;