From d569063c7565ed30bdaacea4f88f92b32459ccb0 Mon Sep 17 00:00:00 2001 From: nicolas Date: Tue, 2 Jun 2026 09:42:13 -0300 Subject: [PATCH] fix: register reconnected WS socket in epoll, arm ping timer after connect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - events.c: EPOLL_CTL_MOD failure falls through to EPOLL_CTL_ADD (handles stale fd reuse after close) - events.c: defer timer_fd registration until ping_interval_ms is known (hot thread starts before WS connection is established) - events.h: replace unused next_ping_ms with ping_timer_armed flag - main.c: remove stale epoll entry before WS reconnect, re-register new socket with O_NONBLOCK - evaluate.c: fix order_param assignment — only overwrite with fills[leg][4] on buy legs (leg 0) and non-leg-0 entries; sell leg 0 preserves original quote_volume --- src/evaluate.c | 4 ++-- src/events.c | 24 +++++++++++++++++------- src/events.h | 2 +- src/main.c | 12 +++++++++--- 4 files changed, 29 insertions(+), 13 deletions(-) diff --git a/src/evaluate.c b/src/evaluate.c index 27e30fe..b33ce01 100644 --- a/src/evaluate.c +++ b/src/evaluate.c @@ -447,10 +447,10 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive bool use_bid = tri->use_bid[leg]; bool is_buy = !use_bid; - if (is_buy) { + if (is_buy && leg == 0) { snprintf(sl->order_param, sizeof(sl->order_param), "%.8g", sig.legs.legs[leg].quote_volume); - } else { + } else if (leg > 0) { snprintf(sl->order_param, sizeof(sl->order_param), "%.8g", fills[leg][4]); } sl->base_increment = tri->base_increment[leg]; diff --git a/src/events.c b/src/events.c index 194e20b..ceb6b9a 100644 --- a/src/events.c +++ b/src/events.c @@ -56,7 +56,13 @@ int event_loops_add_fd(epoll_set_t *set, int fd, fd_type_t type, .events = events, .data.ptr = &set->fds[i] }; - return epoll_ctl(set->epoll_fd, EPOLL_CTL_MOD, fd, &ev); + if (epoll_ctl(set->epoll_fd, EPOLL_CTL_MOD, fd, &ev) == 0) return 0; + // MOD failed (e.g. ENOENT: close() removed fd from kernel epoll). + // Re-use the same tracked entry with ADD. + set->fds[i].type = type; + set->fds[i].ws_conn_idx = ws_idx; + set->fds[i].user_data = user_data; + return epoll_ctl(set->epoll_fd, EPOLL_CTL_ADD, fd, &ev); } } @@ -161,13 +167,17 @@ void *event_hot_thread(void *arg) { } } - if (ws->connections[0].ping_interval_ms > 0) { - event_loops_add_fd(&loops->hot_epoll, loops->timer_fd, FD_TYPE_TIMER, - 0, NULL, EPOLLIN); - arm_ping_timer(loops, ws->connections[0].ping_interval_ms); - } - while (loops->running) { + /* Arm ping timer once the first WS token is received (hot thread + starts before WS connections are established, so ping_interval_ms + is 0 at initialisation). */ + if (!loops->ping_timer_armed && ws->connections[0].ping_interval_ms > 0) { + event_loops_add_fd(&loops->hot_epoll, loops->timer_fd, FD_TYPE_TIMER, + 0, NULL, EPOLLIN); + arm_ping_timer(loops, ws->connections[0].ping_interval_ms); + loops->ping_timer_armed = true; + } + int nfds = epoll_wait(loops->hot_epoll.epoll_fd, loops->hot_epoll.events, MAX_EPOLL_FDS, 100); if (nfds < 0) { diff --git a/src/events.h b/src/events.h index 3d10a75..d0ed211 100644 --- a/src/events.h +++ b/src/events.h @@ -42,7 +42,7 @@ typedef struct { int n_slots; /* number of executor slots */ executor_shared_t *executor_shared; /* shared executor state (in_flight + stale book) */ int timer_fd; /* timerfd for periodic tasks */ - uint64_t next_ping_ms; /* next scheduled WebSocket ping timestamp */ + bool ping_timer_armed; /* whether timer_fd is in the hot epoll set */ bool running; /* false signals event loops to exit */ } event_loops_t; diff --git a/src/main.c b/src/main.c index e15572b..1147ca5 100644 --- a/src/main.c +++ b/src/main.c @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include "config.h" @@ -209,9 +210,14 @@ int main(int argc, char *argv[]) { ws_connection_t *conn = &ws_client.connections[i]; if (conn->state == WS_STATE_DISCONNECTED && g_running) { uint64_t now = ws_client_now_ms(); - if (now - conn->last_activity_ms > 5000) { - log_write("[MAIN] Reconnecting WS %u...\n", i); - if (ws_client_connect(&ws_client, i) == 0) { + if (now - conn->last_activity_ms > 5000) { + log_write("[MAIN] Reconnecting WS %u...\n", i); + if (conn->fd >= 0) + event_loops_remove_fd(&events.hot_epoll, conn->fd); + if (ws_client_connect(&ws_client, i) == 0) { + int fl = fcntl(conn->fd, F_GETFL); + fcntl(conn->fd, F_SETFL, fl | O_NONBLOCK); + event_loops_add_fd(&events.hot_epoll, conn->fd, FD_TYPE_WS, i, NULL, EPOLLIN); if (conn->symbol_count > 0) { ws_client_subscribe(&ws_client, i, conn->symbol_indices, conn->symbol_count);