From 06706ca47958198657c5d7609f34988b6ce8d4ad Mon Sep 17 00:00:00 2001 From: nicolas Date: Wed, 27 May 2026 11:40:03 -0300 Subject: [PATCH] fix: timing legends, log timestamp cache, mask generation, fill drop warning, remove fast parser - Remove zero-alloc JSON fast parser (caused CPU increase, reverted to cJSON) - Add descriptive legends to timing fields (t-1_snapshot, t0_arrival, t1_signal, etc.) - Fix t2 missing when order fires at exec_start (guard on fills[l][0] not timing value) - Cache log_write timestamp to avoid time()+localtime_r() per call - Collapse 4 now_ms_impl() calls to 1 for WS mask generation - Add fill_drop_warn counter for lost fill events (rate-limited warning) - Add fill_drop_warn field to ws_client_t --- src/executor.c | 80 +++++++++++++++++++------------------------------ src/log.c | 39 ++++++++++++++++++++---- src/ws_client.c | 19 +++++++----- src/ws_client.h | 1 + 4 files changed, 78 insertions(+), 61 deletions(-) diff --git a/src/executor.c b/src/executor.c index 2b4dafc..5ae60a5 100644 --- a/src/executor.c +++ b/src/executor.c @@ -186,11 +186,14 @@ void executor_execute_triangle(executor_thread_t *et, fills[leg][4] = input_vol; + /* ── Capture order fire timing BEFORE sending (realtime) ── */ + int64_t t0 = now_mono_ms(); + if (leg < 6) leg_timings[leg * 2] = (double)(now_realtime_ms() - exec_start_rt); + /* ── Place order ── */ char order_id[32] = {0}; char err_msg[128] = {0}; bool ok = false; - int64_t t0 = now_mono_ms(); int64_t order_fire_ms_tracking = t0; if (sig->live) { @@ -213,9 +216,6 @@ void executor_execute_triangle(executor_thread_t *et, fills[leg][5] = (double)(now_mono_ms() - t0); - /* ── Timing: leg_order_fired ── */ - if (leg < 6) leg_timings[leg * 2] = (double)(now_mono_ms() - exec_start_mono); - /* ── ORDER output ── */ { format_ts(ts_buf, sizeof(ts_buf)); @@ -298,8 +298,8 @@ void executor_execute_triangle(executor_thread_t *et, } } - /* ── Timing: leg_fill_received ── */ - if (leg < 6) leg_timings[leg * 2 + 1] = (double)(now_mono_ms() - exec_start_mono); + /* ── Timing: leg_fill_received (realtime) ── */ + if (leg < 6) leg_timings[leg * 2 + 1] = (double)(now_realtime_ms() - exec_start_rt); /* ── FILL output ── */ format_ts(ts_buf, sizeof(ts_buf)); @@ -360,56 +360,38 @@ void executor_execute_triangle(executor_thread_t *et, int64_t total_ms = now_mono_ms() - exec_start_mono; - /* ── Build timing string ── */ - double exec_complete = (double)(now_mono_ms() - exec_start_mono); - char timings_str[384] = ""; + /* ── Build timing string (t0 = book_update_arrived = 0.0ms) ── */ + char timings_str[512] = ""; int tp = 0; + int64_t book_base = sig->t_arrive_ms; - /* Pipeline timings (relative to signal receive time, realtime clock) */ - if (sig->book_ts_ms > 0 && sig->ts_ms > 0) { - double t2 = (double)((int64_t)sig->book_ts_ms - exec_start_rt); - tp += snprintf(timings_str + tp, sizeof(timings_str) - tp, - "t-2_book_snapshot=%.1fms ", t2); - } - if (sig->t_sock_arrive_ms > 0) { - double sa = (double)((int64_t)sig->t_sock_arrive_ms - exec_start_rt); - tp += snprintf(timings_str + tp, sizeof(timings_str) - tp, - "socket_arrived=%.1fms ", sa); - } - if (sig->t_arrive_ms > 0) { - double ba = (double)((int64_t)sig->t_arrive_ms - exec_start_rt); - tp += snprintf(timings_str + tp, sizeof(timings_str) - tp, - "book_update_arrived=%.1fms ", ba); - } - if (sig->t_eval_ms > 0) { - double te = (double)((int64_t)sig->t_eval_ms - exec_start_rt); - tp += snprintf(timings_str + tp, sizeof(timings_str) - tp, - "t-1_eval_complete=%.1fms ", te); - } - if (sig->ts_ms > 0) { - double sc = (double)((int64_t)sig->ts_ms - exec_start_rt); - tp += snprintf(timings_str + tp, sizeof(timings_str) - tp, - "t_signal_created=%.1fms ", sc); - } - - tp += snprintf(timings_str + tp, sizeof(timings_str) - tp, - "signal_received=0.0ms "); - - /* Leg execution timings (monotonic, relative to exec start) */ - for (int l = 0; l < 3; l++) { - if (leg_timings[l * 2] > 0) { + if (book_base > 0) { + if (sig->book_ts_ms > 0) tp += snprintf(timings_str + tp, sizeof(timings_str) - tp, - "leg%d_order_fired=%.1fms ", l, leg_timings[l * 2]); - } - if (leg_timings[l * 2 + 1] > 0) { + "t-1_snapshot=%.1fms ", + (double)((int64_t)sig->book_ts_ms - book_base)); + + tp += snprintf(timings_str + tp, sizeof(timings_str) - tp, + "t0_arrival=0.0ms "); + + if (sig->ts_ms > 0) tp += snprintf(timings_str + tp, sizeof(timings_str) - tp, - "leg%d_fill_received=%.1fms ", l, leg_timings[l * 2 + 1]); + "t1_signal=%.1fms ", + (double)((int64_t)sig->ts_ms - book_base)); + + double sig_recv = (double)(exec_start_rt - book_base); + for (int l = 0; l < 3; l++) { + double o = leg_timings[l * 2]; + double f = leg_timings[l * 2 + 1]; + if (fills[l][0] > 0) { + tp += snprintf(timings_str + tp, sizeof(timings_str) - tp, + "t%d_leg%d_order=%.1fms ", 2 + l * 2, l, sig_recv + o); + tp += snprintf(timings_str + tp, sizeof(timings_str) - tp, + "t%d_leg%d_fill=%.1fms ", 3 + l * 2, l, sig_recv + f); + } } } - tp += snprintf(timings_str + tp, sizeof(timings_str) - tp, - "execution_complete=%.1fms", exec_complete); - /* ── Build fills string ── */ char fills_str[1024] = ""; int fills_pos = 0; diff --git a/src/log.c b/src/log.c index e19ccaf..3075d94 100644 --- a/src/log.c +++ b/src/log.c @@ -76,9 +76,11 @@ void log_shutdown(void) { /* Non-blocking log write. Formats a timestamped message into the pipe. Falls back to sync stderr write if the pipe has not been initialised. */ +static __thread char log_ts_cache[32] = ""; +static __thread time_t log_ts_sec = 0; + void log_write(const char *fmt, ...) { if (log_pipe[1] < 0) { - /* fallback: sync write to stderr */ va_list ap; va_start(ap, fmt); vfprintf(stderr, fmt, ap); @@ -88,9 +90,15 @@ void log_write(const char *fmt, ...) { char ts[32]; time_t t = time(NULL); - struct tm tm; - localtime_r(&t, &tm); - strftime(ts, sizeof(ts), "[%Y/%m/%d %H:%M:%S] ", &tm); + if (t != log_ts_sec) { + struct tm tm; + localtime_r(&t, &tm); + strftime(ts, sizeof(ts), "[%Y/%m/%d %H:%M:%S] ", &tm); + memcpy(log_ts_cache, ts, sizeof(log_ts_cache)); + log_ts_sec = t; + } else { + memcpy(ts, log_ts_cache, sizeof(log_ts_cache)); + } char buf[1536]; int ts_len = (int)strlen(ts); @@ -109,8 +117,29 @@ void log_write(const char *fmt, ...) { /* Write to stderr only — skip the log file. */ void log_write_screen(const char *fmt, ...) { + char ts[32]; + time_t t = time(NULL); + if (t != log_ts_sec) { + struct tm tm; + localtime_r(&t, &tm); + strftime(ts, sizeof(ts), "[%Y/%m/%d %H:%M:%S] ", &tm); + memcpy(log_ts_cache, ts, sizeof(log_ts_cache)); + log_ts_sec = t; + } else { + memcpy(ts, log_ts_cache, sizeof(log_ts_cache)); + } + + char buf[1536]; + int ts_len = (int)strlen(ts); + memcpy(buf, ts, (size_t)ts_len); + va_list ap; va_start(ap, fmt); - vfprintf(stderr, fmt, ap); + int msg_len = vsnprintf(buf + ts_len, sizeof(buf) - (size_t)ts_len, fmt, ap); va_end(ap); + + int total = ts_len + (msg_len > 0 ? msg_len : 0); + if (total > 0) { + write(STDERR_FILENO, buf, (size_t)total); + } } diff --git a/src/ws_client.c b/src/ws_client.c index 71dc62d..73bac58 100644 --- a/src/ws_client.c +++ b/src/ws_client.c @@ -438,10 +438,11 @@ static int ws_send_frame(ws_connection_t *conn, uint8_t opcode, } uint8_t mask[4]; - mask[0] = (uint8_t)(now_ms_impl() & 0xFF); - mask[1] = (uint8_t)((now_ms_impl() >> 8) & 0xFF); - mask[2] = (uint8_t)((now_ms_impl() >> 16) & 0xFF); - mask[3] = (uint8_t)((now_ms_impl() >> 24) & 0xFF); + uint64_t m_now = now_ms_impl(); + mask[0] = (uint8_t)(m_now & 0xFF); + mask[1] = (uint8_t)((m_now >> 8) & 0xFF); + mask[2] = (uint8_t)((m_now >> 16) & 0xFF); + mask[3] = (uint8_t)((m_now >> 24) & 0xFF); memcpy(header + hdr_len, mask, 4); hdr_len += 4; @@ -762,15 +763,19 @@ int16_t ws_client_process_frame(ws_client_t *client, uint32_t conn_idx) { fe.match_price = cJSON_IsString(mp) ? atof(mp->valuestring) : cJSON_IsNumber(mp) ? mp->valuedouble : 0; fe.match_fee = cJSON_IsString(mf) ? atof(mf->valuestring) : - cJSON_IsNumber(mf) ? mf->valuedouble : 0; - fill_channel_push(client->fill_ch, &fe); + cJSON_IsNumber(mf) ? mf->valuedouble : 0; + if (!fill_channel_push(client->fill_ch, &fe) && + ++client->fill_drop_warn <= 3) + log_write("[WS] Fill ring full, dropping match\n"); } else if (is_terminal) { /* Terminal event: push a marker that stops accumulation. A preceding "match" event already communicated the data. If no matches arrived, this is a cancelled+empty order. We push a terminal marker to unblock the waiter. */ fe.is_terminal = true; - fill_channel_push(client->fill_ch, &fe); + if (!fill_channel_push(client->fill_ch, &fe) && + ++client->fill_drop_warn <= 3) + log_write("[WS] Fill ring full, dropping terminal\n"); } } } diff --git a/src/ws_client.h b/src/ws_client.h index bc25f55..2bde3d6 100644 --- a/src/ws_client.h +++ b/src/ws_client.h @@ -73,6 +73,7 @@ typedef struct { evaluator_t *evaluator; /* pointer to evaluator */ bool running; /* false signals client to stop */ fill_channel_t *fill_ch; /* fill event channel (hot→executor) */ + int fill_drop_warn; /* rate-limited fill drop warning counter */ } ws_client_t; /* Initialise a WebSocket client with config, symbol table, books, and evaluator */