migrate: absorb all executor functions into fused_engine (C)

Replace the Python executor with direct C execution in a dedicated
executor thread. Removes UDS JSON serialization, Python async
overhead, and the 2+ms pipeline gap between signal creation and
order fire.

New components:
- src/rest_client.c/h: Keepalive HTTPS, HMAC-SHA256 signing,
  order_place, order_test, Content-Length response parsing
- src/fill_handler.c/h: SPSC ring buffer for WS match events,
  hot thread -> executor thread fill dispatch
- src/executor.c/h: execute_triangle() cascade, fee hold
  reduction, increment floor, paper mode simulation, PnL,
  concurrency isolation, reporting

Modified:
- src/ws_client.c: Subscribe to tradeOrdersV2 + account.balance,
  dispatch orderChange match events to fill SPSC, private token
  fetch via bullet-private, token cleared on reconnect
- src/http_client.c: Added https_post_auth() for signed POST
- src/events.c: Cold thread replaced with executor thread
  (poll on wake_fd + fill_fd, direct execution)
- config.yaml.example: initial_capital moved to fused_engine,
  added cooldown_seconds, kcs_discount_active

Removed:
- src/kucoin_sign.c/h (redundant with http_client.c helpers)
This commit is contained in:
nicolas 2026-05-26 19:54:41 -03:00
parent 60c21bed36
commit 03b5daa003
15 changed files with 1779 additions and 61 deletions

View File

@ -11,6 +11,12 @@ fused_engine:
reconnect_base_delay: 1.0
reconnect_max_delay: 60.0
heartbeat_interval: 18.0
cooldown_seconds: 0.0
kcs_discount_active: false
initial_capital:
USDT: 5
USDC: 5
USD1: 5
executor:
fill_timeout_ms: 1000
@ -21,10 +27,6 @@ executor:
enforce_same_base_isolation: true
enforce_pair_isolation: true
rest_port: 8002
initial_capital:
USDT: 5
USDC: 5
USD1: 5
kucoin_api_key: ""
kucoin_api_secret: ""

View File

@ -57,6 +57,10 @@ class ExecutorSettings(BaseModel):
default=1000,
description="Per-leg fill wait timeout in milliseconds",
)
await_balance: bool = Field(
default=False,
description="Wait for balance WS update before next leg",
)
class Settings(BaseSettings):

View File

@ -693,13 +693,24 @@ class Executor:
f"fee={str(fee)} {fee_ccy} | lat={latency_ms:.1f}ms"
)
if i < 2:
await self._ws_client.await_balance(
output_ccy, fills[-1].filled_volume, 2000
)
if side == "sell":
bal = self._ws_client.latest_balance(output_ccy)
if bal > 0:
fills[-1].filled_volume = bal
if self._settings.await_balance:
await self._ws_client.await_balance(
output_ccy, fills[-1].filled_volume, 2000
)
if side == "sell":
bal = self._ws_client.latest_balance(output_ccy)
if bal > 0:
fills[-1].filled_volume = bal
nxt = legs[i + 1]
nxt_fee = Decimal(str(nxt.get("fee_rate", "0.001")))
nxt_side = nxt.get("side", "")
nxt_inc = Decimal(str(nxt.get(
"quote_increment" if nxt_side == "buy" else "base_increment", "0")))
if nxt_side == "buy":
fills[-1].filled_volume /= (_D1 + nxt_fee)
if nxt_inc > 0:
fills[-1].filled_volume = (fills[-1].filled_volume / nxt_inc
).to_integral_value(rounding=ROUND_DOWN) * nxt_inc
in_flight.last_trade_ts_ms = int(time.time() * 1000)
continue

386
notes/executor_migration.md Normal file
View File

