/* * events.c - Epoll-based event loops for WebSocket I/O and signal dispatch * * Two-thread architecture: * HOT thread: epoll_wait on WebSocket fds + timer fd for keep-alive pings * EXECUTOR thread: polls SPSC signal queue + fill wake fd, executes triangles * * Signals flow: evaluate.c -> SPSC queue -> executor thread (direct execution) */ #include "log.h" #include "events.h" #include "evaluate.h" #include "executor.h" #include #include static int64_t now_mono_ms(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (int64_t)ts.tv_sec * 1000 + (int64_t)ts.tv_nsec / 1000000; } #include #include #include #include #include #include #include #include #include #include #include #include /* Set O_NONBLOCK on an fd. Returns 0 on success, -1 on error. */ static int set_nonblocking(int fd) { int flags = fcntl(fd, F_GETFL, 0); if (flags < 0) return -1; return fcntl(fd, F_SETFL, flags | O_NONBLOCK); } /* Register an fd with the epoll set. If the fd is already tracked, modify its events instead of re-adding. Returns 0 on success, -1 on error. */ int event_loops_add_fd(epoll_set_t *set, int fd, fd_type_t type, uint32_t ws_idx, void *user_data, uint32_t events) { if (set->fd_count >= MAX_EPOLL_FDS) { log_write("[EVENTS] epoll set full\n"); return -1; } // If fd already tracked, modify instead of re-adding for (uint32_t i = 0; i < set->fd_count; i++) { if (set->fds[i].fd == fd) { struct epoll_event ev = { .events = events, .data.ptr = &set->fds[i] }; return epoll_ctl(set->epoll_fd, EPOLL_CTL_MOD, fd, &ev); } } tracked_fd_t *tf = &set->fds[set->fd_count++]; tf->fd = fd; tf->type = type; tf->ws_conn_idx = ws_idx; tf->user_data = user_data; struct epoll_event ev = { .events = events, .data.ptr = tf }; return epoll_ctl(set->epoll_fd, EPOLL_CTL_ADD, fd, &ev); } /* Remove an fd from the epoll set and mark its tracked entry as unused. */ void event_loops_remove_fd(epoll_set_t *set, int fd) { epoll_ctl(set->epoll_fd, EPOLL_CTL_DEL, fd, NULL); for (uint32_t i = 0; i < set->fd_count; i++) { if (set->fds[i].fd == fd) { set->fds[i].fd = -1; return; } } } /* Initialise an epoll set: create epoll fd, zero the tracked fd array. */ static void epoll_set_init(epoll_set_t *set) { memset(set, 0, sizeof(*set)); set->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (set->epoll_fd < 0) { perror("epoll_create1"); exit(1); } } /* Initialise both epoll sets (hot + cold), timer fd, and wakeup fd. The cold epoll set monitors the wakeup eventfd for SPSC drain signals. */ int event_loops_init(event_loops_t *loops, ws_client_t *ws_client, const config_t *cfg, executor_slot_t *slots, int n_slots) { memset(loops, 0, sizeof(*loops)); loops->ws_client = ws_client; loops->slots = slots; loops->n_slots = n_slots; loops->running = true; loops->executor_shared = calloc(1, sizeof(executor_shared_t)); if (loops->executor_shared) { pthread_mutex_init(&loops->executor_shared->lock, NULL); } epoll_set_init(&loops->hot_epoll); epoll_set_init(&loops->cold_epoll); loops->timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); if (loops->timer_fd < 0) { perror("timerfd_create"); return -1; } return 0; } /* Stop the event loops and close all fds (timer, eventfd, sockets, epolls). */ void event_loops_destroy(event_loops_t *loops) { loops->running = false; if (loops->timer_fd >= 0) close(loops->timer_fd); if (loops->executor_shared) { pthread_mutex_destroy(&loops->executor_shared->lock); free(loops->executor_shared); loops->executor_shared = NULL; } if (loops->hot_epoll.epoll_fd >= 0) close(loops->hot_epoll.epoll_fd); if (loops->cold_epoll.epoll_fd >= 0) close(loops->cold_epoll.epoll_fd); } static void arm_ping_timer(event_loops_t *loops, uint64_t interval_ms) { if (interval_ms == 0) return; struct itimerspec its = {0}; its.it_value.tv_sec = interval_ms / 1000; its.it_value.tv_nsec = (interval_ms % 1000) * 1000000; timerfd_settime(loops->timer_fd, 0, &its, NULL); } /* * HOT thread: epoll-driven WebSocket I/O. * Monitors WS connection fds for incoming data and ping timer for keep-alive. * Sends ping frames to all connected WS connections on timer expiry. */ void *event_hot_thread(void *arg) { event_loops_t *loops = (event_loops_t *)arg; ws_client_t *ws = loops->ws_client; log_write("[HOT] Thread started\n"); for (uint32_t i = 0; i < ws->connection_count; i++) { ws_connection_t *conn = &ws->connections[i]; if (conn->fd >= 0) { set_nonblocking(conn->fd); event_loops_add_fd(&loops->hot_epoll, conn->fd, FD_TYPE_WS, i, NULL, EPOLLIN); } } if (ws->connections[0].ping_interval_ms > 0) { event_loops_add_fd(&loops->hot_epoll, loops->timer_fd, FD_TYPE_TIMER, 0, NULL, EPOLLIN); arm_ping_timer(loops, ws->connections[0].ping_interval_ms); } while (loops->running) { int nfds = epoll_wait(loops->hot_epoll.epoll_fd, loops->hot_epoll.events, MAX_EPOLL_FDS, 100); if (nfds < 0) { if (errno == EINTR) continue; perror("epoll_wait hot"); break; } for (int i = 0; i < nfds; i++) { tracked_fd_t *tf = (tracked_fd_t *)loops->hot_epoll.events[i].data.ptr; if (!tf || tf->fd < 0) continue; if (tf->type == FD_TYPE_WS) { ws_client_read(ws, tf->ws_conn_idx); } else if (tf->type == FD_TYPE_TIMER) { uint64_t expirations = 0; if (read(loops->timer_fd, &expirations, sizeof(expirations)) < 0) {} for (uint32_t c = 0; c < ws->connection_count; c++) { ws_connection_t *conn = &ws->connections[c]; if (conn->state == WS_STATE_CONNECTED) { ws_client_send_ping(conn); } } if (ws->connections[0].ping_interval_ms > 0) { arm_ping_timer(loops, ws->connections[0].ping_interval_ms); } } } } log_write("[HOT] Thread exited\n"); return NULL; } /* * Per-executor-thread entry point: creates its own executor_thread_t, * polls its private slot eventfd + the shared fill channel eventfd, * and executes one triangle per signal. */ void *event_executor_thread(void *arg) { executor_thread_arg_t *ta = (executor_thread_arg_t *)arg; event_loops_t *loops = ta->loops; executor_slot_t *slot = ta->slot; executor_thread_t *exec = executor_thread_create(loops->ws_client->cfg, loops->ws_client->fill_ch, loops->ws_client, loops->executor_shared, slot); if (!exec) { log_write("[EXEC] Failed to create executor\n"); return NULL; } int fill_wake_fd = fill_channel_wake_fd(loops->ws_client->fill_ch); struct pollfd pfds[2]; memset(pfds, 0, sizeof(pfds)); pfds[0].fd = slot->eventfd; pfds[0].events = POLLIN; pfds[1].fd = fill_wake_fd; pfds[1].events = POLLIN; int64_t last_keepalive_ms = 0; while (loops->running) { /* Fast path: check slot state without poll */ if (atomic_load_explicit(&slot->state, memory_order_acquire) == EXECUTOR_SLOT_READY) { signal_entry_t sig = slot->signal; atomic_store_explicit(&slot->state, EXECUTOR_SLOT_FREE, memory_order_release); /* Drain eventfd so poll can block next time */ uint64_t val; if (read(slot->eventfd, &val, sizeof(val)) < 0) {} executor_execute_triangle(exec, &sig); int64_t now = now_mono_ms(); if (last_keepalive_ms == 0 || now - last_keepalive_ms >= 30000) { executor_keepalive(exec); last_keepalive_ms = now_mono_ms(); } continue; } /* Slow path: poll with 30s timeout for keepalive */ int poll_timeout = 30000; if (last_keepalive_ms == 0) poll_timeout = 100; int nfds = poll(pfds, 2, poll_timeout); if (nfds < 0) { if (errno == EINTR) continue; log_write("[EXEC] poll error: %s\n", strerror(errno)); break; } /* Drain slot eventfd */ if (pfds[0].revents & POLLIN) { uint64_t val; if (read(slot->eventfd, &val, sizeof(val)) < 0) {} } /* Drain fill channel wake */ if (pfds[1].revents & POLLIN) { uint64_t val; if (read(fill_wake_fd, &val, sizeof(val)) < 0) {} } /* Keepalive */ int64_t now = now_mono_ms(); if (last_keepalive_ms == 0 || now - last_keepalive_ms >= 30000) { executor_keepalive(exec); last_keepalive_ms = now_mono_ms(); } } executor_thread_destroy(exec); log_write("[EXEC] Thread exited\n"); return NULL; } /* Legacy single-thread entry point — delegates to event_executor_thread semantics but creates its own shared state. */ void *event_cold_thread(void *arg) { return event_executor_thread(arg); }