triangular_arbitrage_bot/src/executor.c

486 lines
19 KiB
C

#include "executor.h"
#include "rest_client.h"
#include "log.h"
#include "hash.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <math.h>
#include <unistd.h>
#include <stdarg.h>
#define _D1 1.0
#define FILL_TIMEOUT_MS 5000
struct executor_thread_s {
const config_t *cfg;
rest_conn_t *rest;
ws_client_t *ws;
executor_shared_t *shared;
};
/* ── Reporting ── */
void executor_write_report(const char *fmt, ...) {
va_list args;
va_start(args, fmt);
char buf[2048];
vsnprintf(buf, sizeof(buf), fmt, args);
va_end(args);
log_write("%s", buf);
}
/* ── Timestamp helpers ── */
static int64_t now_mono_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (int64_t)ts.tv_sec * 1000 + (int64_t)ts.tv_nsec / 1000000;
}
static int64_t now_realtime_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (int64_t)ts.tv_sec * 1000 + (int64_t)ts.tv_nsec / 1000000;
}
static void format_ts(char *buf, size_t sz) {
time_t s = time(NULL);
int ms = (int)(now_realtime_ms() % 1000);
struct tm gm;
gmtime_r(&s, &gm);
strftime(buf, sz, "%Y-%m-%dT%H:%M:%S.", &gm);
size_t len = strlen(buf);
snprintf(buf + len, sz - len, "%03dZ", ms);
}
/* ── Fee hold reduction + increment floor ── */
static double apply_fee_hold(double vol, double fee_rate, bool next_is_buy) {
if (next_is_buy && fee_rate > 0)
vol /= (1.0 + fee_rate);
return vol;
}
static double apply_increment_floor(double vol, double inc) {
if (inc > 0)
vol = floor(vol / inc - 1e-12) * inc;
return vol;
}
/* ── Core execution loop ── */
executor_thread_t *executor_thread_create(const config_t *cfg,
fill_channel_t *fill_ch,
ws_client_t *ws,
executor_shared_t *shared) {
(void)fill_ch;
executor_thread_t *et = calloc(1, sizeof(*et));
if (!et) return NULL;
et->cfg = cfg;
et->ws = ws;
et->shared = shared;
et->rest = rest_conn_new();
if (et->rest) {
rest_conn_set_auth(et->rest,
cfg->kucoin_api_key,
cfg->kucoin_api_secret,
cfg->kucoin_api_passphrase);
}
/* Warm up the authenticated REST connection pool */
if (et->rest && cfg->live_mode && cfg->initial_capital_count > 0) {
double dummy = 0;
(void)rest_get_balance(et->rest, cfg->initial_capital[0].currency, &dummy);
}
return et;
}
bool executor_keepalive(executor_thread_t *et) {
if (!et->rest || et->cfg->initial_capital_count == 0) return false;
double dummy = 0;
return rest_get_balance(et->rest, et->cfg->initial_capital[0].currency, &dummy);
}
void executor_execute_triangle(executor_thread_t *et,
signal_entry_t *sig) {
fill_channel_t *fill_ch = et->ws->fill_ch;
executor_shared_t *sh = et->shared;
/* ── Concurrency isolation ── */
uint64_t pair_hashes[3] = {0};
for (int p = 0; p < 3; p++) {
pair_hashes[p] = fnv1a_hash(sig->legs.legs[p].symbol, (uint32_t)strlen(sig->legs.legs[p].symbol));
}
pthread_mutex_lock(&sh->lock);
for (int i = 0; i < sh->count; i++) {
if (strcmp(sh->triangles[i], sig->triangle_key) == 0) {
pthread_mutex_unlock(&sh->lock);
log_write("[EXEC] Dropping signal for overlapping triangle: %s\n", sig->triangle_key);
return;
}
if (strcmp(sh->primary_quotes[i], sig->primary_quote) == 0) {
pthread_mutex_unlock(&sh->lock);
log_write("[EXEC] Dropping signal for same primary quote: %s\n", sig->triangle_key);
return;
}
for (int p = 0; p < 3; p++) {
if (sh->pairs[i] == pair_hashes[p]) {
pthread_mutex_unlock(&sh->lock);
log_write("[EXEC] Dropping signal for overlapping pair on %s\n", sig->triangle_key);
return;
}
}
}
/* Register this execution */
int slot = -1;
for (int i = 0; i < MAX_IN_FLIGHT; i++) {
if (!sh->triangles[i][0]) { slot = i; break; }
}
if (slot >= 0) {
strncpy(sh->triangles[slot], sig->triangle_key, sizeof(sh->triangles[slot]) - 1);
strncpy(sh->primary_quotes[slot], sig->primary_quote, sizeof(sh->primary_quotes[slot]) - 1);
sh->pairs[slot] = pair_hashes[0];
sh->count++;
}
pthread_mutex_unlock(&sh->lock);
char ts_buf[32];
char corr_id[64];
int64_t exec_start_rt = now_realtime_ms();
int64_t exec_start_mono = now_mono_ms();
double leg_timings[6] = {0}; /* leg0_order, leg0_fill, leg1_order, leg1_fill, leg2_order, leg2_fill */
snprintf(corr_id, sizeof(corr_id), "%08x%08x%08x%08x",
(unsigned)(uintptr_t)&sig->legs.legs[0] ^ (unsigned)sig->ts_ms,
(unsigned)sig->ts_ms ^ (unsigned)sig->book_ts_ms,
(unsigned)sig->predicted_bps,
(unsigned)sig->t_arrive_ms);
double leg_output[3] = {0};
double fills[3][6] = {{0}}; /* leg, output, avg_price, fee, input_vol, latency_ms */
bool success = true;
const char *error_str = "";
for (int leg = 0; leg < 3; leg++) {
const signal_leg_t *sl = &sig->legs.legs[leg];
bool is_buy = (strcmp(sl->side, "buy") == 0);
/* ── Input volume ── */
double input_vol;
if (leg == 0) {
input_vol = atof(sl->order_param);
input_vol = apply_increment_floor(input_vol,
is_buy ? (sl->funds_increment > 0 ? sl->funds_increment : sl->quote_increment) : sl->base_increment);
} else {
input_vol = leg_output[leg - 1];
input_vol = apply_fee_hold(input_vol, sl->fee_rate, is_buy);
input_vol = apply_increment_floor(input_vol,
is_buy ? (sl->funds_increment > 0 ? sl->funds_increment : sl->quote_increment) : sl->base_increment);
}
/* Build a client OID */
char client_oid[64];
snprintf(client_oid, sizeof(client_oid), "c%08x%04x",
(unsigned)(now_realtime_ms() & 0xFFFFFFFF),
(unsigned)leg);
fills[leg][4] = input_vol;
/* ── Capture order fire timing BEFORE sending (realtime) ── */
int64_t t0 = now_mono_ms();
if (leg < 6) leg_timings[leg * 2] = (double)(now_realtime_ms() - exec_start_rt);
/* ── Place order ── */
char order_id[32] = {0};
char err_msg[128] = {0};
bool ok = false;
int64_t order_fire_ms_tracking = t0;
if (sig->live) {
ok = rest_order_place(et->rest, sl->symbol, sl->side,
is_buy ? input_vol : 0,
is_buy ? 0 : input_vol,
client_oid,
order_id, sizeof(order_id),
err_msg, sizeof(err_msg));
} else {
ok = rest_order_test(et->rest, sl->symbol, sl->side,
is_buy ? input_vol : 0,
is_buy ? 0 : input_vol,
err_msg, sizeof(err_msg));
if (ok) {
snprintf(order_id, sizeof(order_id), "paper-%08x",
(unsigned)now_realtime_ms());
}
}
fills[leg][5] = (double)(now_mono_ms() - t0);
/* ── ORDER output ── */
{
format_ts(ts_buf, sizeof(ts_buf));
executor_write_report(
"%s ORDER | corr=%s | leg%d | %s | %s | vol=%.10g | "
"order_id=%s | lat=%.1fms\n",
ts_buf, corr_id, leg, sl->symbol, sl->side,
input_vol, ok ? order_id : "NONE", fills[leg][5]);
}
if (!ok) {
error_str = err_msg[0] ? err_msg : "order_rejected";
format_ts(ts_buf, sizeof(ts_buf));
executor_write_report(
"%s REJECTED | corr=%s | leg%d | %s | %s | vol=%.10g | "
"error=%s | lat=%.1fms\n",
ts_buf, corr_id, leg, sl->symbol, sl->side,
input_vol, err_msg, fills[leg][5]);
success = false;
break;
}
/* ── Wait for fill (live only) ── */
double total_size = 0, total_funds = 0, avg_price = 0, total_fee = 0;
if (sig->live) {
fill_result_t fr = {0};
bool filled = fill_channel_await(fill_ch, client_oid,
FILL_TIMEOUT_MS, &fr);
if (!filled) {
error_str = fr.match_count > 0 ? "partial_fill" : "fill_timeout";
success = false;
break;
}
total_size = fr.total_size;
total_funds = fr.total_funds;
total_fee = fr.total_fee;
avg_price = fr.avg_price;
if (fr.order_id[0]) memcpy(order_id, fr.order_id, sizeof(order_id));
} else {
/* Paper mode: simulate fill from books[leg] top-of-book */
if (sig->book_count > leg) {
const signal_book_t *bk = &sig->books[leg];
if (is_buy && bk->ask_count > 0) {
double ask_price = bk->asks[0].price;
total_size = input_vol / ask_price;
if (sl->base_increment > 0)
total_size = floor(total_size / sl->base_increment - 1e-12) * sl->base_increment;
if (sl->quote_increment > 0)
total_funds = floor(total_size * ask_price / sl->quote_increment - 1e-12) * sl->quote_increment;
else
total_funds = total_size * ask_price;
avg_price = ask_price;
} else if (!is_buy && bk->bid_count > 0) {
double bid_price = bk->bids[0].price;
total_size = input_vol; /* base amount to sell */
if (sl->base_increment > 0)
total_size = floor(total_size / sl->base_increment - 1e-12) * sl->base_increment;
if (sl->quote_increment > 0)
total_funds = floor(total_size * bid_price / sl->quote_increment - 1e-12) * sl->quote_increment;
else
total_funds = total_size * bid_price;
avg_price = bid_price;
} else {
error_str = "no_book_data";
success = false;
break;
}
} else {
error_str = "no_book_data";
success = false;
break;
}
/* Simulated fee: if fee currency matches output, fee = output * rate */
if (sl->fee_rate > 0) {
double output_amt = is_buy ? total_size : total_funds;
total_fee = output_amt * sl->fee_rate;
}
}
/* ── Timing: leg_fill_received (realtime) ── */
if (leg < 6) leg_timings[leg * 2 + 1] = (double)(now_realtime_ms() - exec_start_rt);
/* ── FILL output ── */
format_ts(ts_buf, sizeof(ts_buf));
executor_write_report(
"%s FILL | corr=%s | leg%d | %s | %s | "
"out=%.10g@%.6g | fee=%.6g %s | lat=%.1fms\n",
ts_buf, corr_id, leg, sl->symbol, sl->side,
is_buy ? total_size : total_funds, avg_price,
total_fee, sl->fee_currency,
(double)(now_mono_ms() - order_fire_ms_tracking));
/* ── Cascade ── */
double raw_output = is_buy ? total_size : total_funds;
leg_output[leg] = raw_output;
fills[leg][0] = leg_output[leg];
fills[leg][1] = avg_price;
fills[leg][2] = total_fee;
fills[leg][3] = total_funds;
/* ── Balance wait (live only): wait for KuCoin to settle before next leg ── */
if (sig->live && leg < 2) {
const char *pair = sl->symbol;
const char *dash = strchr(pair, '-');
char out_ccy[16] = {0};
if (dash) {
const char *ccy = is_buy ? pair : (dash + 1);
size_t ccy_len = is_buy ? (size_t)(dash - pair) : strlen(dash + 1);
if (ccy_len > 15) ccy_len = 15;
memcpy(out_ccy, ccy, ccy_len);
}
if (out_ccy[0]) {
ws_client_await_balance(et->ws, out_ccy, raw_output, 2000);
/* For sells, KuCoin may deduct fee from output currency.
Use the actual settled balance rather than the arithmetic output. */
double actual = ws_client_latest_balance(et->ws, out_ccy);
if (actual > 0 && actual < raw_output) {
leg_output[leg] = actual;
fills[leg][0] = actual;
raw_output = actual;
}
}
}
/* Fee hold reduction + increment floor for next leg */
if (leg < 2) {
const signal_leg_t *nsl = &sig->legs.legs[leg + 1];
bool nxt_buy = (strcmp(nsl->side, "buy") == 0);
leg_output[leg] = apply_fee_hold(leg_output[leg], nsl->fee_rate, nxt_buy);
leg_output[leg] = apply_increment_floor(leg_output[leg],
nxt_buy ? (nsl->funds_increment > 0 ? nsl->funds_increment : nsl->quote_increment) : nsl->base_increment);
}
}
/* ── Compute PnL ── */
double profit = 0, effective_bps = 0;
if (success && fills[2][0] > 0) {
double leg0_in = fills[0][4];
double leg2_out = fills[2][0];
/* Subtract fee if fee currency matches leg 2 output currency */
{
const signal_leg_t *sl2 = &sig->legs.legs[2];
bool leg2_buy = (strcmp(sl2->side, "buy") == 0);
const char *pair = sl2->symbol;
const char *dash = strchr(pair, '-');
char base_ccy[16] = {0}, quote_ccy[16] = {0};
if (dash) {
size_t blen = (size_t)(dash - pair);
if (blen > 15) blen = 15;
memcpy(base_ccy, pair, blen);
strncpy(quote_ccy, dash + 1, 15);
}
const char *out_ccy = leg2_buy ? base_ccy : quote_ccy;
if (out_ccy[0] && strcmp(sl2->fee_currency, out_ccy) == 0) {
leg2_out -= fills[2][2];
}
}
profit = leg2_out - leg0_in;
if (leg0_in > 0)
effective_bps = (profit / leg0_in) * 10000.0;
}
int64_t total_ms = now_mono_ms() - exec_start_mono;
/* ── Build timing string (t0 = book_update_arrived = 0.0ms) ── */
char timings_str[512] = "";
int tp = 0;
int64_t book_base = sig->t_arrive_ms;
if (book_base > 0) {
if (sig->book_ts_ms > 0)
tp += snprintf(timings_str + tp, sizeof(timings_str) - tp,
"t-1_snapshot=%.1fms ",
(double)((int64_t)sig->book_ts_ms - book_base));
tp += snprintf(timings_str + tp, sizeof(timings_str) - tp,
"t0_arrival=0.0ms ");
if (sig->ts_ms > 0)
tp += snprintf(timings_str + tp, sizeof(timings_str) - tp,
"t1_signal=%.1fms ",
(double)((int64_t)sig->ts_ms - book_base));
double sig_recv = (double)(exec_start_rt - book_base);
for (int l = 0; l < 3; l++) {
double o = leg_timings[l * 2];
double f = leg_timings[l * 2 + 1];
if (fills[l][0] > 0) {
tp += snprintf(timings_str + tp, sizeof(timings_str) - tp,
"t%d_leg%d_order=%.1fms ", 2 + l * 2, l, sig_recv + o);
tp += snprintf(timings_str + tp, sizeof(timings_str) - tp,
"t%d_leg%d_fill=%.1fms ", 3 + l * 2, l, sig_recv + f);
}
}
}
/* ── Build fills string ── */
char fills_str[1024] = "";
int fills_pos = 0;
for (int leg = 0; leg < 3 && leg <= (success ? 2 : (fills[0][0] > 0 ? 0 : -1)); leg++) {
if (fills[leg][0] == 0 && !(leg == 2 && success)) break;
const signal_leg_t *sl = &sig->legs.legs[leg];
const char *pair = sl->symbol;
const char *side = sl->side;
const char *dash = strchr(pair, '-');
char in_ccy[16] = {0}, out_ccy[16] = {0};
if (dash) {
size_t blen = (size_t)(dash - pair);
if (blen > 15) blen = 15;
if (strcmp(side, "buy") == 0) {
memcpy(in_ccy, dash + 1, (strlen(dash+1) > 15 ? 15 : strlen(dash+1)));
memcpy(out_ccy, pair, blen);
} else {
memcpy(out_ccy, dash + 1, (strlen(dash+1) > 15 ? 15 : strlen(dash+1)));
memcpy(in_ccy, pair, blen);
}
}
int n = snprintf(fills_str + fills_pos, sizeof(fills_str) - fills_pos,
"%sL%d:%s %s %s->%s %.10g@%.6g(fee=%.6g %s lat=%.1fms)",
leg > 0 ? ", " : "",
leg, side, pair, in_ccy, out_ccy,
fills[leg][0], fills[leg][1], fills[leg][2],
sl->fee_currency, fills[leg][5]);
if (n > 0) fills_pos += n;
if (fills_pos >= (int)sizeof(fills_str) - 1) break;
}
/* ── Emit report (single line) ── */
format_ts(ts_buf, sizeof(ts_buf));
const char *status = success ? "FILLED" : "FAILED";
char bps_str[32];
snprintf(bps_str, sizeof(bps_str), "%.2f", effective_bps);
executor_write_report(
"%s %s | corr=%s | triangle=['%s'] | "
"predicted_bps=%.2f | effective_bps=%s | "
"book_ts=%lld | profit=%.4f | timings=[%s] | "
"fills=[%s]%s%s\n",
ts_buf, status, corr_id, sig->triangle_key,
sig->predicted_bps, bps_str,
(long long)sig->book_ts_ms, profit, timings_str,
fills_str,
error_str[0] ? " | error=" : "",
error_str[0] ? error_str : "");
/* Release isolation slot */
pthread_mutex_lock(&sh->lock);
for (int i = 0; i < MAX_IN_FLIGHT; i++) {
if (strcmp(sh->triangles[i], sig->triangle_key) == 0) {
sh->triangles[i][0] = '\0';
sh->primary_quotes[i][0] = '\0';
sh->pairs[i] = 0;
sh->count--;
break;
}
}
pthread_mutex_unlock(&sh->lock);
log_write("[EXEC] Triangle %s in %lld ms, profit=%.4f predicted=%.2f eff=%.2f\n",
status, (long long)total_ms, profit, sig->predicted_bps, effective_bps);
}
void executor_thread_destroy(executor_thread_t *et) {
if (!et) return;
if (et->rest) rest_conn_free(et->rest);
free(et);
}