@ -0,0 +1,386 @@
# Executor Migration Plan
Absorb all executor functions into the fused_engine process, eliminating
Python, UDS, JSON serialisation, and the 2+ ms pipeline gap between signal
creation and first order fire.
## Architecture
```
┌───────────────────────────────────────────┐
│ HOT THREAD │
│ epoll WS → book updates → evaluate → │
│ spsc_push(main_queue, &signal) │
│ eventfd_write(wake_fd) │
│ │
│ on WS match event (orderChange): │
│ exec_id = lookup[client_oid] │
│ spsc_push(fill_queue[exec_id], match) │
│ eventfd_write(fill_wake[exec_id]) │
└──────┬─────────────────────────┬──────────┘
│ │
main_queue fill_queue[0..N-1]
(signal_entry_t SPSC) (per-thread SPSC)
│ │
┌──────────▼────────┐ ┌──────────▼─────────┐
│ EXECUTOR THREAD │ │ fill_match_t ring │
│ (1 per slot) │◄────│ pushed by hot t │
│ spsc_pop(main) │ └────────────────────┘
│ execute(sig) │
│ for each leg: │
│ rest POST │
│ wait_for_fill │
│ cascade │
│ emit report │
└───────────────────┘
```
**Thread roles:**
- **HOT**: epoll on WS fds + timer. Evaluates triangles, pushes signals to
SPSC. Also receives WS orderChange events and dispatches match data to the
correct executor thread's fill SPSC.
- **EXECUTOR**: pops from main signal queue, executes triangle via
blocking REST POST + fill wait, cascades to next leg. One thread per
concurrent slot.
- No Python, no UDS socket, no JSON serialisation between threads.
## What Exists in C Already
| Component | File | Status |
|-----------|------|--------|
| WS connection + reconnection | `ws_client.c` | Done |
| SSL context + BIO | `ws_client.c` | Done |
| Timer fd | `events.c` | Done |
| SPSC queue | `queue.c` | Done |
| JSON parser (cJSON) | `cJSON.c` | Done |
| Book data structures | `book.h` | Done |
| Triangle evaluation + sizing | `evaluate.c` | Done |
| Non-blocking log pipe | `log.c` | Done |
| Config parser | `config.c` | Done |
| HMAC-SHA256 | OpenSSL `HMAC()` in libcrypto | Already linked |
| Base64 | OpenSSL `BIO_f_base64()` in libcrypto | Already linked |
## What Needs to Be Written
### Phase 1 — HTTP Client + REST Signing
**New files:** `http_client.c`, `http_client.h`
- Reuse `SSL_CTX` from WS client (or create a minimal second context)
- `http_post(url, body, headers) → (status, response_body)`
- `http_get(url, headers) → (status, response_body)`
- Connection pool: keep-alive via socket reuse (same pattern as WS reconnect
but for REST endpoints)
- DNS resolution: reuse `getaddrinfo` from `ws_client.c`
- `kucoin_sign(timestamp, method, path, body, secret) → signature_base64`
- `kucoin_headers(api_key, encrypted_passphrase, sign, timestamp) → header_string`
**Dependencies:** OpenSSL libcrypto (already linked for HMAC), cJSON for
parsing responses.
### Phase 2 — Fill Handler
**New files:** `fill_handler.c`, `fill_handler.h`
**Data structures:**
```c
typedef struct {
char client_oid[41]; // max 40 chars per KuCoin
double match_size;
double match_price;
bool is_terminal; // canceled+done = complete
} fill_match_t;
typedef struct {
char client_oid[41];
char order_id[32];
double total_size;
double total_funds;
int match_count;
bool done; // terminal event received
} accumulator_t;
// Ring of fill SPSCs — one per executor thread
typedef struct {
spsc_queue_t queue; // fill_match_t ring
int wake_fd; // eventfd for waking executor
accumulator_t acc; // in-progress accumulation (hot thread writes)
atomic_bool registered; // register_fill_listener / unregister
} fill_channel_t;
```
**Fill dispatch (hot thread side):**
```c
// Called from hot thread's ws_client.c when a match event arrives
void fill_dispatch_match(const char *client_oid,
double match_size, double match_price);
// Called from hot thread on canceled/filled terminal event
void fill_dispatch_terminal(const char *client_oid);
```
- `client_oid → executor_thread` lookup via hash table (hot thread use,
atomically updated)
- Each `fill_dispatch_match` pushes a `fill_match_t` to the correct executor
thread's SPSC + eventfd write
**Fill waiter (executor thread side):**
```c
// Accumulates matches for a single client_oid until terminal or timeout
typedef struct {
double total_size;
double total_funds;
double avg_price;
int match_count;
char order_id[32];
} fill_result_t;
bool await_fill(fill_channel_t *ch, const char *client_oid,
int64_t timeout_ms, fill_result_t *out);
```
- Drains `fill_match_t` from the thread's fill SPSC
- Accumulates `total_size += match_size`, `total_funds += match_price * match_size`
- Returns on terminal event or timeout
- `poll()` on the fill eventfd between drain cycles
### Phase 3 — Execution Engine
Executor thread main loop (`events.c`, replacing current cold thread):
```c
void *executor_thread(void *arg) {
thread_data_t *td = (thread_data_t *)arg;
while (atomic_load(&g_running)) {
signal_entry_t sig;
if (spsc_pop(td->main_queue, &sig)) {
execute_triangle(&sig, td);
continue;
}
// sleep via poll on fill eventfd + shutdown fd
struct pollfd fds[2] = {
{ .fd = td->fill_channel.wake_fd, .events = POLLIN },
};
poll(fds, 1, -1);
uint64_t val; read(td->fill_channel.wake_fd, &val, sizeof(val));
}
return NULL;
}
```
**`execute_triangle()` in C** mirrors the current `_execute_triangle`:
```c
void execute_triangle(signal_entry_t *sig, thread_data_t *td) {
setup_timings(sig, timings);
double leg_output[3] = {0};
for (int leg = 0; leg < 3; leg++) {
// ── Input volume ──
double input_vol = (leg == 0) ? sig->starting_volume : leg_output[leg-1];
apply_fee_hold_reduction(sig, leg, &input_vol);
apply_increment_floor(sig, leg, &input_vol);
// ── Place order ──
int64_t t0 = now_monotonic_ms();
char order_id[32];
bool ok = place_order(sig, leg, input_vol, order_id);
timing_append(timings, leg_i_order_fired);
if (!ok) {
emit_failed(sig, leg_output, timings, "order_rejected");
return;
}
register_fill_listener(td, leg, order_id);
// ── Wait for fill ──
fill_result_t result;
bool filled = await_fill(&td->fill_channel, order_id,
FILL_TIMEOUT_MS, &result);
timing_append(timings, leg_i_fill_received);
if (sig->cancelled) { emit_aborted(...); return; }
if (!filled) { emit_failed(...); return; }
// ── Cascade ──
leg_output[leg] = (sig->legs.legs[leg].side_is_buy)
? result.total_size
: result.total_funds;
apply_balance_override(td, sig, leg, &leg_output[leg]);
apply_fee_hold_reduction(sig, leg+1, &leg_output[leg]);
apply_increment_floor(sig, leg+1, &leg_output[leg]);
}
double pnl = compute_pnl(leg_output, sig);
emit_filled(sig, leg_output, timings, pnl);
}
```
Helper functions (`cascade.c`):
```c
// Fee hold: funds * (1 + fee_rate) must not exceed balance.
// Divide by (1 + fee_rate) when next leg is a buy.
void apply_fee_hold_reduction(signal_entry_t *sig, int leg, double *vol);
// Round down to the next leg's quote_increment (buy) or base_increment (sell).
void apply_increment_floor(signal_entry_t *sig, int leg, double *vol);
// Override with latest WS balance for sells (if await_balance enabled).
void apply_balance_override(thread_data_t *td, signal_entry_t *sig,
int leg, double *vol);
// Leg 0 input → leg 2 output, compute profit in primary_quote.
double compute_pnl(double *leg_output, signal_entry_t *sig);
```
### Phase 4 — REST Order Placement
**New file:** `rest_client.c`, `rest_client.h`
```c
bool rest_order_place(const char *symbol, const char *side,
double funds, double size,
const char *client_oid,
char *out_order_id, size_t out_sz,
char *out_error, size_t err_sz);
```
- Signs request with HMAC-SHA256
- POST to `/api/v1/hf/orders`
- Parses JSON response, extracts `orderId` or error `code: msg`
- Supports `funds` (buy) or `size` (sell) based on leg type
```c
bool rest_order_test(const char *symbol, const char *side,
double funds, double size,
char *out_error, size_t err_sz);
```
- POST to `/api/v1/hf/orders/test`
- Same signing path, same response parsing
Connection pool:
- Store 1-2 reusable HTTPS sockets per executor thread
- Keepalive via 30s interval timer (same pattern as `_keepalive_loop`)
- On disconnect: reconnect once, fail immediately
### Phase 5 — Paper Mode
Paper mode in C uses the same `execute_triangle()` but:
- Calls `rest_order_test()` instead of `rest_order_place()`
- Simulates fills from `sig->books[leg]` (top-of-book ask/bid)
- Same rounding, fee hold reduction, and increment flooring as live mode
- No fill SPSC needed (simulated fill is synchronous)
### Phase 6 — Concurrency & Isolation
**New file:** `concurrency.c`, `concurrency.h`
- `concurrent_slots`: number of executor threads = config value.
- Isolation check (same triangle key, same pairs, same primary_quote):
checked atomically in `execute_triangle` before starting.
- In-flight tracking: array of `{ triangle_key, pair_mask, primary_quote }`,
checked and set under a spinlock.
- Cancel: `sig->cancelled` volatile flag, checked between legs.
```c
typedef struct {
uint64_t triangle_hash; // fnv1a of triangle_key
uint64_t pair_mask; // bitmask of pair indices
uint32_t primary_quote; // index into currency table
bool active;
} in_flight_t;
bool try_acquire_slot(signal_entry_t *sig); // isolation check + mark
void release_slot(signal_entry_t *sig); // unmark
```
### Phase 7 — Reporting
Reporting in C mirrors the current `_emit_report`:
```c
void emit_filled(signal_entry_t *sig, double *leg_output,
timing_t *timings, double pnl);
void emit_failed(signal_entry_t *sig, double *leg_output,
timing_t *timings, const char *error);
void emit_aborted(signal_entry_t *sig, double *leg_output,
timing_t *timings, const char *reason);
```
Output format (same as current, via `log_write`):
```
2026-05-26T17:55:53.719Z FILLED | corr=... | triangle=['USDT','FLUX','USDC'] |
predicted_bps=34.45 | effective_bps=-149.94 | book_ts=... | profit=-0.0749 |
timings=[...] | fills=[L0:buy FLUX-USDT USDT->FLUX 64.53@0.0774(fee=0 USDT lat=82.9ms), ...] |
books=[0.0774/0.0772 | 0.07796/0.07783 | 0.9991/0.999]
```
### Phase 8 — Cleanup
- Delete `executor/` directory (executor.py, ws_client.py, kucoin_api.py,
socket_server.py, config.py, __main__.py)
- Delete `common/` directory (log.py, config.py — logging replaced by
C `log_write` + log file handler)
- Remove UDS socket creation + connection from `events.c`
- Remove JSON signal serialisation from `events.c`
- Remove `send_signal_to_executor()` and all UDS references
- The `cold_epoll` set can be removed (executor threads don't use epoll;
they use `poll()` on fill eventfds)
## Migration Order
| Phase | Files | Deliverable | Risk |
|-------|-------|-------------|------|
| 1 | `http_client.c/h`, `kucoin_sign.c/h` | Can POST a test order | Low (OpenSSL HMAC is well-documented) |
| 2 | `fill_handler.c/h` | Can receive WS fill events in executor thread | Medium (hot↔executor SPSC race conditions) |
| 3 | `events.c` (executor thread), `cascade.c/h`, `rest_client.c/h` | Full live-mode triangle execution in C | High (first integration test) |
| 4 | `rest_client.c/h` (`order_place`) | Order placement via C | Medium (HTTP keep-alive bugs) |
| 5 | Paper mode via `rest_order_test` + book sim | Paper mode in C | Low (same logic, different HTTP endpoint) |
| 6 | `concurrency.c/h` | Slot isolation, cancel support | Low (simple atomic operations) |
| 7 | Reporting via `log_write` | Same FILLED/FAILED output | Low (string formatting) |
| 8 | Delete `executor/`, `common/` | No Python runtime dependency | Low (housekeeping) |
## Files to Create
```
src/http_client.c # HTTPS POST/GET with keepalive connection pool
src/http_client.h
src/kucoin_sign.c # HMAC-SHA256 + base64 signing helpers
src/kucoin_sign.h
src/fill_handler.c # Match event accumulation + dispatch (hot thread)
src/fill_handler.h
src/cascade.c # Fee hold reduction, increment floor, PnL compute
src/cascade.h
src/rest_client.c # order_place, order_test (wraps http_client + sign)
src/rest_client.h
src/concurrency.c # Isolation checks, in-flight tracking
src/concurrency.h
src/executor.c # execute_triangle() — the main execution loop
src/executor.h # (or inline in events.c / new executor_thread.c)
```
## Files to Modify
```
src/events.c # Remove UDS, add executor thread creation
src/ws_client.c # Add fill dispatch calls on match/terminal events
src/ws_client.h # Expose fill_dispatch_match / fill_dispatch_terminal
src/config.c # Add executor config fields (fill_timeout_ms, await_balance)
src/config.h # Same
src/CMakeLists.txt # Add new .c files
```
## Files to Delete
```
executor/ # Entire directory
common/ # Entire directory
```

View File

