triangular_arbitrage_bot/notes/executor_migration.md

387 lines
15 KiB
Markdown

# Executor Migration Plan
Absorb all executor functions into the fused_engine process, eliminating
Python, UDS, JSON serialisation, and the 2+ ms pipeline gap between signal
creation and first order fire.
## Architecture
```
┌───────────────────────────────────────────┐
│ HOT THREAD │
│ epoll WS → book updates → evaluate → │
│ spsc_push(main_queue, &signal) │
│ eventfd_write(wake_fd) │
│ │
│ on WS match event (orderChange): │
│ exec_id = lookup[client_oid] │
│ spsc_push(fill_queue[exec_id], match) │
│ eventfd_write(fill_wake[exec_id]) │
└──────┬─────────────────────────┬──────────┘
│ │
main_queue fill_queue[0..N-1]
(signal_entry_t SPSC) (per-thread SPSC)
│ │
┌──────────▼────────┐ ┌──────────▼─────────┐
│ EXECUTOR THREAD │ │ fill_match_t ring │
│ (1 per slot) │◄────│ pushed by hot t │
│ spsc_pop(main) │ └────────────────────┘
│ execute(sig) │
│ for each leg: │
│ rest POST │
│ wait_for_fill │
│ cascade │
│ emit report │
└───────────────────┘
```
**Thread roles:**
- **HOT**: epoll on WS fds + timer. Evaluates triangles, pushes signals to
SPSC. Also receives WS orderChange events and dispatches match data to the
correct executor thread's fill SPSC.
- **EXECUTOR**: pops from main signal queue, executes triangle via
blocking REST POST + fill wait, cascades to next leg. One thread per
concurrent slot.
- No Python, no UDS socket, no JSON serialisation between threads.
## What Exists in C Already
| Component | File | Status |
|-----------|------|--------|
| WS connection + reconnection | `ws_client.c` | Done |
| SSL context + BIO | `ws_client.c` | Done |
| Timer fd | `events.c` | Done |
| SPSC queue | `queue.c` | Done |
| JSON parser (cJSON) | `cJSON.c` | Done |
| Book data structures | `book.h` | Done |
| Triangle evaluation + sizing | `evaluate.c` | Done |
| Non-blocking log pipe | `log.c` | Done |
| Config parser | `config.c` | Done |
| HMAC-SHA256 | OpenSSL `HMAC()` in libcrypto | Already linked |
| Base64 | OpenSSL `BIO_f_base64()` in libcrypto | Already linked |
## What Needs to Be Written
### Phase 1 — HTTP Client + REST Signing
**New files:** `http_client.c`, `http_client.h`
- Reuse `SSL_CTX` from WS client (or create a minimal second context)
- `http_post(url, body, headers) → (status, response_body)`
- `http_get(url, headers) → (status, response_body)`
- Connection pool: keep-alive via socket reuse (same pattern as WS reconnect
but for REST endpoints)
- DNS resolution: reuse `getaddrinfo` from `ws_client.c`
- `kucoin_sign(timestamp, method, path, body, secret) → signature_base64`
- `kucoin_headers(api_key, encrypted_passphrase, sign, timestamp) → header_string`
**Dependencies:** OpenSSL libcrypto (already linked for HMAC), cJSON for
parsing responses.
### Phase 2 — Fill Handler
**New files:** `fill_handler.c`, `fill_handler.h`
**Data structures:**
```c
typedef struct {
char client_oid[41]; // max 40 chars per KuCoin
double match_size;
double match_price;
bool is_terminal; // canceled+done = complete
} fill_match_t;
typedef struct {
char client_oid[41];
char order_id[32];
double total_size;
double total_funds;
int match_count;
bool done; // terminal event received
} accumulator_t;
// Ring of fill SPSCs — one per executor thread
typedef struct {
spsc_queue_t queue; // fill_match_t ring
int wake_fd; // eventfd for waking executor
accumulator_t acc; // in-progress accumulation (hot thread writes)
atomic_bool registered; // register_fill_listener / unregister
} fill_channel_t;
```
**Fill dispatch (hot thread side):**
```c
// Called from hot thread's ws_client.c when a match event arrives
void fill_dispatch_match(const char *client_oid,
double match_size, double match_price);
// Called from hot thread on canceled/filled terminal event
void fill_dispatch_terminal(const char *client_oid);
```
- `client_oid → executor_thread` lookup via hash table (hot thread use,
atomically updated)
- Each `fill_dispatch_match` pushes a `fill_match_t` to the correct executor
thread's SPSC + eventfd write
**Fill waiter (executor thread side):**
```c
// Accumulates matches for a single client_oid until terminal or timeout
typedef struct {
double total_size;
double total_funds;
double avg_price;
int match_count;
char order_id[32];
} fill_result_t;
bool await_fill(fill_channel_t *ch, const char *client_oid,
int64_t timeout_ms, fill_result_t *out);
```
- Drains `fill_match_t` from the thread's fill SPSC
- Accumulates `total_size += match_size`, `total_funds += match_price * match_size`
- Returns on terminal event or timeout
- `poll()` on the fill eventfd between drain cycles
### Phase 3 — Execution Engine
Executor thread main loop (`events.c`, replacing current cold thread):
```c
void *executor_thread(void *arg) {
thread_data_t *td = (thread_data_t *)arg;
while (atomic_load(&g_running)) {
signal_entry_t sig;
if (spsc_pop(td->main_queue, &sig)) {
execute_triangle(&sig, td);
continue;
}
// sleep via poll on fill eventfd + shutdown fd
struct pollfd fds[2] = {
{ .fd = td->fill_channel.wake_fd, .events = POLLIN },
};
poll(fds, 1, -1);
uint64_t val; read(td->fill_channel.wake_fd, &val, sizeof(val));
}
return NULL;
}
```
**`execute_triangle()` in C** mirrors the current `_execute_triangle`:
```c
void execute_triangle(signal_entry_t *sig, thread_data_t *td) {
setup_timings(sig, timings);
double leg_output[3] = {0};
for (int leg = 0; leg < 3; leg++) {
// ── Input volume ──
double input_vol = (leg == 0) ? sig->starting_volume : leg_output[leg-1];
apply_fee_hold_reduction(sig, leg, &input_vol);
apply_increment_floor(sig, leg, &input_vol);
// ── Place order ──
int64_t t0 = now_monotonic_ms();
char order_id[32];
bool ok = place_order(sig, leg, input_vol, order_id);
timing_append(timings, leg_i_order_fired);
if (!ok) {
emit_failed(sig, leg_output, timings, "order_rejected");
return;
}
register_fill_listener(td, leg, order_id);
// ── Wait for fill ──
fill_result_t result;
bool filled = await_fill(&td->fill_channel, order_id,
FILL_TIMEOUT_MS, &result);
timing_append(timings, leg_i_fill_received);
if (sig->cancelled) { emit_aborted(...); return; }
if (!filled) { emit_failed(...); return; }
// ── Cascade ──
leg_output[leg] = (sig->legs.legs[leg].side_is_buy)
? result.total_size
: result.total_funds;
apply_balance_override(td, sig, leg, &leg_output[leg]);
apply_fee_hold_reduction(sig, leg+1, &leg_output[leg]);
apply_increment_floor(sig, leg+1, &leg_output[leg]);
}
double pnl = compute_pnl(leg_output, sig);
emit_filled(sig, leg_output, timings, pnl);
}
```
Helper functions (`cascade.c`):
```c
// Fee hold: funds * (1 + fee_rate) must not exceed balance.
// Divide by (1 + fee_rate) when next leg is a buy.
void apply_fee_hold_reduction(signal_entry_t *sig, int leg, double *vol);
// Round down to the next leg's quote_increment (buy) or base_increment (sell).
void apply_increment_floor(signal_entry_t *sig, int leg, double *vol);
// Override with latest WS balance for sells (if await_balance enabled).
void apply_balance_override(thread_data_t *td, signal_entry_t *sig,
int leg, double *vol);
// Leg 0 input → leg 2 output, compute profit in primary_quote.
double compute_pnl(double *leg_output, signal_entry_t *sig);
```
### Phase 4 — REST Order Placement
**New file:** `rest_client.c`, `rest_client.h`
```c
bool rest_order_place(const char *symbol, const char *side,
double funds, double size,
const char *client_oid,
char *out_order_id, size_t out_sz,
char *out_error, size_t err_sz);
```
- Signs request with HMAC-SHA256
- POST to `/api/v1/hf/orders`
- Parses JSON response, extracts `orderId` or error `code: msg`
- Supports `funds` (buy) or `size` (sell) based on leg type
```c
bool rest_order_test(const char *symbol, const char *side,
double funds, double size,
char *out_error, size_t err_sz);
```
- POST to `/api/v1/hf/orders/test`
- Same signing path, same response parsing
Connection pool:
- Store 1-2 reusable HTTPS sockets per executor thread
- Keepalive via 30s interval timer (same pattern as `_keepalive_loop`)
- On disconnect: reconnect once, fail immediately
### Phase 5 — Paper Mode
Paper mode in C uses the same `execute_triangle()` but:
- Calls `rest_order_test()` instead of `rest_order_place()`
- Simulates fills from `sig->books[leg]` (top-of-book ask/bid)
- Same rounding, fee hold reduction, and increment flooring as live mode
- No fill SPSC needed (simulated fill is synchronous)
### Phase 6 — Concurrency & Isolation
**New file:** `concurrency.c`, `concurrency.h`
- `concurrent_slots`: number of executor threads = config value.
- Isolation check (same triangle key, same pairs, same primary_quote):
checked atomically in `execute_triangle` before starting.
- In-flight tracking: array of `{ triangle_key, pair_mask, primary_quote }`,
checked and set under a spinlock.
- Cancel: `sig->cancelled` volatile flag, checked between legs.
```c
typedef struct {
uint64_t triangle_hash; // fnv1a of triangle_key
uint64_t pair_mask; // bitmask of pair indices
uint32_t primary_quote; // index into currency table
bool active;
} in_flight_t;
bool try_acquire_slot(signal_entry_t *sig); // isolation check + mark
void release_slot(signal_entry_t *sig); // unmark
```
### Phase 7 — Reporting
Reporting in C mirrors the current `_emit_report`:
```c
void emit_filled(signal_entry_t *sig, double *leg_output,
timing_t *timings, double pnl);
void emit_failed(signal_entry_t *sig, double *leg_output,
timing_t *timings, const char *error);
void emit_aborted(signal_entry_t *sig, double *leg_output,
timing_t *timings, const char *reason);
```
Output format (same as current, via `log_write`):
```
2026-05-26T17:55:53.719Z FILLED | corr=... | triangle=['USDT','FLUX','USDC'] |
predicted_bps=34.45 | effective_bps=-149.94 | book_ts=... | profit=-0.0749 |
timings=[...] | fills=[L0:buy FLUX-USDT USDT->FLUX 64.53@0.0774(fee=0 USDT lat=82.9ms), ...] |
books=[0.0774/0.0772 | 0.07796/0.07783 | 0.9991/0.999]
```
### Phase 8 — Cleanup
- Delete `executor/` directory (executor.py, ws_client.py, kucoin_api.py,
socket_server.py, config.py, __main__.py)
- Delete `common/` directory (log.py, config.py — logging replaced by
C `log_write` + log file handler)
- Remove UDS socket creation + connection from `events.c`
- Remove JSON signal serialisation from `events.c`
- Remove `send_signal_to_executor()` and all UDS references
- The `cold_epoll` set can be removed (executor threads don't use epoll;
they use `poll()` on fill eventfds)
## Migration Order
| Phase | Files | Deliverable | Risk |
|-------|-------|-------------|------|
| 1 | `http_client.c/h`, `kucoin_sign.c/h` | Can POST a test order | Low (OpenSSL HMAC is well-documented) |
| 2 | `fill_handler.c/h` | Can receive WS fill events in executor thread | Medium (hot↔executor SPSC race conditions) |
| 3 | `events.c` (executor thread), `cascade.c/h`, `rest_client.c/h` | Full live-mode triangle execution in C | High (first integration test) |
| 4 | `rest_client.c/h` (`order_place`) | Order placement via C | Medium (HTTP keep-alive bugs) |
| 5 | Paper mode via `rest_order_test` + book sim | Paper mode in C | Low (same logic, different HTTP endpoint) |
| 6 | `concurrency.c/h` | Slot isolation, cancel support | Low (simple atomic operations) |
| 7 | Reporting via `log_write` | Same FILLED/FAILED output | Low (string formatting) |
| 8 | Delete `executor/`, `common/` | No Python runtime dependency | Low (housekeeping) |
## Files to Create
```
src/http_client.c # HTTPS POST/GET with keepalive connection pool
src/http_client.h
src/kucoin_sign.c # HMAC-SHA256 + base64 signing helpers
src/kucoin_sign.h
src/fill_handler.c # Match event accumulation + dispatch (hot thread)
src/fill_handler.h
src/cascade.c # Fee hold reduction, increment floor, PnL compute
src/cascade.h
src/rest_client.c # order_place, order_test (wraps http_client + sign)
src/rest_client.h
src/concurrency.c # Isolation checks, in-flight tracking
src/concurrency.h
src/executor.c # execute_triangle() — the main execution loop
src/executor.h # (or inline in events.c / new executor_thread.c)
```
## Files to Modify
```
src/events.c # Remove UDS, add executor thread creation
src/ws_client.c # Add fill dispatch calls on match/terminal events
src/ws_client.h # Expose fill_dispatch_match / fill_dispatch_terminal
src/config.c # Add executor config fields (fill_timeout_ms, await_balance)
src/config.h # Same
src/CMakeLists.txt # Add new .c files
```
## Files to Delete
```
executor/ # Entire directory
common/ # Entire directory
```