triangular_arbitrage_bot/src/events.c

457 lines
16 KiB
C

/*
* events.c - Epoll-based event loops for WebSocket I/O and signal dispatch
*
* Two-thread architecture:
* HOT thread: epoll_wait on WebSocket fds + timer fd for keep-alive pings
* COLD thread: polls SPSC signal queue + Unix domain socket to executor
*
* Signals flow: evaluate.c -> SPSC queue -> COLD thread -> executor via UDS
*/
#include "log.h"
#include "events.h"
#include "evaluate.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
#include <time.h>
#include <pthread.h>
#include <sys/un.h>
#include <sys/timerfd.h>
#include <sys/eventfd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
/* Set O_NONBLOCK on an fd. Returns 0 on success, -1 on error. */
static int set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) return -1;
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
/* Register an fd with the epoll set. If the fd is already tracked, modify
its events instead of re-adding. Returns 0 on success, -1 on error. */
int event_loops_add_fd(epoll_set_t *set, int fd, fd_type_t type,
uint32_t ws_idx, void *user_data, uint32_t events) {
if (set->fd_count >= MAX_EPOLL_FDS) {
log_write("[EVENTS] epoll set full\n");
return -1;
}
// If fd already tracked, modify instead of re-adding
for (uint32_t i = 0; i < set->fd_count; i++) {
if (set->fds[i].fd == fd) {
struct epoll_event ev = {
.events = events,
.data.ptr = &set->fds[i]
};
return epoll_ctl(set->epoll_fd, EPOLL_CTL_MOD, fd, &ev);
}
}
tracked_fd_t *tf = &set->fds[set->fd_count++];
tf->fd = fd;
tf->type = type;
tf->ws_conn_idx = ws_idx;
tf->user_data = user_data;
struct epoll_event ev = {
.events = events,
.data.ptr = tf
};
return epoll_ctl(set->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
}
/* Remove an fd from the epoll set and mark its tracked entry as unused. */
void event_loops_remove_fd(epoll_set_t *set, int fd) {
epoll_ctl(set->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
for (uint32_t i = 0; i < set->fd_count; i++) {
if (set->fds[i].fd == fd) {
set->fds[i].fd = -1;
return;
}
}
}
/* Initialise an epoll set: create epoll fd, zero the tracked fd array. */
static void epoll_set_init(epoll_set_t *set) {
memset(set, 0, sizeof(*set));
set->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (set->epoll_fd < 0) {
perror("epoll_create1");
exit(1);
}
}
/* Initialise both epoll sets (hot + cold), timer fd, and wakeup fd.
The cold epoll set monitors the wakeup eventfd for SPSC drain signals. */
int event_loops_init(event_loops_t *loops, ws_client_t *ws_client,
spsc_queue_t *signal_queue, const config_t *cfg, int wakeup_fd) {
memset(loops, 0, sizeof(*loops));
loops->ws_client = ws_client;
loops->signal_queue = signal_queue;
loops->running = true;
loops->unix_client_fd = -1;
loops->wakeup_fd = wakeup_fd;
epoll_set_init(&loops->hot_epoll);
epoll_set_init(&loops->cold_epoll);
loops->timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (loops->timer_fd < 0) {
perror("timerfd_create");
return -1;
}
event_loops_add_fd(&loops->cold_epoll, loops->wakeup_fd, FD_TYPE_EVENT,
0, NULL, EPOLLIN);
return 0;
}
/* Stop the event loops and close all fds (timer, eventfd, sockets, epolls). */
void event_loops_destroy(event_loops_t *loops) {
loops->running = false;
if (loops->timer_fd >= 0) close(loops->timer_fd);
if (loops->wakeup_fd >= 0) close(loops->wakeup_fd);
if (loops->unix_client_fd >= 0) close(loops->unix_client_fd);
if (loops->http_server_fd >= 0) close(loops->http_server_fd);
if (loops->hot_epoll.epoll_fd >= 0) close(loops->hot_epoll.epoll_fd);
if (loops->cold_epoll.epoll_fd >= 0) close(loops->cold_epoll.epoll_fd);
}
/* Connect to a Unix domain socket at the given path. Uses SOCK_NONBLOCK
with a poll-based 100ms timeout for the connection to complete.
Returns connected fd on success, -1 on failure. */
int unix_client_connect(const char *socket_path) {
int fd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (fd < 0) return -1;
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1);
if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
if (errno != EINPROGRESS) {
close(fd);
return -1;
}
struct pollfd pfd = { .fd = fd, .events = POLLOUT };
if (poll(&pfd, 1, 100) <= 0) { // 100 ms timeout
close(fd);
return -1;
}
}
return fd;
}
/* Create and bind a Unix domain stream socket server, remove stale socket
file first. Sets O_NONBLOCK on the listening fd. Returns fd, or -1. */
int unix_server_create(const char *socket_path) {
int fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0) return -1;
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1);
unlink(socket_path);
if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
close(fd);
return -1;
}
if (listen(fd, 5) < 0) {
close(fd);
return -1;
}
set_nonblocking(fd);
return fd;
}
/*
* Build a JSON signal message and send it to the external executor over a Unix socket.
*
* JSON structure:
* {
* "type": "signal",
* "correlation_id": "<hex>",
* "triangle_key": ["base","mid","quote"],
* "primary_quote": "<currency>",
* "live": true/false,
* "starting_volume": "<volume>",
* "legs": [{...}, {...}, {...}],
* "predicted_bps": <float>,
* "ts_ms", "book_ts_ms", "t_sock_arrive_ms", "t_arrive_ms", "t_eval_ms": <timestamp>,
* "books": [...] (snapshot, only when !live)
* }
*
* correlation_id is a mix of address/ts/bps values for best-effort uniqueness.
* Connects lazily on first signal; reconnects on write failure.
*/
static void send_signal_to_executor(event_loops_t *loops, signal_entry_t *sig) {
if (loops->unix_client_fd < 0) {
loops->unix_client_fd = unix_client_connect(loops->ws_client->cfg->executor_socket_path);
if (loops->unix_client_fd < 0) {
log_write("[EVENTS] Cannot connect to executor at %s\n",
loops->ws_client->cfg->executor_socket_path);
return;
}
event_loops_add_fd(&loops->cold_epoll, loops->unix_client_fd,
FD_TYPE_UNIX_CLIENT, 0, NULL, EPOLLIN);
}
char json_buf[4096];
char corr_id[37];
snprintf(corr_id, sizeof(corr_id),
"%08x%08x%08x%08x",
(unsigned)(uintptr_t)&sig->legs.legs[0] ^ (unsigned)sig->ts_ms,
(unsigned)sig->ts_ms ^ (unsigned)sig->book_ts_ms,
(unsigned)sig->predicted_bps,
(unsigned)sig->t_arrive_ms);
char legs_json[1024];
legs_json[0] = '\0';
for (uint8_t l = 0; l < 3; l++) {
const signal_leg_t *sl = &sig->legs.legs[l];
char tmp[384];
snprintf(tmp, sizeof(tmp),
"%s{\"pair\":\"%s\",\"side\":\"%s\","
"\"order_param\":\"%s\","
"\"fee_rate\":%.6f,\"fee_currency\":\"%s\","
"\"base_increment\":\"%.10g\",\"quote_increment\":\"%.10g\",\"base_min_size\":\"%.10g\"}",
l ? "," : "", sl->symbol, sl->side,
sl->order_param,
sl->fee_rate, sl->fee_currency,
sl->base_increment, sl->quote_increment, sl->base_min_size);
strncat(legs_json, tmp, sizeof(legs_json) - 1);
}
// triangle_key as JSON array ["base","mid","quote"]
char triangle_key_json[96];
{
char parts[3][16] = {{0}};
const char *tk = sig->triangle_key;
const char *s1 = strchr(tk, '/');
const char *s2 = s1 ? strchr(s1 + 1, '/') : NULL;
if (s1 && s2) {
uint32_t l1 = s1 - tk;
if (l1 > 15) l1 = 15;
memcpy(parts[0], tk, l1);
uint32_t l2 = s2 - s1 - 1;
if (l2 > 15) l2 = 15;
memcpy(parts[1], s1 + 1, l2);
strncpy(parts[2], s2 + 1, 15);
snprintf(triangle_key_json, sizeof(triangle_key_json),
"[\"%s\",\"%s\",\"%s\"]", parts[0], parts[1], parts[2]);
} else {
snprintf(triangle_key_json, sizeof(triangle_key_json), "[\"%s\"]", tk);
}
}
// Full book snapshot included when !live (paper trading mode)
char books_json_str[2048] = "";
if (!sig->live && sig->book_count > 0) {
char *bp = books_json_str;
size_t rem = sizeof(books_json_str);
for (uint8_t b = 0; b < sig->book_count; b++) {
const signal_book_t *sb = &sig->books[b];
char bid_arr[256] = {0}, ask_arr[256] = {0};
for (uint8_t lev = 0; lev < sb->bid_count; lev++) {
char tmp[64];
snprintf(tmp, sizeof(tmp), "%s{\"price\":\"%.6g\",\"size\":\"%.8g\"}",
lev ? "," : "", sb->bids[lev].price, sb->bids[lev].size);
strncat(bid_arr, tmp, sizeof(bid_arr) - 1);
}
for (uint8_t lev = 0; lev < sb->ask_count; lev++) {
char tmp[64];
snprintf(tmp, sizeof(tmp), "%s{\"price\":\"%.6g\",\"size\":\"%.8g\"}",
lev ? "," : "", sb->asks[lev].price, sb->asks[lev].size);
strncat(ask_arr, tmp, sizeof(ask_arr) - 1);
}
int n = snprintf(bp, rem,
"%s{\"symbol\":\"%s\",\"bids\":[%s],\"asks\":[%s],\"ts_ms\":%lld}",
b ? "," : "", sb->symbol, bid_arr, ask_arr, (long long)sb->ts_ms);
if (n > 0 && (size_t)n < rem) { bp += n; rem -= (size_t)n; }
}
}
snprintf(json_buf, sizeof(json_buf),
"{\"type\":\"signal\",\"correlation_id\":\"%s\","
"\"triangle_key\":%s,\"primary_quote\":\"%s\","
"\"live\":%s,\"starting_volume\":\"%.8g\","
"\"legs\":[%s],\"predicted_bps\":%.4f,"
"\"ts_ms\":%lld,\"book_ts_ms\":%lld,\"t_sock_arrive_ms\":%lld,\"t_arrive_ms\":%lld,\"t_eval_ms\":%lld"
"%s%s%s"
"}\n",
corr_id, triangle_key_json, sig->primary_quote,
sig->live ? "true" : "false", sig->starting_volume,
legs_json, sig->predicted_bps,
(long long)sig->ts_ms, (long long)sig->book_ts_ms,
(long long)sig->t_sock_arrive_ms,
(long long)sig->t_arrive_ms, (long long)sig->t_eval_ms,
(sig->live || sig->book_count == 0) ? "" : ",\"books\":[",
books_json_str[0] ? books_json_str : "",
(sig->live || sig->book_count == 0) ? "" : "]");
size_t to_send = strlen(json_buf);
size_t sent = 0;
while (sent < to_send) {
int r = (int)write(loops->unix_client_fd, json_buf + sent, to_send - sent);
if (r > 0) {
sent += (size_t)r;
continue;
}
if (r == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) {
log_write("[EVENTS] Write to executor failed, reconnecting\n");
int old_fd = loops->unix_client_fd;
loops->unix_client_fd = -1;
close(old_fd);
event_loops_remove_fd(&loops->cold_epoll, old_fd);
break;
}
/* EAGAIN: executor buffer full, drop this signal and move on */
break;
}
}
static void arm_ping_timer(event_loops_t *loops, uint64_t interval_ms) {
if (interval_ms == 0) return;
struct itimerspec its = {0};
its.it_value.tv_sec = interval_ms / 1000;
its.it_value.tv_nsec = (interval_ms % 1000) * 1000000;
timerfd_settime(loops->timer_fd, 0, &its, NULL);
}
/*
* HOT thread: epoll-driven WebSocket I/O.
* Monitors WS connection fds for incoming data and ping timer for keep-alive.
* Sends ping frames to all connected WS connections on timer expiry.
*/
void *event_hot_thread(void *arg) {
event_loops_t *loops = (event_loops_t *)arg;
ws_client_t *ws = loops->ws_client;
log_write("[HOT] Thread started\n");
for (uint32_t i = 0; i < ws->connection_count; i++) {
ws_connection_t *conn = &ws->connections[i];
if (conn->fd >= 0) {
set_nonblocking(conn->fd);
event_loops_add_fd(&loops->hot_epoll, conn->fd, FD_TYPE_WS, i, NULL, EPOLLIN);
}
}
if (ws->connections[0].ping_interval_ms > 0) {
event_loops_add_fd(&loops->hot_epoll, loops->timer_fd, FD_TYPE_TIMER,
0, NULL, EPOLLIN);
arm_ping_timer(loops, ws->connections[0].ping_interval_ms);
}
while (loops->running) {
int nfds = epoll_wait(loops->hot_epoll.epoll_fd,
loops->hot_epoll.events, MAX_EPOLL_FDS, 100);
if (nfds < 0) {
if (errno == EINTR) continue;
perror("epoll_wait hot");
break;
}
for (int i = 0; i < nfds; i++) {
tracked_fd_t *tf = (tracked_fd_t *)loops->hot_epoll.events[i].data.ptr;
if (!tf || tf->fd < 0) continue;
if (tf->type == FD_TYPE_WS) {
ws_client_read(ws, tf->ws_conn_idx);
} else if (tf->type == FD_TYPE_TIMER) {
uint64_t expirations = 0;
read(loops->timer_fd, &expirations, sizeof(expirations));
for (uint32_t c = 0; c < ws->connection_count; c++) {
ws_connection_t *conn = &ws->connections[c];
if (conn->state == WS_STATE_CONNECTED) {
ws_client_send_ping(conn);
}
}
if (ws->connections[0].ping_interval_ms > 0) {
arm_ping_timer(loops, ws->connections[0].ping_interval_ms);
}
}
}
}
log_write("[HOT] Thread exited\n");
return NULL;
}
/*
* COLD thread: drain SPSC signal queue and forward to executor.
* Uses epoll_wait on the Unix client fd to detect disconnection.
* Priority: drains queue before and after epoll to minimize latency.
*/
void *event_cold_thread(void *arg) {
event_loops_t *loops = (event_loops_t *)arg;
log_write("[COLD] Thread started\n");
while (loops->running) {
while (!spsc_empty(loops->signal_queue)) {
signal_entry_t sig;
if (spsc_pop(loops->signal_queue, &sig)) {
send_signal_to_executor(loops, &sig);
}
}
int nfds = epoll_wait(loops->cold_epoll.epoll_fd,
loops->cold_epoll.events, MAX_EPOLL_FDS, 200);
if (nfds < 0) {
if (errno == EINTR) continue;
perror("epoll_wait cold");
break;
}
for (int i = 0; i < nfds; i++) {
tracked_fd_t *tf = (tracked_fd_t *)loops->cold_epoll.events[i].data.ptr;
if (!tf || tf->fd < 0) continue;
uint32_t ev = loops->cold_epoll.events[i].events;
if (tf->type == FD_TYPE_EVENT) {
uint64_t val = 0;
read(loops->wakeup_fd, &val, sizeof(val));
continue;
}
if (tf->type == FD_TYPE_UNIX_CLIENT) {
if (ev & (EPOLLERR | EPOLLHUP)) {
log_write("[COLD] Executor disconnected\n");
close(loops->unix_client_fd);
loops->unix_client_fd = -1;
event_loops_remove_fd(&loops->cold_epoll, tf->fd);
continue;
}
}
}
// Drain again after epoll to catch any signals queued during processing
while (!spsc_empty(loops->signal_queue)) {
signal_entry_t sig;
if (spsc_pop(loops->signal_queue, &sig)) {
send_signal_to_executor(loops, &sig);
}
}
}
log_write("[COLD] Thread exited\n");
return NULL;
}