@ -11,6 +11,7 @@
#include "log.h"
#include "events.h"
#include "evaluate.h"
#include "executor.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@ -198,6 +199,8 @@ int unix_server_create(const char *socket_path) {
* Connects lazily on first signal; reconnects on write failure.
*/
static void send_signal_to_executor(event_loops_t *loops, signal_entry_t *sig) {
(void)loops; (void)sig;
/* No longer used — execution is direct in the executor thread */
if (loops->unix_client_fd < 0) {
loops->unix_client_fd = unix_client_connect(loops->ws_client->cfg->executor_socket_path);
if (loops->unix_client_fd < 0) {
@ -394,63 +397,58 @@ void *event_hot_thread(void *arg) {
}
/*
* COLD thread: drain SPSC signal queue and forward to executor.
* Uses epoll_wait on the Unix client fd to detect disconnection.
* Priority: drains queue before and after epoll to minimize latency.
* EXECUTOR thread: drain SPSC signal queue and execute triangles directly.
* Wakes on SPSC eventfd (signal available) and fill channel eventfd (fill arrived).
*/
void *event_cold_thread(void *arg) {
event_loops_t *loops = (event_loops_t *)arg;
log_write("[COLD] Thread started\n");
log_write("[EXEC] Thread started\n");
executor_thread_t *exec = executor_thread_create(loops->ws_client->cfg,
loops->ws_client->fill_ch);
if (!exec) {
log_write("[EXEC] Failed to create executor\n");
return NULL;
}
struct pollfd fds[2] = {
{ .fd = loops->wakeup_fd, .events = POLLIN },
{ .fd = fill_channel_wake_fd(loops->ws_client->fill_ch), .events = POLLIN },
};
while (loops->running) {
while (!spsc_empty(loops->signal_queue)) {
signal_entry_t sig;
if (spsc_pop(loops->signal_queue, &sig)) {
send_signal_to_executor(loops, &sig);
}
}
int nfds = epoll_wait(loops->cold_epoll.epoll_fd,
loops->cold_epoll.events, MAX_EPOLL_FDS, 200);
int nfds = poll(fds, 2, 200);
if (nfds < 0) {
if (errno == EINTR) continue;
perror("epoll_wait cold");
log_write("[EXEC] poll error: %s\n", strerror(errno));
break;
}
for (int i = 0; i < nfds; i++) {
tracked_fd_t *tf = (tracked_fd_t *)loops->cold_epoll.events[i].data.ptr;
if (!tf || tf->fd < 0) continue;
uint32_t ev = loops->cold_epoll.events[i].events;
if (tf->type == FD_TYPE_EVENT) {
uint64_t val = 0;
read(loops->wakeup_fd, &val, sizeof(val));
continue;
}
if (tf->type == FD_TYPE_UNIX_CLIENT) {
if (ev & (EPOLLERR | EPOLLHUP)) {
log_write("[COLD] Executor disconnected\n");
close(loops->unix_client_fd);
loops->unix_client_fd = -1;
event_loops_remove_fd(&loops->cold_epoll, tf->fd);
continue;
}
}
}
// Drain again after epoll to catch any signals queued during processing
/* Drain signal queue */
while (!spsc_empty(loops->signal_queue)) {
signal_entry_t sig;
if (spsc_pop(loops->signal_queue, &sig)) {
send_signal_to_executor(loops, &sig);
executor_execute_triangle(exec, &sig, loops->ws_client->fill_ch);
}
}
/* Drain fill wakeup */
if (fds[1].revents & POLLIN) {
uint64_t val;
read(fds[1].fd, &val, sizeof(val));
}
/* Drain again to catch signals enqueued during execution */
while (!spsc_empty(loops->signal_queue)) {
signal_entry_t sig;
if (spsc_pop(loops->signal_queue, &sig)) {
executor_execute_triangle(exec, &sig, loops->ws_client->fill_ch);
}
}
}
log_write("[COLD] Thread exited\n");
executor_thread_destroy(exec);
log_write("[EXEC] Thread exited\n");
return NULL;
}

388
src/executor.c Normal file
View File

@ -0,0 +1,388 @@
#include "executor.h"
#include "rest_client.h"
#include "log.h"
#include "hash.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <math.h>
#include <unistd.h>
#include <stdarg.h>
#define _D1 1.0
#define FILL_TIMEOUT_MS 5000
#define MAX_IN_FLIGHT 8
struct executor_thread_s {
const config_t *cfg;
rest_conn_t *rest;
/* Concurrency isolation state */
char in_flight_triangles[MAX_IN_FLIGHT][128]; /* triangle_key */
uint64_t in_flight_pairs[MAX_IN_FLIGHT]; /* fnv1a hash of each pair */
char in_flight_primary_quotes[MAX_IN_FLIGHT][16]; /* primary quote currency */
int in_flight_count;
};
/* ── Reporting ── */
void executor_write_report(const char *fmt, ...) {
va_list args;
va_start(args, fmt);
char buf[2048];
vsnprintf(buf, sizeof(buf), fmt, args);
va_end(args);
log_write("%s", buf);
}
/* ── Timestamp helpers ── */
static int64_t now_mono_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (int64_t)ts.tv_sec * 1000 + (int64_t)ts.tv_nsec / 1000000;
}
static int64_t now_realtime_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (int64_t)ts.tv_sec * 1000 + (int64_t)ts.tv_nsec / 1000000;
}
static void format_ts(char *buf, size_t sz) {
time_t s = time(NULL);
int ms = (int)(now_realtime_ms() % 1000);
struct tm gm;
gmtime_r(&s, &gm);
strftime(buf, sz, "%Y-%m-%dT%H:%M:%S.", &gm);
size_t len = strlen(buf);
snprintf(buf + len, sz - len, "%03dZ", ms);
}
/* ── Fee hold reduction + increment floor ── */
static double apply_fee_hold(double vol, double fee_rate, bool next_is_buy) {
if (next_is_buy && fee_rate > 0)
vol /= (1.0 + fee_rate);
return vol;
}
static double apply_increment_floor(double vol, double inc) {
if (inc > 0)
vol = floor(vol / inc - 1e-12) * inc;
return vol;
}
/* ── Core execution loop ── */
executor_thread_t *executor_thread_create(const config_t *cfg,
fill_channel_t *fill_ch) {
(void)fill_ch;
executor_thread_t *et = calloc(1, sizeof(*et));
if (!et) return NULL;
et->cfg = cfg;
et->rest = rest_conn_new();
if (et->rest) {
rest_conn_set_auth(et->rest,
cfg->kucoin_api_key,
cfg->kucoin_api_secret,
cfg->kucoin_api_passphrase);
}
return et;
}
void executor_thread_set_fill_ch(executor_thread_t *et, fill_channel_t *ch) {
(void)et;
(void)ch;
/* In the initial implementation the fill channel is global (ws_client->fill_ch),
passed directly to execute_triangle. This setter is for per-thread channels. */
}
void executor_execute_triangle(executor_thread_t *et,
signal_entry_t *sig,
fill_channel_t *fill_ch) {
/* ── Concurrency isolation ── */
uint64_t pair_hashes[3] = {0};
for (int p = 0; p < 3; p++) {
pair_hashes[p] = fnv1a_hash(sig->legs.legs[p].symbol, (uint32_t)strlen(sig->legs.legs[p].symbol));
}
for (int i = 0; i < et->in_flight_count; i++) {
if (strcmp(et->in_flight_triangles[i], sig->triangle_key) == 0) {
log_write("[EXEC] Dropping signal for overlapping triangle: %s\n", sig->triangle_key);
return;
}
if (strcmp(et->in_flight_primary_quotes[i], sig->primary_quote) == 0) {
log_write("[EXEC] Dropping signal for same primary quote: %s\n", sig->triangle_key);
return;
}
for (int p = 0; p < 3; p++) {
if (et->in_flight_pairs[i] == pair_hashes[p]) {
log_write("[EXEC] Dropping signal for overlapping pair on %s\n", sig->triangle_key);
return;
}
}
}
/* Register this execution */
int slot = -1;
for (int i = 0; i < MAX_IN_FLIGHT; i++) {
if (!et->in_flight_triangles[i][0]) { slot = i; break; }
}
if (slot >= 0) {
strncpy(et->in_flight_triangles[slot], sig->triangle_key, sizeof(et->in_flight_triangles[slot]) - 1);
strncpy(et->in_flight_primary_quotes[slot], sig->primary_quote, sizeof(et->in_flight_primary_quotes[slot]) - 1);
et->in_flight_pairs[slot] = pair_hashes[0];
et->in_flight_count++;
}
char ts_buf[32];
char corr_id[64];
int64_t exec_start = now_mono_ms();
snprintf(corr_id, sizeof(corr_id), "%08x%08x%08x%08x",
(unsigned)(uintptr_t)&sig->legs.legs[0] ^ (unsigned)sig->ts_ms,
(unsigned)sig->ts_ms ^ (unsigned)sig->book_ts_ms,
(unsigned)sig->predicted_bps,
(unsigned)sig->t_arrive_ms);
double leg_output[3] = {0};
double fills[3][6] = {{0}}; /* leg, output, avg_price, fee, input_vol, latency_ms */
bool success = true;
const char *error_str = "";
for (int leg = 0; leg < 3; leg++) {
const signal_leg_t *sl = &sig->legs.legs[leg];
bool is_buy = (strcmp(sl->side, "buy") == 0);
/* ── Input volume ── */
double input_vol;
if (leg == 0) {
input_vol = atof(sl->order_param);
} else {
input_vol = leg_output[leg - 1];
input_vol = apply_fee_hold(input_vol, sl->fee_rate, is_buy);
input_vol = apply_increment_floor(input_vol,
is_buy ? sl->quote_increment : sl->base_increment);
}
/* Build a client OID */
char client_oid[64];
snprintf(client_oid, sizeof(client_oid), "c%08x%04x",
(unsigned)(now_realtime_ms() & 0xFFFFFFFF),
(unsigned)leg);
fills[leg][4] = input_vol;
/* ── Place order ── */
char order_id[32] = {0};
char err_msg[128] = {0};
bool ok = false;
int64_t t0 = now_mono_ms();
if (sig->live) {
ok = rest_order_place(et->rest, sl->symbol, sl->side,
is_buy ? input_vol : 0,
is_buy ? 0 : input_vol,
client_oid,
order_id, sizeof(order_id),
err_msg, sizeof(err_msg));
} else {
ok = rest_order_test(et->rest, sl->symbol, sl->side,
is_buy ? input_vol : 0,
is_buy ? 0 : input_vol,
err_msg, sizeof(err_msg));
if (ok) {
snprintf(order_id, sizeof(order_id), "paper-%08x",
(unsigned)now_realtime_ms());
}
}
fills[leg][5] = (double)(now_mono_ms() - t0);
if (!ok) {
error_str = err_msg[0] ? err_msg : "order_rejected";
success = false;
break;
}
/* ── Wait for fill (live only) ── */
double total_size = 0, total_funds = 0, avg_price = 0, total_fee = 0;
int match_count = 0;
if (sig->live) {
fill_result_t fr = {0};
bool filled = fill_channel_await(fill_ch, client_oid,
FILL_TIMEOUT_MS, &fr);
if (!filled) {
error_str = fr.match_count > 0 ? "partial_fill" : "fill_timeout";
success = false;
break;
}
total_size = fr.total_size;
total_funds = fr.total_funds;
total_fee = fr.total_fee;
avg_price = fr.avg_price;
match_count = fr.match_count;
if (fr.order_id[0]) strncpy(order_id, fr.order_id, sizeof(order_id) - 1);
} else {
/* Paper mode: simulate fill from books[leg] top-of-book */
if (sig->book_count > leg) {
const signal_book_t *bk = &sig->books[leg];
if (is_buy && bk->ask_count > 0) {
double ask_price = bk->asks[0].price;
total_size = input_vol / ask_price;
if (sl->base_increment > 0)
total_size = floor(total_size / sl->base_increment - 1e-12) * sl->base_increment;
if (sl->quote_increment > 0)
total_funds = floor(total_size * ask_price / sl->quote_increment - 1e-12) * sl->quote_increment;
else
total_funds = total_size * ask_price;
avg_price = ask_price;
} else if (!is_buy && bk->bid_count > 0) {
double bid_price = bk->bids[0].price;
total_size = input_vol; /* base amount to sell */
if (sl->base_increment > 0)
total_size = floor(total_size / sl->base_increment - 1e-12) * sl->base_increment;
if (sl->quote_increment > 0)
total_funds = floor(total_size * bid_price / sl->quote_increment - 1e-12) * sl->quote_increment;
else
total_funds = total_size * bid_price;
avg_price = bid_price;
} else {
error_str = "no_book_data";
success = false;
break;
}
} else {
error_str = "no_book_data";
success = false;
break;
}
/* Simulated fee: if fee currency matches output, fee = output * rate */
if (sl->fee_rate > 0) {
double output_amt = is_buy ? total_size : total_funds;
total_fee = output_amt * sl->fee_rate;
}
}
/* ── Cascade ── */
leg_output[leg] = is_buy ? total_size : total_funds;
fills[leg][0] = leg_output[leg];
fills[leg][1] = avg_price;
fills[leg][2] = total_fee;
fills[leg][3] = total_funds;
/* Fee hold reduction + increment floor for next leg */
if (leg < 2) {
const signal_leg_t *nsl = &sig->legs.legs[leg + 1];
bool nxt_buy = (strcmp(nsl->side, "buy") == 0);
leg_output[leg] = apply_fee_hold(leg_output[leg], nsl->fee_rate, nxt_buy);
leg_output[leg] = apply_increment_floor(leg_output[leg],
nxt_buy ? nsl->quote_increment : nsl->base_increment);
}
}
/* ── Compute PnL ── */
double profit = 0, effective_bps = 0;
if (success && fills[2][0] > 0) {
double leg0_in = (strcmp(sig->legs.legs[0].side, "buy") == 0)
? fills[0][3] : fills[0][0];
double leg2_out = (strcmp(sig->legs.legs[2].side, "buy") == 0)
? fills[2][0] : fills[2][0];
/* Subtract fee if fee currency matches leg 2 output currency */
{
const signal_leg_t *sl2 = &sig->legs.legs[2];
bool leg2_buy = (strcmp(sl2->side, "buy") == 0);
const char *pair = sl2->symbol;
const char *dash = strchr(pair, '-');
char base_ccy[16] = {0}, quote_ccy[16] = {0};
if (dash) {
size_t blen = (size_t)(dash - pair);
if (blen > 15) blen = 15;
memcpy(base_ccy, pair, blen);
strncpy(quote_ccy, dash + 1, 15);
}
const char *out_ccy = leg2_buy ? base_ccy : quote_ccy;
if (out_ccy[0] && strcmp(sl2->fee_currency, out_ccy) == 0) {
leg2_out -= fills[2][2];
}
}
profit = leg2_out - leg0_in;
if (leg0_in > 0)
effective_bps = (profit / leg0_in) * 10000.0;
}
int64_t total_ms = now_mono_ms() - exec_start;
/* ── Emit report ── */
format_ts(ts_buf, sizeof(ts_buf));
const char *status = success ? "FILLED" : "FAILED";
char bps_str[32];
snprintf(bps_str, sizeof(bps_str), "%.2f", effective_bps);
executor_write_report(
"%s %s | corr=%s | triangle=['%s'] | "
"predicted_bps=%.2f | effective_bps=%s | "
"book_ts=%lld | profit=%.4f | timings=[] | fills=[",
ts_buf, status, corr_id, sig->triangle_key,
sig->predicted_bps, bps_str,
(long long)sig->book_ts_ms, profit);
for (int leg = 0; leg < 3 && leg <= (success ? 2 : (fills[0][0] > 0 ? 0 : -1)); leg++) {
if (fills[leg][0] == 0 && !(leg == 2 && success)) break;
const signal_leg_t *sl = &sig->legs.legs[leg];
char pair[32], side[8], in_ccy[16], out_ccy[16];
strncpy(pair, sl->symbol, sizeof(pair) - 1);
strncpy(side, sl->side, sizeof(side) - 1);
if (strcmp(side, "buy") == 0) {
char *dash = strchr(pair, '-');
if (dash) {
strncpy(in_ccy, dash + 1, sizeof(in_ccy) - 1);
size_t len = dash - pair;
if (len > sizeof(out_ccy) - 1) len = sizeof(out_ccy) - 1;
memcpy(out_ccy, pair, len);
out_ccy[len] = '\0';
}
} else {
char *dash = strchr(pair, '-');
if (dash) {
strncpy(out_ccy, dash + 1, sizeof(out_ccy) - 1);
size_t len = dash - pair;
if (len > sizeof(in_ccy) - 1) len = sizeof(in_ccy) - 1;
memcpy(in_ccy, pair, len);
in_ccy[len] = '\0';
}
}
executor_write_report(
"%sL%d:%s %s %s->%s %.10g@%.6g(fee=%.6g %s lat=%.1fms)",
leg > 0 ? ", " : "",
leg, side, pair, in_ccy, out_ccy,
fills[leg][0], fills[leg][1], fills[leg][2],
sl->fee_currency, fills[leg][5]);
}
if (error_str[0]) {
executor_write_report("] | error=%s\n", error_str);
} else {
executor_write_report("]\n");
}
/* Log timing */
/* Release isolation slot */
for (int i = 0; i < MAX_IN_FLIGHT; i++) {
if (strcmp(et->in_flight_triangles[i], sig->triangle_key) == 0) {
et->in_flight_triangles[i][0] = '\0';
et->in_flight_primary_quotes[i][0] = '\0';
et->in_flight_pairs[i] = 0;
et->in_flight_count--;
break;
}
}
log_write("[EXEC] Triangle %s in %lld ms, profit=%.4f predicted=%.2f eff=%.2f\n",
status, (long long)total_ms, profit, sig->predicted_bps, effective_bps);
}
void executor_thread_destroy(executor_thread_t *et) {
if (!et) return;
if (et->rest) rest_conn_free(et->rest);
free(et);
}

30
src/executor.h Normal file
View File

@ -0,0 +1,30 @@
#ifndef EXECUTOR_H
#define EXECUTOR_H
#include <stdbool.h>
#include "fill_handler.h"
#include "config.h"
#include "queue.h"
/* Per-thread data for the executor. */
typedef struct executor_thread_s executor_thread_t;
/* Create an executor thread (one per concurrent slot). */
executor_thread_t *executor_thread_create(const config_t *cfg,
fill_channel_t *fill_ch);
/* Set the fill channel for this executor thread. */
void executor_thread_set_fill_ch(executor_thread_t *et, fill_channel_t *ch);
/* Execute a single triangle signal (blocking, called from executor thread). */
void executor_execute_triangle(executor_thread_t *et,
signal_entry_t *sig,
fill_channel_t *fill_ch);
/* Report status text. */
void executor_write_report(const char *fmt, ...);
/* Destroy and free. */
void executor_thread_destroy(executor_thread_t *et);
#endif

136
src/fill_handler.c Normal file
View File

@ -0,0 +1,136 @@
#include "fill_handler.h"
#include "log.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdatomic.h>
#include <poll.h>
#include <sys/eventfd.h>
#include <time.h>
#define FILL_RING_SIZE 256
struct fill_channel_s {
fill_event_t ring[FILL_RING_SIZE];
_Atomic uint32_t head; /* consumer read index */
_Atomic uint32_t tail; /* producer write index */
int wake_fd; /* eventfd to wake consumer */
};
fill_channel_t *fill_channel_create(void) {
fill_channel_t *ch = calloc(1, sizeof(*ch));
if (!ch) return NULL;
ch->head = 0;
ch->tail = 0;
ch->wake_fd = eventfd(0, EFD_NONBLOCK);
if (ch->wake_fd < 0) {
free(ch);
return NULL;
}
return ch;
}
void fill_channel_destroy(fill_channel_t *ch) {
if (!ch) return;
if (ch->wake_fd >= 0) close(ch->wake_fd);
free(ch);
}
int fill_channel_wake_fd(fill_channel_t *ch) {
return ch->wake_fd;
}
void fill_channel_reset(fill_channel_t *ch) {
atomic_store_explicit(&ch->head, 0, memory_order_release);
atomic_store_explicit(&ch->tail, 0, memory_order_release);
/* Drain the eventfd */
uint64_t val;
while (read(ch->wake_fd, &val, sizeof(val)) > 0) {}
}
bool fill_channel_push(fill_channel_t *ch, const fill_event_t *ev) {
uint32_t cur_tail = atomic_load_explicit(&ch->tail, memory_order_relaxed);
uint32_t cur_head = atomic_load_explicit(&ch->head, memory_order_acquire);
uint32_t next = (cur_tail + 1) % FILL_RING_SIZE;
if (next == cur_head) return false; /* full */
ch->ring[cur_tail] = *ev;
atomic_store_explicit(&ch->tail, next, memory_order_release);
uint64_t one = 1;
write(ch->wake_fd, &one, sizeof(one));
return true;
}
bool fill_channel_pop(fill_channel_t *ch, fill_event_t *ev) {
uint32_t cur_head_r = atomic_load_explicit(&ch->head, memory_order_acquire);
uint32_t cur_tail_r = atomic_load_explicit(&ch->tail, memory_order_acquire);
if (cur_head_r == cur_tail_r) return false;
*ev = ch->ring[cur_head_r];
uint32_t next_head = (cur_head_r + 1) % FILL_RING_SIZE;
atomic_store_explicit(&ch->head, next_head, memory_order_release);
return true;
}
static int64_t now_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (int64_t)ts.tv_sec * 1000 + (int64_t)ts.tv_nsec / 1000000;
}
bool fill_channel_await(fill_channel_t *ch, const char *client_oid,
int64_t timeout_ms, fill_result_t *out) {
double total_size = 0, total_funds = 0, total_fee = 0;
int match_count = 0;
char order_id[32] = {0};
int64_t deadline = now_ms() + timeout_ms;
while (now_ms() < deadline) {
fill_event_t ev;
while (fill_channel_pop(ch, &ev)) {
if (strcmp(ev.client_oid, client_oid) != 0) continue;
total_size += ev.match_size;
total_funds += ev.match_price * ev.match_size;
total_fee += ev.match_fee;
match_count++;
if (!order_id[0] && ev.order_id[0])
strncpy(order_id, ev.order_id, sizeof(order_id) - 1);
if (ev.is_terminal && match_count > 0) {
out->total_size = total_size;
out->total_funds = total_funds;
out->total_fee = total_fee;
out->avg_price = total_size > 0 ? total_funds / total_size : 0;
out->match_count = match_count;
strncpy(out->order_id, order_id, sizeof(out->order_id) - 1);
return true;
}
}
/* Wait for more events */
struct pollfd pfd = { .fd = ch->wake_fd, .events = POLLIN };
int remaining = (int)(deadline - now_ms());
if (remaining <= 0) break;
int ret = poll(&pfd, 1, remaining);
if (ret > 0) {
uint64_t val;
read(ch->wake_fd, &val, sizeof(val));
}
}
/* Timeout — return partial if any matches */
if (match_count > 0) {
out->total_size = total_size;
out->total_funds = total_funds;
out->total_fee = total_fee;
out->avg_price = total_size > 0 ? total_funds / total_size : 0;
out->match_count = match_count;
strncpy(out->order_id, order_id, sizeof(out->order_id) - 1);
}
return false;
}

