cleanup: remove stale files, add session to .gitignore

This commit is contained in:
nicolas 2026-06-04 10:35:34 -03:00
parent e6ca8ac624
commit 3c4bbfde8b
6 changed files with 1 additions and 1120 deletions

1
.gitignore vendored
View File

@ -12,3 +12,4 @@ deploy.sh
deploy_amzn.sh deploy_amzn.sh
build/ build/
triangular_arb.egg-info/ triangular_arb.egg-info/
session

View File

@ -1,766 +0,0 @@
# Fused Engine — Development Plan
## 1. Objective
Fuse the `fh_ob` (Feed Handler + Order Book) and `oe_em` (Opportunity Engine + Emission)
Python components into a single C binary (`fused_engine`) that eliminates:
- Two-process IPC overhead (Unix socket serialization/deserialization between fh_ob → oe_em)
- Three-way JSON serialization (WS parse → socket serialize → socket deserialize → signal serialize)
- Python interpreter overhead on the hot path (object allocation, GIL, asyncio dispatch)
- GC pressure from transient `OrderBookTop5`, `BookLevel`, and `dict` allocations
The **design maxima**: from the moment an order book update arrives, triangle
re-evaluation and signal dispatch must be **immediate**. No blocking I/O on the hot path.
## 2. Current Architecture (for reference)
### 2.1 Process Layout
```
[KuCoin WS] → [fh_ob] --Unix socket (JSON book snapshots)--> [oe_em] --Unix socket (JSON signals)--> [executor]
```
Three processes, two IPC hops. The critical path is:
```
WS frame arrives at fh_ob
→ json.loads() (fh_ob/ws_client.py:239-240)
→ BookStore.update() (fh_ob/book_store.py:42-82) [allocates OrderBookTop5, BookLevel objects]
→ json.dumps(book) (fh_ob/book_store.py:86-91) [serializes to JSON]
→ write to Unix socket (fh_ob/socket_server.py:107-114)
→ oe_em reads from socket (oe_em/book_consumer.py:77-93)
→ json.loads() (oe_em/book_consumer.py:84-91) [second parse]
→ evaluate_triangles_for_pair() (oe_em/opportunity.py:152-230) [Python loop, dict lookups]
→ json.dumps(signal) (oe_em/opportunity.py:290-310) [third serialization]
→ write to executor socket (oe_em/socket_client.py:33-53)
```
### 2.2 Key Python Files and Line References
| File | Lines | Purpose |
|---|---|---|
| `fh_ob/ws_client.py` | 1-148 | KuCoin WS connection, multi-worker model, token fetch, subscribe/unsubscribe |
| `fh_ob/ws_client.py` | 149-168 | Dynamic subscription loop (REST-triggered add/remove symbols) |
| `fh_ob/ws_client.py` | 199-206 | Public token fetch via `/api/v1/bullet-public` |
| `fh_ob/ws_client.py` | 208-227 | Batch subscribe (100 symbols per message, topic `/spotMarket/level2Depth5:...`) |
| `fh_ob/ws_client.py` | 238-270 | Message handler: welcome/pong/ack/disconnect/message routing |
| `fh_ob/book_store.py` | 18-25 | `BookLevel` dataclass (price, size as `Decimal`) |
| `fh_ob/book_store.py` | 27-40 | `OrderBookTop5` dataclass (symbol, ts, sequence, bids/asks as lists) |
| `fh_ob/book_store.py` | 42-82 | `BookStore.update()` — parse WS message, allocate objects, store in dict |
| `fh_ob/book_store.py` | 84-91 | `to_json()` — serialize book snapshot for IPC |
| `fh_ob/socket_server.py` | 1-114 | Unix socket server, broadcast to all connected oe_em clients |
| `fh_ob/rest_server.py` | 1-189 | FastAPI REST: `/health`, `/book/{symbol}`, `/symbols` CRUD |
| `fh_ob/__main__.py` | 1-196 | Startup: config load, WS client init, dynamic subscription setup, REST server |
| `oe_em/book_consumer.py` | 1-93 | Unix socket client, connect to fh_ob, JSON parse, callback dispatch |
| `oe_em/opportunity.py` | 47-109 | `FeeTable` — fetch from KuCoin `/api/v1/base-fee`, apply KCS discount |
| `oe_em/opportunity.py` | 111-149 | `TriangleEnumerator` — enumerate triangles from pairs, filter by hold currencies |
| `oe_em/opportunity.py` | 152-230 | `evaluate_triangles_for_pair()` — hot path: cumulative rate, fee factor, max_volume |
| `oe_em/opportunity.py` | 232-288 | `create_signal()` — build signal dict with legs, books, metadata |
| `oe_em/opportunity.py` | 290-310 | `format_signal_json()` — serialize signal for executor |
| `oe_em/opportunity.py` | 312-330 | `check_cooldown()` — per-triangle cooldown enforcement |
| `oe_em/triangle_enum.py` | 1-203 | Standalone triangle enumeration utility (not used at runtime) |
| `oe_em/kucoin_api.py` | 1-107 | REST client for fee table, trading pairs fetch |
| `oe_em/socket_client.py` | 1-53 | Unix socket client to executor, reconnect loop |
| `oe_em/__main__.py` | 1-281 | Startup: config, fee table, triangle enum, book consumer, signal sender |
| `executor/executor.py` | 204-227 | `handle_signal()` — entry point, exception safety |
| `executor/executor.py` | 229-305 | `_handle_signal_impl()` — pause check, validation, stale rejection, slot check |
| `executor/executor.py` | 307-398 | `_precheck_volume()` — backward-propagation minimum calculation |
| `executor/executor.py` | 464-852 | `_execute_triangle()` — sequential 3-leg execution, paper/live mode |
| `executor/executor.py` | 474-483 | Timing markers: `t-2_book_snapshot`, `t-1_signal_created`, `signal_received` |
| `executor/executor.py` | 854-890 | `_emit_report()` — execution report with timing log |
| `executor/config.py` | 1-81 | Executor settings: paper mode, concurrency, isolation, initial capital |
| `executor/socket_server.py` | 1-90 | Unix socket server receiving signals from oe_em |
| `common/config.py` | 1-59 | fh_ob settings (YAML schema): symbols, WS URL, heartbeat, reconnect |
| `oe_em/config.py` | 1-84 | oe_em settings (YAML schema): threshold, cooldown, hold currencies, KCS discount |
### 2.3 KuCoin API References
| Endpoint / Feature | Doc URL | Notes |
|---|---|---|
| Public Token (Classic Spot) | https://www.kucoin.com/docs-new/websocket-api/base-info/get-public-token-spot-margin.md | POST `/api/v1/bullet-public`, returns token + `instanceServers` with `pingInterval`/`pingTimeout` |
| WS Connection | https://www.kucoin.com/docs-new/websocket-api/base-info/introduction.md | `wss://ws-api-spot.kucoin.com?token=...&connectId=...`, welcome message on connect |
| WS Heartbeat | https://www.kucoin.com/docs-new/websocket-api/base-info/introduction.md | Ping every `pingInterval` (18s), timeout after `pingTimeout` (10s). Any outgoing message resets timeout. |
| WS Subscribe | https://www.kucoin.com/docs-new/websocket-api/base-info/introduction.md | `{"type":"subscribe","topic":"/spotMarket/level2Depth5:BTC-USDT,...","response":true}` |
| WS Unsubscribe | https://www.kucoin.com/docs-new/websocket-api/base-info/introduction.md | `{"type":"unsubscribe","topic":"/spotMarket/level2Depth5:BTC-USDT","response":true}` |
| WS Message Format | https://www.kucoin.com/docs-new/websocket-api/base-info/introduction.md | `{"type":"message","subject":"matchLevel2","topic":"/spotMarket/level2Depth5:SYMBOL","data":{"time":...,"sequence":...,"bids":[...],"asks":[...]}}` |
| WS Topic Limit | https://www.kucoin.com/docs-new/websocket-api/base-info/introduction.md | **400 topics per connection**. Multiple connections needed for >400 symbols. |
| WS Token TTL | https://www.kucoin.com/docs-new/websocket-api/base-info/introduction.md | Token valid 24 hours, connection disconnected after 24h. |
| Get Symbols | https://www.kucoin.com/docs-new/rest/spot-trading/market-data/get-all-symbols.md | GET `/api/v1/symbols`, returns pair metadata (base, quote, min sizes, increments) |
| Get Fee Table | https://www.kucoin.com/docs-new/rest/account-info/trade-fee/get-basic-fee-spot-margin.md | GET `/api/v1/base-fee`, returns taker/maker fees per currency |
| Order Test | https://www.kucoin.com/docs-new/rest/spot-trading/orders/add-order-test.md | POST `/api/v1/orders/test`, validates order without placing |
| Place Order | https://www.kucoin.com/docs-new/rest/spot-trading/orders/add-order.md | POST `/api/v1/orders`, places market/limit order |
### 2.4 WS Message Format (Classic API)
```json
{
"type": "message",
"subject": "matchLevel2",
"topic": "/spotMarket/level2Depth5:BTC-USDT",
"data": {
"time": 1746789012345,
"sequence": 123456789,
"sequenceNum": 123456789,
"bids": [
["90701.1", "0.13918404"],
["90700.0", "1.00000000"],
["90699.5", "2.50000000"],
["90698.0", "0.75000000"],
["90697.5", "3.20000000"]
],
"asks": [
["90701.2", "0.57715830"],
["90702.0", "0.25000000"],
["90703.5", "1.10000000"],
["90704.0", "0.80000000"],
["90705.0", "2.00000000"]
]
}
}
```
### 2.5 Signal Format (oe_em → executor)
From `oe_em/opportunity.py:232-310`:
```json
{
"type": "signal",
"correlation_id": "abc123",
"triangle_key": ["USDT", "BTC", "ETH"],
"primary_quote": "USDT",
"legs": [
{
"pair": "BTC-USDT",
"input_currency": "USDT",
"output_currency": "BTC",
"fee_currency": "USDT",
"fee_rate": 0.001,
"exchange_rate": 0.00001102,
"side": "buy"
},
{
"pair": "ETH-BTC",
"input_currency": "BTC",
"output_currency": "ETH",
"fee_currency": "BTC",
"fee_rate": 0.001,
"exchange_rate": 16.5,
"side": "buy"
},
{
"pair": "ETH-USDT",
"input_currency": "ETH",
"output_currency": "USDT",
"fee_currency": "USDT",
"fee_rate": 0.001,
"exchange_rate": 90701.2,
"side": "sell"
}
],
"predicted_bps": 1.50,
"max_volume": "100.00",
"ts_ms": 1746789012349,
"book_ts_ms": 1746789012345,
"books": [
{
"symbol": "BTC-USDT",
"bids": [{"price": "90701.1", "size": "0.139"}, ...],
"asks": [{"price": "90701.2", "size": "0.577"}, ...],
"ts_ms": 1746789012345
},
...
]
}
```
The executor uses `books[i].asks[0].price` or `books[i].bids[0].price` for price lookup
in `_precheck_volume()` (executor/executor.py:344-348) and `_execute_triangle()` (executor/executor.py:539-545).
## 3. Target Architecture
### 3.1 High-Level Diagram
```
[KuCoin WS] ──▶ [ fused_engine (C binary) ] ──Unix socket──▶ [executor (Python)]
│ │
│ N WS connections │ triangle_index (precomputed)
│ each ≤400 topics │ book[] (fixed array)
│ single epoll loop │
└── REST API (port 8000) ──┘
```
Single binary replaces fh_ob + oe_em. Executor is **unchanged** except for reading
two new timing fields from the signal.
### 3.2 Threading Model
Two threads, separated by hot/cold concern:
```
Thread 1 (HOT — evaluation only) Thread 2 (COLD — I/O)
───────────────────────────────── ─────────────────────
epoll: epoll:
[WS fd #1, WS fd #2, ..., WS fd #N] [unix_socket_server_fd,
[timerfd for WS pings] rest_server_fd,
eventfd_wakeup]
Incoming WS frame: Drains SPSC ring buffer:
1. Decode WS frame (stack-only) → JSON write signal to executor socket
2. jsmn parse (0 alloc) → Handle REST API requests
3. Update book[sym_idx] in-place → HTTP token refresh, pair fetch
4. clock_gettime(CLOCK_MONOTONIC) → t_arrive → WS reconnect I/O
5. clock_gettime(CLOCK_MONOTONIC) → t_eval → Dynamic subscribe/unsubscribe
6. for tri in tri_index[sym_idx]:
compute net return (6 muls, 2 subs)
if profitable && cooldown_ok:
clock_gettime() → t_signal
format signal JSON into fixed buffer
push to SPSC ring buffer
log latency line to stderr
7. Return immediately (never blocks)
```
**Guarantees:**
- WS read + triangle evaluation is **never blocked** by signal dispatch or REST traffic
- Signal enters SPSC queue within microseconds of detection
- Thread 2 stalls → Thread 1 keeps processing WS messages
- SPSC push is lock-free: single atomic increment, < 100ns
### 3.3 SPSC Ring Buffer
```
Thread 1 pushes: atomic increment head → copy signal blob → atomic publish
Thread 2 drains: atomic read tail → copy signal blob → atomic increment tail
```
Bounded ring buffer of fixed-size entries (~4KB each, max 1024 entries).
`eventfd` used to wake Thread 2 from epoll when buffer transitions from empty to non-empty.
### 3.4 File Structure
```
src/
CMakeLists.txt — build configuration
main.c — startup, config parse, thread spawn, signal handling
config.c/h — YAML config parser (libyaml), reads fh_ob + oe_em sections
hash.c/h — FNV-1a string hash, symbol table (sorted array + bsearch)
http_client.c/h — raw HTTP GET/POST over TCP (token, pairs, symbols, fees)
http_server.c/h — minimal HTTP/1.1 server (REST API endpoints)
hmac.c/h — HMAC-SHA256 via OpenSSL EVP (for private endpoints if needed)
queue.c/h — lock-free SPSC ring buffer + eventfd wakeup
triangle.c/h — triangle enumeration, index builder, fee table
ws_client.c/h — WebSocket frame parser, OpenSSL BIO TLS, subscribe/unsubscribe
book.c/h — order book array (fixed-size, 0 alloc updates)
evaluate.c/h — triangle evaluation loop (HOT PATH, all inline)
signal.c/h — JSON signal formatter, pushes to SPSC queue
events.c/h — epoll loop, timerfd, eventfd, signal handling
jsmn.h — jsmn library (header-only, dropped in)
config.json.example — example config (same schema as config.yaml)
```
### 3.5 Dependencies
| Library | Purpose | Notes |
|---|---|---|
| **OpenSSL** | TLS (WS connections), HMAC-SHA256 | System-installed, `find_package(OpenSSL REQUIRED)` |
| **libyaml** | Parse `config.yaml` at startup | Startup-only, not on hot path |
| **jsmn** | JSON parsing (WS messages, token response, REST requests) | Header-only, 0 allocation, drop `jsmn.h` into `src/` |
| **libc** | epoll, timerfd, eventfd, Unix sockets, pthreads, clock_gettime | Linux 6.12+ only |
### 3.6 Config
`config.yaml` is **unchanged** — read by both the C binary and the executor.
The C binary uses libyaml to parse the `fh_ob` and `oe_em` sections at startup.
Relevant settings from `common/config.py:9-45` (fh_ob section):
- `symbols` — initial symbol list (can be empty, dynamic add via REST)
- `log_level` — logging verbosity
- `socket_path` — Unix socket path for executor (still `/tmp/fh_ob.sock` for fh_ob→oe_em internal, but the C binary writes directly to executor socket at `/tmp/executor.sock`)
- `rest_host`, `rest_port` — REST API bind address/port
- `ws_url` — KuCoin WS endpoint (from token response, not hardcoded)
- `token_url` — KuCoin public token endpoint
- `reconnect_base_delay`, `reconnect_max_delay` — exponential backoff
- `heartbeat_interval` — WS ping interval (from token response, default 18s)
Relevant settings from `oe_em/config.py:16-68` (oe_em section):
- `signal_threshold_bps` — minimum net return in bps to fire signal
- `hold_currencies` — base currencies for triangle enumeration (default `["USDT"]`)
- `excluded_currencies` — currencies to exclude from enumeration
- `kcs_discount_active` — multiply taker fees by 0.8
- `executor_socket_path` — Unix socket path for executor signals (`/tmp/executor.sock`)
- `send_signals` — whether to emit signals to executor
## 4. Data Structures
### 4.1 Symbol Table
```c
// Sorted array of symbol entries, indexed by FNV-1a hash → bsearch
typedef struct {
char name[16]; // e.g. "BTC-USDT\0"
uint16_t index; // index into book[] array
} symbol_entry_t;
typedef struct {
symbol_entry_t *entries;
uint32_t count;
} symbol_table_t;
// O(log N) lookup: hash string → bsearch in sorted array → return book index
uint16_t symbol_table_lookup(symbol_table_t *table, const char *name);
```
### 4.2 Order Book
```c
typedef struct {
uint16_t symbol_idx;
int64_t ts_ms; // KuCoin's "time" field from WS message
int64_t sequence; // sequence number for ordering
double bids[5][2]; // [level][0] = price, [level][1] = size
double asks[5][2];
uint8_t bid_count; // actual number of bid levels (0-5)
uint8_t ask_count; // actual number of ask levels (0-5)
} order_book_t;
// Fixed-size array, indexed by symbol index
// book[symbol_idx] gives O(1) access, cache-friendly
#define MAX_SYMBOLS 2048
order_book_t books[MAX_SYMBOLS];
```
### 4.3 Triangle
```c
typedef struct {
uint16_t symbol_idx[3]; // book[] indices for each leg
uint8_t use_bid[3]; // 1 = read bid (sell), 0 = read ask (buy)
double fee_factor[3]; // precomputed: (1.0 - taker_fee * kcs_discount)
uint16_t id; // unique triangle ID (for cooldown tracking)
uint8_t currency_ids[3]; // compact currency identifiers
uint8_t primary_quote_id; // index into hold_currencies[]
char symbol_names[3][16];// symbol strings for signal JSON
char base[16]; // base currency (e.g., "USDT")
char mid[16]; // mid currency (e.g., "BTC")
char quote[16]; // quote currency (e.g., "ETH")
} triangle_t;
```
### 4.4 Triangle Index
```c
// For each symbol, offset/count into the flat triangle[] array
// Triangles are sorted by their first symbol index, then second, then third
// so all triangles containing a given symbol are contiguous
typedef struct {
uint32_t offset;
uint32_t count;
} tri_index_entry_t;
tri_index_entry_t tri_index[MAX_SYMBOLS];
```
### 4.5 Signal (SPSC Queue Entry)
```c
#define SIGNAL_MAX_SIZE 4096
typedef struct {
char data[SIGNAL_MAX_SIZE]; // pre-formatted JSON
uint32_t len;
int64_t ts_ms; // signal creation timestamp
} signal_entry_t;
#define SPSC_CAPACITY 1024
signal_entry_t ring[SPSC_CAPACITY];
atomic_uint head; // writer (Thread 1)
atomic_uint tail; // reader (Thread 2)
```
### 4.6 Cooldown State
```c
typedef struct {
int64_t last_signal_ms; // timestamp of last fired signal (ms)
} cooldown_t;
cooldown_t cooldowns[MAX_TRIANGLES];
```
## 5. Hot Path (evaluate.c)
The hot path runs inline in the WS message handler on Thread 1. Zero heap
allocation. All data accessed by index into fixed arrays.
```c
// Called from ws_client.c message handler, immediately after book update
// sym_idx is the index of the symbol that just updated
void evaluate_and_signal(uint16_t sym_idx) {
// Timestamp: arrival (after WS decode + book update)
struct timespec ts_arrive;
clock_gettime(CLOCK_MONOTONIC, &ts_arrive);
int64_t t_arrive_ms = ts_arrive.tv_sec * 1000 + ts_arrive.tv_nsec / 1e6;
tri_index_entry_t *idx = &tri_index[sym_idx];
int64_t book_ts_ms = books[sym_idx].ts_ms;
for (uint32_t i = 0; i < idx->count; i++) {
triangle_t *tri = &triangles[idx->offset + i];
// Timestamp: evaluation start
struct timespec ts_eval;
clock_gettime(CLOCK_MONOTONIC, &ts_eval);
int64_t t_eval_ms = ts_eval.tv_sec * 1000 + ts_eval.tv_nsec / 1e6;
// Compute cumulative rate through all 3 legs
double cum = 1.0;
for (int leg = 0; leg < 3; leg++) {
order_book_t *b = &books[tri->symbol_idx[leg]];
double rate = tri->use_bid[leg]
? b->bids[0][0] // sell leg: use best bid
: 1.0 / b->asks[0][0]; // buy leg: use best ask (invert)
cum *= rate * tri->fee_factor[leg];
}
double net_bps = (cum - 1.0) * 10000.0;
if (net_bps > signal_threshold_bps
&& cooldown_ok(tri->id)
&& max_volume_ok(tri)) {
// Compute max_volume (chained minimum across 3 legs)
double max_vol = compute_max_volume(tri);
// Timestamp: signal creation
struct timespec ts_signal;
clock_gettime(CLOCK_MONOTONIC, &ts_signal);
int64_t t_signal_ms = ts_signal.tv_sec * 1000 + ts_signal.tv_nsec / 1e6;
// Format signal JSON into fixed buffer
char buf[SIGNAL_MAX_SIZE];
int len = format_signal(buf, sizeof(buf), tri, net_bps, max_vol,
t_arrive_ms, t_eval_ms, t_signal_ms, book_ts_ms);
// Push to SPSC queue (lock-free, never blocks)
spsc_push(buf, len, t_signal_ms);
// Log latency line (to stderr, not disk)
fprintf(stderr,
"SIGNAL corr=%s sym=%s tri=%s/%s/%s bps=%.2f "
"t_exchange=%ld t_arrive=%ld t_eval=%ld t_signal=%ld\n",
correlation_id, tri->symbol_names[0],
tri->base, tri->mid, tri->quote,
net_bps,
(long)book_ts_ms, (long)t_arrive_ms,
(long)t_eval_ms, (long)t_signal_ms);
set_cooldown(tri->id, t_signal_ms);
}
}
}
```
**Performance characteristics:**
- Per triangle: 6 double multiplications, 2 subtractions, 3 array lookups
- No heap allocation, no function call overhead (all `static inline`)
- `clock_gettime(CLOCK_MONOTONIC)` is ~10ns on modern Linux
- Worst case for a symbol in 50 triangles: ~50 × 1µs = 50µs total eval time
## 6. Signal JSON Format
Same as current oe_em → executor format, with two new timing fields.
The executor reads `t_arrive_ms` and `t_eval_ms` (new) alongside existing
`ts_ms` and `book_ts_ms`.
```json
{
"type": "signal",
"correlation_id": "abc123",
"triangle_key": ["USDT", "BTC", "ETH"],
"primary_quote": "USDT",
"legs": [
{
"pair": "BTC-USDT",
"input_currency": "USDT",
"output_currency": "BTC",
"fee_currency": "USDT",
"fee_rate": 0.001,
"exchange_rate": 0.00001102,
"side": "buy"
},
{
"pair": "ETH-BTC",
"input_currency": "BTC",
"output_currency": "ETH",
"fee_currency": "BTC",
"fee_rate": 0.001,
"exchange_rate": 16.5,
"side": "buy"
},
{
"pair": "ETH-USDT",
"input_currency": "ETH",
"output_currency": "USDT",
"fee_currency": "USDT",
"fee_rate": 0.001,
"exchange_rate": 90701.2,
"side": "sell"
}
],
"predicted_bps": 1.50,
"max_volume": "100.00",
"book_ts_ms": 1746789012345,
"t_arrive_ms": 1746789012347,
"t_eval_ms": 1746789012347,
"ts_ms": 1746789012349,
"books": [
{
"symbol": "BTC-USDT",
"bids": [{"price": "90701.1", "size": "0.13918404"}, ...],
"asks": [{"price": "90701.2", "size": "0.57715830"}, ...],
"ts_ms": 1746789012345
},
...
]
}
```
### 6.1 Executor Timing Derivation
The executor (executor/executor.py:474-483) currently derives:
```python
timings.append({"step": "t-2_book_snapshot", "elapsed_ms": -(executor_receive_ts_ms - book_ts_ms)})
timings.append({"step": "t-1_signal_created", "elapsed_ms": -(executor_receive_ts_ms - signal_ts_ms)})
```
With the two new fields, the executor derives:
| Metric | Formula | Meaning |
|---|---|---|
| Network latency | `t_arrive_ms - book_ts_ms` | KuCoin server → our doorstep |
| Dispatch overhead | `t_eval_ms - t_arrive_ms` | Book update → eval start |
| Eval time | `ts_ms - t_eval_ms` | Triangle evaluation loop |
| Socket + queue latency | `executor_receive_ts_ms - ts_ms` | SPSC push → executor receive |
| Total end-to-end | `executor_receive_ts_ms - book_ts_ms` | Full pipeline |
**Executor change required:** In `executor/executor.py:474-483`, read the two new
fields and add derived timing entries. Approximately 5 lines of code.
## 7. WS Connection Model
### 7.1 Multi-Connection Architecture
KuCoin limits each WS connection to **400 topics** (KuCoin API docs:
https://www.kucoin.com/docs-new/websocket-api/base-info/introduction.md).
With ~100 symbols, a single connection suffices. But the design supports
dynamic symbol addition that could exceed 400.
```c
typedef struct {
int socket_fd; // epoll'd file descriptor
BIO *bio; // OpenSSL BIO for TLS
SSL *ssl; // OpenSSL SSL context
char buffer[65536]; // WS frame receive buffer
int buffer_len;
uint32_t topic_count; // current subscription count
uint32_t max_topics; // typically 400
int64_t last_ping_ms; // last ping timestamp
int64_t ping_interval_ms; // from token response
int64_t ping_timeout_ms; // from token response
char token[512]; // public WS token
char connect_id[64]; // unique connection ID
uint16_t symbol_indices[400]; // subscribed symbol indices
uint32_t symbol_count;
} ws_connection_t;
#define MAX_WS_CONNECTIONS 8
ws_connection_t ws_connections[MAX_WS_CONNECTIONS];
uint32_t ws_connection_count;
```
### 7.2 WS Frame Protocol
Classic KuCoin WS uses standard WebSocket framing over TLS. The C implementation
handles framing manually (no libwebsockets dependency):
**Outgoing:**
1. Construct WS frame: FIN=1, opcode=1 (text), masked, payload = JSON string
2. Send via `BIO_write(bio, frame, frame_len)`
**Incoming:**
1. Read from `BIO_read(bio, buf, sizeof(buf))`
2. Parse frame header (2-14 bytes): FIN, RSV, opcode, mask, payload length
3. Unmask payload (XOR with 4-byte masking key)
4. Pass to jsmn parser
Frame parsing is ~50 lines of C. See `rfc6455` Section 5.2 for frame format.
### 7.3 Subscribe/Unsubscribe
From `fh_ob/ws_client.py:208-227`: subscribe in batches of 100 symbols.
Topic format: `/spotMarket/level2Depth5:SYMBOL1,SYMBOL2,...`
```c
// Subscribe symbols to a WS connection
int ws_subscribe(ws_connection_t *conn, uint16_t *symbol_indices, uint32_t count);
// Unsubscribe symbols from a WS connection
int ws_unsubscribe(ws_connection_t *conn, uint16_t *symbol_indices, uint32_t count);
```
Dynamic subscribe/unsubscribe is triggered by REST API calls
(`POST /symbols` and `DELETE /symbols/{symbol}`).
## 8. REST API
Kept from fh_ob for operational use. Same endpoints as `fh_ob/rest_server.py:1-189`.
| Endpoint | Method | Purpose | Ref |
|---|---|---|---|
| `/health` | GET | Status: WS connected, book count, symbol count, uptime | `fh_ob/rest_server.py:35-48` |
| `/book/{symbol}` | GET | Current top-of-book for symbol (all 5 levels) | `fh_ob/rest_server.py:50-68` |
| `/books` | GET | All books | `fh_ob/rest_server.py:70-80` |
| `/symbols` | GET | List subscribed symbols | `fh_ob/rest_server.py:82-92` |
| `/symbols` | POST | Add symbol (subscribe to WS, add to triangle eval) | `fh_ob/rest_server.py:94-120` |
| `/symbols/{symbol}` | DELETE | Remove symbol (unsubscribe from WS) | `fh_ob/rest_server.py:122-145` |
HTTP/1.1 server is raw socket + simple parser. No libhttp dependency.
Handles GET/POST/DELETE with Content-Type: application/json.
Binds to `0.0.0.0:8000` (configurable via `fh_ob.rest_port`).
## 9. Triangle Enumeration
From `oe_em/opportunity.py:111-149` and `oe_em/triangle_enum.py:1-203`.
Algorithm:
1. Fetch KuCoin symbols via `GET /api/v1/symbols` (oe_em/kucoin_api.py:39-56)
2. Filter: `base != quote`, symbol is active
3. For each pair of symbols (A, B) where A.quote == B.base or A.base == B.quote:
- Find third pair (C) that completes the triangle (C.base == A.base, C.quote == B.quote, etc.)
- Filter: triangle starts and ends in a hold currency (`oe_em/config.py:55-57`, default `["USDT"]`)
- Exclude triangles containing excluded currencies (`oe_em/config.py:50-53`)
4. Build `triangle_t` array with precomputed fee factors
5. Sort triangles, build `tri_index` for O(1) lookup by symbol
Fee table from `GET /api/v1/base-fee` (oe_em/kucoin_api.py:60-78).
KCS discount: if `kcs_discount_active`, multiply taker fee by 0.8 (oe_em/opportunity.py:50-64).
## 10. Startup Sequence
```
1. Parse config.yaml (libyaml) — read fh_ob + oe_em sections
2. Fetch KuCoin symbols: HTTP GET /api/v1/symbols
3. Fetch KuCoin fee table: HTTP GET /api/v1/base-fee
4. Enumerate triangles → build sorted triangle[] + tri_index[]
5. Fetch public WS token: HTTP POST /api/v1/bullet-public
6. Create WS connections (N connections, each ≤400 topics)
7. Subscribe to initial symbols (from config.yaml fh_ob.symbols)
8. Bind Unix socket at executor_socket_path (oe_em config)
9. Start HTTP server on rest_port (fh_ob config)
10. Spawn Thread 1 (hot epoll loop: WS fds + timerfd)
11. Spawn Thread 2 (cold epoll loop: Unix socket, REST, eventfd)
12. Enter event loops
```
## 11. Executor Compatibility
### 11.1 Unix Socket Protocol
The executor's `SignalSocketServer` (executor/socket_server.py:1-90) expects:
- Unix domain socket at `/tmp/executor.sock`
- JSON messages, one per connection message
- Message format: `{"type":"signal", ...}` followed by newline
The C binary writes to the same socket path. No changes needed to the
executor's socket server.
### 11.2 Signal Payload
The signal JSON is identical to the current format, with two additional fields:
- `t_arrive_ms` — monotonic timestamp when WS frame decode completed
- `t_eval_ms` — monotonic timestamp when triangle evaluation started
The executor reads these in `executor/executor.py:474-483`. Required change:
```python
# In executor/executor.py, around line 474:
t_arrive_ms = signal.get("t_arrive_ms", 0)
t_eval_ms = signal.get("t_eval_ms", 0)
if t_arrive_ms > 0:
timings.append({"step": "t-3_ws_arrive", "elapsed_ms": -(executor_receive_ts_ms - t_arrive_ms)})
if t_eval_ms > 0:
timings.append({"step": "t-4_eval_start", "elapsed_ms": -(executor_receive_ts_ms - t_eval_ms)})
```
### 11.3 Executor Disconnect Handling
The C binary keeps the executor Unix socket open and waits for reconnection.
If the executor disconnects, Thread 2 logs a warning and keeps the server
socket listening. New executor connections are accepted normally.
(Signal queue entries for disconnected executor are dropped.)
## 12. Implementation Order
### Phase 1: Foundation (Week 1)
1. `CMakeLists.txt` — build config, OpenSSL + libyaml dependencies
2. `config.c/h` — YAML parser, reads fh_ob + oe_em sections
3. `hash.c/h` — FNV-1a hash, symbol table
4. `http_client.c/h` — raw HTTP GET/POST (token, pairs, fees)
5. `triangle.c/h` — triangle enumeration, index builder
6. `book.c/h` — order book array, update function
7. Unit tests: config parse, symbol table, triangle enum
### Phase 2: WS + Hot Path (Week 2)
8. `jsmn.h` — drop in library
9. `ws_client.c/h` — WS frame parser, OpenSSL BIO TLS, subscribe/unsubscribe
10. `evaluate.c/h` — triangle evaluation loop (inline, zero alloc)
11. `signal.c/h` — JSON signal formatter
12. Integration test: connect to KuCoin WS, verify book updates
### Phase 3: IPC + REST (Week 3)
13. `queue.c/h` — SPSC ring buffer, eventfd
14. `http_server.c/h` — REST API server
15. `events.c/h` — epoll loops, timerfd, signal handling
16. `main.c` — startup, thread spawn, signal handling
17. Integration test: full pipeline, signal to executor
### Phase 4: Polish (Week 4)
18. Dynamic subscribe/unsubscribe via REST
19. Reconnection logic (exponential backoff)
20. Timing field integration in executor
21. Performance benchmarking
22. Edge case handling, error paths
23. Deployment script update
## 13. Performance Targets
| Metric | Current (Python) | Target (C) |
|---|---|---|
| WS arrive → eval start | ~500µs (asyncio dispatch + IPC) | <1µs (inline) |
| Eval 50 triangles | ~5ms (Python loop) | <50µs (C loop) |
| Signal dispatch | ~1ms (JSON + socket write) | <10µs (SPSC push) |
| Total end-to-end (exchange → executor) | ~15-30ms | ~5-10ms (network-bound) |
The dominant latency component will be network RTT to KuCoin (~5-20ms).
The C binary aims to add <100µs of processing latency on top.
## 14. Files Affected
### New Files
- `src/` directory with all C source files (listed in §3.4)
### Modified Files
- `executor/executor.py` — add `t_arrive_ms` and `t_eval_ms` to timing derivation (~5 lines)
### Unchanged Files
- `config.yaml` — read by both C binary and executor
- `executor/` (except `executor.py` timing change) — fully compatible
- `common/log.py` — still used by executor
- `deploy.sh` — updated to deploy compiled binary instead of Python files
### Deleted Files
- `fh_ob/` — entire directory (replaced by C binary)
- `oe_em/` — entire directory (replaced by C binary)
- `common/config.py` — fh_ob settings moved to C config parser; oe_em settings merged into C config

View File

@ -1,114 +0,0 @@
## Plan: Move volume computation from executor to fused_engine
### Goal
fused_engine computes per-leg order parameters (`funds`/`size`), precision-rounded.
Executor becomes a thin dispatcher — validate+simulate (paper) or place+record (live).
### New signal schema
```json
{
"type": "signal",
"live": false,
"legs": [
{"pair": "BTC-USDT", "side": "buy", "funds": "5.01", "base_increment": "0.00001", "quote_increment": "0.01", "base_min": "0.0001"},
{"pair": "ETH-BTC", "side": "sell", "size": "0.0495", "base_increment": "0.001", "quote_increment": "0.01", "base_min": "0.001"},
{"pair": "ETH-USDT", "side": "sell", "size": "0.0498", "base_increment": "0.001", "quote_increment": "0.01", "base_min": "0.001"}
],
"starting_volume": "5.01",
"predicted_bps": 12.5,
"books": [{"symbol": "BTC-USDT", "bids": [...], "asks": [...]}, ...],
"ts_ms": 1779400000000,
"book_ts_ms": 1779400000000,
"t_arrive_ms": 1779400000000,
"t_eval_ms": 1779400000000,
"triangle_key": ["USDT","BTC","ETH"],
"correlation_id": "abc123"
}
```
- `live=true`: no `books`, no `size`/`funds` — executor places leg 0 with `starting_volume`, KuCoin fills drive the chain
- `live=false`: `books` present for simulation, `funds`/`size` pre-computed, executor passes them straight to `order_test()`
### fused_engine changes
#### 1. Store symbol precision metadata (`symbols_api.c`)
Already fetches `/api/v2/symbols`. Add to `trading_pair_t`:
```
char base_increment[32];
char quote_increment[32];
char base_min_size[32];
```
Parse from `baseIncrement`, `quoteIncrement`, `baseMinSize` fields in the API response.
#### 2. Per-leg volume computation (`evaluate.c`)
After computing `max_volume` in evaluate_symbol, add a new function that runs the executor's current volume logic in C:
```
compute_leg_volumes(
triangle, books, max_volume, fee_rates,
out_leg_funds_or_size[3],
out_leg_side[3]
)
```
Steps (mirrors executor.py:550-608):
- `starting = cost_to_precision(max_volume, leg0.quote_increment)`
- For leg 0 (buy): `net = starting * (1 - fee)`, `base = floor(net / ask, base_inc)`, recompute `funds = cost_precision(base * ask, quote_inc)`
- For leg 1: `input = leg0.base_output`. Same buy/sell logic
- For leg 2: same
- Store `funds` for buys, `size` for sells
The computation is pure arithmetic (no I/O): Decimal-style operations using `strtod`/`sprintf` with integer math or floating point with epsilon guards. Or use a lightweight bignum lib.
#### 3. Signal JSON format (`evaluate.c` and `events.c`)
Update both `format_signal_json` (evaluate.c, unused today — keep for debugging) and `send_signal_to_executor` (events.c) to emit the new schema, including per-leg `funds`/`size` and increment fields.
Remove `fee_rate` and `exchange_rate` from leg JSON — executor no longer needs them.
#### 4. Live vs paper mode selection
Config: add `live_mode: bool` to config.yaml (default false). fused_engine reads it and conditionalizes signal content.
### Executor changes (`executor.py`)
#### 1. Remove dead code
- `get_symbol_meta()` and `SymbolMeta` (kucoin_api.py:226-228 + dataclass)
- Volume propagation (executor.py:550-558)
- Precision rounding per leg (executor.py:564-608)
- `_precheck_volume()` → simplify to just check `starting_volume >= min_size` per leg using the `base_min`/`base_increment` from the signal
- Fee computation in paper fill simulation (executor.py:823) → use fee_currency from signal leg
- `legs[i - 1].get("fee_rate")` → executor no longer knows fee rates, use from signal
#### 2. New signal parsing
Extract `live`, `legs` with pre-computed `funds`/`size`, increment fields, `starting_volume`.
#### 3. live=true path
- Leg 0: `order_place(funds=starting_volume)`, await fill
- Subsequent legs: `order_place(funds=actual_propagated)` — no, the actual fill drives the next leg. Keep current propagation (actual fill.filled_volume feeds next leg input) — this stays because KuCoin returns real fills.
#### 4. live=false path (paper)
- Each leg: `order_test(funds=leg.funds)` or `order_test(size=leg.size)` — pass through
- Paper fill simulation: use signal `books` + leg `base_increment`/`quote_increment` to compute `deal_funds` ← this is the only volume math remaining in the executor
- Simulate: apply fee to base_or_quote, round to precision, compute effective deal_funds
- profit = `fills[2].deal_funds - fills[0].deal_funds`
#### 5. Paper fill simulation (what stays)
The executor must still:
- Read `books[i].asks[0].price` / `bids[0].price`
- For buys: compute how much base you'd get for `funds` at that top-of-book price, after fee, rounded to `base_increment`
- For sells: compute how much quote you'd get for `size` at that top-of-book price, after fee, rounded to `quote_increment`
- This is ~20 lines per leg, no external dependencies
### Migration order
1. Add precision fields to `trading_pair_t` and parse from API
2. Add volume computation in `evaluate.c`
3. Update signal JSON format (both functions)
4. Strip executor down
5. Test paper mode, then live mode
### Risk: Decimal precision in C
The executor uses Python `Decimal` for exact precision. In C, we can:
- Use `double` with `ceil()`/`floor()` and `1e-10` epsilon guards (simple, adequate for crypto tick sizes)
- Or use integer math: multiply by 10^decimals, do integer rounding, convert back
For crypto tick sizes (0.00001, 0.01, 0.1), doubles are precise enough. No need for bignum.

File diff suppressed because one or more lines are too long

View File

@ -1 +0,0 @@
opencode -s ses_1b4ebb4cdffey1e1nSPwzXAwbn

View File

@ -1,238 +0,0 @@
#!/usr/bin/env python3
"""Test harness for fused_engine binary.
Starts the binary, adds symbols via REST API, listens for signals
on the executor Unix socket, and reports results.
"""
import asyncio
import json
import os
import signal
import socket
import subprocess
import sys
import time
from pathlib import Path
BINARY = Path(__file__).parent / "build" / "fused_engine"
CONFIG = Path(__file__).parent / "config.yaml"
REST_URL = "http://127.0.0.1:8000"
SOCKET_PATH = "/tmp/executor.sock"
SIGNAL_FILE = Path(__file__).parent / "test_signals.jsonl"
LOG_FILE = Path(__file__).parent / "test_stderr.log"
TEST_DURATION = 120 # seconds
SYMBOLS_TO_ADD = [
"BTC-USDT", "ETH-USDT", "ETH-BTC",
"BNB-USDT", "BNB-BTC", "BNB-ETH",
"XRP-USDT", "XRP-BTC", "XRP-ETH",
"SOL-USDT", "SOL-BTC", "SOL-ETH",
"ADA-USDT", "ADA-BTC", "ADA-ETH",
"DOGE-USDT", "DOGE-BTC", "DOGE-ETH",
"MATIC-USDT", "MATIC-BTC", "MATIC-ETH",
"DOT-USDT", "DOT-BTC", "DOT-ETH",
]
async def wait_for_rest(timeout=15):
"""Wait until the REST API is reachable."""
import urllib.request
deadline = time.time() + timeout
while time.time() < deadline:
try:
req = urllib.request.urlopen(f"{REST_URL}/health", timeout=2)
if req.status == 200:
return True
except Exception:
pass
await asyncio.sleep(0.5)
return False
async def add_symbols():
"""Add symbols via POST /symbols."""
import urllib.request
try:
data = json.dumps(SYMBOLS_TO_ADD).encode()
req = urllib.request.Request(
f"{REST_URL}/symbols",
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
resp = urllib.request.urlopen(req, timeout=5)
body = json.loads(resp.read())
print(f" Added symbols: {body.get('added', [])}")
except Exception as e:
print(f" Symbol add failed: {e}")
async def check_health():
"""Print health status."""
import urllib.request
try:
req = urllib.request.urlopen(f"{REST_URL}/health", timeout=2)
body = json.loads(req.read())
return body
except Exception as e:
return {"error": str(e)}
async def listen_for_signals(duration):
"""Listen on the executor Unix socket for signal JSON lines."""
signals = []
errors = []
deadline = time.time() + duration
while time.time() < deadline:
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(2.0)
sock.connect(SOCKET_PATH)
buf = b""
while time.time() < deadline:
try:
chunk = sock.recv(4096)
if not chunk:
break
buf += chunk
while b"\n" in buf:
line, buf = buf.split(b"\n", 1)
line = line.strip()
if not line:
continue
try:
sig = json.loads(line)
signals.append(sig)
bps = sig.get("predicted_bps", "?")
key = sig.get("triangle_key", "?")
t_arrive = sig.get("t_arrive_ms", "?")
t_eval = sig.get("t_eval_ms", "?")
print(f" [SIGNAL] {bps} bps | {key} | arrive={t_arrive} eval={t_eval}")
except json.JSONDecodeError:
errors.append(f"Bad JSON: {line[:80]}")
except socket.timeout:
continue
except Exception as e:
errors.append(f"Read error: {e}")
break
sock.close()
except Exception as e:
await asyncio.sleep(1)
return signals, errors
async def main():
print(f"=== Fused Engine Test ===")
print(f"Binary: {BINARY}")
print(f"Config: {CONFIG}")
print(f"Duration: {TEST_DURATION}s")
print(f"Symbols to add: {len(SYMBOLS_TO_ADD)}")
print()
if not BINARY.exists():
print(f"ERROR: Binary not found at {BINARY}")
sys.exit(1)
# Clean up old socket
try:
os.unlink(SOCKET_PATH)
except OSError:
pass
# Start the binary
print("[1] Starting fused_engine...")
with LOG_FILE.open("w") as log_f:
proc = subprocess.Popen(
[str(BINARY), str(CONFIG)],
stdout=subprocess.PIPE,
stderr=log_f,
text=True,
)
print(f" PID: {proc.pid}")
# Wait for REST API
print("[2] Waiting for REST API...")
if not await wait_for_rest(20):
print(" ERROR: REST API did not come up")
proc.terminate()
proc.wait()
print(" stderr output:")
print(LOG_FILE.read_text()[-2000:])
sys.exit(1)
print(" REST API is up")
# Check health
health = await check_health()
print(f" Health: {health}")
# Add symbols
print("[3] Adding symbols...")
await add_symbols()
# Wait a moment for subscriptions to take effect
await asyncio.sleep(3)
# Check health again
health = await check_health()
print(f" Health after add: {health}")
# Listen for signals
print(f"[4] Listening for signals for {TEST_DURATION}s...")
signals, errors = await listen_for_signals(TEST_DURATION)
# Stop the binary
print("[5] Stopping fused_engine...")
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait()
# Report results
print()
print(f"=== Results ===")
print(f"Signals received: {len(signals)}")
print(f"Socket errors: {len(errors)}")
if signals:
SIGNAL_FILE.write_text(json.dumps(signals, indent=2) + "\n")
print(f"Signals saved to: {SIGNAL_FILE}")
bps_values = [s.get("predicted_bps", 0) for s in signals]
print(f"BPS range: {min(bps_values):.4f} - {max(bps_values):.4f}")
triangles = set(s.get("triangle_key", "?") for s in signals)
print(f"Unique triangles: {triangles}")
for s in signals[:5]:
print(f" {s.get('triangle_key', '?')} | {s.get('predicted_bps', '?')} bps | "
f"t_arrive={s.get('t_arrive_ms', '?')} t_eval={s.get('t_eval_ms', '?')}")
else:
print("No signals received during test period.")
if errors:
print(f"Errors: {errors[:5]}")
# Print last lines of stderr log
print()
print("=== Last 40 lines of stderr ===")
log_text = LOG_FILE.read_text()
lines = log_text.strip().split("\n")
for line in lines[-40:]:
print(f" {line}")
print(f"\nFull log: {LOG_FILE}")
print(f"Signals: {SIGNAL_FILE if signals else '(none)'}")
print()
if len(signals) > 0:
print("PASS: Signals were received.")
else:
print("INFO: No signals. This may be normal if no arbitrage opportunities arose.")
print(" Check stderr above for connection/subscription issues.")
if __name__ == "__main__":
asyncio.run(main())