#include "executor.h" #include "rest_client.h" #include "log.h" #include "hash.h" #include #include #include #include #include #include #include #define _D1 1.0 #define FILL_TIMEOUT_MS 5000 #define MAX_IN_FLIGHT 8 struct executor_thread_s { const config_t *cfg; rest_conn_t *rest; /* Concurrency isolation state */ char in_flight_triangles[MAX_IN_FLIGHT][128]; /* triangle_key */ uint64_t in_flight_pairs[MAX_IN_FLIGHT]; /* fnv1a hash of each pair */ char in_flight_primary_quotes[MAX_IN_FLIGHT][16]; /* primary quote currency */ int in_flight_count; }; /* ── Reporting ── */ void executor_write_report(const char *fmt, ...) { va_list args; va_start(args, fmt); char buf[2048]; vsnprintf(buf, sizeof(buf), fmt, args); va_end(args); log_write("%s", buf); } /* ── Timestamp helpers ── */ static int64_t now_mono_ms(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (int64_t)ts.tv_sec * 1000 + (int64_t)ts.tv_nsec / 1000000; } static int64_t now_realtime_ms(void) { struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); return (int64_t)ts.tv_sec * 1000 + (int64_t)ts.tv_nsec / 1000000; } static void format_ts(char *buf, size_t sz) { time_t s = time(NULL); int ms = (int)(now_realtime_ms() % 1000); struct tm gm; gmtime_r(&s, &gm); strftime(buf, sz, "%Y-%m-%dT%H:%M:%S.", &gm); size_t len = strlen(buf); snprintf(buf + len, sz - len, "%03dZ", ms); } /* ── Fee hold reduction + increment floor ── */ static double apply_fee_hold(double vol, double fee_rate, bool next_is_buy) { if (next_is_buy && fee_rate > 0) vol /= (1.0 + fee_rate); return vol; } static double apply_increment_floor(double vol, double inc) { if (inc > 0) vol = floor(vol / inc - 1e-12) * inc; return vol; } /* ── Core execution loop ── */ executor_thread_t *executor_thread_create(const config_t *cfg, fill_channel_t *fill_ch) { (void)fill_ch; executor_thread_t *et = calloc(1, sizeof(*et)); if (!et) return NULL; et->cfg = cfg; et->rest = rest_conn_new(); if (et->rest) { rest_conn_set_auth(et->rest, cfg->kucoin_api_key, cfg->kucoin_api_secret, cfg->kucoin_api_passphrase); } /* Warm up the authenticated REST connection pool */ if (et->rest && cfg->live_mode) { double dummy = 0; (void)rest_get_balance(et->rest, "USDT", &dummy); } return et; } bool executor_keepalive(executor_thread_t *et) { if (!et->rest) return false; double dummy = 0; return rest_get_balance(et->rest, "USDT", &dummy); } void executor_thread_set_fill_ch(executor_thread_t *et, fill_channel_t *ch) { (void)et; (void)ch; /* In the initial implementation the fill channel is global (ws_client->fill_ch), passed directly to execute_triangle. This setter is for per-thread channels. */ } void executor_execute_triangle(executor_thread_t *et, signal_entry_t *sig, fill_channel_t *fill_ch) { /* ── Concurrency isolation ── */ uint64_t pair_hashes[3] = {0}; for (int p = 0; p < 3; p++) { pair_hashes[p] = fnv1a_hash(sig->legs.legs[p].symbol, (uint32_t)strlen(sig->legs.legs[p].symbol)); } for (int i = 0; i < et->in_flight_count; i++) { if (strcmp(et->in_flight_triangles[i], sig->triangle_key) == 0) { log_write("[EXEC] Dropping signal for overlapping triangle: %s\n", sig->triangle_key); return; } if (strcmp(et->in_flight_primary_quotes[i], sig->primary_quote) == 0) { log_write("[EXEC] Dropping signal for same primary quote: %s\n", sig->triangle_key); return; } for (int p = 0; p < 3; p++) { if (et->in_flight_pairs[i] == pair_hashes[p]) { log_write("[EXEC] Dropping signal for overlapping pair on %s\n", sig->triangle_key); return; } } } /* Register this execution */ int slot = -1; for (int i = 0; i < MAX_IN_FLIGHT; i++) { if (!et->in_flight_triangles[i][0]) { slot = i; break; } } if (slot >= 0) { strncpy(et->in_flight_triangles[slot], sig->triangle_key, sizeof(et->in_flight_triangles[slot]) - 1); strncpy(et->in_flight_primary_quotes[slot], sig->primary_quote, sizeof(et->in_flight_primary_quotes[slot]) - 1); et->in_flight_pairs[slot] = pair_hashes[0]; et->in_flight_count++; } char ts_buf[32]; char corr_id[64]; int64_t signal_received_ms = now_mono_ms(); int64_t exec_start = signal_received_ms; snprintf(corr_id, sizeof(corr_id), "%08x%08x%08x%08x", (unsigned)(uintptr_t)&sig->legs.legs[0] ^ (unsigned)sig->ts_ms, (unsigned)sig->ts_ms ^ (unsigned)sig->book_ts_ms, (unsigned)sig->predicted_bps, (unsigned)sig->t_arrive_ms); double leg_output[3] = {0}; double fills[3][6] = {{0}}; /* leg, output, avg_price, fee, input_vol, latency_ms */ bool success = true; const char *error_str = ""; for (int leg = 0; leg < 3; leg++) { const signal_leg_t *sl = &sig->legs.legs[leg]; bool is_buy = (strcmp(sl->side, "buy") == 0); /* ── Input volume ── */ double input_vol; if (leg == 0) { input_vol = atof(sl->order_param); } else { input_vol = leg_output[leg - 1]; input_vol = apply_fee_hold(input_vol, sl->fee_rate, is_buy); input_vol = apply_increment_floor(input_vol, is_buy ? sl->quote_increment : sl->base_increment); } /* Build a client OID */ char client_oid[64]; snprintf(client_oid, sizeof(client_oid), "c%08x%04x", (unsigned)(now_realtime_ms() & 0xFFFFFFFF), (unsigned)leg); fills[leg][4] = input_vol; /* ── Place order ── */ char order_id[32] = {0}; char err_msg[128] = {0}; bool ok = false; int64_t t0 = now_mono_ms(); int64_t order_fire_ms_tracking = t0; if (sig->live) { ok = rest_order_place(et->rest, sl->symbol, sl->side, is_buy ? input_vol : 0, is_buy ? 0 : input_vol, client_oid, order_id, sizeof(order_id), err_msg, sizeof(err_msg)); } else { ok = rest_order_test(et->rest, sl->symbol, sl->side, is_buy ? input_vol : 0, is_buy ? 0 : input_vol, err_msg, sizeof(err_msg)); if (ok) { snprintf(order_id, sizeof(order_id), "paper-%08x", (unsigned)now_realtime_ms()); } } fills[leg][5] = (double)(now_mono_ms() - t0); /* ── ORDER output ── */ { int64_t ref_ms = signal_received_ms; format_ts(ts_buf, sizeof(ts_buf)); executor_write_report( "%s ORDER | corr=%s | leg%d | %s | %s | vol=%.10g | " "order_id=%s | lat=%.1fms\n", ts_buf, corr_id, leg, sl->symbol, sl->side, input_vol, ok ? order_id : "NONE", fills[leg][5]); } if (!ok) { error_str = err_msg[0] ? err_msg : "order_rejected"; format_ts(ts_buf, sizeof(ts_buf)); executor_write_report( "%s REJECTED | corr=%s | leg%d | %s | %s | vol=%.10g | " "error=%s | lat=%.1fms\n", ts_buf, corr_id, leg, sl->symbol, sl->side, input_vol, err_msg, fills[leg][5]); success = false; break; } /* ── Wait for fill (live only) ── */ double total_size = 0, total_funds = 0, avg_price = 0, total_fee = 0; int match_count = 0; if (sig->live) { fill_result_t fr = {0}; bool filled = fill_channel_await(fill_ch, client_oid, FILL_TIMEOUT_MS, &fr); if (!filled) { error_str = fr.match_count > 0 ? "partial_fill" : "fill_timeout"; success = false; break; } total_size = fr.total_size; total_funds = fr.total_funds; total_fee = fr.total_fee; avg_price = fr.avg_price; match_count = fr.match_count; if (fr.order_id[0]) strncpy(order_id, fr.order_id, sizeof(order_id) - 1); } else { /* Paper mode: simulate fill from books[leg] top-of-book */ if (sig->book_count > leg) { const signal_book_t *bk = &sig->books[leg]; if (is_buy && bk->ask_count > 0) { double ask_price = bk->asks[0].price; total_size = input_vol / ask_price; if (sl->base_increment > 0) total_size = floor(total_size / sl->base_increment - 1e-12) * sl->base_increment; if (sl->quote_increment > 0) total_funds = floor(total_size * ask_price / sl->quote_increment - 1e-12) * sl->quote_increment; else total_funds = total_size * ask_price; avg_price = ask_price; } else if (!is_buy && bk->bid_count > 0) { double bid_price = bk->bids[0].price; total_size = input_vol; /* base amount to sell */ if (sl->base_increment > 0) total_size = floor(total_size / sl->base_increment - 1e-12) * sl->base_increment; if (sl->quote_increment > 0) total_funds = floor(total_size * bid_price / sl->quote_increment - 1e-12) * sl->quote_increment; else total_funds = total_size * bid_price; avg_price = bid_price; } else { error_str = "no_book_data"; success = false; break; } } else { error_str = "no_book_data"; success = false; break; } /* Simulated fee: if fee currency matches output, fee = output * rate */ if (sl->fee_rate > 0) { double output_amt = is_buy ? total_size : total_funds; total_fee = output_amt * sl->fee_rate; } } /* ── FILL output ── */ format_ts(ts_buf, sizeof(ts_buf)); executor_write_report( "%s FILL | corr=%s | leg%d | %s | %s | " "out=%.10g@%.6g | fee=%.6g %s | lat=%.1fms\n", ts_buf, corr_id, leg, sl->symbol, sl->side, is_buy ? total_size : total_funds, avg_price, total_fee, sl->fee_currency, (double)(now_mono_ms() - order_fire_ms_tracking)); /* ── Cascade ── */ leg_output[leg] = is_buy ? total_size : total_funds; fills[leg][0] = leg_output[leg]; fills[leg][1] = avg_price; fills[leg][2] = total_fee; fills[leg][3] = total_funds; /* Fee hold reduction + increment floor for next leg */ if (leg < 2) { const signal_leg_t *nsl = &sig->legs.legs[leg + 1]; bool nxt_buy = (strcmp(nsl->side, "buy") == 0); leg_output[leg] = apply_fee_hold(leg_output[leg], nsl->fee_rate, nxt_buy); leg_output[leg] = apply_increment_floor(leg_output[leg], nxt_buy ? nsl->quote_increment : nsl->base_increment); } } /* ── Compute PnL ── */ double profit = 0, effective_bps = 0; if (success && fills[2][0] > 0) { double leg0_in = (strcmp(sig->legs.legs[0].side, "buy") == 0) ? fills[0][3] : fills[0][0]; double leg2_out = (strcmp(sig->legs.legs[2].side, "buy") == 0) ? fills[2][0] : fills[2][0]; /* Subtract fee if fee currency matches leg 2 output currency */ { const signal_leg_t *sl2 = &sig->legs.legs[2]; bool leg2_buy = (strcmp(sl2->side, "buy") == 0); const char *pair = sl2->symbol; const char *dash = strchr(pair, '-'); char base_ccy[16] = {0}, quote_ccy[16] = {0}; if (dash) { size_t blen = (size_t)(dash - pair); if (blen > 15) blen = 15; memcpy(base_ccy, pair, blen); strncpy(quote_ccy, dash + 1, 15); } const char *out_ccy = leg2_buy ? base_ccy : quote_ccy; if (out_ccy[0] && strcmp(sl2->fee_currency, out_ccy) == 0) { leg2_out -= fills[2][2]; } } profit = leg2_out - leg0_in; if (leg0_in > 0) effective_bps = (profit / leg0_in) * 10000.0; } int64_t total_ms = now_mono_ms() - exec_start; /* ── Build timing string (relative to signal_arrive realtime) ── */ int64_t now_rt = now_realtime_ms(); char timings_str[256] = ""; if (sig->t_sock_arrive_ms > 0) { snprintf(timings_str, sizeof(timings_str), "t-2_book_snapshot=%.1fms " "book_update_arrived=%.1fms " "t-1_eval_complete=%.1fms " "t_signal_created=%.1fms " "signal_received=0.0ms " "execution_complete=%.1fms", (double)(sig->ts_ms - now_rt), (double)(sig->t_sock_arrive_ms - now_rt), (double)(sig->t_arrive_ms - now_rt), (double)(sig->t_eval_ms - now_rt), (double)(now_rt - now_rt)); } /* ── Build fills string ── */ char fills_str[1024] = ""; int fills_pos = 0; for (int leg = 0; leg < 3 && leg <= (success ? 2 : (fills[0][0] > 0 ? 0 : -1)); leg++) { if (fills[leg][0] == 0 && !(leg == 2 && success)) break; const signal_leg_t *sl = &sig->legs.legs[leg]; const char *pair = sl->symbol; const char *side = sl->side; const char *dash = strchr(pair, '-'); char in_ccy[16] = {0}, out_ccy[16] = {0}; if (dash) { size_t blen = (size_t)(dash - pair); if (blen > 15) blen = 15; if (strcmp(side, "buy") == 0) { memcpy(in_ccy, dash + 1, (strlen(dash+1) > 15 ? 15 : strlen(dash+1))); memcpy(out_ccy, pair, blen); } else { memcpy(out_ccy, dash + 1, (strlen(dash+1) > 15 ? 15 : strlen(dash+1))); memcpy(in_ccy, pair, blen); } } int n = snprintf(fills_str + fills_pos, sizeof(fills_str) - fills_pos, "%sL%d:%s %s %s->%s %.10g@%.6g(fee=%.6g %s lat=%.1fms)", leg > 0 ? ", " : "", leg, side, pair, in_ccy, out_ccy, fills[leg][0], fills[leg][1], fills[leg][2], sl->fee_currency, fills[leg][5]); if (n > 0) fills_pos += n; if (fills_pos >= (int)sizeof(fills_str) - 1) break; } /* ── Emit report (single line) ── */ format_ts(ts_buf, sizeof(ts_buf)); const char *status = success ? "FILLED" : "FAILED"; char bps_str[32]; snprintf(bps_str, sizeof(bps_str), "%.2f", effective_bps); executor_write_report( "%s %s | corr=%s | triangle=['%s'] | " "predicted_bps=%.2f | effective_bps=%s | " "book_ts=%lld | profit=%.4f | timings=[%s] | " "fills=[%s]%s%s\n", ts_buf, status, corr_id, sig->triangle_key, sig->predicted_bps, bps_str, (long long)sig->book_ts_ms, profit, timings_str, fills_str, error_str[0] ? " | error=" : "", error_str[0] ? error_str : ""); /* Release isolation slot */ for (int i = 0; i < MAX_IN_FLIGHT; i++) { if (strcmp(et->in_flight_triangles[i], sig->triangle_key) == 0) { et->in_flight_triangles[i][0] = '\0'; et->in_flight_primary_quotes[i][0] = '\0'; et->in_flight_pairs[i] = 0; et->in_flight_count--; break; } } log_write("[EXEC] Triangle %s in %lld ms, profit=%.4f predicted=%.2f eff=%.2f\n", status, (long long)total_ms, profit, sig->predicted_bps, effective_bps); } void executor_thread_destroy(executor_thread_t *et) { if (!et) return; if (et->rest) rest_conn_free(et->rest); free(et); }