54
src/fill_handler.h Normal file
View File

@ -0,0 +1,54 @@
#ifndef FILL_HANDLER_H
#define FILL_HANDLER_H
#include <stdint.h>
#include <stdbool.h>
#include <stddef.h>
#define MAX_CLIENT_OID 41
/* A single match event pushed from hot thread to executor thread. */
typedef struct {
char client_oid[MAX_CLIENT_OID];
double match_size;
double match_price;
double match_fee;
bool is_terminal; /* canceled+done with matches */
bool is_timeout; /* true = fill timeout */
char order_id[32];
} fill_event_t;
/* Accumulated fill result returned to the executor. */
typedef struct {
double total_size;
double total_funds;
double total_fee;
double avg_price;
int match_count;
char order_id[32];
} fill_result_t;
/* Per-thread fill channel (SPSC queue + wake fd). */
typedef struct fill_channel_s fill_channel_t;
fill_channel_t *fill_channel_create(void);
void fill_channel_destroy(fill_channel_t *ch);
/* Push a fill event (hot thread, non-blocking). */
bool fill_channel_push(fill_channel_t *ch, const fill_event_t *ev);
/* Wait for a fill matching client_oid (executor thread, blocking).
Returns the accumulated result. Times out after timeout_ms. */
bool fill_channel_await(fill_channel_t *ch, const char *client_oid,
int64_t timeout_ms, fill_result_t *out);
/* Pop a single event (non-blocking drain). */
bool fill_channel_pop(fill_channel_t *ch, fill_event_t *ev);
/* Get the wake fd for poll(). */
int fill_channel_wake_fd(fill_channel_t *ch);
/* Reset for next execution. */
void fill_channel_reset(fill_channel_t *ch);
#endif

