# Executor Migration Plan Absorb all executor functions into the fused_engine process, eliminating Python, UDS, JSON serialisation, and the 2+ ms pipeline gap between signal creation and first order fire. ## Architecture ``` ┌───────────────────────────────────────────┐ │ HOT THREAD │ │ epoll WS → book updates → evaluate → │ │ spsc_push(main_queue, &signal) │ │ eventfd_write(wake_fd) │ │ │ │ on WS match event (orderChange): │ │ exec_id = lookup[client_oid] │ │ spsc_push(fill_queue[exec_id], match) │ │ eventfd_write(fill_wake[exec_id]) │ └──────┬─────────────────────────┬──────────┘ │ │ main_queue fill_queue[0..N-1] (signal_entry_t SPSC) (per-thread SPSC) │ │ ┌──────────▼────────┐ ┌──────────▼─────────┐ │ EXECUTOR THREAD │ │ fill_match_t ring │ │ (1 per slot) │◄────│ pushed by hot t │ │ spsc_pop(main) │ └────────────────────┘ │ execute(sig) │ │ for each leg: │ │ rest POST │ │ wait_for_fill │ │ cascade │ │ emit report │ └───────────────────┘ ``` **Thread roles:** - **HOT**: epoll on WS fds + timer. Evaluates triangles, pushes signals to SPSC. Also receives WS orderChange events and dispatches match data to the correct executor thread's fill SPSC. - **EXECUTOR**: pops from main signal queue, executes triangle via blocking REST POST + fill wait, cascades to next leg. One thread per concurrent slot. - No Python, no UDS socket, no JSON serialisation between threads. ## What Exists in C Already | Component | File | Status | |-----------|------|--------| | WS connection + reconnection | `ws_client.c` | Done | | SSL context + BIO | `ws_client.c` | Done | | Timer fd | `events.c` | Done | | SPSC queue | `queue.c` | Done | | JSON parser (cJSON) | `cJSON.c` | Done | | Book data structures | `book.h` | Done | | Triangle evaluation + sizing | `evaluate.c` | Done | | Non-blocking log pipe | `log.c` | Done | | Config parser | `config.c` | Done | | HMAC-SHA256 | OpenSSL `HMAC()` in libcrypto | Already linked | | Base64 | OpenSSL `BIO_f_base64()` in libcrypto | Already linked | ## What Needs to Be Written ### Phase 1 — HTTP Client + REST Signing **New files:** `http_client.c`, `http_client.h` - Reuse `SSL_CTX` from WS client (or create a minimal second context) - `http_post(url, body, headers) → (status, response_body)` - `http_get(url, headers) → (status, response_body)` - Connection pool: keep-alive via socket reuse (same pattern as WS reconnect but for REST endpoints) - DNS resolution: reuse `getaddrinfo` from `ws_client.c` - `kucoin_sign(timestamp, method, path, body, secret) → signature_base64` - `kucoin_headers(api_key, encrypted_passphrase, sign, timestamp) → header_string` **Dependencies:** OpenSSL libcrypto (already linked for HMAC), cJSON for parsing responses. ### Phase 2 — Fill Handler **New files:** `fill_handler.c`, `fill_handler.h` **Data structures:** ```c typedef struct { char client_oid[41]; // max 40 chars per KuCoin double match_size; double match_price; bool is_terminal; // canceled+done = complete } fill_match_t; typedef struct { char client_oid[41]; char order_id[32]; double total_size; double total_funds; int match_count; bool done; // terminal event received } accumulator_t; // Ring of fill SPSCs — one per executor thread typedef struct { spsc_queue_t queue; // fill_match_t ring int wake_fd; // eventfd for waking executor accumulator_t acc; // in-progress accumulation (hot thread writes) atomic_bool registered; // register_fill_listener / unregister } fill_channel_t; ``` **Fill dispatch (hot thread side):** ```c // Called from hot thread's ws_client.c when a match event arrives void fill_dispatch_match(const char *client_oid, double match_size, double match_price); // Called from hot thread on canceled/filled terminal event void fill_dispatch_terminal(const char *client_oid); ``` - `client_oid → executor_thread` lookup via hash table (hot thread use, atomically updated) - Each `fill_dispatch_match` pushes a `fill_match_t` to the correct executor thread's SPSC + eventfd write **Fill waiter (executor thread side):** ```c // Accumulates matches for a single client_oid until terminal or timeout typedef struct { double total_size; double total_funds; double avg_price; int match_count; char order_id[32]; } fill_result_t; bool await_fill(fill_channel_t *ch, const char *client_oid, int64_t timeout_ms, fill_result_t *out); ``` - Drains `fill_match_t` from the thread's fill SPSC - Accumulates `total_size += match_size`, `total_funds += match_price * match_size` - Returns on terminal event or timeout - `poll()` on the fill eventfd between drain cycles ### Phase 3 — Execution Engine Executor thread main loop (`events.c`, replacing current cold thread): ```c void *executor_thread(void *arg) { thread_data_t *td = (thread_data_t *)arg; while (atomic_load(&g_running)) { signal_entry_t sig; if (spsc_pop(td->main_queue, &sig)) { execute_triangle(&sig, td); continue; } // sleep via poll on fill eventfd + shutdown fd struct pollfd fds[2] = { { .fd = td->fill_channel.wake_fd, .events = POLLIN }, }; poll(fds, 1, -1); uint64_t val; read(td->fill_channel.wake_fd, &val, sizeof(val)); } return NULL; } ``` **`execute_triangle()` in C** mirrors the current `_execute_triangle`: ```c void execute_triangle(signal_entry_t *sig, thread_data_t *td) { setup_timings(sig, timings); double leg_output[3] = {0}; for (int leg = 0; leg < 3; leg++) { // ── Input volume ── double input_vol = (leg == 0) ? sig->starting_volume : leg_output[leg-1]; apply_fee_hold_reduction(sig, leg, &input_vol); apply_increment_floor(sig, leg, &input_vol); // ── Place order ── int64_t t0 = now_monotonic_ms(); char order_id[32]; bool ok = place_order(sig, leg, input_vol, order_id); timing_append(timings, leg_i_order_fired); if (!ok) { emit_failed(sig, leg_output, timings, "order_rejected"); return; } register_fill_listener(td, leg, order_id); // ── Wait for fill ── fill_result_t result; bool filled = await_fill(&td->fill_channel, order_id, FILL_TIMEOUT_MS, &result); timing_append(timings, leg_i_fill_received); if (sig->cancelled) { emit_aborted(...); return; } if (!filled) { emit_failed(...); return; } // ── Cascade ── leg_output[leg] = (sig->legs.legs[leg].side_is_buy) ? result.total_size : result.total_funds; apply_balance_override(td, sig, leg, &leg_output[leg]); apply_fee_hold_reduction(sig, leg+1, &leg_output[leg]); apply_increment_floor(sig, leg+1, &leg_output[leg]); } double pnl = compute_pnl(leg_output, sig); emit_filled(sig, leg_output, timings, pnl); } ``` Helper functions (`cascade.c`): ```c // Fee hold: funds * (1 + fee_rate) must not exceed balance. // Divide by (1 + fee_rate) when next leg is a buy. void apply_fee_hold_reduction(signal_entry_t *sig, int leg, double *vol); // Round down to the next leg's quote_increment (buy) or base_increment (sell). void apply_increment_floor(signal_entry_t *sig, int leg, double *vol); // Override with latest WS balance for sells (if await_balance enabled). void apply_balance_override(thread_data_t *td, signal_entry_t *sig, int leg, double *vol); // Leg 0 input → leg 2 output, compute profit in primary_quote. double compute_pnl(double *leg_output, signal_entry_t *sig); ``` ### Phase 4 — REST Order Placement **New file:** `rest_client.c`, `rest_client.h` ```c bool rest_order_place(const char *symbol, const char *side, double funds, double size, const char *client_oid, char *out_order_id, size_t out_sz, char *out_error, size_t err_sz); ``` - Signs request with HMAC-SHA256 - POST to `/api/v1/hf/orders` - Parses JSON response, extracts `orderId` or error `code: msg` - Supports `funds` (buy) or `size` (sell) based on leg type ```c bool rest_order_test(const char *symbol, const char *side, double funds, double size, char *out_error, size_t err_sz); ``` - POST to `/api/v1/hf/orders/test` - Same signing path, same response parsing Connection pool: - Store 1-2 reusable HTTPS sockets per executor thread - Keepalive via 30s interval timer (same pattern as `_keepalive_loop`) - On disconnect: reconnect once, fail immediately ### Phase 5 — Paper Mode Paper mode in C uses the same `execute_triangle()` but: - Calls `rest_order_test()` instead of `rest_order_place()` - Simulates fills from `sig->books[leg]` (top-of-book ask/bid) - Same rounding, fee hold reduction, and increment flooring as live mode - No fill SPSC needed (simulated fill is synchronous) ### Phase 6 — Concurrency & Isolation **New file:** `concurrency.c`, `concurrency.h` - `concurrent_slots`: number of executor threads = config value. - Isolation check (same triangle key, same pairs, same primary_quote): checked atomically in `execute_triangle` before starting. - In-flight tracking: array of `{ triangle_key, pair_mask, primary_quote }`, checked and set under a spinlock. - Cancel: `sig->cancelled` volatile flag, checked between legs. ```c typedef struct { uint64_t triangle_hash; // fnv1a of triangle_key uint64_t pair_mask; // bitmask of pair indices uint32_t primary_quote; // index into currency table bool active; } in_flight_t; bool try_acquire_slot(signal_entry_t *sig); // isolation check + mark void release_slot(signal_entry_t *sig); // unmark ``` ### Phase 7 — Reporting Reporting in C mirrors the current `_emit_report`: ```c void emit_filled(signal_entry_t *sig, double *leg_output, timing_t *timings, double pnl); void emit_failed(signal_entry_t *sig, double *leg_output, timing_t *timings, const char *error); void emit_aborted(signal_entry_t *sig, double *leg_output, timing_t *timings, const char *reason); ``` Output format (same as current, via `log_write`): ``` 2026-05-26T17:55:53.719Z FILLED | corr=... | triangle=['USDT','FLUX','USDC'] | predicted_bps=34.45 | effective_bps=-149.94 | book_ts=... | profit=-0.0749 | timings=[...] | fills=[L0:buy FLUX-USDT USDT->FLUX 64.53@0.0774(fee=0 USDT lat=82.9ms), ...] | books=[0.0774/0.0772 | 0.07796/0.07783 | 0.9991/0.999] ``` ### Phase 8 — Cleanup - Delete `executor/` directory (executor.py, ws_client.py, kucoin_api.py, socket_server.py, config.py, __main__.py) - Delete `common/` directory (log.py, config.py — logging replaced by C `log_write` + log file handler) - Remove UDS socket creation + connection from `events.c` - Remove JSON signal serialisation from `events.c` - Remove `send_signal_to_executor()` and all UDS references - The `cold_epoll` set can be removed (executor threads don't use epoll; they use `poll()` on fill eventfds) ## Migration Order | Phase | Files | Deliverable | Risk | |-------|-------|-------------|------| | 1 | `http_client.c/h`, `kucoin_sign.c/h` | Can POST a test order | Low (OpenSSL HMAC is well-documented) | | 2 | `fill_handler.c/h` | Can receive WS fill events in executor thread | Medium (hot↔executor SPSC race conditions) | | 3 | `events.c` (executor thread), `cascade.c/h`, `rest_client.c/h` | Full live-mode triangle execution in C | High (first integration test) | | 4 | `rest_client.c/h` (`order_place`) | Order placement via C | Medium (HTTP keep-alive bugs) | | 5 | Paper mode via `rest_order_test` + book sim | Paper mode in C | Low (same logic, different HTTP endpoint) | | 6 | `concurrency.c/h` | Slot isolation, cancel support | Low (simple atomic operations) | | 7 | Reporting via `log_write` | Same FILLED/FAILED output | Low (string formatting) | | 8 | Delete `executor/`, `common/` | No Python runtime dependency | Low (housekeeping) | ## Files to Create ``` src/http_client.c # HTTPS POST/GET with keepalive connection pool src/http_client.h src/kucoin_sign.c # HMAC-SHA256 + base64 signing helpers src/kucoin_sign.h src/fill_handler.c # Match event accumulation + dispatch (hot thread) src/fill_handler.h src/cascade.c # Fee hold reduction, increment floor, PnL compute src/cascade.h src/rest_client.c # order_place, order_test (wraps http_client + sign) src/rest_client.h src/concurrency.c # Isolation checks, in-flight tracking src/concurrency.h src/executor.c # execute_triangle() — the main execution loop src/executor.h # (or inline in events.c / new executor_thread.c) ``` ## Files to Modify ``` src/events.c # Remove UDS, add executor thread creation src/ws_client.c # Add fill dispatch calls on match/terminal events src/ws_client.h # Expose fill_dispatch_match / fill_dispatch_terminal src/config.c # Add executor config fields (fill_timeout_ms, await_balance) src/config.h # Same src/CMakeLists.txt # Add new .c files ``` ## Files to Delete ``` executor/ # Entire directory common/ # Entire directory ```