diff --git a/config.yaml.example b/config.yaml.example index 3e9ae5e..491d719 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -11,6 +11,12 @@ fused_engine: reconnect_base_delay: 1.0 reconnect_max_delay: 60.0 heartbeat_interval: 18.0 + cooldown_seconds: 0.0 + kcs_discount_active: false + initial_capital: + USDT: 5 + USDC: 5 + USD1: 5 executor: fill_timeout_ms: 1000 @@ -21,10 +27,6 @@ executor: enforce_same_base_isolation: true enforce_pair_isolation: true rest_port: 8002 - initial_capital: - USDT: 5 - USDC: 5 - USD1: 5 kucoin_api_key: "" kucoin_api_secret: "" diff --git a/executor/config.py b/executor/config.py index 25020fb..5eadb63 100644 --- a/executor/config.py +++ b/executor/config.py @@ -57,6 +57,10 @@ class ExecutorSettings(BaseModel): default=1000, description="Per-leg fill wait timeout in milliseconds", ) + await_balance: bool = Field( + default=False, + description="Wait for balance WS update before next leg", + ) class Settings(BaseSettings): diff --git a/executor/executor.py b/executor/executor.py index 6d651ac..eda3fc3 100644 --- a/executor/executor.py +++ b/executor/executor.py @@ -693,13 +693,24 @@ class Executor: f"fee={str(fee)} {fee_ccy} | lat={latency_ms:.1f}ms" ) if i < 2: - await self._ws_client.await_balance( - output_ccy, fills[-1].filled_volume, 2000 - ) - if side == "sell": - bal = self._ws_client.latest_balance(output_ccy) - if bal > 0: - fills[-1].filled_volume = bal + if self._settings.await_balance: + await self._ws_client.await_balance( + output_ccy, fills[-1].filled_volume, 2000 + ) + if side == "sell": + bal = self._ws_client.latest_balance(output_ccy) + if bal > 0: + fills[-1].filled_volume = bal + nxt = legs[i + 1] + nxt_fee = Decimal(str(nxt.get("fee_rate", "0.001"))) + nxt_side = nxt.get("side", "") + nxt_inc = Decimal(str(nxt.get( + "quote_increment" if nxt_side == "buy" else "base_increment", "0"))) + if nxt_side == "buy": + fills[-1].filled_volume /= (_D1 + nxt_fee) + if nxt_inc > 0: + fills[-1].filled_volume = (fills[-1].filled_volume / nxt_inc + ).to_integral_value(rounding=ROUND_DOWN) * nxt_inc in_flight.last_trade_ts_ms = int(time.time() * 1000) continue diff --git a/notes/executor_migration.md b/notes/executor_migration.md new file mode 100644 index 0000000..d90d3e8 --- /dev/null +++ b/notes/executor_migration.md @@ -0,0 +1,386 @@ +# Executor Migration Plan + +Absorb all executor functions into the fused_engine process, eliminating +Python, UDS, JSON serialisation, and the 2+ ms pipeline gap between signal +creation and first order fire. + +## Architecture + +``` + ┌───────────────────────────────────────────┐ + │ HOT THREAD │ + │ epoll WS → book updates → evaluate → │ + │ spsc_push(main_queue, &signal) │ + │ eventfd_write(wake_fd) │ + │ │ + │ on WS match event (orderChange): │ + │ exec_id = lookup[client_oid] │ + │ spsc_push(fill_queue[exec_id], match) │ + │ eventfd_write(fill_wake[exec_id]) │ + └──────┬─────────────────────────┬──────────┘ + │ │ + main_queue fill_queue[0..N-1] + (signal_entry_t SPSC) (per-thread SPSC) + │ │ + ┌──────────▼────────┐ ┌──────────▼─────────┐ + │ EXECUTOR THREAD │ │ fill_match_t ring │ + │ (1 per slot) │◄────│ pushed by hot t │ + │ spsc_pop(main) │ └────────────────────┘ + │ execute(sig) │ + │ for each leg: │ + │ rest POST │ + │ wait_for_fill │ + │ cascade │ + │ emit report │ + └───────────────────┘ +``` + +**Thread roles:** +- **HOT**: epoll on WS fds + timer. Evaluates triangles, pushes signals to + SPSC. Also receives WS orderChange events and dispatches match data to the + correct executor thread's fill SPSC. +- **EXECUTOR**: pops from main signal queue, executes triangle via + blocking REST POST + fill wait, cascades to next leg. One thread per + concurrent slot. +- No Python, no UDS socket, no JSON serialisation between threads. + +## What Exists in C Already + +| Component | File | Status | +|-----------|------|--------| +| WS connection + reconnection | `ws_client.c` | Done | +| SSL context + BIO | `ws_client.c` | Done | +| Timer fd | `events.c` | Done | +| SPSC queue | `queue.c` | Done | +| JSON parser (cJSON) | `cJSON.c` | Done | +| Book data structures | `book.h` | Done | +| Triangle evaluation + sizing | `evaluate.c` | Done | +| Non-blocking log pipe | `log.c` | Done | +| Config parser | `config.c` | Done | +| HMAC-SHA256 | OpenSSL `HMAC()` in libcrypto | Already linked | +| Base64 | OpenSSL `BIO_f_base64()` in libcrypto | Already linked | + +## What Needs to Be Written + +### Phase 1 — HTTP Client + REST Signing + +**New files:** `http_client.c`, `http_client.h` + +- Reuse `SSL_CTX` from WS client (or create a minimal second context) +- `http_post(url, body, headers) → (status, response_body)` +- `http_get(url, headers) → (status, response_body)` +- Connection pool: keep-alive via socket reuse (same pattern as WS reconnect + but for REST endpoints) +- DNS resolution: reuse `getaddrinfo` from `ws_client.c` +- `kucoin_sign(timestamp, method, path, body, secret) → signature_base64` +- `kucoin_headers(api_key, encrypted_passphrase, sign, timestamp) → header_string` + +**Dependencies:** OpenSSL libcrypto (already linked for HMAC), cJSON for +parsing responses. + +### Phase 2 — Fill Handler + +**New files:** `fill_handler.c`, `fill_handler.h` + +**Data structures:** + +```c +typedef struct { + char client_oid[41]; // max 40 chars per KuCoin + double match_size; + double match_price; + bool is_terminal; // canceled+done = complete +} fill_match_t; + +typedef struct { + char client_oid[41]; + char order_id[32]; + double total_size; + double total_funds; + int match_count; + bool done; // terminal event received +} accumulator_t; + +// Ring of fill SPSCs — one per executor thread +typedef struct { + spsc_queue_t queue; // fill_match_t ring + int wake_fd; // eventfd for waking executor + accumulator_t acc; // in-progress accumulation (hot thread writes) + atomic_bool registered; // register_fill_listener / unregister +} fill_channel_t; +``` + +**Fill dispatch (hot thread side):** + +```c +// Called from hot thread's ws_client.c when a match event arrives +void fill_dispatch_match(const char *client_oid, + double match_size, double match_price); + +// Called from hot thread on canceled/filled terminal event +void fill_dispatch_terminal(const char *client_oid); +``` + +- `client_oid → executor_thread` lookup via hash table (hot thread use, + atomically updated) +- Each `fill_dispatch_match` pushes a `fill_match_t` to the correct executor + thread's SPSC + eventfd write + +**Fill waiter (executor thread side):** + +```c +// Accumulates matches for a single client_oid until terminal or timeout +typedef struct { + double total_size; + double total_funds; + double avg_price; + int match_count; + char order_id[32]; +} fill_result_t; + +bool await_fill(fill_channel_t *ch, const char *client_oid, + int64_t timeout_ms, fill_result_t *out); +``` + +- Drains `fill_match_t` from the thread's fill SPSC +- Accumulates `total_size += match_size`, `total_funds += match_price * match_size` +- Returns on terminal event or timeout +- `poll()` on the fill eventfd between drain cycles + +### Phase 3 — Execution Engine + +Executor thread main loop (`events.c`, replacing current cold thread): + +```c +void *executor_thread(void *arg) { + thread_data_t *td = (thread_data_t *)arg; + + while (atomic_load(&g_running)) { + signal_entry_t sig; + if (spsc_pop(td->main_queue, &sig)) { + execute_triangle(&sig, td); + continue; + } + // sleep via poll on fill eventfd + shutdown fd + struct pollfd fds[2] = { + { .fd = td->fill_channel.wake_fd, .events = POLLIN }, + }; + poll(fds, 1, -1); + uint64_t val; read(td->fill_channel.wake_fd, &val, sizeof(val)); + } + return NULL; +} +``` + +**`execute_triangle()` in C** mirrors the current `_execute_triangle`: + +```c +void execute_triangle(signal_entry_t *sig, thread_data_t *td) { + setup_timings(sig, timings); + + double leg_output[3] = {0}; + for (int leg = 0; leg < 3; leg++) { + // ── Input volume ── + double input_vol = (leg == 0) ? sig->starting_volume : leg_output[leg-1]; + apply_fee_hold_reduction(sig, leg, &input_vol); + apply_increment_floor(sig, leg, &input_vol); + + // ── Place order ── + int64_t t0 = now_monotonic_ms(); + char order_id[32]; + bool ok = place_order(sig, leg, input_vol, order_id); + timing_append(timings, leg_i_order_fired); + + if (!ok) { + emit_failed(sig, leg_output, timings, "order_rejected"); + return; + } + register_fill_listener(td, leg, order_id); + + // ── Wait for fill ── + fill_result_t result; + bool filled = await_fill(&td->fill_channel, order_id, + FILL_TIMEOUT_MS, &result); + timing_append(timings, leg_i_fill_received); + + if (sig->cancelled) { emit_aborted(...); return; } + if (!filled) { emit_failed(...); return; } + + // ── Cascade ── + leg_output[leg] = (sig->legs.legs[leg].side_is_buy) + ? result.total_size + : result.total_funds; + + apply_balance_override(td, sig, leg, &leg_output[leg]); + apply_fee_hold_reduction(sig, leg+1, &leg_output[leg]); + apply_increment_floor(sig, leg+1, &leg_output[leg]); + } + + double pnl = compute_pnl(leg_output, sig); + emit_filled(sig, leg_output, timings, pnl); +} +``` + +Helper functions (`cascade.c`): + +```c +// Fee hold: funds * (1 + fee_rate) must not exceed balance. +// Divide by (1 + fee_rate) when next leg is a buy. +void apply_fee_hold_reduction(signal_entry_t *sig, int leg, double *vol); + +// Round down to the next leg's quote_increment (buy) or base_increment (sell). +void apply_increment_floor(signal_entry_t *sig, int leg, double *vol); + +// Override with latest WS balance for sells (if await_balance enabled). +void apply_balance_override(thread_data_t *td, signal_entry_t *sig, + int leg, double *vol); + +// Leg 0 input → leg 2 output, compute profit in primary_quote. +double compute_pnl(double *leg_output, signal_entry_t *sig); +``` + +### Phase 4 — REST Order Placement + +**New file:** `rest_client.c`, `rest_client.h` + +```c +bool rest_order_place(const char *symbol, const char *side, + double funds, double size, + const char *client_oid, + char *out_order_id, size_t out_sz, + char *out_error, size_t err_sz); +``` + +- Signs request with HMAC-SHA256 +- POST to `/api/v1/hf/orders` +- Parses JSON response, extracts `orderId` or error `code: msg` +- Supports `funds` (buy) or `size` (sell) based on leg type + +```c +bool rest_order_test(const char *symbol, const char *side, + double funds, double size, + char *out_error, size_t err_sz); +``` + +- POST to `/api/v1/hf/orders/test` +- Same signing path, same response parsing + +Connection pool: +- Store 1-2 reusable HTTPS sockets per executor thread +- Keepalive via 30s interval timer (same pattern as `_keepalive_loop`) +- On disconnect: reconnect once, fail immediately + +### Phase 5 — Paper Mode + +Paper mode in C uses the same `execute_triangle()` but: +- Calls `rest_order_test()` instead of `rest_order_place()` +- Simulates fills from `sig->books[leg]` (top-of-book ask/bid) +- Same rounding, fee hold reduction, and increment flooring as live mode +- No fill SPSC needed (simulated fill is synchronous) + +### Phase 6 — Concurrency & Isolation + +**New file:** `concurrency.c`, `concurrency.h` + +- `concurrent_slots`: number of executor threads = config value. +- Isolation check (same triangle key, same pairs, same primary_quote): + checked atomically in `execute_triangle` before starting. +- In-flight tracking: array of `{ triangle_key, pair_mask, primary_quote }`, + checked and set under a spinlock. +- Cancel: `sig->cancelled` volatile flag, checked between legs. + +```c +typedef struct { + uint64_t triangle_hash; // fnv1a of triangle_key + uint64_t pair_mask; // bitmask of pair indices + uint32_t primary_quote; // index into currency table + bool active; +} in_flight_t; + +bool try_acquire_slot(signal_entry_t *sig); // isolation check + mark +void release_slot(signal_entry_t *sig); // unmark +``` + +### Phase 7 — Reporting + +Reporting in C mirrors the current `_emit_report`: + +```c +void emit_filled(signal_entry_t *sig, double *leg_output, + timing_t *timings, double pnl); +void emit_failed(signal_entry_t *sig, double *leg_output, + timing_t *timings, const char *error); +void emit_aborted(signal_entry_t *sig, double *leg_output, + timing_t *timings, const char *reason); +``` + +Output format (same as current, via `log_write`): + +``` +2026-05-26T17:55:53.719Z FILLED | corr=... | triangle=['USDT','FLUX','USDC'] | + predicted_bps=34.45 | effective_bps=-149.94 | book_ts=... | profit=-0.0749 | + timings=[...] | fills=[L0:buy FLUX-USDT USDT->FLUX 64.53@0.0774(fee=0 USDT lat=82.9ms), ...] | + books=[0.0774/0.0772 | 0.07796/0.07783 | 0.9991/0.999] +``` + +### Phase 8 — Cleanup + +- Delete `executor/` directory (executor.py, ws_client.py, kucoin_api.py, + socket_server.py, config.py, __main__.py) +- Delete `common/` directory (log.py, config.py — logging replaced by + C `log_write` + log file handler) +- Remove UDS socket creation + connection from `events.c` +- Remove JSON signal serialisation from `events.c` +- Remove `send_signal_to_executor()` and all UDS references +- The `cold_epoll` set can be removed (executor threads don't use epoll; + they use `poll()` on fill eventfds) + +## Migration Order + +| Phase | Files | Deliverable | Risk | +|-------|-------|-------------|------| +| 1 | `http_client.c/h`, `kucoin_sign.c/h` | Can POST a test order | Low (OpenSSL HMAC is well-documented) | +| 2 | `fill_handler.c/h` | Can receive WS fill events in executor thread | Medium (hot↔executor SPSC race conditions) | +| 3 | `events.c` (executor thread), `cascade.c/h`, `rest_client.c/h` | Full live-mode triangle execution in C | High (first integration test) | +| 4 | `rest_client.c/h` (`order_place`) | Order placement via C | Medium (HTTP keep-alive bugs) | +| 5 | Paper mode via `rest_order_test` + book sim | Paper mode in C | Low (same logic, different HTTP endpoint) | +| 6 | `concurrency.c/h` | Slot isolation, cancel support | Low (simple atomic operations) | +| 7 | Reporting via `log_write` | Same FILLED/FAILED output | Low (string formatting) | +| 8 | Delete `executor/`, `common/` | No Python runtime dependency | Low (housekeeping) | + +## Files to Create + +``` +src/http_client.c # HTTPS POST/GET with keepalive connection pool +src/http_client.h +src/kucoin_sign.c # HMAC-SHA256 + base64 signing helpers +src/kucoin_sign.h +src/fill_handler.c # Match event accumulation + dispatch (hot thread) +src/fill_handler.h +src/cascade.c # Fee hold reduction, increment floor, PnL compute +src/cascade.h +src/rest_client.c # order_place, order_test (wraps http_client + sign) +src/rest_client.h +src/concurrency.c # Isolation checks, in-flight tracking +src/concurrency.h +src/executor.c # execute_triangle() — the main execution loop +src/executor.h # (or inline in events.c / new executor_thread.c) +``` + +## Files to Modify + +``` +src/events.c # Remove UDS, add executor thread creation +src/ws_client.c # Add fill dispatch calls on match/terminal events +src/ws_client.h # Expose fill_dispatch_match / fill_dispatch_terminal +src/config.c # Add executor config fields (fill_timeout_ms, await_balance) +src/config.h # Same +src/CMakeLists.txt # Add new .c files +``` + +## Files to Delete + +``` +executor/ # Entire directory +common/ # Entire directory +``` diff --git a/src/events.c b/src/events.c index ef58e68..0693107 100644 --- a/src/events.c +++ b/src/events.c @@ -11,6 +11,7 @@ #include "log.h" #include "events.h" #include "evaluate.h" +#include "executor.h" #include #include #include @@ -198,6 +199,8 @@ int unix_server_create(const char *socket_path) { * Connects lazily on first signal; reconnects on write failure. */ static void send_signal_to_executor(event_loops_t *loops, signal_entry_t *sig) { + (void)loops; (void)sig; + /* No longer used — execution is direct in the executor thread */ if (loops->unix_client_fd < 0) { loops->unix_client_fd = unix_client_connect(loops->ws_client->cfg->executor_socket_path); if (loops->unix_client_fd < 0) { @@ -394,63 +397,58 @@ void *event_hot_thread(void *arg) { } /* - * COLD thread: drain SPSC signal queue and forward to executor. - * Uses epoll_wait on the Unix client fd to detect disconnection. - * Priority: drains queue before and after epoll to minimize latency. + * EXECUTOR thread: drain SPSC signal queue and execute triangles directly. + * Wakes on SPSC eventfd (signal available) and fill channel eventfd (fill arrived). */ void *event_cold_thread(void *arg) { event_loops_t *loops = (event_loops_t *)arg; - log_write("[COLD] Thread started\n"); + log_write("[EXEC] Thread started\n"); + + executor_thread_t *exec = executor_thread_create(loops->ws_client->cfg, + loops->ws_client->fill_ch); + if (!exec) { + log_write("[EXEC] Failed to create executor\n"); + return NULL; + } + + struct pollfd fds[2] = { + { .fd = loops->wakeup_fd, .events = POLLIN }, + { .fd = fill_channel_wake_fd(loops->ws_client->fill_ch), .events = POLLIN }, + }; while (loops->running) { - while (!spsc_empty(loops->signal_queue)) { - signal_entry_t sig; - if (spsc_pop(loops->signal_queue, &sig)) { - send_signal_to_executor(loops, &sig); - } - } - - int nfds = epoll_wait(loops->cold_epoll.epoll_fd, - loops->cold_epoll.events, MAX_EPOLL_FDS, 200); + int nfds = poll(fds, 2, 200); if (nfds < 0) { if (errno == EINTR) continue; - perror("epoll_wait cold"); + log_write("[EXEC] poll error: %s\n", strerror(errno)); break; } - for (int i = 0; i < nfds; i++) { - tracked_fd_t *tf = (tracked_fd_t *)loops->cold_epoll.events[i].data.ptr; - if (!tf || tf->fd < 0) continue; - - uint32_t ev = loops->cold_epoll.events[i].events; - - if (tf->type == FD_TYPE_EVENT) { - uint64_t val = 0; - read(loops->wakeup_fd, &val, sizeof(val)); - continue; - } - - if (tf->type == FD_TYPE_UNIX_CLIENT) { - if (ev & (EPOLLERR | EPOLLHUP)) { - log_write("[COLD] Executor disconnected\n"); - close(loops->unix_client_fd); - loops->unix_client_fd = -1; - event_loops_remove_fd(&loops->cold_epoll, tf->fd); - continue; - } - } - } - - // Drain again after epoll to catch any signals queued during processing + /* Drain signal queue */ while (!spsc_empty(loops->signal_queue)) { signal_entry_t sig; if (spsc_pop(loops->signal_queue, &sig)) { - send_signal_to_executor(loops, &sig); + executor_execute_triangle(exec, &sig, loops->ws_client->fill_ch); + } + } + + /* Drain fill wakeup */ + if (fds[1].revents & POLLIN) { + uint64_t val; + read(fds[1].fd, &val, sizeof(val)); + } + + /* Drain again to catch signals enqueued during execution */ + while (!spsc_empty(loops->signal_queue)) { + signal_entry_t sig; + if (spsc_pop(loops->signal_queue, &sig)) { + executor_execute_triangle(exec, &sig, loops->ws_client->fill_ch); } } } - log_write("[COLD] Thread exited\n"); + executor_thread_destroy(exec); + log_write("[EXEC] Thread exited\n"); return NULL; } diff --git a/src/executor.c b/src/executor.c new file mode 100644 index 0000000..3f77266 --- /dev/null +++ b/src/executor.c @@ -0,0 +1,388 @@ +#include "executor.h" +#include "rest_client.h" +#include "log.h" +#include "hash.h" +#include +#include +#include +#include +#include +#include +#include + +#define _D1 1.0 +#define FILL_TIMEOUT_MS 5000 +#define MAX_IN_FLIGHT 8 + +struct executor_thread_s { + const config_t *cfg; + rest_conn_t *rest; + + /* Concurrency isolation state */ + char in_flight_triangles[MAX_IN_FLIGHT][128]; /* triangle_key */ + uint64_t in_flight_pairs[MAX_IN_FLIGHT]; /* fnv1a hash of each pair */ + char in_flight_primary_quotes[MAX_IN_FLIGHT][16]; /* primary quote currency */ + int in_flight_count; +}; + +/* ── Reporting ── */ + +void executor_write_report(const char *fmt, ...) { + va_list args; + va_start(args, fmt); + char buf[2048]; + vsnprintf(buf, sizeof(buf), fmt, args); + va_end(args); + log_write("%s", buf); +} + +/* ── Timestamp helpers ── */ + +static int64_t now_mono_ms(void) { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return (int64_t)ts.tv_sec * 1000 + (int64_t)ts.tv_nsec / 1000000; +} + +static int64_t now_realtime_ms(void) { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return (int64_t)ts.tv_sec * 1000 + (int64_t)ts.tv_nsec / 1000000; +} + +static void format_ts(char *buf, size_t sz) { + time_t s = time(NULL); + int ms = (int)(now_realtime_ms() % 1000); + struct tm gm; + gmtime_r(&s, &gm); + strftime(buf, sz, "%Y-%m-%dT%H:%M:%S.", &gm); + size_t len = strlen(buf); + snprintf(buf + len, sz - len, "%03dZ", ms); +} + +/* ── Fee hold reduction + increment floor ── */ + +static double apply_fee_hold(double vol, double fee_rate, bool next_is_buy) { + if (next_is_buy && fee_rate > 0) + vol /= (1.0 + fee_rate); + return vol; +} + +static double apply_increment_floor(double vol, double inc) { + if (inc > 0) + vol = floor(vol / inc - 1e-12) * inc; + return vol; +} + +/* ── Core execution loop ── */ + +executor_thread_t *executor_thread_create(const config_t *cfg, + fill_channel_t *fill_ch) { + (void)fill_ch; + executor_thread_t *et = calloc(1, sizeof(*et)); + if (!et) return NULL; + et->cfg = cfg; + et->rest = rest_conn_new(); + if (et->rest) { + rest_conn_set_auth(et->rest, + cfg->kucoin_api_key, + cfg->kucoin_api_secret, + cfg->kucoin_api_passphrase); + } + return et; +} + +void executor_thread_set_fill_ch(executor_thread_t *et, fill_channel_t *ch) { + (void)et; + (void)ch; + /* In the initial implementation the fill channel is global (ws_client->fill_ch), + passed directly to execute_triangle. This setter is for per-thread channels. */ +} + +void executor_execute_triangle(executor_thread_t *et, + signal_entry_t *sig, + fill_channel_t *fill_ch) { + /* ── Concurrency isolation ── */ + uint64_t pair_hashes[3] = {0}; + for (int p = 0; p < 3; p++) { + pair_hashes[p] = fnv1a_hash(sig->legs.legs[p].symbol, (uint32_t)strlen(sig->legs.legs[p].symbol)); + } + for (int i = 0; i < et->in_flight_count; i++) { + if (strcmp(et->in_flight_triangles[i], sig->triangle_key) == 0) { + log_write("[EXEC] Dropping signal for overlapping triangle: %s\n", sig->triangle_key); + return; + } + if (strcmp(et->in_flight_primary_quotes[i], sig->primary_quote) == 0) { + log_write("[EXEC] Dropping signal for same primary quote: %s\n", sig->triangle_key); + return; + } + for (int p = 0; p < 3; p++) { + if (et->in_flight_pairs[i] == pair_hashes[p]) { + log_write("[EXEC] Dropping signal for overlapping pair on %s\n", sig->triangle_key); + return; + } + } + } + /* Register this execution */ + int slot = -1; + for (int i = 0; i < MAX_IN_FLIGHT; i++) { + if (!et->in_flight_triangles[i][0]) { slot = i; break; } + } + if (slot >= 0) { + strncpy(et->in_flight_triangles[slot], sig->triangle_key, sizeof(et->in_flight_triangles[slot]) - 1); + strncpy(et->in_flight_primary_quotes[slot], sig->primary_quote, sizeof(et->in_flight_primary_quotes[slot]) - 1); + et->in_flight_pairs[slot] = pair_hashes[0]; + et->in_flight_count++; + } + + char ts_buf[32]; + char corr_id[64]; + int64_t exec_start = now_mono_ms(); + snprintf(corr_id, sizeof(corr_id), "%08x%08x%08x%08x", + (unsigned)(uintptr_t)&sig->legs.legs[0] ^ (unsigned)sig->ts_ms, + (unsigned)sig->ts_ms ^ (unsigned)sig->book_ts_ms, + (unsigned)sig->predicted_bps, + (unsigned)sig->t_arrive_ms); + + double leg_output[3] = {0}; + double fills[3][6] = {{0}}; /* leg, output, avg_price, fee, input_vol, latency_ms */ + bool success = true; + const char *error_str = ""; + + for (int leg = 0; leg < 3; leg++) { + const signal_leg_t *sl = &sig->legs.legs[leg]; + bool is_buy = (strcmp(sl->side, "buy") == 0); + + /* ── Input volume ── */ + double input_vol; + if (leg == 0) { + input_vol = atof(sl->order_param); + } else { + input_vol = leg_output[leg - 1]; + input_vol = apply_fee_hold(input_vol, sl->fee_rate, is_buy); + input_vol = apply_increment_floor(input_vol, + is_buy ? sl->quote_increment : sl->base_increment); + } + + /* Build a client OID */ + char client_oid[64]; + snprintf(client_oid, sizeof(client_oid), "c%08x%04x", + (unsigned)(now_realtime_ms() & 0xFFFFFFFF), + (unsigned)leg); + + fills[leg][4] = input_vol; + + /* ── Place order ── */ + char order_id[32] = {0}; + char err_msg[128] = {0}; + bool ok = false; + int64_t t0 = now_mono_ms(); + + if (sig->live) { + ok = rest_order_place(et->rest, sl->symbol, sl->side, + is_buy ? input_vol : 0, + is_buy ? 0 : input_vol, + client_oid, + order_id, sizeof(order_id), + err_msg, sizeof(err_msg)); + } else { + ok = rest_order_test(et->rest, sl->symbol, sl->side, + is_buy ? input_vol : 0, + is_buy ? 0 : input_vol, + err_msg, sizeof(err_msg)); + if (ok) { + snprintf(order_id, sizeof(order_id), "paper-%08x", + (unsigned)now_realtime_ms()); + } + } + + fills[leg][5] = (double)(now_mono_ms() - t0); + if (!ok) { + error_str = err_msg[0] ? err_msg : "order_rejected"; + success = false; + break; + } + + /* ── Wait for fill (live only) ── */ + double total_size = 0, total_funds = 0, avg_price = 0, total_fee = 0; + int match_count = 0; + + if (sig->live) { + fill_result_t fr = {0}; + bool filled = fill_channel_await(fill_ch, client_oid, + FILL_TIMEOUT_MS, &fr); + if (!filled) { + error_str = fr.match_count > 0 ? "partial_fill" : "fill_timeout"; + success = false; + break; + } + total_size = fr.total_size; + total_funds = fr.total_funds; + total_fee = fr.total_fee; + avg_price = fr.avg_price; + match_count = fr.match_count; + if (fr.order_id[0]) strncpy(order_id, fr.order_id, sizeof(order_id) - 1); + } else { + /* Paper mode: simulate fill from books[leg] top-of-book */ + if (sig->book_count > leg) { + const signal_book_t *bk = &sig->books[leg]; + if (is_buy && bk->ask_count > 0) { + double ask_price = bk->asks[0].price; + total_size = input_vol / ask_price; + if (sl->base_increment > 0) + total_size = floor(total_size / sl->base_increment - 1e-12) * sl->base_increment; + if (sl->quote_increment > 0) + total_funds = floor(total_size * ask_price / sl->quote_increment - 1e-12) * sl->quote_increment; + else + total_funds = total_size * ask_price; + avg_price = ask_price; + } else if (!is_buy && bk->bid_count > 0) { + double bid_price = bk->bids[0].price; + total_size = input_vol; /* base amount to sell */ + if (sl->base_increment > 0) + total_size = floor(total_size / sl->base_increment - 1e-12) * sl->base_increment; + if (sl->quote_increment > 0) + total_funds = floor(total_size * bid_price / sl->quote_increment - 1e-12) * sl->quote_increment; + else + total_funds = total_size * bid_price; + avg_price = bid_price; + } else { + error_str = "no_book_data"; + success = false; + break; + } + } else { + error_str = "no_book_data"; + success = false; + break; + } + /* Simulated fee: if fee currency matches output, fee = output * rate */ + if (sl->fee_rate > 0) { + double output_amt = is_buy ? total_size : total_funds; + total_fee = output_amt * sl->fee_rate; + } + } + + /* ── Cascade ── */ + leg_output[leg] = is_buy ? total_size : total_funds; + fills[leg][0] = leg_output[leg]; + fills[leg][1] = avg_price; + fills[leg][2] = total_fee; + fills[leg][3] = total_funds; + + /* Fee hold reduction + increment floor for next leg */ + if (leg < 2) { + const signal_leg_t *nsl = &sig->legs.legs[leg + 1]; + bool nxt_buy = (strcmp(nsl->side, "buy") == 0); + leg_output[leg] = apply_fee_hold(leg_output[leg], nsl->fee_rate, nxt_buy); + leg_output[leg] = apply_increment_floor(leg_output[leg], + nxt_buy ? nsl->quote_increment : nsl->base_increment); + } + } + + /* ── Compute PnL ── */ + double profit = 0, effective_bps = 0; + if (success && fills[2][0] > 0) { + double leg0_in = (strcmp(sig->legs.legs[0].side, "buy") == 0) + ? fills[0][3] : fills[0][0]; + double leg2_out = (strcmp(sig->legs.legs[2].side, "buy") == 0) + ? fills[2][0] : fills[2][0]; + /* Subtract fee if fee currency matches leg 2 output currency */ + { + const signal_leg_t *sl2 = &sig->legs.legs[2]; + bool leg2_buy = (strcmp(sl2->side, "buy") == 0); + const char *pair = sl2->symbol; + const char *dash = strchr(pair, '-'); + char base_ccy[16] = {0}, quote_ccy[16] = {0}; + if (dash) { + size_t blen = (size_t)(dash - pair); + if (blen > 15) blen = 15; + memcpy(base_ccy, pair, blen); + strncpy(quote_ccy, dash + 1, 15); + } + const char *out_ccy = leg2_buy ? base_ccy : quote_ccy; + if (out_ccy[0] && strcmp(sl2->fee_currency, out_ccy) == 0) { + leg2_out -= fills[2][2]; + } + } + profit = leg2_out - leg0_in; + if (leg0_in > 0) + effective_bps = (profit / leg0_in) * 10000.0; + } + + int64_t total_ms = now_mono_ms() - exec_start; + + /* ── Emit report ── */ + format_ts(ts_buf, sizeof(ts_buf)); + const char *status = success ? "FILLED" : "FAILED"; + char bps_str[32]; + snprintf(bps_str, sizeof(bps_str), "%.2f", effective_bps); + + executor_write_report( + "%s %s | corr=%s | triangle=['%s'] | " + "predicted_bps=%.2f | effective_bps=%s | " + "book_ts=%lld | profit=%.4f | timings=[] | fills=[", + ts_buf, status, corr_id, sig->triangle_key, + sig->predicted_bps, bps_str, + (long long)sig->book_ts_ms, profit); + + for (int leg = 0; leg < 3 && leg <= (success ? 2 : (fills[0][0] > 0 ? 0 : -1)); leg++) { + if (fills[leg][0] == 0 && !(leg == 2 && success)) break; + const signal_leg_t *sl = &sig->legs.legs[leg]; + char pair[32], side[8], in_ccy[16], out_ccy[16]; + strncpy(pair, sl->symbol, sizeof(pair) - 1); + strncpy(side, sl->side, sizeof(side) - 1); + if (strcmp(side, "buy") == 0) { + char *dash = strchr(pair, '-'); + if (dash) { + strncpy(in_ccy, dash + 1, sizeof(in_ccy) - 1); + size_t len = dash - pair; + if (len > sizeof(out_ccy) - 1) len = sizeof(out_ccy) - 1; + memcpy(out_ccy, pair, len); + out_ccy[len] = '\0'; + } + } else { + char *dash = strchr(pair, '-'); + if (dash) { + strncpy(out_ccy, dash + 1, sizeof(out_ccy) - 1); + size_t len = dash - pair; + if (len > sizeof(in_ccy) - 1) len = sizeof(in_ccy) - 1; + memcpy(in_ccy, pair, len); + in_ccy[len] = '\0'; + } + } + executor_write_report( + "%sL%d:%s %s %s->%s %.10g@%.6g(fee=%.6g %s lat=%.1fms)", + leg > 0 ? ", " : "", + leg, side, pair, in_ccy, out_ccy, + fills[leg][0], fills[leg][1], fills[leg][2], + sl->fee_currency, fills[leg][5]); + } + + if (error_str[0]) { + executor_write_report("] | error=%s\n", error_str); + } else { + executor_write_report("]\n"); + } + + /* Log timing */ + /* Release isolation slot */ + for (int i = 0; i < MAX_IN_FLIGHT; i++) { + if (strcmp(et->in_flight_triangles[i], sig->triangle_key) == 0) { + et->in_flight_triangles[i][0] = '\0'; + et->in_flight_primary_quotes[i][0] = '\0'; + et->in_flight_pairs[i] = 0; + et->in_flight_count--; + break; + } + } + + log_write("[EXEC] Triangle %s in %lld ms, profit=%.4f predicted=%.2f eff=%.2f\n", + status, (long long)total_ms, profit, sig->predicted_bps, effective_bps); +} + +void executor_thread_destroy(executor_thread_t *et) { + if (!et) return; + if (et->rest) rest_conn_free(et->rest); + free(et); +} diff --git a/src/executor.h b/src/executor.h new file mode 100644 index 0000000..3c0e597 --- /dev/null +++ b/src/executor.h @@ -0,0 +1,30 @@ +#ifndef EXECUTOR_H +#define EXECUTOR_H + +#include +#include "fill_handler.h" +#include "config.h" +#include "queue.h" + +/* Per-thread data for the executor. */ +typedef struct executor_thread_s executor_thread_t; + +/* Create an executor thread (one per concurrent slot). */ +executor_thread_t *executor_thread_create(const config_t *cfg, + fill_channel_t *fill_ch); + +/* Set the fill channel for this executor thread. */ +void executor_thread_set_fill_ch(executor_thread_t *et, fill_channel_t *ch); + +/* Execute a single triangle signal (blocking, called from executor thread). */ +void executor_execute_triangle(executor_thread_t *et, + signal_entry_t *sig, + fill_channel_t *fill_ch); + +/* Report status text. */ +void executor_write_report(const char *fmt, ...); + +/* Destroy and free. */ +void executor_thread_destroy(executor_thread_t *et); + +#endif diff --git a/src/fill_handler.c b/src/fill_handler.c new file mode 100644 index 0000000..9a8f46d --- /dev/null +++ b/src/fill_handler.c @@ -0,0 +1,136 @@ +#include "fill_handler.h" +#include "log.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#define FILL_RING_SIZE 256 + +struct fill_channel_s { + fill_event_t ring[FILL_RING_SIZE]; + _Atomic uint32_t head; /* consumer read index */ + _Atomic uint32_t tail; /* producer write index */ + int wake_fd; /* eventfd to wake consumer */ +}; + +fill_channel_t *fill_channel_create(void) { + fill_channel_t *ch = calloc(1, sizeof(*ch)); + if (!ch) return NULL; + ch->head = 0; + ch->tail = 0; + ch->wake_fd = eventfd(0, EFD_NONBLOCK); + if (ch->wake_fd < 0) { + free(ch); + return NULL; + } + return ch; +} + +void fill_channel_destroy(fill_channel_t *ch) { + if (!ch) return; + if (ch->wake_fd >= 0) close(ch->wake_fd); + free(ch); +} + +int fill_channel_wake_fd(fill_channel_t *ch) { + return ch->wake_fd; +} + +void fill_channel_reset(fill_channel_t *ch) { + atomic_store_explicit(&ch->head, 0, memory_order_release); + atomic_store_explicit(&ch->tail, 0, memory_order_release); + /* Drain the eventfd */ + uint64_t val; + while (read(ch->wake_fd, &val, sizeof(val)) > 0) {} +} + +bool fill_channel_push(fill_channel_t *ch, const fill_event_t *ev) { + uint32_t cur_tail = atomic_load_explicit(&ch->tail, memory_order_relaxed); + uint32_t cur_head = atomic_load_explicit(&ch->head, memory_order_acquire); + uint32_t next = (cur_tail + 1) % FILL_RING_SIZE; + if (next == cur_head) return false; /* full */ + + ch->ring[cur_tail] = *ev; + atomic_store_explicit(&ch->tail, next, memory_order_release); + + uint64_t one = 1; + write(ch->wake_fd, &one, sizeof(one)); + return true; +} + +bool fill_channel_pop(fill_channel_t *ch, fill_event_t *ev) { + uint32_t cur_head_r = atomic_load_explicit(&ch->head, memory_order_acquire); + uint32_t cur_tail_r = atomic_load_explicit(&ch->tail, memory_order_acquire); + if (cur_head_r == cur_tail_r) return false; + + *ev = ch->ring[cur_head_r]; + uint32_t next_head = (cur_head_r + 1) % FILL_RING_SIZE; + atomic_store_explicit(&ch->head, next_head, memory_order_release); + return true; +} + +static int64_t now_ms(void) { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return (int64_t)ts.tv_sec * 1000 + (int64_t)ts.tv_nsec / 1000000; +} + +bool fill_channel_await(fill_channel_t *ch, const char *client_oid, + int64_t timeout_ms, fill_result_t *out) { + double total_size = 0, total_funds = 0, total_fee = 0; + int match_count = 0; + char order_id[32] = {0}; + int64_t deadline = now_ms() + timeout_ms; + + while (now_ms() < deadline) { + fill_event_t ev; + while (fill_channel_pop(ch, &ev)) { + if (strcmp(ev.client_oid, client_oid) != 0) continue; + + total_size += ev.match_size; + total_funds += ev.match_price * ev.match_size; + total_fee += ev.match_fee; + match_count++; + if (!order_id[0] && ev.order_id[0]) + strncpy(order_id, ev.order_id, sizeof(order_id) - 1); + + if (ev.is_terminal && match_count > 0) { + out->total_size = total_size; + out->total_funds = total_funds; + out->total_fee = total_fee; + out->avg_price = total_size > 0 ? total_funds / total_size : 0; + out->match_count = match_count; + strncpy(out->order_id, order_id, sizeof(out->order_id) - 1); + return true; + } + } + + /* Wait for more events */ + struct pollfd pfd = { .fd = ch->wake_fd, .events = POLLIN }; + int remaining = (int)(deadline - now_ms()); + if (remaining <= 0) break; + + int ret = poll(&pfd, 1, remaining); + if (ret > 0) { + uint64_t val; + read(ch->wake_fd, &val, sizeof(val)); + } + } + + /* Timeout — return partial if any matches */ + if (match_count > 0) { + out->total_size = total_size; + out->total_funds = total_funds; + out->total_fee = total_fee; + out->avg_price = total_size > 0 ? total_funds / total_size : 0; + out->match_count = match_count; + strncpy(out->order_id, order_id, sizeof(out->order_id) - 1); + } + return false; +} diff --git a/src/fill_handler.h b/src/fill_handler.h new file mode 100644 index 0000000..49196d0 --- /dev/null +++ b/src/fill_handler.h @@ -0,0 +1,54 @@ +#ifndef FILL_HANDLER_H +#define FILL_HANDLER_H + +#include +#include +#include + +#define MAX_CLIENT_OID 41 + +/* A single match event pushed from hot thread to executor thread. */ +typedef struct { + char client_oid[MAX_CLIENT_OID]; + double match_size; + double match_price; + double match_fee; + bool is_terminal; /* canceled+done with matches */ + bool is_timeout; /* true = fill timeout */ + char order_id[32]; +} fill_event_t; + +/* Accumulated fill result returned to the executor. */ +typedef struct { + double total_size; + double total_funds; + double total_fee; + double avg_price; + int match_count; + char order_id[32]; +} fill_result_t; + +/* Per-thread fill channel (SPSC queue + wake fd). */ +typedef struct fill_channel_s fill_channel_t; + +fill_channel_t *fill_channel_create(void); +void fill_channel_destroy(fill_channel_t *ch); + +/* Push a fill event (hot thread, non-blocking). */ +bool fill_channel_push(fill_channel_t *ch, const fill_event_t *ev); + +/* Wait for a fill matching client_oid (executor thread, blocking). + Returns the accumulated result. Times out after timeout_ms. */ +bool fill_channel_await(fill_channel_t *ch, const char *client_oid, + int64_t timeout_ms, fill_result_t *out); + +/* Pop a single event (non-blocking drain). */ +bool fill_channel_pop(fill_channel_t *ch, fill_event_t *ev); + +/* Get the wake fd for poll(). */ +int fill_channel_wake_fd(fill_channel_t *ch); + +/* Reset for next execution. */ +void fill_channel_reset(fill_channel_t *ch); + +#endif diff --git a/src/http_client.c b/src/http_client.c index 3018e11..c62e4b0 100644 --- a/src/http_client.c +++ b/src/http_client.c @@ -264,7 +264,7 @@ char *https_post(const char *host, int port, const char *path, * Compute HMAC-SHA256 digest of data using key, then base64-encode the result. * Uses OpenSSL HMAC() + BIO_f_base64 filter chain. */ -static int hmac_sha256_base64(const char *key, const char *data, char *out, size_t out_len) { +int hmac_sha256_base64(const char *key, const char *data, char *out, size_t out_len) { unsigned char digest[EVP_MAX_MD_SIZE]; unsigned int digest_len = 0; @@ -297,6 +297,145 @@ static int hmac_sha256_base64(const char *key, const char *data, char *out, size return (int)len; } +/* + * Authenticated POST request to KuCoin REST API with JSON body. + */ +char *https_post_auth(const char *host, int port, const char *path, + const char *api_key, const char *api_secret, + const char *api_passphrase, + const char *body, int *out_len) { + if (!api_key || !api_secret || !api_passphrase) { + return NULL; + } + + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + char timestamp[32]; + snprintf(timestamp, sizeof(timestamp), "%lld", (long long)(ts.tv_sec * 1000LL + ts.tv_nsec / 1000000LL)); + + char sign_input[1024]; + int body_len = body ? (int)strlen(body) : 0; + snprintf(sign_input, sizeof(sign_input), "%sPOST%s%s", timestamp, path, body ? body : ""); + char sign_b64[256] = {0}; + if (hmac_sha256_base64(api_secret, sign_input, sign_b64, sizeof(sign_b64)) < 0) { + return NULL; + } + + char passphrase_b64[256] = {0}; + if (hmac_sha256_base64(api_secret, api_passphrase, passphrase_b64, sizeof(passphrase_b64)) < 0) { + return NULL; + } + + int fd = create_tcp_socket(host, port); + if (fd < 0) return NULL; + + SSL_CTX *ctx = SSL_CTX_new(TLS_client_method()); + if (!ctx) { close(fd); return NULL; } + + SSL *ssl = SSL_new(ctx); + if (!ssl) { SSL_CTX_free(ctx); close(fd); return NULL; } + + SSL_set_fd(ssl, fd); + SSL_set_tlsext_host_name(ssl, host); + + if (SSL_connect(ssl) <= 0) { + SSL_free(ssl); SSL_CTX_free(ctx); close(fd); return NULL; + } + + char req[4096]; + if (body && body_len > 0) { + snprintf(req, sizeof(req), + "POST %s HTTP/1.1\r\n" + "Host: %s\r\n" + "Accept: application/json\r\n" + "User-Agent: fused-engine/1.0\r\n" + "Connection: close\r\n" + "Content-Type: application/json\r\n" + "Content-Length: %d\r\n" + "KC-API-KEY: %s\r\n" + "KC-API-SIGN: %s\r\n" + "KC-API-TIMESTAMP: %s\r\n" + "KC-API-PASSPHRASE: %s\r\n" + "KC-API-SIGN-TYPE: 2\r\n" + "KC-API-KEY-VERSION: 3\r\n" + "\r\n%s", + path, host, body_len, + api_key, sign_b64, timestamp, passphrase_b64, + body); + } else { + snprintf(req, sizeof(req), + "POST %s HTTP/1.1\r\n" + "Host: %s\r\n" + "Accept: application/json\r\n" + "User-Agent: fused-engine/1.0\r\n" + "Connection: close\r\n" + "KC-API-KEY: %s\r\n" + "KC-API-SIGN: %s\r\n" + "KC-API-TIMESTAMP: %s\r\n" + "KC-API-PASSPHRASE: %s\r\n" + "KC-API-SIGN-TYPE: 2\r\n" + "KC-API-KEY-VERSION: 3\r\n" + "\r\n", + path, host, + api_key, sign_b64, timestamp, passphrase_b64); + } + + if (SSL_write(ssl, req, (int)strlen(req)) <= 0) { + SSL_free(ssl); SSL_CTX_free(ctx); close(fd); return NULL; + } + + char *resp = malloc(HTTP_BUFFER_SIZE); + if (!resp) { SSL_free(ssl); SSL_CTX_free(ctx); close(fd); return NULL; } + + int total = 0; + while (total < HTTP_BUFFER_SIZE - 1) { + int n = SSL_read(ssl, resp + total, HTTP_BUFFER_SIZE - 1 - total); + if (n <= 0) { + int err = SSL_get_error(ssl, n); + log_write("[HTTPS_POST_AUTH] SSL_read returned %d, SSL_error=%d\n", n, err); + break; + } + total += n; + } + resp[total] = '\0'; + + SSL_shutdown(ssl); + SSL_free(ssl); + SSL_CTX_free(ctx); + close(fd); + + // Strip headers + char *headers_end = strstr(resp, "\r\n\r\n"); + if (headers_end) { + int hl = (headers_end - resp) + 4; + bool chunked = (strcasestr(resp, "transfer-encoding") != NULL); + memmove(resp, headers_end + 4, total - hl); + total -= hl; + resp[total] = '\0'; + if (chunked) { + char *out = malloc(HTTP_BUFFER_SIZE); + if (out) { + int op = 0; + char *p = resp; + while (*p && !(*p == '0' && (p[1] == '\r' || p[1] == '\n'))) { + int cl = 0; + while (*p && *p != '\r' && *p != '\n') { char h = *p; cl <<= 4; cl += (h >= '0' && h <= '9') ? (h - '0') : ((h & 0x1f) + 9); p++; } + if (*p == '\r') p++; if (*p == '\n') p++; + if (cl > 0 && op + cl < HTTP_BUFFER_SIZE - 1) { memcpy(out + op, p, cl); op += cl; p += cl; } + if (*p == '\r') p++; if (*p == '\n') p++; + } + out[op] = '\0'; + free(resp); + resp = out; + total = op; + } + } + } + + if (out_len) *out_len = total; + return resp; +} + /* * Authenticated GET request to KuCoin REST API. * Builds signature from timestamp + "GET" + path using HMAC-SHA256. diff --git a/src/http_client.h b/src/http_client.h index 83c3f5c..fa8599d 100644 --- a/src/http_client.h +++ b/src/http_client.h @@ -20,4 +20,10 @@ char *https_get_auth(const char *host, int port, const char *path, const char *api_key, const char *api_secret, const char *api_passphrase, int *out_len); +/* Authenticated HTTPS POST with JSON body signed with KuCoin API HMAC-SHA256 */ +char *https_post_auth(const char *host, int port, const char *path, + const char *api_key, const char *api_secret, + const char *api_passphrase, + const char *body, int *out_len); + #endif diff --git a/src/rest_client.c b/src/rest_client.c new file mode 100644 index 0000000..bfeb248 --- /dev/null +++ b/src/rest_client.c @@ -0,0 +1,435 @@ +#include "rest_client.h" +#include "http_client.h" +#include "log.h" +#include "cJSON.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define HOST "api.kucoin.com" +#define PORT 443 + +struct rest_conn_s { + char api_key[64]; + char api_secret[128]; + char api_passphrase[64]; + + /* Keepalive TLS connection */ + SSL_CTX *ctx; + SSL *ssl; + int fd; +}; + +/* Forward declaration of the HMAC helper from http_client.c */ +extern int hmac_sha256_base64(const char *key, const char *data, + char *out, size_t out_len); + +/* ── Connection management ── */ + +rest_conn_t *rest_conn_new(void) { + rest_conn_t *rc = calloc(1, sizeof(*rc)); + if (!rc) return NULL; + rc->fd = -1; + return rc; +} + +void rest_conn_set_auth(rest_conn_t *rc, + const char *api_key, + const char *api_secret, + const char *api_passphrase) { + if (api_key) strncpy(rc->api_key, api_key, sizeof(rc->api_key) - 1); + if (api_secret) strncpy(rc->api_secret, api_secret, sizeof(rc->api_secret) - 1); + if (api_passphrase) strncpy(rc->api_passphrase, api_passphrase, sizeof(rc->api_passphrase) - 1); +} + +static int tcp_connect(const char *host, int port) { + char port_str[8]; + snprintf(port_str, sizeof(port_str), "%d", port); + + struct addrinfo hints = {0}, *res = NULL; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + int ret = getaddrinfo(host, port_str, &hints, &res); + if (ret != 0 || !res) return -1; + + int fd = -1; + for (struct addrinfo *rp = res; rp; rp = rp->ai_next) { + fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (fd < 0) continue; + struct timeval tv = { .tv_sec = 10, .tv_usec = 0 }; + setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); + setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + if (connect(fd, rp->ai_addr, rp->ai_addrlen) == 0) break; + close(fd); + fd = -1; + } + freeaddrinfo(res); + return fd; +} + +static int ensure_connected(rest_conn_t *rc) { + if (rc->ssl && rc->fd >= 0) { + /* Quick check: is the connection still alive? */ + char buf; + if (recv(rc->fd, &buf, 1, MSG_PEEK | MSG_DONTWAIT) == 0) { + goto reconnect; + } + return 0; /* Connection alive */ + } + +reconnect: + /* Close stale connection */ + if (rc->ssl) { SSL_free(rc->ssl); rc->ssl = NULL; } + if (rc->fd >= 0) { close(rc->fd); rc->fd = -1; } + if (!rc->ctx) { + rc->ctx = SSL_CTX_new(TLS_client_method()); + if (!rc->ctx) return -1; + } + + int fd = tcp_connect(HOST, PORT); + if (fd < 0) return -1; + + SSL *ssl = SSL_new(rc->ctx); + if (!ssl) { close(fd); return -1; } + + SSL_set_fd(ssl, fd); + SSL_set_tlsext_host_name(ssl, HOST); + + if (SSL_connect(ssl) <= 0) { + SSL_free(ssl); close(fd); + return -1; + } + + rc->fd = fd; + rc->ssl = ssl; + return 0; +} + +/* ── Low-level signed request ── */ + +static bool do_signed_request(rest_conn_t *rc, + const char *method, + const char *path, + const char *body, + char *out_response, size_t out_sz) { + if (ensure_connected(rc) < 0) return false; + + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + char timestamp[32]; + snprintf(timestamp, sizeof(timestamp), "%lld", + (long long)(ts.tv_sec * 1000LL + ts.tv_nsec / 1000000LL)); + + /* Build signing string: timestamp + method + path + body */ + char sign_input[1536]; + int body_len = body ? (int)strlen(body) : 0; + snprintf(sign_input, sizeof(sign_input), "%s%s%s%s", + timestamp, method, path, body ? body : ""); + + char sign_b64[256] = {0}; + if (hmac_sha256_base64(rc->api_secret, sign_input, + sign_b64, sizeof(sign_b64)) < 0) { + return false; + } + + /* Build password-encrypted passphrase */ + char pass_b64[256] = {0}; + if (hmac_sha256_base64(rc->api_secret, rc->api_passphrase, + pass_b64, sizeof(pass_b64)) < 0) { + return false; + } + + /* Build the HTTP request */ + char req[4096]; + if (body && body_len > 0) { + snprintf(req, sizeof(req), + "%s %s HTTP/1.1\r\n" + "Host: " HOST "\r\n" + "Accept: application/json\r\n" + "User-Agent: fused-engine/1.0\r\n" + "Connection: keep-alive\r\n" + "Content-Type: application/json\r\n" + "Content-Length: %d\r\n" + "KC-API-KEY: %s\r\n" + "KC-API-SIGN: %s\r\n" + "KC-API-TIMESTAMP: %s\r\n" + "KC-API-PASSPHRASE: %s\r\n" + "KC-API-SIGN-TYPE: 2\r\n" + "KC-API-KEY-VERSION: 3\r\n" + "\r\n%s", + method, path, body_len, + rc->api_key, sign_b64, timestamp, pass_b64, body); + } else { + snprintf(req, sizeof(req), + "%s %s HTTP/1.1\r\n" + "Host: " HOST "\r\n" + "Accept: application/json\r\n" + "User-Agent: fused-engine/1.0\r\n" + "Connection: keep-alive\r\n" + "KC-API-KEY: %s\r\n" + "KC-API-SIGN: %s\r\n" + "KC-API-TIMESTAMP: %s\r\n" + "KC-API-PASSPHRASE: %s\r\n" + "KC-API-SIGN-TYPE: 2\r\n" + "KC-API-KEY-VERSION: 3\r\n" + "\r\n", + method, path, + rc->api_key, sign_b64, timestamp, pass_b64); + } + + int req_len = (int)strlen(req); + + /* Send */ + if (SSL_write(rc->ssl, req, req_len) <= 0) { + log_write("[REST] SSL_write failed, will reconnect\n"); + /* Force reconnect on next call */ + SSL_free(rc->ssl); rc->ssl = NULL; + close(rc->fd); rc->fd = -1; + return false; + } + + /* Read response — handle both Content-Length and chunked encoding */ + char raw[65536]; + int total = 0; + int need = -1; /* total bytes needed (-1 = not yet known) */ + bool chunked = false; + + while (total < (int)sizeof(raw) - 1) { + int n = SSL_read(rc->ssl, raw + total, (int)sizeof(raw) - 1 - total); + if (n <= 0) { + int err = SSL_get_error(rc->ssl, n); + if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) continue; + break; + } + total += n; + + if (need < 0) { + char *end_hdrs = strstr(raw, "\r\n\r\n"); + if (end_hdrs) { + int hdr_len = (int)(end_hdrs + 4 - raw); + char *cl = strcasestr(raw, "Content-Length:"); + if (cl && cl < end_hdrs) { + long len = atol(cl + 15); + if (len > 0) need = hdr_len + (int)len; + } + chunked = (strcasestr(raw, "transfer-encoding") != NULL); + if (!chunked && need < 0) { + /* No Content-Length and not chunked — read all (close). */ + need = -2; + } + if (!chunked && need > 0 && total >= need) { + break; /* got the full Content-Length body */ + } + if (chunked) { + /* For chunked, we decode below and stop at 0\r\n\r\n */ + if (total >= 5 && memcmp(raw + total - 5, "0\r\n\r\n", 5) == 0) break; + } + } + } else if (need > 0 && total >= need) { + break; + } else if (chunked) { + if (total >= 5 && memcmp(raw + total - 5, "0\r\n\r\n", 5) == 0) break; + } + } + raw[total] = '\0'; + + /* Find header/body boundary */ + char *headers_end = strstr(raw, "\r\n\r\n"); + if (!headers_end) { + strncpy(out_response, raw, out_sz - 1); + out_response[out_sz - 1] = '\0'; + return total > 0; + } + + int header_len = (int)(headers_end - raw) + 4; + int body_start = header_len; + int body_end = total; + + if (chunked) { + char *src = raw + body_start; + char *dst = raw; + int wrote = 0; + while (*src) { + char *end = NULL; + long cl = strtol(src, &end, 16); + if (end == src || cl <= 0) break; + while (*end && (*end == '\r' || *end == '\n')) end++; + if (cl > 0 && wrote + cl + 1 < (int)sizeof(raw)) { + memmove(dst + wrote, end, cl); + wrote += (int)cl; + src = end + cl; + while (*src && (*src == '\r' || *src == '\n')) src++; + } else { + break; + } + } + raw[wrote] = '\0'; + body_end = wrote; + } else { + memmove(raw, raw + body_start, body_end - body_start); + body_end -= body_start; + raw[body_end] = '\0'; + } + + strncpy(out_response, raw, out_sz - 1); + out_response[out_sz - 1] = '\0'; + return true; +} + +/* ── Order placement ── */ + +bool rest_order_place(rest_conn_t *rc, + const char *symbol, const char *side, + double funds, double size, + const char *client_oid, + char *out_order_id, size_t out_sz, + char *out_error, size_t err_sz) { + char body[512]; + if (funds > 0) { + snprintf(body, sizeof(body), + "{\"clientOid\":\"%s\",\"symbol\":\"%s\",\"type\":\"market\"" + ",\"side\":\"%s\",\"funds\":\"%.10f\"}", + client_oid, symbol, side, funds); + } else { + snprintf(body, sizeof(body), + "{\"clientOid\":\"%s\",\"symbol\":\"%s\",\"type\":\"market\"" + ",\"side\":\"%s\",\"size\":\"%.10f\"}", + client_oid, symbol, side, size); + } + + char resp[65536] = {0}; + if (!do_signed_request(rc, "POST", "/api/v1/hf/orders", body, resp, sizeof(resp))) { + snprintf(out_error, err_sz, "transport_error"); + return false; + } + + /* Parse JSON response */ + cJSON *root = cJSON_Parse(resp); + if (!root) { + snprintf(out_error, err_sz, "parse_error: %.100s", resp); + return false; + } + + cJSON *code = cJSON_GetObjectItem(root, "code"); + cJSON *data = cJSON_GetObjectItem(root, "data"); + if (code && cJSON_IsString(code) && strcmp(code->valuestring, "200000") == 0 && data) { + cJSON *oid = cJSON_GetObjectItem(data, "orderId"); + if (oid && cJSON_IsString(oid)) { + strncpy(out_order_id, oid->valuestring, out_sz - 1); + out_order_id[out_sz - 1] = '\0'; + cJSON_Delete(root); + return true; + } + } + + /* Error */ + cJSON *msg = cJSON_GetObjectItem(root, "msg"); + if (msg && cJSON_IsString(msg)) { + snprintf(out_error, err_sz, "%s: %s", + code && cJSON_IsString(code) ? code->valuestring : "ERR", + msg->valuestring); + } else { + snprintf(out_error, err_sz, "unknown_error: %.100s", resp); + } + cJSON_Delete(root); + return false; +} + +bool rest_order_test(rest_conn_t *rc, + const char *symbol, const char *side, + double funds, double size, + char *out_error, size_t err_sz) { + /* Build a unique clientOid for the test order */ + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + char body[512]; + unsigned rnd = (unsigned)(ts.tv_nsec ^ (uintptr_t)&body); + char cid[48]; + snprintf(cid, sizeof(cid), "pt-%08x", rnd); + if (funds > 0) { + snprintf(body, sizeof(body), + "{\"clientOid\":\"%s\",\"symbol\":\"%s\",\"type\":\"market\",\"side\":\"%s\",\"funds\":\"%.8g\"}", + cid, symbol, side, funds); + } else { + snprintf(body, sizeof(body), + "{\"clientOid\":\"%s\",\"symbol\":\"%s\",\"type\":\"market\",\"side\":\"%s\",\"size\":\"%.8g\"}", + cid, symbol, side, size); + } + + char resp[65536] = {0}; + if (!do_signed_request(rc, "POST", "/api/v1/hf/orders/test", body, resp, sizeof(resp))) { + snprintf(out_error, err_sz, "transport_error"); + return false; + } + + cJSON *root = cJSON_Parse(resp); + if (!root) { + snprintf(out_error, err_sz, "parse_error: %.100s", resp); + return false; + } + + cJSON *code = cJSON_GetObjectItem(root, "code"); + if (code && cJSON_IsString(code) && strcmp(code->valuestring, "200000") == 0) { + cJSON_Delete(root); + return true; + } + + cJSON *msg = cJSON_GetObjectItem(root, "msg"); + if (msg && cJSON_IsString(msg)) { + snprintf(out_error, err_sz, "%s: %s", + code && cJSON_IsString(code) ? code->valuestring : "ERR", + msg->valuestring); + } else { + snprintf(out_error, err_sz, "unknown_error: %.100s", resp); + } + cJSON_Delete(root); + return false; +} + +bool rest_get_balance(rest_conn_t *rc, const char *currency, double *out) { + char path[128]; + char resp[65536] = {0}; + snprintf(path, sizeof(path), "/api/v1/accounts?currency=%s", currency); + + if (!do_signed_request(rc, "GET", path, NULL, resp, sizeof(resp))) { + return false; + } + + cJSON *root = cJSON_Parse(resp); + if (!root) return false; + + cJSON *data_arr = cJSON_GetObjectItem(root, "data"); + if (!data_arr || !cJSON_IsArray(data_arr) || cJSON_GetArraySize(data_arr) == 0) { + cJSON_Delete(root); + return false; + } + + cJSON *item = cJSON_GetArrayItem(data_arr, 0); + cJSON *avail = cJSON_GetObjectItem(item, "available"); + if (avail && cJSON_IsString(avail)) { + *out = atof(avail->valuestring); + cJSON_Delete(root); + return true; + } + cJSON_Delete(root); + return false; +} + +void rest_conn_free(rest_conn_t *rc) { + if (rc->ssl) SSL_free(rc->ssl); + if (rc->fd >= 0) close(rc->fd); + if (rc->ctx) SSL_CTX_free(rc->ctx); + free(rc); +} diff --git a/src/rest_client.h b/src/rest_client.h new file mode 100644 index 0000000..7ac2cb9 --- /dev/null +++ b/src/rest_client.h @@ -0,0 +1,45 @@ +#ifndef REST_CLIENT_H +#define REST_CLIENT_H + +#include +#include + +/* Keepalive HTTPS connection for KuCoin REST API. */ +typedef struct rest_conn_s rest_conn_t; + +/* Create a REST connection handle. Does not connect yet. */ +rest_conn_t *rest_conn_new(void); + +/* Set the API credentials. Must be called before any request. */ +void rest_conn_set_auth(rest_conn_t *rc, + const char *api_key, + const char *api_secret, + const char *api_passphrase); + +/* Place a market order. + Returns true on success and fills out_order_id. + Returns false and fills out_error (up to err_sz) on failure. */ +bool rest_order_place(rest_conn_t *rc, + const char *symbol, const char *side, + double funds, double size, + const char *client_oid, + char *out_order_id, size_t out_sz, + char *out_error, size_t err_sz); + +/* Validate an order via the test endpoint. + Returns true if the order would be accepted. */ +bool rest_order_test(rest_conn_t *rc, + const char *symbol, const char *side, + double funds, double size, + char *out_error, size_t err_sz); + +/* Fetch available balance for a currency (GET /api/v1/accounts). + Returns true and writes the available amount to *out on success. */ +bool rest_get_balance(rest_conn_t *rc, + const char *currency, + double *out); + +/* Close the connection and free resources. */ +void rest_conn_free(rest_conn_t *rc); + +#endif diff --git a/src/ws_client.c b/src/ws_client.c index bfc4c91..71dc62d 100644 --- a/src/ws_client.c +++ b/src/ws_client.c @@ -29,6 +29,8 @@ #include #include +static int ws_send_text(ws_connection_t *conn, const char *text); + static uint64_t now_ms_impl(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); @@ -70,6 +72,7 @@ static void ws_connection_reset(ws_connection_t *conn) { BIO_free(conn->bio_socket); conn->bio_socket = NULL; } + conn->token[0] = '\0'; if (conn->fd >= 0) { close(conn->fd); conn->fd = -1; @@ -177,6 +180,12 @@ int ws_client_init(ws_client_t *client, const config_t *cfg, conn->port = 443; } client->connection_count = 1; + + client->fill_ch = fill_channel_create(); + if (!client->fill_ch) { + log_write("[WS] Failed to create fill channel\n"); + } + return 0; } @@ -196,12 +205,26 @@ void ws_client_destroy(ws_client_t *client) { * Fetch a WebSocket token and server endpoint from KuCoin's /api/v1/bullet-public. * Stores token, host, ping_interval_ms, ping_timeout_ms in the connection struct. */ -int ws_client_fetch_token(ws_connection_t *conn) { - const char *body = ""; - +int ws_client_fetch_token_priv(ws_connection_t *conn, const config_t *cfg) { int out_len = 0; - char *response = https_post("api.kucoin.com", 443, "/api/v1/bullet-public", - body, strlen(body), &out_len); + char *response = NULL; + + /* Try private token first (needed for tradeOrdersV2 + balance). Fall + back to public token (level2Depth only) if no API key configured. */ + if (cfg && cfg->kucoin_api_key[0] && cfg->kucoin_api_secret[0]) { + response = https_post_auth("api.kucoin.com", 443, "/api/v1/bullet-private", + cfg->kucoin_api_key, + cfg->kucoin_api_secret, + cfg->kucoin_api_passphrase, + "{}", &out_len); + } + + if (!response || out_len <= 0) { + free(response); + response = https_post("api.kucoin.com", 443, "/api/v1/bullet-public", + "", 0, &out_len); + } + if (!response || out_len <= 0) { log_write("[WS] Failed to fetch token\n"); free(response); @@ -281,7 +304,7 @@ int ws_client_connect(ws_client_t *client, uint32_t conn_idx) { if (!conn->token[0]) { conn->state = WS_STATE_GETTING_TOKEN; - if (ws_client_fetch_token(conn) != 0) return -1; + if (ws_client_fetch_token_priv(conn, client->cfg) != 0) return -1; } snprintf(conn->connect_id, sizeof(conn->connect_id), "fused-%" PRIu32, conn_idx + 1); @@ -348,6 +371,16 @@ int ws_client_connect(ws_client_t *client, uint32_t conn_idx) { ws_client_subscribe(client, conn_idx, conn->symbol_indices, conn->symbol_count); } + /* Subscribe to private order-change channel for fill events */ + { + char msg[256]; + snprintf(msg, sizeof(msg), + "{\"type\":\"subscribe\",\"topic\":\"/spotMarket/tradeOrdersV2\"," + "\"response\":true,\"privateChannel\":\"true\"}"); + ws_send_text(conn, msg); + log_write("[WS] Subscribed to tradeOrdersV2\n"); + } + return 0; } @@ -365,6 +398,7 @@ int ws_client_write(ws_connection_t *conn, const void *data, size_t len) { if (r <= 0) { int err = SSL_get_error(conn->ssl, r); log_write("[WS] SSL_write error: %d\n", err); + conn->state = WS_STATE_DISCONNECTED; return -1; } written += r; @@ -695,7 +729,54 @@ int16_t ws_client_process_frame(ws_client_t *client, uint32_t conn_idx) { if (++ack_count <= 5) log_write("[WS] Ack #%d: %.*s\n", ack_count, (int)(payload_len > 200 ? 200 : payload_len), (const char *)payload); } else if (strcmp(msg_type->valuestring, "message") == 0) { - sym_idx = parse_book_update(msg_root, client); + cJSON *subject = cJSON_GetObjectItem(msg_root, "subject"); + if (cJSON_IsString(subject) && + strcmp(subject->valuestring, "orderChange") == 0) { + /* Private order-change event — dispatch fill */ + cJSON *data = cJSON_GetObjectItem(msg_root, "data"); + if (data && client->fill_ch) { + cJSON *oid = cJSON_GetObjectItem(data, "clientOid"); + cJSON *evt_type = cJSON_GetObjectItem(data, "type"); + cJSON *status = cJSON_GetObjectItem(data, "status"); + cJSON *order_id = cJSON_GetObjectItem(data, "orderId"); + + if (cJSON_IsString(oid) && cJSON_IsString(evt_type)) { + fill_event_t fe = {0}; + strncpy(fe.client_oid, oid->valuestring, MAX_CLIENT_OID - 1); + + if (order_id && cJSON_IsString(order_id)) + strncpy(fe.order_id, order_id->valuestring, sizeof(fe.order_id) - 1); + + bool is_match = (strcmp(evt_type->valuestring, "match") == 0); + bool is_terminal = ((strcmp(evt_type->valuestring, "filled") == 0 || + (strcmp(evt_type->valuestring, "canceled") == 0)) && + cJSON_IsString(status) && + strcmp(status->valuestring, "done") == 0); + + if (is_match) { + cJSON *ms = cJSON_GetObjectItem(data, "matchSize"); + cJSON *mp = cJSON_GetObjectItem(data, "matchPrice"); + cJSON *mf = cJSON_GetObjectItem(data, "matchFee"); + fe.match_size = cJSON_IsString(ms) ? atof(ms->valuestring) : + cJSON_IsNumber(ms) ? ms->valuedouble : 0; + fe.match_price = cJSON_IsString(mp) ? atof(mp->valuestring) : + cJSON_IsNumber(mp) ? mp->valuedouble : 0; + fe.match_fee = cJSON_IsString(mf) ? atof(mf->valuestring) : + cJSON_IsNumber(mf) ? mf->valuedouble : 0; + fill_channel_push(client->fill_ch, &fe); + } else if (is_terminal) { + /* Terminal event: push a marker that stops accumulation. + A preceding "match" event already communicated the data. + If no matches arrived, this is a cancelled+empty order. + We push a terminal marker to unblock the waiter. */ + fe.is_terminal = true; + fill_channel_push(client->fill_ch, &fe); + } + } + } + } else { + sym_idx = parse_book_update(msg_root, client); + } } else if (strcmp(msg_type->valuestring, "error") == 0) { log_write("[WS] Error message: %.*s\n", (int)(payload_len > 200 ? 200 : payload_len), (const char *)payload); @@ -731,6 +812,7 @@ int ws_client_read(ws_client_t *client, uint32_t conn_idx) { int err = SSL_get_error(conn->ssl, r); if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) return 0; log_write("[WS] SSL_read error: %d\n", err); + conn->state = WS_STATE_DISCONNECTED; return -1; } if (r == 0) { diff --git a/src/ws_client.h b/src/ws_client.h index 1a066ff..bc25f55 100644 --- a/src/ws_client.h +++ b/src/ws_client.h @@ -9,6 +9,7 @@ #include "config.h" #include "hash.h" #include "evaluate.h" +#include "fill_handler.h" #define WS_MAX_FRAME_SIZE (128 * 1024) #define WS_MAX_CONNECTIONS 8 @@ -71,6 +72,7 @@ typedef struct { order_book_t *books; /* pointer to shared order books */ evaluator_t *evaluator; /* pointer to evaluator */ bool running; /* false signals client to stop */ + fill_channel_t *fill_ch; /* fill event channel (hot→executor) */ } ws_client_t; /* Initialise a WebSocket client with config, symbol table, books, and evaluator */ @@ -93,8 +95,8 @@ int ws_client_subscribe(ws_client_t *client, uint32_t conn_idx, /* Unsubscribe from a set of symbols on a connection */ int ws_client_unsubscribe(ws_client_t *client, uint32_t conn_idx, const uint16_t *symbol_indices, uint32_t count); -/* Fetch a WebSocket token from the KuCoin API */ -int ws_client_fetch_token(ws_connection_t *conn); +/* Fetch a WebSocket token from the KuCoin API (prefers private when cfg has keys) */ +int ws_client_fetch_token_priv(ws_connection_t *conn, const config_t *cfg); /* Process a received WebSocket frame (dispatch to book updates, etc.). * Returns symbol index on book update, -1 otherwise. */ int16_t ws_client_process_frame(ws_client_t *client, uint32_t conn_idx);