15 KiB
Executor Migration Plan
Absorb all executor functions into the fused_engine process, eliminating Python, UDS, JSON serialisation, and the 2+ ms pipeline gap between signal creation and first order fire.
Architecture
┌───────────────────────────────────────────┐
│ HOT THREAD │
│ epoll WS → book updates → evaluate → │
│ spsc_push(main_queue, &signal) │
│ eventfd_write(wake_fd) │
│ │
│ on WS match event (orderChange): │
│ exec_id = lookup[client_oid] │
│ spsc_push(fill_queue[exec_id], match) │
│ eventfd_write(fill_wake[exec_id]) │
└──────┬─────────────────────────┬──────────┘
│ │
main_queue fill_queue[0..N-1]
(signal_entry_t SPSC) (per-thread SPSC)
│ │
┌──────────▼────────┐ ┌──────────▼─────────┐
│ EXECUTOR THREAD │ │ fill_match_t ring │
│ (1 per slot) │◄────│ pushed by hot t │
│ spsc_pop(main) │ └────────────────────┘
│ execute(sig) │
│ for each leg: │
│ rest POST │
│ wait_for_fill │
│ cascade │
│ emit report │
└───────────────────┘
Thread roles:
- HOT: epoll on WS fds + timer. Evaluates triangles, pushes signals to SPSC. Also receives WS orderChange events and dispatches match data to the correct executor thread's fill SPSC.
- EXECUTOR: pops from main signal queue, executes triangle via blocking REST POST + fill wait, cascades to next leg. One thread per concurrent slot.
- No Python, no UDS socket, no JSON serialisation between threads.
What Exists in C Already
| Component | File | Status |
|---|---|---|
| WS connection + reconnection | ws_client.c |
Done |
| SSL context + BIO | ws_client.c |
Done |
| Timer fd | events.c |
Done |
| SPSC queue | queue.c |
Done |
| JSON parser (cJSON) | cJSON.c |
Done |
| Book data structures | book.h |
Done |
| Triangle evaluation + sizing | evaluate.c |
Done |
| Non-blocking log pipe | log.c |
Done |
| Config parser | config.c |
Done |
| HMAC-SHA256 | OpenSSL HMAC() in libcrypto |
Already linked |
| Base64 | OpenSSL BIO_f_base64() in libcrypto |
Already linked |
What Needs to Be Written
Phase 1 — HTTP Client + REST Signing
New files: http_client.c, http_client.h
- Reuse
SSL_CTXfrom WS client (or create a minimal second context) http_post(url, body, headers) → (status, response_body)http_get(url, headers) → (status, response_body)- Connection pool: keep-alive via socket reuse (same pattern as WS reconnect but for REST endpoints)
- DNS resolution: reuse
getaddrinfofromws_client.c kucoin_sign(timestamp, method, path, body, secret) → signature_base64kucoin_headers(api_key, encrypted_passphrase, sign, timestamp) → header_string
Dependencies: OpenSSL libcrypto (already linked for HMAC), cJSON for parsing responses.
Phase 2 — Fill Handler
New files: fill_handler.c, fill_handler.h
Data structures:
typedef struct {
char client_oid[41]; // max 40 chars per KuCoin
double match_size;
double match_price;
bool is_terminal; // canceled+done = complete
} fill_match_t;
typedef struct {
char client_oid[41];
char order_id[32];
double total_size;
double total_funds;
int match_count;
bool done; // terminal event received
} accumulator_t;
// Ring of fill SPSCs — one per executor thread
typedef struct {
spsc_queue_t queue; // fill_match_t ring
int wake_fd; // eventfd for waking executor
accumulator_t acc; // in-progress accumulation (hot thread writes)
atomic_bool registered; // register_fill_listener / unregister
} fill_channel_t;
Fill dispatch (hot thread side):
// Called from hot thread's ws_client.c when a match event arrives
void fill_dispatch_match(const char *client_oid,
double match_size, double match_price);
// Called from hot thread on canceled/filled terminal event
void fill_dispatch_terminal(const char *client_oid);
client_oid → executor_threadlookup via hash table (hot thread use, atomically updated)- Each
fill_dispatch_matchpushes afill_match_tto the correct executor thread's SPSC + eventfd write
Fill waiter (executor thread side):
// Accumulates matches for a single client_oid until terminal or timeout
typedef struct {
double total_size;
double total_funds;
double avg_price;
int match_count;
char order_id[32];
} fill_result_t;
bool await_fill(fill_channel_t *ch, const char *client_oid,
int64_t timeout_ms, fill_result_t *out);
- Drains
fill_match_tfrom the thread's fill SPSC - Accumulates
total_size += match_size,total_funds += match_price * match_size - Returns on terminal event or timeout
poll()on the fill eventfd between drain cycles
Phase 3 — Execution Engine
Executor thread main loop (events.c, replacing current cold thread):
void *executor_thread(void *arg) {
thread_data_t *td = (thread_data_t *)arg;
while (atomic_load(&g_running)) {
signal_entry_t sig;
if (spsc_pop(td->main_queue, &sig)) {
execute_triangle(&sig, td);
continue;
}
// sleep via poll on fill eventfd + shutdown fd
struct pollfd fds[2] = {
{ .fd = td->fill_channel.wake_fd, .events = POLLIN },
};
poll(fds, 1, -1);
uint64_t val; read(td->fill_channel.wake_fd, &val, sizeof(val));
}
return NULL;
}
execute_triangle() in C mirrors the current _execute_triangle:
void execute_triangle(signal_entry_t *sig, thread_data_t *td) {
setup_timings(sig, timings);
double leg_output[3] = {0};
for (int leg = 0; leg < 3; leg++) {
// ── Input volume ──
double input_vol = (leg == 0) ? sig->starting_volume : leg_output[leg-1];
apply_fee_hold_reduction(sig, leg, &input_vol);
apply_increment_floor(sig, leg, &input_vol);
// ── Place order ──
int64_t t0 = now_monotonic_ms();
char order_id[32];
bool ok = place_order(sig, leg, input_vol, order_id);
timing_append(timings, leg_i_order_fired);
if (!ok) {
emit_failed(sig, leg_output, timings, "order_rejected");
return;
}
register_fill_listener(td, leg, order_id);
// ── Wait for fill ──
fill_result_t result;
bool filled = await_fill(&td->fill_channel, order_id,
FILL_TIMEOUT_MS, &result);
timing_append(timings, leg_i_fill_received);
if (sig->cancelled) { emit_aborted(...); return; }
if (!filled) { emit_failed(...); return; }
// ── Cascade ──
leg_output[leg] = (sig->legs.legs[leg].side_is_buy)
? result.total_size
: result.total_funds;
apply_balance_override(td, sig, leg, &leg_output[leg]);
apply_fee_hold_reduction(sig, leg+1, &leg_output[leg]);
apply_increment_floor(sig, leg+1, &leg_output[leg]);
}
double pnl = compute_pnl(leg_output, sig);
emit_filled(sig, leg_output, timings, pnl);
}
Helper functions (cascade.c):
// Fee hold: funds * (1 + fee_rate) must not exceed balance.
// Divide by (1 + fee_rate) when next leg is a buy.
void apply_fee_hold_reduction(signal_entry_t *sig, int leg, double *vol);
// Round down to the next leg's quote_increment (buy) or base_increment (sell).
void apply_increment_floor(signal_entry_t *sig, int leg, double *vol);
// Override with latest WS balance for sells (if await_balance enabled).
void apply_balance_override(thread_data_t *td, signal_entry_t *sig,
int leg, double *vol);
// Leg 0 input → leg 2 output, compute profit in primary_quote.
double compute_pnl(double *leg_output, signal_entry_t *sig);
Phase 4 — REST Order Placement
New file: rest_client.c, rest_client.h
bool rest_order_place(const char *symbol, const char *side,
double funds, double size,
const char *client_oid,
char *out_order_id, size_t out_sz,
char *out_error, size_t err_sz);
- Signs request with HMAC-SHA256
- POST to
/api/v1/hf/orders - Parses JSON response, extracts
orderIdor errorcode: msg - Supports
funds(buy) orsize(sell) based on leg type
bool rest_order_test(const char *symbol, const char *side,
double funds, double size,
char *out_error, size_t err_sz);
- POST to
/api/v1/hf/orders/test - Same signing path, same response parsing
Connection pool:
- Store 1-2 reusable HTTPS sockets per executor thread
- Keepalive via 30s interval timer (same pattern as
_keepalive_loop) - On disconnect: reconnect once, fail immediately
Phase 5 — Paper Mode
Paper mode in C uses the same execute_triangle() but:
- Calls
rest_order_test()instead ofrest_order_place() - Simulates fills from
sig->books[leg](top-of-book ask/bid) - Same rounding, fee hold reduction, and increment flooring as live mode
- No fill SPSC needed (simulated fill is synchronous)
Phase 6 — Concurrency & Isolation
New file: concurrency.c, concurrency.h
concurrent_slots: number of executor threads = config value.- Isolation check (same triangle key, same pairs, same primary_quote):
checked atomically in
execute_trianglebefore starting. - In-flight tracking: array of
{ triangle_key, pair_mask, primary_quote }, checked and set under a spinlock. - Cancel:
sig->cancelledvolatile flag, checked between legs.
typedef struct {
uint64_t triangle_hash; // fnv1a of triangle_key
uint64_t pair_mask; // bitmask of pair indices
uint32_t primary_quote; // index into currency table
bool active;
} in_flight_t;
bool try_acquire_slot(signal_entry_t *sig); // isolation check + mark
void release_slot(signal_entry_t *sig); // unmark
Phase 7 — Reporting
Reporting in C mirrors the current _emit_report:
void emit_filled(signal_entry_t *sig, double *leg_output,
timing_t *timings, double pnl);
void emit_failed(signal_entry_t *sig, double *leg_output,
timing_t *timings, const char *error);
void emit_aborted(signal_entry_t *sig, double *leg_output,
timing_t *timings, const char *reason);
Output format (same as current, via log_write):
2026-05-26T17:55:53.719Z FILLED | corr=... | triangle=['USDT','FLUX','USDC'] |
predicted_bps=34.45 | effective_bps=-149.94 | book_ts=... | profit=-0.0749 |
timings=[...] | fills=[L0:buy FLUX-USDT USDT->FLUX 64.53@0.0774(fee=0 USDT lat=82.9ms), ...] |
books=[0.0774/0.0772 | 0.07796/0.07783 | 0.9991/0.999]
Phase 8 — Cleanup
- Delete
executor/directory (executor.py, ws_client.py, kucoin_api.py, socket_server.py, config.py, main.py) - Delete
common/directory (log.py, config.py — logging replaced by Clog_write+ log file handler) - Remove UDS socket creation + connection from
events.c - Remove JSON signal serialisation from
events.c - Remove
send_signal_to_executor()and all UDS references - The
cold_epollset can be removed (executor threads don't use epoll; they usepoll()on fill eventfds)
Migration Order
| Phase | Files | Deliverable | Risk |
|---|---|---|---|
| 1 | http_client.c/h, kucoin_sign.c/h |
Can POST a test order | Low (OpenSSL HMAC is well-documented) |
| 2 | fill_handler.c/h |
Can receive WS fill events in executor thread | Medium (hot↔executor SPSC race conditions) |
| 3 | events.c (executor thread), cascade.c/h, rest_client.c/h |
Full live-mode triangle execution in C | High (first integration test) |
| 4 | rest_client.c/h (order_place) |
Order placement via C | Medium (HTTP keep-alive bugs) |
| 5 | Paper mode via rest_order_test + book sim |
Paper mode in C | Low (same logic, different HTTP endpoint) |
| 6 | concurrency.c/h |
Slot isolation, cancel support | Low (simple atomic operations) |
| 7 | Reporting via log_write |
Same FILLED/FAILED output | Low (string formatting) |
| 8 | Delete executor/, common/ |
No Python runtime dependency | Low (housekeeping) |
Files to Create
src/http_client.c # HTTPS POST/GET with keepalive connection pool
src/http_client.h
src/kucoin_sign.c # HMAC-SHA256 + base64 signing helpers
src/kucoin_sign.h
src/fill_handler.c # Match event accumulation + dispatch (hot thread)
src/fill_handler.h
src/cascade.c # Fee hold reduction, increment floor, PnL compute
src/cascade.h
src/rest_client.c # order_place, order_test (wraps http_client + sign)
src/rest_client.h
src/concurrency.c # Isolation checks, in-flight tracking
src/concurrency.h
src/executor.c # execute_triangle() — the main execution loop
src/executor.h # (or inline in events.c / new executor_thread.c)
Files to Modify
src/events.c # Remove UDS, add executor thread creation
src/ws_client.c # Add fill dispatch calls on match/terminal events
src/ws_client.h # Expose fill_dispatch_match / fill_dispatch_terminal
src/config.c # Add executor config fields (fill_timeout_ms, await_balance)
src/config.h # Same
src/CMakeLists.txt # Add new .c files
Files to Delete
executor/ # Entire directory
common/ # Entire directory