From 46084de4b29efb26e4dc74f77996d2111255e35f Mon Sep 17 00:00:00 2001 From: nicolas Date: Wed, 27 May 2026 00:15:08 -0300 Subject: [PATCH] fix: keepalive, connection health check, log file, single-line reports, timings fix - Move REST keepalive to poll loop (async, not blocking signal execution) - Fix ensure_connected to detect RST connections (recv < 0, not just == 0) - Add log_set_file() + log file /tmp/engine.log (background thread writes) - Single-line FILLED/FAILED reports (no multi-line fills inside brackets) - Fix timing clock (use CLOCK_REALTIME consistently, not mixing with MONOTONIC) - Add ORDER/FILL/REJECTED intermediate output lines - Add session warmup at executor_thread_create - Fix FILL latency (use order-fire time, not signal-received time) - Paper mode: add clientOid to test endpoint, fix fee simulation - Concurrency: add primary_quote isolation --- src/events.c | 22 ++++++- src/executor.c | 146 +++++++++++++++++++++++++++++++--------------- src/executor.h | 3 + src/log.c | 9 +++ src/log.h | 3 + src/main.c | 1 + src/rest_client.c | 6 +- 7 files changed, 141 insertions(+), 49 deletions(-) diff --git a/src/events.c b/src/events.c index 0693107..f9f1f17 100644 --- a/src/events.c +++ b/src/events.c @@ -13,6 +13,13 @@ #include "evaluate.h" #include "executor.h" #include +#include + +static int64_t now_mono_ms(void) { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return (int64_t)ts.tv_sec * 1000 + (int64_t)ts.tv_nsec / 1000000; +} #include #include #include @@ -416,9 +423,16 @@ void *event_cold_thread(void *arg) { { .fd = loops->wakeup_fd, .events = POLLIN }, { .fd = fill_channel_wake_fd(loops->ws_client->fill_ch), .events = POLLIN }, }; + int64_t last_keepalive_ms = 0; while (loops->running) { - int nfds = poll(fds, 2, 200); + int64_t now_ka = now_mono_ms(); + int poll_timeout = 200; + if (last_keepalive_ms == 0 || now_ka - last_keepalive_ms >= 30000) { + poll_timeout = 100; + } + + int nfds = poll(fds, 2, poll_timeout); if (nfds < 0) { if (errno == EINTR) continue; log_write("[EXEC] poll error: %s\n", strerror(errno)); @@ -439,6 +453,12 @@ void *event_cold_thread(void *arg) { read(fds[1].fd, &val, sizeof(val)); } + /* Keepalive: warm up REST connection every 30s */ + if (now_ka - last_keepalive_ms >= 30000 || last_keepalive_ms == 0) { + executor_keepalive(exec); + last_keepalive_ms = now_mono_ms(); + } + /* Drain again to catch signals enqueued during execution */ while (!spsc_empty(loops->signal_queue)) { signal_entry_t sig; diff --git a/src/executor.c b/src/executor.c index 3f77266..5848752 100644 --- a/src/executor.c +++ b/src/executor.c @@ -89,9 +89,21 @@ executor_thread_t *executor_thread_create(const config_t *cfg, cfg->kucoin_api_secret, cfg->kucoin_api_passphrase); } + + /* Warm up the authenticated REST connection pool */ + if (et->rest && cfg->live_mode) { + double dummy = 0; + (void)rest_get_balance(et->rest, "USDT", &dummy); + } return et; } +bool executor_keepalive(executor_thread_t *et) { + if (!et->rest) return false; + double dummy = 0; + return rest_get_balance(et->rest, "USDT", &dummy); +} + void executor_thread_set_fill_ch(executor_thread_t *et, fill_channel_t *ch) { (void)et; (void)ch; @@ -101,7 +113,7 @@ void executor_thread_set_fill_ch(executor_thread_t *et, fill_channel_t *ch) { void executor_execute_triangle(executor_thread_t *et, signal_entry_t *sig, - fill_channel_t *fill_ch) { + fill_channel_t *fill_ch) { /* ── Concurrency isolation ── */ uint64_t pair_hashes[3] = {0}; for (int p = 0; p < 3; p++) { @@ -137,7 +149,8 @@ void executor_execute_triangle(executor_thread_t *et, char ts_buf[32]; char corr_id[64]; - int64_t exec_start = now_mono_ms(); + int64_t signal_received_ms = now_mono_ms(); + int64_t exec_start = signal_received_ms; snprintf(corr_id, sizeof(corr_id), "%08x%08x%08x%08x", (unsigned)(uintptr_t)&sig->legs.legs[0] ^ (unsigned)sig->ts_ms, (unsigned)sig->ts_ms ^ (unsigned)sig->book_ts_ms, @@ -177,6 +190,7 @@ void executor_execute_triangle(executor_thread_t *et, char err_msg[128] = {0}; bool ok = false; int64_t t0 = now_mono_ms(); + int64_t order_fire_ms_tracking = t0; if (sig->live) { ok = rest_order_place(et->rest, sl->symbol, sl->side, @@ -197,8 +211,26 @@ void executor_execute_triangle(executor_thread_t *et, } fills[leg][5] = (double)(now_mono_ms() - t0); + + /* ── ORDER output ── */ + { + int64_t ref_ms = signal_received_ms; + format_ts(ts_buf, sizeof(ts_buf)); + executor_write_report( + "%s ORDER | corr=%s | leg%d | %s | %s | vol=%.10g | " + "order_id=%s | lat=%.1fms\n", + ts_buf, corr_id, leg, sl->symbol, sl->side, + input_vol, ok ? order_id : "NONE", fills[leg][5]); + } + if (!ok) { error_str = err_msg[0] ? err_msg : "order_rejected"; + format_ts(ts_buf, sizeof(ts_buf)); + executor_write_report( + "%s REJECTED | corr=%s | leg%d | %s | %s | vol=%.10g | " + "error=%s | lat=%.1fms\n", + ts_buf, corr_id, leg, sl->symbol, sl->side, + input_vol, err_msg, fills[leg][5]); success = false; break; } @@ -263,6 +295,16 @@ void executor_execute_triangle(executor_thread_t *et, } } + /* ── FILL output ── */ + format_ts(ts_buf, sizeof(ts_buf)); + executor_write_report( + "%s FILL | corr=%s | leg%d | %s | %s | " + "out=%.10g@%.6g | fee=%.6g %s | lat=%.1fms\n", + ts_buf, corr_id, leg, sl->symbol, sl->side, + is_buy ? total_size : total_funds, avg_price, + total_fee, sl->fee_currency, + (double)(now_mono_ms() - order_fire_ms_tracking)); + /* ── Cascade ── */ leg_output[leg] = is_buy ? total_size : total_funds; fills[leg][0] = leg_output[leg]; @@ -312,7 +354,56 @@ void executor_execute_triangle(executor_thread_t *et, int64_t total_ms = now_mono_ms() - exec_start; - /* ── Emit report ── */ + /* ── Build timing string (relative to signal_arrive realtime) ── */ + int64_t now_rt = now_realtime_ms(); + char timings_str[256] = ""; + if (sig->t_sock_arrive_ms > 0) { + snprintf(timings_str, sizeof(timings_str), + "t-2_book_snapshot=%.1fms " + "book_update_arrived=%.1fms " + "t-1_eval_complete=%.1fms " + "t_signal_created=%.1fms " + "signal_received=0.0ms " + "execution_complete=%.1fms", + (double)(sig->ts_ms - now_rt), + (double)(sig->t_sock_arrive_ms - now_rt), + (double)(sig->t_arrive_ms - now_rt), + (double)(sig->t_eval_ms - now_rt), + (double)(now_rt - now_rt)); + } + + /* ── Build fills string ── */ + char fills_str[1024] = ""; + int fills_pos = 0; + for (int leg = 0; leg < 3 && leg <= (success ? 2 : (fills[0][0] > 0 ? 0 : -1)); leg++) { + if (fills[leg][0] == 0 && !(leg == 2 && success)) break; + const signal_leg_t *sl = &sig->legs.legs[leg]; + const char *pair = sl->symbol; + const char *side = sl->side; + const char *dash = strchr(pair, '-'); + char in_ccy[16] = {0}, out_ccy[16] = {0}; + if (dash) { + size_t blen = (size_t)(dash - pair); + if (blen > 15) blen = 15; + if (strcmp(side, "buy") == 0) { + memcpy(in_ccy, dash + 1, (strlen(dash+1) > 15 ? 15 : strlen(dash+1))); + memcpy(out_ccy, pair, blen); + } else { + memcpy(out_ccy, dash + 1, (strlen(dash+1) > 15 ? 15 : strlen(dash+1))); + memcpy(in_ccy, pair, blen); + } + } + int n = snprintf(fills_str + fills_pos, sizeof(fills_str) - fills_pos, + "%sL%d:%s %s %s->%s %.10g@%.6g(fee=%.6g %s lat=%.1fms)", + leg > 0 ? ", " : "", + leg, side, pair, in_ccy, out_ccy, + fills[leg][0], fills[leg][1], fills[leg][2], + sl->fee_currency, fills[leg][5]); + if (n > 0) fills_pos += n; + if (fills_pos >= (int)sizeof(fills_str) - 1) break; + } + + /* ── Emit report (single line) ── */ format_ts(ts_buf, sizeof(ts_buf)); const char *status = success ? "FILLED" : "FAILED"; char bps_str[32]; @@ -321,51 +412,14 @@ void executor_execute_triangle(executor_thread_t *et, executor_write_report( "%s %s | corr=%s | triangle=['%s'] | " "predicted_bps=%.2f | effective_bps=%s | " - "book_ts=%lld | profit=%.4f | timings=[] | fills=[", + "book_ts=%lld | profit=%.4f | timings=[%s] | " + "fills=[%s]%s%s\n", ts_buf, status, corr_id, sig->triangle_key, sig->predicted_bps, bps_str, - (long long)sig->book_ts_ms, profit); - - for (int leg = 0; leg < 3 && leg <= (success ? 2 : (fills[0][0] > 0 ? 0 : -1)); leg++) { - if (fills[leg][0] == 0 && !(leg == 2 && success)) break; - const signal_leg_t *sl = &sig->legs.legs[leg]; - char pair[32], side[8], in_ccy[16], out_ccy[16]; - strncpy(pair, sl->symbol, sizeof(pair) - 1); - strncpy(side, sl->side, sizeof(side) - 1); - if (strcmp(side, "buy") == 0) { - char *dash = strchr(pair, '-'); - if (dash) { - strncpy(in_ccy, dash + 1, sizeof(in_ccy) - 1); - size_t len = dash - pair; - if (len > sizeof(out_ccy) - 1) len = sizeof(out_ccy) - 1; - memcpy(out_ccy, pair, len); - out_ccy[len] = '\0'; - } - } else { - char *dash = strchr(pair, '-'); - if (dash) { - strncpy(out_ccy, dash + 1, sizeof(out_ccy) - 1); - size_t len = dash - pair; - if (len > sizeof(in_ccy) - 1) len = sizeof(in_ccy) - 1; - memcpy(in_ccy, pair, len); - in_ccy[len] = '\0'; - } - } - executor_write_report( - "%sL%d:%s %s %s->%s %.10g@%.6g(fee=%.6g %s lat=%.1fms)", - leg > 0 ? ", " : "", - leg, side, pair, in_ccy, out_ccy, - fills[leg][0], fills[leg][1], fills[leg][2], - sl->fee_currency, fills[leg][5]); - } - - if (error_str[0]) { - executor_write_report("] | error=%s\n", error_str); - } else { - executor_write_report("]\n"); - } - - /* Log timing */ + (long long)sig->book_ts_ms, profit, timings_str, + fills_str, + error_str[0] ? " | error=" : "", + error_str[0] ? error_str : ""); /* Release isolation slot */ for (int i = 0; i < MAX_IN_FLIGHT; i++) { if (strcmp(et->in_flight_triangles[i], sig->triangle_key) == 0) { diff --git a/src/executor.h b/src/executor.h index 3c0e597..f0a37c0 100644 --- a/src/executor.h +++ b/src/executor.h @@ -24,6 +24,9 @@ void executor_execute_triangle(executor_thread_t *et, /* Report status text. */ void executor_write_report(const char *fmt, ...); +/* Warm up the authenticated REST connection (keepalive). Returns true on success. */ +bool executor_keepalive(executor_thread_t *et); + /* Destroy and free. */ void executor_thread_destroy(executor_thread_t *et); diff --git a/src/log.c b/src/log.c index 5bdc9ad..99e56cb 100644 --- a/src/log.c +++ b/src/log.c @@ -20,9 +20,15 @@ #include static int log_pipe[2] = {-1, -1}; +static int log_file_fd = -1; static pthread_t log_thread; static atomic_bool log_running = false; +void log_set_file(const char *path) { + if (log_file_fd >= 0) close(log_file_fd); + log_file_fd = open(path, O_WRONLY | O_CREAT | O_APPEND, 0644); +} + /* Background thread: drains the pipe and writes each chunk to stderr. Spins on EAGAIN with 100us sleep when the pipe is empty. */ static void *log_worker(void *arg) { @@ -32,6 +38,8 @@ static void *log_worker(void *arg) { ssize_t n = read(log_pipe[0], buf, sizeof(buf)); if (n > 0) { write(STDERR_FILENO, buf, (size_t)n); + if (log_file_fd >= 0) + write(log_file_fd, buf, (size_t)n); } else if (n < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { usleep(100); @@ -63,6 +71,7 @@ void log_shutdown(void) { } if (log_pipe[0] >= 0) { close(log_pipe[0]); log_pipe[0] = -1; } if (log_pipe[1] >= 0) { close(log_pipe[1]); log_pipe[1] = -1; } + if (log_file_fd >= 0) { close(log_file_fd); log_file_fd = -1; } } /* Non-blocking log write. Formats a timestamped message into the pipe. diff --git a/src/log.h b/src/log.h index 637e35f..8996773 100644 --- a/src/log.h +++ b/src/log.h @@ -13,4 +13,7 @@ void log_shutdown(void); * Non-blocking after log_init(); falls back to synchronous fprintf. */ void log_write(const char *fmt, ...) __attribute__((format(printf, 1, 2))); +/* Set a log file path. Must be called before the first log_write. */ +void log_set_file(const char *path); + #endif diff --git a/src/main.c b/src/main.c index c8170ef..2476d97 100644 --- a/src/main.c +++ b/src/main.c @@ -60,6 +60,7 @@ int main(int argc, char *argv[]) { SSL_load_error_strings(); log_init(); + log_set_file("/tmp/engine.log"); log_write("[MAIN] Loading config from '%s'...\n", config_path); config_t cfg; diff --git a/src/rest_client.c b/src/rest_client.c index bfeb248..c7e9035 100644 --- a/src/rest_client.c +++ b/src/rest_client.c @@ -81,9 +81,11 @@ static int tcp_connect(const char *host, int port) { static int ensure_connected(rest_conn_t *rc) { if (rc->ssl && rc->fd >= 0) { - /* Quick check: is the connection still alive? */ + /* Check if the connection is still alive. + recv returns 0 on graceful close, -1 on reset, EAGAIN if alive with no data. */ char buf; - if (recv(rc->fd, &buf, 1, MSG_PEEK | MSG_DONTWAIT) == 0) { + int r = recv(rc->fd, &buf, 1, MSG_PEEK | MSG_DONTWAIT); + if (r == 0 || (r < 0 && errno != EAGAIN && errno != EWOULDBLOCK)) { goto reconnect; } return 0; /* Connection alive */