View File

@ -264,7 +264,7 @@ char *https_post(const char *host, int port, const char *path,
* Compute HMAC-SHA256 digest of data using key, then base64-encode the result.
* Uses OpenSSL HMAC() + BIO_f_base64 filter chain.
*/
static int hmac_sha256_base64(const char *key, const char *data, char *out, size_t out_len) {
int hmac_sha256_base64(const char *key, const char *data, char *out, size_t out_len) {
unsigned char digest[EVP_MAX_MD_SIZE];
unsigned int digest_len = 0;
@ -297,6 +297,145 @@ static int hmac_sha256_base64(const char *key, const char *data, char *out, size
return (int)len;
}
/*
* Authenticated POST request to KuCoin REST API with JSON body.
*/
char *https_post_auth(const char *host, int port, const char *path,
const char *api_key, const char *api_secret,
const char *api_passphrase,
const char *body, int *out_len) {
if (!api_key || !api_secret || !api_passphrase) {
return NULL;
}
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
char timestamp[32];
snprintf(timestamp, sizeof(timestamp), "%lld", (long long)(ts.tv_sec * 1000LL + ts.tv_nsec / 1000000LL));
char sign_input[1024];
int body_len = body ? (int)strlen(body) : 0;
snprintf(sign_input, sizeof(sign_input), "%sPOST%s%s", timestamp, path, body ? body : "");
char sign_b64[256] = {0};
if (hmac_sha256_base64(api_secret, sign_input, sign_b64, sizeof(sign_b64)) < 0) {
return NULL;
}
char passphrase_b64[256] = {0};
if (hmac_sha256_base64(api_secret, api_passphrase, passphrase_b64, sizeof(passphrase_b64)) < 0) {
return NULL;
}
int fd = create_tcp_socket(host, port);
if (fd < 0) return NULL;
SSL_CTX *ctx = SSL_CTX_new(TLS_client_method());
if (!ctx) { close(fd); return NULL; }
SSL *ssl = SSL_new(ctx);
if (!ssl) { SSL_CTX_free(ctx); close(fd); return NULL; }
SSL_set_fd(ssl, fd);
SSL_set_tlsext_host_name(ssl, host);
if (SSL_connect(ssl) <= 0) {
SSL_free(ssl); SSL_CTX_free(ctx); close(fd); return NULL;
}
char req[4096];
if (body && body_len > 0) {
snprintf(req, sizeof(req),
"POST %s HTTP/1.1\r\n"
"Host: %s\r\n"
"Accept: application/json\r\n"
"User-Agent: fused-engine/1.0\r\n"
"Connection: close\r\n"
"Content-Type: application/json\r\n"
"Content-Length: %d\r\n"
"KC-API-KEY: %s\r\n"
"KC-API-SIGN: %s\r\n"
"KC-API-TIMESTAMP: %s\r\n"
"KC-API-PASSPHRASE: %s\r\n"
"KC-API-SIGN-TYPE: 2\r\n"
"KC-API-KEY-VERSION: 3\r\n"
"\r\n%s",
path, host, body_len,
api_key, sign_b64, timestamp, passphrase_b64,
body);
} else {
snprintf(req, sizeof(req),
"POST %s HTTP/1.1\r\n"
"Host: %s\r\n"
"Accept: application/json\r\n"
"User-Agent: fused-engine/1.0\r\n"
"Connection: close\r\n"
"KC-API-KEY: %s\r\n"
"KC-API-SIGN: %s\r\n"
"KC-API-TIMESTAMP: %s\r\n"
"KC-API-PASSPHRASE: %s\r\n"
"KC-API-SIGN-TYPE: 2\r\n"
"KC-API-KEY-VERSION: 3\r\n"
"\r\n",
path, host,
api_key, sign_b64, timestamp, passphrase_b64);
}
if (SSL_write(ssl, req, (int)strlen(req)) <= 0) {
SSL_free(ssl); SSL_CTX_free(ctx); close(fd); return NULL;
}
char *resp = malloc(HTTP_BUFFER_SIZE);
if (!resp) { SSL_free(ssl); SSL_CTX_free(ctx); close(fd); return NULL; }
int total = 0;
while (total < HTTP_BUFFER_SIZE - 1) {
int n = SSL_read(ssl, resp + total, HTTP_BUFFER_SIZE - 1 - total);
if (n <= 0) {
int err = SSL_get_error(ssl, n);
log_write("[HTTPS_POST_AUTH] SSL_read returned %d, SSL_error=%d\n", n, err);
break;
}
total += n;
}
resp[total] = '\0';
SSL_shutdown(ssl);
SSL_free(ssl);
SSL_CTX_free(ctx);
close(fd);
// Strip headers
char *headers_end = strstr(resp, "\r\n\r\n");
if (headers_end) {
int hl = (headers_end - resp) + 4;
bool chunked = (strcasestr(resp, "transfer-encoding") != NULL);
memmove(resp, headers_end + 4, total - hl);
total -= hl;
resp[total] = '\0';
if (chunked) {
char *out = malloc(HTTP_BUFFER_SIZE);
if (out) {
int op = 0;
char *p = resp;
while (*p && !(*p == '0' && (p[1] == '\r' || p[1] == '\n'))) {
int cl = 0;
while (*p && *p != '\r' && *p != '\n') { char h = *p; cl <<= 4; cl += (h >= '0' && h <= '9') ? (h - '0') : ((h & 0x1f) + 9); p++; }
if (*p == '\r') p++; if (*p == '\n') p++;
if (cl > 0 && op + cl < HTTP_BUFFER_SIZE - 1) { memcpy(out + op, p, cl); op += cl; p += cl; }
if (*p == '\r') p++; if (*p == '\n') p++;
}
out[op] = '\0';
free(resp);
resp = out;
total = op;
}
}
}
if (out_len) *out_len = total;
return resp;
}
/*
* Authenticated GET request to KuCoin REST API.
* Builds signature from timestamp + "GET" + path using HMAC-SHA256.

View File

@ -20,4 +20,10 @@ char *https_get_auth(const char *host, int port, const char *path,
const char *api_key, const char *api_secret,
const char *api_passphrase, int *out_len);
/* Authenticated HTTPS POST with JSON body signed with KuCoin API HMAC-SHA256 */
char *https_post_auth(const char *host, int port, const char *path,
const char *api_key, const char *api_secret,
const char *api_passphrase,
const char *body, int *out_len);
#endif

435
src/rest_client.c Normal file
View File

@ -0,0 +1,435 @@
#include "rest_client.h"
#include "http_client.h"
#include "log.h"
#include "cJSON.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#define HOST "api.kucoin.com"
#define PORT 443
struct rest_conn_s {
char api_key[64];
char api_secret[128];
char api_passphrase[64];
/* Keepalive TLS connection */
SSL_CTX *ctx;
SSL *ssl;
int fd;
};
/* Forward declaration of the HMAC helper from http_client.c */
extern int hmac_sha256_base64(const char *key, const char *data,
char *out, size_t out_len);
/* ── Connection management ── */
rest_conn_t *rest_conn_new(void) {
rest_conn_t *rc = calloc(1, sizeof(*rc));
if (!rc) return NULL;
rc->fd = -1;
return rc;
}
void rest_conn_set_auth(rest_conn_t *rc,
const char *api_key,
const char *api_secret,
const char *api_passphrase) {
if (api_key) strncpy(rc->api_key, api_key, sizeof(rc->api_key) - 1);
if (api_secret) strncpy(rc->api_secret, api_secret, sizeof(rc->api_secret) - 1);
if (api_passphrase) strncpy(rc->api_passphrase, api_passphrase, sizeof(rc->api_passphrase) - 1);
}
static int tcp_connect(const char *host, int port) {
char port_str[8];
snprintf(port_str, sizeof(port_str), "%d", port);
struct addrinfo hints = {0}, *res = NULL;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
int ret = getaddrinfo(host, port_str, &hints, &res);
if (ret != 0 || !res) return -1;
int fd = -1;
for (struct addrinfo *rp = res; rp; rp = rp->ai_next) {
fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (fd < 0) continue;
struct timeval tv = { .tv_sec = 10, .tv_usec = 0 };
setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
if (connect(fd, rp->ai_addr, rp->ai_addrlen) == 0) break;
close(fd);
fd = -1;
}
freeaddrinfo(res);
return fd;
}
static int ensure_connected(rest_conn_t *rc) {
if (rc->ssl && rc->fd >= 0) {
/* Quick check: is the connection still alive? */
char buf;
if (recv(rc->fd, &buf, 1, MSG_PEEK | MSG_DONTWAIT) == 0) {
goto reconnect;
}
return 0; /* Connection alive */
}
reconnect:
/* Close stale connection */
if (rc->ssl) { SSL_free(rc->ssl); rc->ssl = NULL; }
if (rc->fd >= 0) { close(rc->fd); rc->fd = -1; }
if (!rc->ctx) {
rc->ctx = SSL_CTX_new(TLS_client_method());
if (!rc->ctx) return -1;
}
int fd = tcp_connect(HOST, PORT);
if (fd < 0) return -1;
SSL *ssl = SSL_new(rc->ctx);
if (!ssl) { close(fd); return -1; }
SSL_set_fd(ssl, fd);
SSL_set_tlsext_host_name(ssl, HOST);
if (SSL_connect(ssl) <= 0) {
SSL_free(ssl); close(fd);
return -1;
}
rc->fd = fd;
rc->ssl = ssl;
return 0;
}
/* ── Low-level signed request ── */
static bool do_signed_request(rest_conn_t *rc,
const char *method,
const char *path,
const char *body,
char *out_response, size_t out_sz) {
if (ensure_connected(rc) < 0) return false;
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
char timestamp[32];
snprintf(timestamp, sizeof(timestamp), "%lld",
(long long)(ts.tv_sec * 1000LL + ts.tv_nsec / 1000000LL));
/* Build signing string: timestamp + method + path + body */
char sign_input[1536];
int body_len = body ? (int)strlen(body) : 0;
snprintf(sign_input, sizeof(sign_input), "%s%s%s%s",
timestamp, method, path, body ? body : "");
char sign_b64[256] = {0};
if (hmac_sha256_base64(rc->api_secret, sign_input,
sign_b64, sizeof(sign_b64)) < 0) {
return false;
}
/* Build password-encrypted passphrase */
char pass_b64[256] = {0};
if (hmac_sha256_base64(rc->api_secret, rc->api_passphrase,
pass_b64, sizeof(pass_b64)) < 0) {
return false;
}
/* Build the HTTP request */
char req[4096];
if (body && body_len > 0) {
snprintf(req, sizeof(req),
"%s %s HTTP/1.1\r\n"
"Host: " HOST "\r\n"
"Accept: application/json\r\n"
"User-Agent: fused-engine/1.0\r\n"
"Connection: keep-alive\r\n"
"Content-Type: application/json\r\n"
"Content-Length: %d\r\n"
"KC-API-KEY: %s\r\n"
"KC-API-SIGN: %s\r\n"
"KC-API-TIMESTAMP: %s\r\n"
"KC-API-PASSPHRASE: %s\r\n"
"KC-API-SIGN-TYPE: 2\r\n"
"KC-API-KEY-VERSION: 3\r\n"
"\r\n%s",
method, path, body_len,
rc->api_key, sign_b64, timestamp, pass_b64, body);
} else {
snprintf(req, sizeof(req),
"%s %s HTTP/1.1\r\n"
"Host: " HOST "\r\n"
"Accept: application/json\r\n"
"User-Agent: fused-engine/1.0\r\n"
"Connection: keep-alive\r\n"
"KC-API-KEY: %s\r\n"
"KC-API-SIGN: %s\r\n"
"KC-API-TIMESTAMP: %s\r\n"
"KC-API-PASSPHRASE: %s\r\n"
"KC-API-SIGN-TYPE: 2\r\n"
"KC-API-KEY-VERSION: 3\r\n"
"\r\n",
method, path,
rc->api_key, sign_b64, timestamp, pass_b64);
}
int req_len = (int)strlen(req);
/* Send */
if (SSL_write(rc->ssl, req, req_len) <= 0) {
log_write("[REST] SSL_write failed, will reconnect\n");
/* Force reconnect on next call */
SSL_free(rc->ssl); rc->ssl = NULL;
close(rc->fd); rc->fd = -1;
return false;
}
/* Read response — handle both Content-Length and chunked encoding */
char raw[65536];
int total = 0;
int need = -1; /* total bytes needed (-1 = not yet known) */
bool chunked = false;
while (total < (int)sizeof(raw) - 1) {
int n = SSL_read(rc->ssl, raw + total, (int)sizeof(raw) - 1 - total);
if (n <= 0) {
int err = SSL_get_error(rc->ssl, n);
if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) continue;
break;
}
total += n;
if (need < 0) {
char *end_hdrs = strstr(raw, "\r\n\r\n");
if (end_hdrs) {
int hdr_len = (int)(end_hdrs + 4 - raw);
char *cl = strcasestr(raw, "Content-Length:");
if (cl && cl < end_hdrs) {
long len = atol(cl + 15);
if (len > 0) need = hdr_len + (int)len;
}
chunked = (strcasestr(raw, "transfer-encoding") != NULL);
if (!chunked && need < 0) {
/* No Content-Length and not chunked — read all (close). */
need = -2;
}
if (!chunked && need > 0 && total >= need) {
break; /* got the full Content-Length body */
}
if (chunked) {
/* For chunked, we decode below and stop at 0\r\n\r\n */
if (total >= 5 && memcmp(raw + total - 5, "0\r\n\r\n", 5) == 0) break;
}
}
} else if (need > 0 && total >= need) {
break;
} else if (chunked) {
if (total >= 5 && memcmp(raw + total - 5, "0\r\n\r\n", 5) == 0) break;
}
}
raw[total] = '\0';
/* Find header/body boundary */
char *headers_end = strstr(raw, "\r\n\r\n");
if (!headers_end) {
strncpy(out_response, raw, out_sz - 1);
out_response[out_sz - 1] = '\0';
return total > 0;
}
int header_len = (int)(headers_end - raw) + 4;
int body_start = header_len;
int body_end = total;
if (chunked) {
char *src = raw + body_start;
char *dst = raw;
int wrote = 0;
while (*src) {
char *end = NULL;
long cl = strtol(src, &end, 16);
if (end == src || cl <= 0) break;
while (*end && (*end == '\r' || *end == '\n')) end++;
if (cl > 0 && wrote + cl + 1 < (int)sizeof(raw)) {
memmove(dst + wrote, end, cl);
wrote += (int)cl;
src = end + cl;
while (*src && (*src == '\r' || *src == '\n')) src++;
} else {
break;
}
}
raw[wrote] = '\0';
body_end = wrote;
} else {
memmove(raw, raw + body_start, body_end - body_start);
body_end -= body_start;
raw[body_end] = '\0';
}
strncpy(out_response, raw, out_sz - 1);
out_response[out_sz - 1] = '\0';
return true;
}
/* ── Order placement ── */
bool rest_order_place(rest_conn_t *rc,
const char *symbol, const char *side,
double funds, double size,
const char *client_oid,
char *out_order_id, size_t out_sz,
char *out_error, size_t err_sz) {
char body[512];
if (funds > 0) {
snprintf(body, sizeof(body),
"{\"clientOid\":\"%s\",\"symbol\":\"%s\",\"type\":\"market\""
",\"side\":\"%s\",\"funds\":\"%.10f\"}",
client_oid, symbol, side, funds);
} else {
snprintf(body, sizeof(body),
"{\"clientOid\":\"%s\",\"symbol\":\"%s\",\"type\":\"market\""
",\"side\":\"%s\",\"size\":\"%.10f\"}",
client_oid, symbol, side, size);
}
char resp[65536] = {0};
if (!do_signed_request(rc, "POST", "/api/v1/hf/orders", body, resp, sizeof(resp))) {
snprintf(out_error, err_sz, "transport_error");
return false;
}
/* Parse JSON response */
cJSON *root = cJSON_Parse(resp);
if (!root) {
snprintf(out_error, err_sz, "parse_error: %.100s", resp);
return false;
}
cJSON *code = cJSON_GetObjectItem(root, "code");
cJSON *data = cJSON_GetObjectItem(root, "data");
if (code && cJSON_IsString(code) && strcmp(code->valuestring, "200000") == 0 && data) {
cJSON *oid = cJSON_GetObjectItem(data, "orderId");
if (oid && cJSON_IsString(oid)) {
strncpy(out_order_id, oid->valuestring, out_sz - 1);
out_order_id[out_sz - 1] = '\0';
cJSON_Delete(root);
return true;
}
}
/* Error */
cJSON *msg = cJSON_GetObjectItem(root, "msg");
if (msg && cJSON_IsString(msg)) {
snprintf(out_error, err_sz, "%s: %s",
code && cJSON_IsString(code) ? code->valuestring : "ERR",
msg->valuestring);
} else {
snprintf(out_error, err_sz, "unknown_error: %.100s", resp);
}
cJSON_Delete(root);
return false;
}
bool rest_order_test(rest_conn_t *rc,
const char *symbol, const char *side,
double funds, double size,
char *out_error, size_t err_sz) {
/* Build a unique clientOid for the test order */
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
char body[512];
unsigned rnd = (unsigned)(ts.tv_nsec ^ (uintptr_t)&body);
char cid[48];
snprintf(cid, sizeof(cid), "pt-%08x", rnd);
if (funds > 0) {
snprintf(body, sizeof(body),
"{\"clientOid\":\"%s\",\"symbol\":\"%s\",\"type\":\"market\",\"side\":\"%s\",\"funds\":\"%.8g\"}",
cid, symbol, side, funds);
} else {
snprintf(body, sizeof(body),
"{\"clientOid\":\"%s\",\"symbol\":\"%s\",\"type\":\"market\",\"side\":\"%s\",\"size\":\"%.8g\"}",
cid, symbol, side, size);
}
char resp[65536] = {0};
if (!do_signed_request(rc, "POST", "/api/v1/hf/orders/test", body, resp, sizeof(resp))) {
snprintf(out_error, err_sz, "transport_error");
return false;
}
cJSON *root = cJSON_Parse(resp);
if (!root) {
snprintf(out_error, err_sz, "parse_error: %.100s", resp);
return false;
}
cJSON *code = cJSON_GetObjectItem(root, "code");
if (code && cJSON_IsString(code) && strcmp(code->valuestring, "200000") == 0) {
cJSON_Delete(root);
return true;
}
cJSON *msg = cJSON_GetObjectItem(root, "msg");
if (msg && cJSON_IsString(msg)) {
snprintf(out_error, err_sz, "%s: %s",
code && cJSON_IsString(code) ? code->valuestring : "ERR",
msg->valuestring);
} else {
snprintf(out_error, err_sz, "unknown_error: %.100s", resp);
}
cJSON_Delete(root);
return false;
}
bool rest_get_balance(rest_conn_t *rc, const char *currency, double *out) {
char path[128];
char resp[65536] = {0};
snprintf(path, sizeof(path), "/api/v1/accounts?currency=%s", currency);
if (!do_signed_request(rc, "GET", path, NULL, resp, sizeof(resp))) {
return false;
}
cJSON *root = cJSON_Parse(resp);
if (!root) return false;
cJSON *data_arr = cJSON_GetObjectItem(root, "data");
if (!data_arr || !cJSON_IsArray(data_arr) || cJSON_GetArraySize(data_arr) == 0) {
cJSON_Delete(root);
return false;
}
cJSON *item = cJSON_GetArrayItem(data_arr, 0);
cJSON *avail = cJSON_GetObjectItem(item, "available");
if (avail && cJSON_IsString(avail)) {
*out = atof(avail->valuestring);
cJSON_Delete(root);
return true;
}
cJSON_Delete(root);
return false;
}
void rest_conn_free(rest_conn_t *rc) {
if (rc->ssl) SSL_free(rc->ssl);
if (rc->fd >= 0) close(rc->fd);
if (rc->ctx) SSL_CTX_free(rc->ctx);
free(rc);
}

45
src/rest_client.h Normal file
View File

@ -0,0 +1,45 @@
#ifndef REST_CLIENT_H
#define REST_CLIENT_H
#include <stdbool.h>
#include <stddef.h>
/* Keepalive HTTPS connection for KuCoin REST API. */
typedef struct rest_conn_s rest_conn_t;
/* Create a REST connection handle. Does not connect yet. */
rest_conn_t *rest_conn_new(void);
/* Set the API credentials. Must be called before any request. */
void rest_conn_set_auth(rest_conn_t *rc,
const char *api_key,
const char *api_secret,
const char *api_passphrase);
/* Place a market order.
Returns true on success and fills out_order_id.
Returns false and fills out_error (up to err_sz) on failure. */
bool rest_order_place(rest_conn_t *rc,
const char *symbol, const char *side,
double funds, double size,
const char *client_oid,
char *out_order_id, size_t out_sz,
char *out_error, size_t err_sz);
/* Validate an order via the test endpoint.
Returns true if the order would be accepted. */
bool rest_order_test(rest_conn_t *rc,
const char *symbol, const char *side,
double funds, double size,
char *out_error, size_t err_sz);
/* Fetch available balance for a currency (GET /api/v1/accounts).
Returns true and writes the available amount to *out on success. */
bool rest_get_balance(rest_conn_t *rc,
const char *currency,
double *out);
/* Close the connection and free resources. */
void rest_conn_free(rest_conn_t *rc);
#endif

View File

@ -29,6 +29,8 @@
#include <openssl/ssl.h>
#include <openssl/err.h>
static int ws_send_text(ws_connection_t *conn, const char *text);
static uint64_t now_ms_impl(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
@ -70,6 +72,7 @@ static void ws_connection_reset(ws_connection_t *conn) {
BIO_free(conn->bio_socket);
conn->bio_socket = NULL;
}
conn->token[0] = '\0';
if (conn->fd >= 0) {
close(conn->fd);
conn->fd = -1;
@ -177,6 +180,12 @@ int ws_client_init(ws_client_t *client, const config_t *cfg,
conn->port = 443;
}
client->connection_count = 1;
client->fill_ch = fill_channel_create();
if (!client->fill_ch) {
log_write("[WS] Failed to create fill channel\n");
}
return 0;
}
@ -196,12 +205,26 @@ void ws_client_destroy(ws_client_t *client) {
* Fetch a WebSocket token and server endpoint from KuCoin's /api/v1/bullet-public.
* Stores token, host, ping_interval_ms, ping_timeout_ms in the connection struct.
*/
int ws_client_fetch_token(ws_connection_t *conn) {
const char *body = "";
int ws_client_fetch_token_priv(ws_connection_t *conn, const config_t *cfg) {
int out_len = 0;
char *response = https_post("api.kucoin.com", 443, "/api/v1/bullet-public",
body, strlen(body), &out_len);
char *response = NULL;
/* Try private token first (needed for tradeOrdersV2 + balance). Fall
back to public token (level2Depth only) if no API key configured. */
if (cfg && cfg->kucoin_api_key[0] && cfg->kucoin_api_secret[0]) {
response = https_post_auth("api.kucoin.com", 443, "/api/v1/bullet-private",
cfg->kucoin_api_key,
cfg->kucoin_api_secret,
cfg->kucoin_api_passphrase,
"{}", &out_len);
}
if (!response || out_len <= 0) {
free(response);
response = https_post("api.kucoin.com", 443, "/api/v1/bullet-public",
"", 0, &out_len);
}
if (!response || out_len <= 0) {
log_write("[WS] Failed to fetch token\n");
free(response);
@ -281,7 +304,7 @@ int ws_client_connect(ws_client_t *client, uint32_t conn_idx) {
if (!conn->token[0]) {
conn->state = WS_STATE_GETTING_TOKEN;
if (ws_client_fetch_token(conn) != 0) return -1;
if (ws_client_fetch_token_priv(conn, client->cfg) != 0) return -1;
}
snprintf(conn->connect_id, sizeof(conn->connect_id), "fused-%" PRIu32, conn_idx + 1);
@ -348,6 +371,16 @@ int ws_client_connect(ws_client_t *client, uint32_t conn_idx) {
ws_client_subscribe(client, conn_idx, conn->symbol_indices, conn->symbol_count);
}
/* Subscribe to private order-change channel for fill events */
{
char msg[256];
snprintf(msg, sizeof(msg),
"{\"type\":\"subscribe\",\"topic\":\"/spotMarket/tradeOrdersV2\","
"\"response\":true,\"privateChannel\":\"true\"}");
ws_send_text(conn, msg);
log_write("[WS] Subscribed to tradeOrdersV2\n");
}
return 0;
}
@ -365,6 +398,7 @@ int ws_client_write(ws_connection_t *conn, const void *data, size_t len) {
if (r <= 0) {
int err = SSL_get_error(conn->ssl, r);
log_write("[WS] SSL_write error: %d\n", err);
conn->state = WS_STATE_DISCONNECTED;
return -1;
}
written += r;
@ -695,7 +729,54 @@ int16_t ws_client_process_frame(ws_client_t *client, uint32_t conn_idx) {
if (++ack_count <= 5) log_write("[WS] Ack #%d: %.*s\n", ack_count,
(int)(payload_len > 200 ? 200 : payload_len), (const char *)payload);
} else if (strcmp(msg_type->valuestring, "message") == 0) {
sym_idx = parse_book_update(msg_root, client);
cJSON *subject = cJSON_GetObjectItem(msg_root, "subject");
if (cJSON_IsString(subject) &&
strcmp(subject->valuestring, "orderChange") == 0) {
/* Private order-change event — dispatch fill */
cJSON *data = cJSON_GetObjectItem(msg_root, "data");
if (data && client->fill_ch) {
cJSON *oid = cJSON_GetObjectItem(data, "clientOid");
cJSON *evt_type = cJSON_GetObjectItem(data, "type");
cJSON *status = cJSON_GetObjectItem(data, "status");
cJSON *order_id = cJSON_GetObjectItem(data, "orderId");
if (cJSON_IsString(oid) && cJSON_IsString(evt_type)) {
fill_event_t fe = {0};
strncpy(fe.client_oid, oid->valuestring, MAX_CLIENT_OID - 1);
if (order_id && cJSON_IsString(order_id))
strncpy(fe.order_id, order_id->valuestring, sizeof(fe.order_id) - 1);
bool is_match = (strcmp(evt_type->valuestring, "match") == 0);
bool is_terminal = ((strcmp(evt_type->valuestring, "filled") == 0 ||
(strcmp(evt_type->valuestring, "canceled") == 0)) &&
cJSON_IsString(status) &&
strcmp(status->valuestring, "done") == 0);
if (is_match) {
cJSON *ms = cJSON_GetObjectItem(data, "matchSize");
cJSON *mp = cJSON_GetObjectItem(data, "matchPrice");
cJSON *mf = cJSON_GetObjectItem(data, "matchFee");
fe.match_size = cJSON_IsString(ms) ? atof(ms->valuestring) :
cJSON_IsNumber(ms) ? ms->valuedouble : 0;
fe.match_price = cJSON_IsString(mp) ? atof(mp->valuestring) :
cJSON_IsNumber(mp) ? mp->valuedouble : 0;
fe.match_fee = cJSON_IsString(mf) ? atof(mf->valuestring) :
cJSON_IsNumber(mf) ? mf->valuedouble : 0;
fill_channel_push(client->fill_ch, &fe);
} else if (is_terminal) {
/* Terminal event: push a marker that stops accumulation.
A preceding "match" event already communicated the data.
If no matches arrived, this is a cancelled+empty order.
We push a terminal marker to unblock the waiter. */
fe.is_terminal = true;
fill_channel_push(client->fill_ch, &fe);
}
}
}
} else {
sym_idx = parse_book_update(msg_root, client);
}
} else if (strcmp(msg_type->valuestring, "error") == 0) {
log_write("[WS] Error message: %.*s\n",
(int)(payload_len > 200 ? 200 : payload_len), (const char *)payload);
@ -731,6 +812,7 @@ int ws_client_read(ws_client_t *client, uint32_t conn_idx) {
int err = SSL_get_error(conn->ssl, r);
if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) return 0;
log_write("[WS] SSL_read error: %d\n", err);
conn->state = WS_STATE_DISCONNECTED;
return -1;
}
if (r == 0) {

View File

@ -9,6 +9,7 @@
#include "config.h"
#include "hash.h"
#include "evaluate.h"
#include "fill_handler.h"
#define WS_MAX_FRAME_SIZE (128 * 1024)
#define WS_MAX_CONNECTIONS 8
@ -71,6 +72,7 @@ typedef struct {
order_book_t *books; /* pointer to shared order books */
evaluator_t *evaluator; /* pointer to evaluator */
bool running; /* false signals client to stop */
fill_channel_t *fill_ch; /* fill event channel (hot→executor) */
} ws_client_t;
/* Initialise a WebSocket client with config, symbol table, books, and evaluator */
@ -93,8 +95,8 @@ int ws_client_subscribe(ws_client_t *client, uint32_t conn_idx,
/* Unsubscribe from a set of symbols on a connection */
int ws_client_unsubscribe(ws_client_t *client, uint32_t conn_idx,
const uint16_t *symbol_indices, uint32_t count);
/* Fetch a WebSocket token from the KuCoin API */
int ws_client_fetch_token(ws_connection_t *conn);
/* Fetch a WebSocket token from the KuCoin API (prefers private when cfg has keys) */
int ws_client_fetch_token_priv(ws_connection_t *conn, const config_t *cfg);
/* Process a received WebSocket frame (dispatch to book updates, etc.).
* Returns symbol index on book update, -1 otherwise. */
int16_t ws_client_process_frame(ws_client_t *client, uint32_t conn_idx);