From b714ac132e459b6a0f9e2bbdb6252b55c0c4c56d Mon Sep 17 00:00:00 2001 From: nicolas Date: Thu, 28 May 2026 22:15:59 -0300 Subject: [PATCH] perf: replace SPSC signal queue with per-executor atomic slot delivery --- README.md | 16 +++++---- src/evaluate.c | 8 +++-- src/evaluate.h | 9 ++--- src/events.c | 97 ++++++++++++++++++++++++++------------------------ src/events.h | 17 ++++++--- src/executor.c | 9 +++-- src/executor.h | 6 ++-- src/main.c | 49 ++++++++++++++++--------- src/queue.c | 90 ---------------------------------------------- src/queue.h | 93 ----------------------------------------------- src/slot.c | 17 +++++++++ src/slot.h | 20 +++++++++++ src/triangle.h | 60 +++++++++++++++++++++++++++++++ 13 files changed, 220 insertions(+), 271 deletions(-) delete mode 100644 src/queue.c delete mode 100644 src/queue.h create mode 100644 src/slot.c create mode 100644 src/slot.h diff --git a/README.md b/README.md index b5234ea..760e104 100644 --- a/README.md +++ b/README.md @@ -5,20 +5,22 @@ Single C binary — WebSocket book feeds, triangle evaluation, and order placeme ## Architecture -Monolithic single-process design using three pthreads: +Monolithic single-process design using N+1 pthreads: | Thread | Role | |---|---| | **Hot thread** | WebSocket I/O via epoll, order book maintenance, book update dispatch to evaluator | -| **Evaluator** (embedded in hot thread) | Triangle profitability evaluation on every book update, pushes signals to SPSC queue | -| **Executor thread(s)** | Pops signals from queue, places 3-leg HF market orders via KuCoin REST API, waits for fills via WebSocket, optional balance-wait between legs | +| **Evaluator** (embedded in hot thread) | Triangle profitability evaluation on every book update, delivers to first free executor slot | +| **Executor thread(s)** | Spin on per-thread slot via atomic state machine; pick up signal, place 3-leg HF market orders via KuCoin REST API, wait for fills via WebSocket, optional balance-wait between legs | ``` -[KuCoin WS] ──▶ Hot Thread ──▶ Evaluator ──▶ SPSC Queue ──▶ Executor Thread(s) - │ | +[KuCoin WS] ──▶ Hot Thread ──▶ Evaluator ──▶ Slot[0..N] ──▶ Executor Thread(s) + │ (CAS state machine) │ Order Book KuCoin REST ``` +Each executor thread owns one slot. The evaluator writes the signal directly into the slot via a CAS state machine (FREE → CLAIMED → READY). The executor picks it up on the next spin iteration (nanoseconds). If all slots are busy or no free slot available, the signal is dropped — no queueing. + The evaluator runs **in-process** on every book update (no cooldown). The executor does **not** re-evaluate — it trusts the signal as valid at emission time. @@ -30,7 +32,7 @@ The executor does **not** re-evaluate — it trusts the signal as valid at emiss | Order book store | Complete — bid/ask cache with sequence tracking | | Triangle enumeration | Complete — builder pattern from /api/v2/symbols | | Profitability evaluation | Complete — paper-trade simulation cascade matching executor | -| SPSC signal queue | Complete — lock-free ring buffer + eventfd wakeup | +| Per-slot signal delivery | Complete — per-thread slot with atomic CAS state machine, no queue | | Order execution | Complete — HF market orders, fill event wait, balance wait | | Fill channel | Complete — eventfd-based cross-thread fill delivery | | Concurrent execution | Optional — configurable `concurrent_slots` (default 1) | @@ -91,7 +93,7 @@ tri_arb/ │ ├── fill_handler.c/h # Cross-thread fill event channel (SPSC ring + eventfd) │ ├── rest_client.c/h # KuCoin REST API client (signed requests, keepalive) │ ├── http_client.c/h # One-shot HTTPS requests (token fetch, fee table) -│ ├── queue.c/h # SPSC lock-free queue (evaluator to executor) +│ ├── slot.c/h # Per-executor signal slot with atomic CAS delivery │ ├── config.c/h # YAML config parser │ ├── hash.c/h # Hash table │ ├── cJSON.c/h # JSON parser diff --git a/src/evaluate.c b/src/evaluate.c index 604036d..fd25ed3 100644 --- a/src/evaluate.c +++ b/src/evaluate.c @@ -14,6 +14,7 @@ #include "log.h" #include "evaluate.h" +#include "slot.h" #include #include #include @@ -40,11 +41,12 @@ static double apply_increment_floor(double vol, double inc) { void evaluator_init(evaluator_t *ev, const triangle_set_t *triangles, const order_book_t *books, const config_t *cfg, - spsc_queue_t *queue, bool kcs_discount) { + executor_slot_t *slots, int n_slots, bool kcs_discount) { ev->triangles = triangles; ev->books = books; ev->cfg = cfg; - ev->queue = queue; + ev->slots = slots; + ev->n_slots = n_slots; ev->fee_mult = kcs_discount ? 0.8 : 1.0; memset(&ev->stats, 0, sizeof(ev->stats)); ev->stats.best_net_bps = -1e18; @@ -470,7 +472,7 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive sl->exchange_rate = rates[leg]; } - if (spsc_push(ev->queue, &sig)) { + if (slot_deliver(ev->slots, ev->n_slots, &sig)) { ev->stats.signals_fired++; ev->stats.last_eval_ts_ms = now; log_write("[SIGNAL] %.4f bps vol=%s | %s (%s, %s, %s)\n", diff --git a/src/evaluate.h b/src/evaluate.h index 039685c..9443f7e 100644 --- a/src/evaluate.h +++ b/src/evaluate.h @@ -6,7 +6,7 @@ #include "book.h" #include "triangle.h" #include "config.h" -#include "queue.h" +#include "slot.h" /* Aggregated evaluation statistics for monitoring */ typedef struct { @@ -25,15 +25,16 @@ typedef struct { const triangle_set_t *triangles; /* pre-enumerated triangle set (read-only) */ const order_book_t *books; /* live order books array (read-only) */ const config_t *cfg; /* application configuration (read-only) */ - spsc_queue_t *queue; /* signal queue for firing opportunities */ + executor_slot_t *slots; /* executor slots array */ + int n_slots; /* number of slots */ eval_stats_t stats; /* cumulative evaluation statistics */ double fee_mult; /* combined fee multiplier (includes KCS discount) */ } evaluator_t; -/* Initialise evaluator with triangle set, books, config, and signal queue */ +/* Initialise evaluator with triangle set, books, config, and executor slots */ void evaluator_init(evaluator_t *ev, const triangle_set_t *triangles, const order_book_t *books, const config_t *cfg, - spsc_queue_t *queue, bool kcs_discount); + executor_slot_t *slots, int n_slots, bool kcs_discount); /* Evaluate all triangles involving the given symbol; returns true if a signal was fired */ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive_ms, int64_t t_arrive_ms); diff --git a/src/events.c b/src/events.c index a2eaa09..7dd3f10 100644 --- a/src/events.c +++ b/src/events.c @@ -97,17 +97,16 @@ static void epoll_set_init(epoll_set_t *set) { /* Initialise both epoll sets (hot + cold), timer fd, and wakeup fd. The cold epoll set monitors the wakeup eventfd for SPSC drain signals. */ int event_loops_init(event_loops_t *loops, ws_client_t *ws_client, - spsc_queue_t *signal_queue, const config_t *cfg, int wakeup_fd) { + const config_t *cfg, executor_slot_t *slots, int n_slots) { memset(loops, 0, sizeof(*loops)); loops->ws_client = ws_client; - loops->signal_queue = signal_queue; + loops->slots = slots; + loops->n_slots = n_slots; loops->running = true; - loops->wakeup_fd = wakeup_fd; loops->executor_shared = calloc(1, sizeof(executor_shared_t)); if (loops->executor_shared) { pthread_mutex_init(&loops->executor_shared->lock, NULL); - pthread_mutex_init(&loops->executor_shared->queue_lock, NULL); } epoll_set_init(&loops->hot_epoll); @@ -119,9 +118,6 @@ int event_loops_init(event_loops_t *loops, ws_client_t *ws_client, return -1; } - event_loops_add_fd(&loops->cold_epoll, loops->wakeup_fd, FD_TYPE_EVENT, - 0, NULL, EPOLLIN); - return 0; } @@ -129,10 +125,8 @@ int event_loops_init(event_loops_t *loops, ws_client_t *ws_client, void event_loops_destroy(event_loops_t *loops) { loops->running = false; if (loops->timer_fd >= 0) close(loops->timer_fd); - if (loops->wakeup_fd >= 0) close(loops->wakeup_fd); if (loops->executor_shared) { pthread_mutex_destroy(&loops->executor_shared->lock); - pthread_mutex_destroy(&loops->executor_shared->queue_lock); free(loops->executor_shared); loops->executor_shared = NULL; } @@ -212,72 +206,81 @@ void *event_hot_thread(void *arg) { /* * Per-executor-thread entry point: creates its own executor_thread_t, - * polls the shared SPSC signal queue, and executes triangles. - * Multiple threads run concurrently, sharing the same executor_shared_t - * for concurrency isolation (via shared in_flight table under mutex). + * polls its private slot eventfd + the shared fill channel eventfd, + * and executes one triangle per signal. */ void *event_executor_thread(void *arg) { - event_loops_t *loops = (event_loops_t *)arg; + executor_thread_arg_t *ta = (executor_thread_arg_t *)arg; + event_loops_t *loops = ta->loops; + executor_slot_t *slot = ta->slot; executor_thread_t *exec = executor_thread_create(loops->ws_client->cfg, loops->ws_client->fill_ch, loops->ws_client, - loops->executor_shared); + loops->executor_shared, + slot); if (!exec) { log_write("[EXEC] Failed to create executor\n"); return NULL; } - struct pollfd fds[2] = { - { .fd = loops->wakeup_fd, .events = POLLIN }, - { .fd = fill_channel_wake_fd(loops->ws_client->fill_ch), .events = POLLIN }, - }; + int fill_wake_fd = fill_channel_wake_fd(loops->ws_client->fill_ch); + struct pollfd pfds[2]; + memset(pfds, 0, sizeof(pfds)); + pfds[0].fd = slot->eventfd; + pfds[0].events = POLLIN; + pfds[1].fd = fill_wake_fd; + pfds[1].events = POLLIN; + int64_t last_keepalive_ms = 0; while (loops->running) { - int64_t now_ka = now_mono_ms(); - int poll_timeout = 200; - if (last_keepalive_ms == 0 || now_ka - last_keepalive_ms >= 30000) { - poll_timeout = 100; + /* Fast path: check slot state without poll */ + if (atomic_load_explicit(&slot->state, memory_order_acquire) == EXECUTOR_SLOT_READY) { + signal_entry_t sig = slot->signal; + atomic_store_explicit(&slot->state, EXECUTOR_SLOT_FREE, memory_order_release); + /* Drain eventfd so poll can block next time */ + uint64_t val; + if (read(slot->eventfd, &val, sizeof(val)) < 0) {} + + executor_execute_triangle(exec, &sig); + + int64_t now = now_mono_ms(); + if (last_keepalive_ms == 0 || now - last_keepalive_ms >= 30000) { + executor_keepalive(exec); + last_keepalive_ms = now_mono_ms(); + } + continue; } - int nfds = poll(fds, 2, poll_timeout); + /* Slow path: poll with 30s timeout for keepalive */ + int poll_timeout = 30000; + if (last_keepalive_ms == 0) poll_timeout = 100; + + int nfds = poll(pfds, 2, poll_timeout); if (nfds < 0) { if (errno == EINTR) continue; log_write("[EXEC] poll error: %s\n", strerror(errno)); break; } - /* Drain fill wakeup */ - if (fds[1].revents & POLLIN) { + /* Drain slot eventfd */ + if (pfds[0].revents & POLLIN) { uint64_t val; - if (read(fds[1].fd, &val, sizeof(val)) < 0) {} + if (read(slot->eventfd, &val, sizeof(val)) < 0) {} + } + /* Drain fill channel wake */ + if (pfds[1].revents & POLLIN) { + uint64_t val; + if (read(fill_wake_fd, &val, sizeof(val)) < 0) {} } - /* Drain signal eventfd — reset counter so poll() can block. - Must happen AFTER the pop attempt so any signal arriving during - pop is not drained away (it will be caught next poll iteration). */ - if (fds[0].revents & POLLIN) { - uint64_t val; - if (read(fds[0].fd, &val, sizeof(val)) < 0) {} - } - - /* Keepalive: warm up REST connection every 30s */ - if (now_ka - last_keepalive_ms >= 30000 || last_keepalive_ms == 0) { + /* Keepalive */ + int64_t now = now_mono_ms(); + if (last_keepalive_ms == 0 || now - last_keepalive_ms >= 30000) { executor_keepalive(exec); last_keepalive_ms = now_mono_ms(); } - - /* Pop and execute one signal at a time (non-blocking) */ - signal_entry_t sig; - bool got = false; - pthread_mutex_lock(&loops->executor_shared->queue_lock); - if (spsc_pop(loops->signal_queue, &sig)) got = true; - pthread_mutex_unlock(&loops->executor_shared->queue_lock); - - if (got) { - executor_execute_triangle(exec, &sig); - } } executor_thread_destroy(exec); diff --git a/src/events.h b/src/events.h index ab23f50..3d10a75 100644 --- a/src/events.h +++ b/src/events.h @@ -5,7 +5,7 @@ #include #include #include "ws_client.h" -#include "queue.h" +#include "slot.h" #include "executor.h" #define MAX_EPOLL_FDS 64 @@ -38,17 +38,17 @@ typedef struct { epoll_set_t hot_epoll; /* hot epoll set for latency-sensitive ws events */ epoll_set_t cold_epoll; /* cold epoll set for timer/http events */ ws_client_t *ws_client; /* WebSocket client instance */ - spsc_queue_t *signal_queue; /* signal queue for emitting opportunities */ - executor_shared_t *executor_shared; /* shared executor state (in_flight + queue lock) */ + executor_slot_t *slots; /* per-thread executor slots */ + int n_slots; /* number of executor slots */ + executor_shared_t *executor_shared; /* shared executor state (in_flight + stale book) */ int timer_fd; /* timerfd for periodic tasks */ - int wakeup_fd; /* eventfd for waking the cold loop */ uint64_t next_ping_ms; /* next scheduled WebSocket ping timestamp */ bool running; /* false signals event loops to exit */ } event_loops_t; /* Initialise both epoll sets, create sockets, and start event loops */ int event_loops_init(event_loops_t *loops, ws_client_t *ws_client, - spsc_queue_t *signal_queue, const config_t *cfg, int wakeup_fd); + const config_t *cfg, executor_slot_t *slots, int n_slots); /* Tear down event loops, close all sockets */ void event_loops_destroy(event_loops_t *loops); /* Register a file descriptor with an epoll set */ @@ -60,6 +60,13 @@ void event_loops_remove_fd(epoll_set_t *set, int fd); void *event_hot_thread(void *arg); /* Cold event loop thread (legacy, single-threaded) */ void *event_cold_thread(void *arg); +/* Argument struct for event_executor_thread */ +typedef struct { + event_loops_t *loops; + executor_slot_t *slot; + int slot_index; +} executor_thread_arg_t; + /* Per-executor-thread entry point: creates its own executor_thread_t and polls signal queue */ void *event_executor_thread(void *arg); diff --git a/src/executor.c b/src/executor.c index 48f246c..c17f061 100644 --- a/src/executor.c +++ b/src/executor.c @@ -18,6 +18,7 @@ struct executor_thread_s { rest_conn_t *rest; ws_client_t *ws; executor_shared_t *shared; + executor_slot_t *slot; }; /* ── Reporting ── */ @@ -72,15 +73,17 @@ static double apply_increment_floor(double vol, double inc) { /* ── Core execution loop ── */ executor_thread_t *executor_thread_create(const config_t *cfg, - fill_channel_t *fill_ch, - ws_client_t *ws, - executor_shared_t *shared) { + fill_channel_t *fill_ch, + ws_client_t *ws, + executor_shared_t *shared, + executor_slot_t *slot) { (void)fill_ch; executor_thread_t *et = calloc(1, sizeof(*et)); if (!et) return NULL; et->cfg = cfg; et->ws = ws; et->shared = shared; + et->slot = slot; et->rest = rest_conn_new(); if (et->rest) { rest_conn_set_auth(et->rest, diff --git a/src/executor.h b/src/executor.h index bf9e273..1ef3879 100644 --- a/src/executor.h +++ b/src/executor.h @@ -5,7 +5,7 @@ #include #include "fill_handler.h" #include "config.h" -#include "queue.h" +#include "slot.h" #include "ws_client.h" #define MAX_IN_FLIGHT 8 @@ -14,7 +14,6 @@ /* Shared in-flight state across all executor threads (protected by lock). */ typedef struct { pthread_mutex_t lock; - pthread_mutex_t queue_lock; char triangles[MAX_IN_FLIGHT][128]; uint64_t pairs[MAX_IN_FLIGHT]; char primary_quotes[MAX_IN_FLIGHT][16]; @@ -31,7 +30,8 @@ typedef struct executor_thread_s executor_thread_t; executor_thread_t *executor_thread_create(const config_t *cfg, fill_channel_t *fill_ch, ws_client_t *ws, - executor_shared_t *shared); + executor_shared_t *shared, + executor_slot_t *slot); /* Execute a single triangle signal (blocking, called from executor thread). */ void executor_execute_triangle(executor_thread_t *et, diff --git a/src/main.c b/src/main.c index 0d7f2f2..ca319b0 100644 --- a/src/main.c +++ b/src/main.c @@ -24,7 +24,7 @@ #include "symbols_api.h" #include "ws_client.h" #include "evaluate.h" -#include "queue.h" +#include "slot.h" #include "events.h" static volatile sig_atomic_t g_running = 1; @@ -118,41 +118,49 @@ int main(int argc, char *argv[]) { order_book_t *books = calloc(MAX_SYMBOLS, sizeof(order_book_t)); log_write("[MAIN] books=%p, size=%zu\n", (void*)books, sizeof(order_book_t)); - log_write("[MAIN] >>> Init SPSC queue\n"); if (!books) { log_write("[MAIN] Failed to allocate books\n"); free_fee_table(fees); return 1; } - spsc_queue_t signal_queue; - log_write("[MAIN] >>> Calling spsc_init\n"); - int wakeup_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); - if (wakeup_fd < 0 || spsc_init(&signal_queue, wakeup_fd) != 0) { - log_write("[MAIN] Failed to init signal queue\n"); - triangle_set_free(&triangles); + int n_slots = cfg.concurrent_slots; + if (n_slots < 1) n_slots = 1; + if (n_slots > 16) n_slots = 16; + executor_slot_t *slots = calloc((size_t)n_slots, sizeof(executor_slot_t)); + if (!slots) { + log_write("[MAIN] Failed to allocate slots\n"); free_fee_table(fees); return 1; } + for (int i = 0; i < n_slots; i++) { + slots[i].eventfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (slots[i].eventfd < 0) { + log_write("[MAIN] Failed to create slot eventfd\n"); + for (int j = 0; j < i; j++) close(slots[j].eventfd); + free(slots); + free_fee_table(fees); + return 1; + } + } + log_write("[MAIN] Created %d executor slots\n", n_slots); evaluator_t evaluator; - evaluator_init(&evaluator, &triangles, books, &cfg, &signal_queue, + evaluator_init(&evaluator, &triangles, books, &cfg, slots, n_slots, cfg.kcs_discount_active); ws_client_t ws_client; if (ws_client_init(&ws_client, &cfg, &symbols, books, &evaluator) != 0) { log_write("[MAIN] Failed to init WS client\n"); - spsc_destroy(&signal_queue); triangle_set_free(&triangles); free_fee_table(fees); return 1; } event_loops_t events; - if (event_loops_init(&events, &ws_client, &signal_queue, &cfg, wakeup_fd) != 0) { + if (event_loops_init(&events, &ws_client, &cfg, slots, n_slots) != 0) { log_write("[MAIN] Failed to init event loops\n"); ws_client_destroy(&ws_client); - spsc_destroy(&signal_queue); triangle_set_free(&triangles); free_fee_table(fees); return 1; @@ -202,8 +210,12 @@ int main(int argc, char *argv[]) { if (n > 16) n = 16; pthread_t exec_threads[16]; int exec_thread_count = n; + executor_thread_arg_t exec_args[16]; for (int i = 0; i < n; i++) { - pthread_create(&exec_threads[i], NULL, event_executor_thread, &events); + exec_args[i].loops = &events; + exec_args[i].slot = &slots[i]; + exec_args[i].slot_index = i; + pthread_create(&exec_threads[i], NULL, event_executor_thread, &exec_args[i]); } // Unblock signals in main thread only; worker threads inherit blocked mask @@ -242,9 +254,11 @@ int main(int argc, char *argv[]) { events.running = false; ws_client.running = false; + /* Wake all executor threads */ uint64_t val = 1; - ssize_t wr = write(events.wakeup_fd, &val, sizeof(val)); - (void)wr; + for (int i = 0; i < n_slots; i++) { + if (write(slots[i].eventfd, &val, sizeof(val)) < 0) {} + } pthread_join(hot_thread, NULL); for (int i = 0; i < exec_thread_count; i++) { @@ -253,7 +267,10 @@ int main(int argc, char *argv[]) { event_loops_destroy(&events); ws_client_destroy(&ws_client); - spsc_destroy(&signal_queue); + for (int i = 0; i < n_slots; i++) { + close(slots[i].eventfd); + } + free(slots); triangle_set_free(&triangles); free(books); free_fee_table(fees); diff --git a/src/queue.c b/src/queue.c deleted file mode 100644 index d9a22d6..0000000 --- a/src/queue.c +++ /dev/null @@ -1,90 +0,0 @@ -/* - * queue.c - Lock-free single-producer single-consumer (SPSC) bounded queue - * - * Uses C11 atomics with acquire/release ordering for correct head/tail - * synchronization without locks. The eventfd notify on push wakes the - * consumer's epoll loop (cold thread) for immediate dispatch. - */ - -#include "queue.h" -#include -#include -#include -#include -#include -#include - -int spsc_init(spsc_queue_t *q, int wakeup_fd) { - memset(q, 0, sizeof(*q)); - q->buffer = calloc(Spsc_QUEUE_DEPTH, sizeof(signal_entry_t)); - if (!q->buffer) return -1; - q->head = 0; - q->tail = 0; - q->depth = Spsc_QUEUE_DEPTH; - q->dropped = 0; - q->eventfd = wakeup_fd; - return 0; -} - -void spsc_destroy(spsc_queue_t *q) { - /* eventfd is owned by caller (event_loops), don't close */ - q->eventfd = -1; - free(q->buffer); - q->buffer = NULL; -} - -static inline void eventfd_notify(int fd) { - uint64_t val = 1; - ssize_t ret; - do { - ret = write(fd, &val, sizeof(val)); - } while (ret < 0 && errno == EINTR); - (void)ret; -} - -bool spsc_push(spsc_queue_t *q, const signal_entry_t *entry) { - uint32_t head = atomic_load_explicit(&q->head, memory_order_relaxed); - uint32_t tail = atomic_load_explicit(&q->tail, memory_order_acquire); - - uint32_t next_head = head + 1; - if (next_head >= q->depth) next_head = 0; - - if (next_head == tail) { - q->dropped++; - return false; - } - - q->buffer[head] = *entry; - - atomic_store_explicit(&q->head, next_head, memory_order_release); - eventfd_notify(q->eventfd); - return true; -} - -bool spsc_pop(spsc_queue_t *q, signal_entry_t *entry) { - uint32_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed); - uint32_t head = atomic_load_explicit(&q->head, memory_order_acquire); - - if (tail == head) return false; - - *entry = q->buffer[tail]; - - uint32_t next_tail = tail + 1; - if (next_tail >= q->depth) next_tail = 0; - - atomic_store_explicit(&q->tail, next_tail, memory_order_release); - return true; -} - -bool spsc_empty(const spsc_queue_t *q) { - uint32_t head = atomic_load_explicit(&q->head, memory_order_acquire); - uint32_t tail = atomic_load_explicit(&q->tail, memory_order_acquire); - return head == tail; -} - -uint32_t spsc_count(const spsc_queue_t *q) { - uint32_t head = atomic_load_explicit(&q->head, memory_order_acquire); - uint32_t tail = atomic_load_explicit(&q->tail, memory_order_acquire); - if (head >= tail) return head - tail; - return q->depth - tail + head; -} diff --git a/src/queue.h b/src/queue.h deleted file mode 100644 index 1f5f9bf..0000000 --- a/src/queue.h +++ /dev/null @@ -1,93 +0,0 @@ -#ifndef FUSED_QUEUE_H -#define FUSED_QUEUE_H - -#include -#include -#include -#include "book.h" -#include "triangle.h" - -#define MAX_SIGNAL_LEN 4096 -#define Spsc_QUEUE_DEPTH 1024 - -/* Single price+size level in an order book snapshot */ -typedef struct { - double price; /* price at this level */ - double size; /* available size at this level */ -} book_level_t; - -/* Snapshot of one leg's order book included in a signal */ -typedef struct { - char symbol[SYMBOL_NAME_LEN]; /* trading pair symbol name */ - int64_t ts_ms; /* book timestamp (milliseconds) */ - book_level_t bids[MAX_BOOK_LEVELS]; /* bid levels (top of book) */ - book_level_t asks[MAX_BOOK_LEVELS]; /* ask levels (top of book) */ - uint8_t bid_count; /* number of valid bid levels */ - uint8_t ask_count; /* number of valid ask levels */ -} signal_book_t; - -/* One leg of a triangular arbitrage signal */ -typedef struct { - char symbol[SYMBOL_NAME_LEN]; /* trading pair symbol */ - char input_currency[CURRENCY_NAME_LEN]; /* currency being traded in */ - char output_currency[CURRENCY_NAME_LEN]; /* currency being traded out */ - char fee_currency[CURRENCY_NAME_LEN]; /* currency used to pay fees */ - double fee_rate; /* fee rate for this leg */ - double exchange_rate; /* computed exchange rate for the leg */ - char side[5]; /* trade side: "buy" or "sell" */ - char order_param[32]; /* order parameter string for the executor */ - double quote_volume; /* notional quote volume for the leg */ - double base_increment; /* base asset lot size step */ - double quote_increment; /* quote asset lot size step */ - double funds_increment; /* funds lot size step (market buy funds param) */ - double base_min_size; /* minimum base asset order size */ -} signal_leg_t; - -/* Collection of up to 3 legs comprising a triangular signal */ -typedef struct { - uint8_t leg_count; /* number of legs (typically 3) */ - signal_leg_t legs[3]; /* the individual leg descriptors */ -} signal_legs_t; - -/* Entry describing one triangular arbitrage opportunity */ -typedef struct { - char triangle_key[CURRENCY_NAME_LEN * 3 + 4]; /* unique triangle identifier e.g. "BTC-ETH-USDT" */ - char primary_quote[CURRENCY_NAME_LEN]; /* primary quote currency of the triangle */ - double predicted_bps; /* predicted profit in basis points */ - char max_volume[32]; /* max trade volume as string */ - double starting_volume; /* recommended starting volume */ - bool live; /* whether this entry is from live (vs simulated) data */ - int64_t ts_ms; /* signal generation timestamp */ - int64_t book_ts_ms; /* reference order book timestamp */ - int64_t t_sock_arrive_ms; /* timestamp when bytes arrived at SSL_read */ - int64_t t_arrive_ms; /* arrival timestamp at evaluator (post-parse) */ - int64_t t_eval_ms; /* evaluation completion timestamp */ - uint8_t book_count; /* number of books used (typically 3) */ - signal_book_t books[3]; /* order book snapshots for each leg */ - signal_legs_t legs; /* leg descriptors for execution */ -} signal_entry_t; - -/* Lock-free single-producer single-consumer queue for signal entries */ -typedef struct { - signal_entry_t *buffer; /* ring buffer of entries */ - _Atomic uint32_t head; /* consumer read index (atomic) */ - _Atomic uint32_t tail; /* producer write index (atomic) */ - int eventfd; /* eventfd for waking consumer */ - uint32_t depth; /* capacity of the ring buffer */ - uint32_t dropped; /* count of dropped entries due to full queue */ -} spsc_queue_t; - -/* Initialise an SPSC queue backed by a wakeup eventfd */ -int spsc_init(spsc_queue_t *q, int wakeup_fd); -/* Destroy an SPSC queue and free its buffer */ -void spsc_destroy(spsc_queue_t *q); -/* Push an entry into the queue (non-blocking); returns false if full */ -bool spsc_push(spsc_queue_t *q, const signal_entry_t *entry); -/* Pop an entry from the queue (non-blocking); returns false if empty */ -bool spsc_pop(spsc_queue_t *q, signal_entry_t *entry); -/* Returns true if the queue is empty */ -bool spsc_empty(const spsc_queue_t *q); -/* Returns the number of entries currently in the queue */ -uint32_t spsc_count(const spsc_queue_t *q); - -#endif diff --git a/src/slot.c b/src/slot.c new file mode 100644 index 0000000..ccca067 --- /dev/null +++ b/src/slot.c @@ -0,0 +1,17 @@ +#include "slot.h" +#include +#include + +bool slot_deliver(executor_slot_t *slots, int n_slots, const signal_entry_t *sig) { + for (int i = 0; i < n_slots; i++) { + int expected = EXECUTOR_SLOT_FREE; + if (atomic_compare_exchange_strong(&slots[i].state, &expected, EXECUTOR_SLOT_CLAIMED)) { + slots[i].signal = *sig; + atomic_store_explicit(&slots[i].state, EXECUTOR_SLOT_READY, memory_order_release); + uint64_t one = 1; + if (write(slots[i].eventfd, &one, sizeof(one)) < 0) {} + return true; + } + } + return false; +} diff --git a/src/slot.h b/src/slot.h new file mode 100644 index 0000000..8e60dc5 --- /dev/null +++ b/src/slot.h @@ -0,0 +1,20 @@ +#ifndef FUSED_SLOT_H +#define FUSED_SLOT_H + +#include +#include +#include "triangle.h" + +#define EXECUTOR_SLOT_FREE 0 +#define EXECUTOR_SLOT_CLAIMED 1 +#define EXECUTOR_SLOT_READY 2 + +typedef struct { + _Atomic int state; + signal_entry_t signal; + int eventfd; +} executor_slot_t; + +bool slot_deliver(executor_slot_t *slots, int n_slots, const signal_entry_t *sig); + +#endif diff --git a/src/triangle.h b/src/triangle.h index 6bf3c2f..d638d0c 100644 --- a/src/triangle.h +++ b/src/triangle.h @@ -3,6 +3,7 @@ #include #include +#include #include "book.h" #include "hash.h" #include "config.h" @@ -49,6 +50,65 @@ typedef struct { double maker_fee; /* maker fee rate for this currency */ } fee_entry_t; +/* Signal types — moved here from queue.h */ + +/* Single price+size level in an order book snapshot */ +typedef struct { + double price; + double size; +} book_level_t; + +/* Snapshot of one leg's order book included in a signal */ +typedef struct { + char symbol[SYMBOL_NAME_LEN]; + int64_t ts_ms; + book_level_t bids[MAX_BOOK_LEVELS]; + book_level_t asks[MAX_BOOK_LEVELS]; + uint8_t bid_count; + uint8_t ask_count; +} signal_book_t; + +/* One leg of a triangular arbitrage signal */ +typedef struct { + char symbol[SYMBOL_NAME_LEN]; + char input_currency[CURRENCY_NAME_LEN]; + char output_currency[CURRENCY_NAME_LEN]; + char fee_currency[CURRENCY_NAME_LEN]; + double fee_rate; + double exchange_rate; + char side[5]; + char order_param[32]; + double quote_volume; + double base_increment; + double quote_increment; + double funds_increment; + double base_min_size; +} signal_leg_t; + +/* Collection of up to 3 legs comprising a triangular signal */ +typedef struct { + uint8_t leg_count; + signal_leg_t legs[3]; +} signal_legs_t; + +/* Entry describing one triangular arbitrage opportunity */ +typedef struct { + char triangle_key[CURRENCY_NAME_LEN * 3 + 4]; + char primary_quote[CURRENCY_NAME_LEN]; + double predicted_bps; + char max_volume[32]; + double starting_volume; + bool live; + int64_t ts_ms; + int64_t book_ts_ms; + int64_t t_sock_arrive_ms; + int64_t t_arrive_ms; + int64_t t_eval_ms; + uint8_t book_count; + signal_book_t books[3]; + signal_legs_t legs; +} signal_entry_t; + /* Initialise triangle set: enumerate all triangles from the symbol table */ int triangle_set_init(triangle_set_t *set, const symbol_table_t *symbols, const config_t *cfg, const fee_entry_t *fees,