fix: cross-leg increment floor, ceiling-to-floor rounding, balance WS subscription, order-level logging

src/evaluate.c:
- Add cross-leg increment floor after each leg's output
- Fix sell-leg min_volume conversion (was understated by rates[leg])
- Change ceil to floor for all leg rounding (round input down, then compute)

executor/ws_client.py:
- Subscribe to /account/balance via Classic WS (subject: account.balance)
- Add await_balance() with ack tracking and per-currency futures
- Handle balance events and store latest available per currency

executor/executor.py:
- Reject order detail included in fills list with real attempted volume/latency
- Screen/log output shows fills, book tops, profit for all statuses
- side field in order_placed/order_rejected logs
- predicted_bps read early from signal (no more hardcoded 0.0)
- timings in failed/aborted reports
- Paper mode rounding: buy funds/base floored to qi/bi
This commit is contained in:
nicolas 2026-05-25 20:21:19 -03:00
parent c1c4aa4be8
commit 3828e2b104
3 changed files with 187 additions and 57 deletions

View File

@ -31,7 +31,7 @@ import uuid
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal
from decimal import Decimal, ROUND_DOWN
from pathlib import Path
from typing import Optional
@ -189,6 +189,7 @@ class ExecutionReport:
effective_bps: float = 0.0
error: str = ""
book_ts_ms: int = 0
book_tops: list[dict] = field(default_factory=list)
@dataclass
@ -419,6 +420,7 @@ class Executor:
"""
exec_start_ts = time.perf_counter()
timings: list[dict] = []
predicted_bps = signal.get("predicted_bps", 0.0)
executor_receive_ts_ms = signal.get("_receiver_ts_ms") or int(time.time() * 1000)
signal_ts_ms = signal.get("ts_ms", 0)
book_ts_ms = signal.get("book_ts_ms", 0)
@ -439,6 +441,16 @@ class Executor:
self._log.debug("execute_triangle_entered", correlation_id=correlation_id)
legs = signal.get("legs", [])
books = signal.get("books", [])
book_tops = []
for bk in books[:3]:
asks = bk.get("asks", [])
bids = bk.get("bids", [])
book_tops.append({
"ask_px": str(asks[0]["price"]) if asks else "",
"ask_sz": str(asks[0]["size"]) if asks else "",
"bid_px": str(bids[0]["price"]) if bids else "",
"bid_sz": str(bids[0]["size"]) if bids else "",
})
live = signal.get("live", False)
log_prefix = "" if live else "[PAPER] "
@ -466,6 +478,8 @@ class Executor:
side = leg.get("side", "")
order_param = Decimal(str(leg.get("order_param", "0")))
base_min_size = Decimal(str(leg.get("base_min_size", "0")))
base_increment = Decimal(str(leg.get("base_increment", "0")))
quote_increment = Decimal(str(leg.get("quote_increment", "0")))
fee_rate = Decimal(str(leg.get("fee_rate", "0.001")))
fee_ccy = leg.get("fee_currency", "")
@ -482,7 +496,7 @@ class Executor:
if live:
# --- LIVE MODE ---
client_oid = f"live-{correlation_id}-{i}-{uuid.uuid4().hex[:8]}"
client_oid = f"t{correlation_id[-24:]}-{i}-{uuid.uuid4().hex[:4]}"
t0 = time.perf_counter()
if self._ws_client is None or not self._ws_client.is_connected:
@ -512,23 +526,9 @@ class Executor:
else:
prev = fills[i - 1]
input_vol = prev.filled_volume
if base_min_size > 0 and side == "sell" and input_vol < base_min_size:
self._log.info("execution_aborted_below_min",
correlation_id=correlation_id, leg=i, pair=pair,
base_vol=str(input_vol), min_base=str(base_min_size),
)
report = ExecutionReport(
correlation_id=correlation_id,
triangle_key=signal.get("triangle_key", []),
status="aborted",
fills=fills,
predicted_bps=0.0,
ts_ms=int(time.time() * 1000),
error="volume_below_minimum",
)
self._emit_report(report)
return
inc = quote_increment if side == "buy" else base_increment
if inc > 0:
input_vol = (input_vol / inc).to_integral_value(rounding=ROUND_DOWN) * inc
if side == "buy":
ok, err_msg, order_id = await self._api.order_place(
@ -554,27 +554,52 @@ class Executor:
correlation_id=correlation_id,
leg=i,
pair=pair,
side=side,
order_id=order_id,
client_oid=client_oid,
volume=str(input_vol),
place_latency_ms=round(place_latency_ms, 2),
)
self._log.write_plain(
f"{time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime())}{int(time.time() * 1000) % 1000:03d}Z "
f"ORDER | corr={correlation_id} | leg{i} | {pair} | {side} | "
f"vol={str(input_vol)} | order_id={order_id} | lat={place_latency_ms:.1f}ms"
)
if not ok:
fills.append(LegFill(
leg=i, order_id=order_id or "",
side=side, pair=pair,
input_currency=input_ccy, output_currency=output_ccy,
fee_currency=fee_ccy,
filled_volume=input_vol, deal_funds=_D0,
avg_price=_D0, fee=_D0,
latency_ms=round(place_latency_ms, 2),
))
self._log.error(
f"{log_prefix}order_rejected",
correlation_id=correlation_id,
leg=i,
pair=pair,
side=side,
volume=str(input_vol),
error=err_msg,
)
self._log.write_plain(
f"{time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime())}{int(time.time() * 1000) % 1000:03d}Z "
f"REJECTED | corr={correlation_id} | leg{i} | {pair} | {side} | "
f"vol={str(input_vol)} | error={err_msg} | lat={place_latency_ms:.1f}ms"
)
report = ExecutionReport(
correlation_id=correlation_id,
triangle_key=signal.get("triangle_key", []),
status="failed",
fills=fills,
predicted_bps=0.0,
predicted_bps=predicted_bps,
ts_ms=int(time.time() * 1000),
timings=timings,
error=err_msg or "order_rejected",
book_tops=book_tops,
)
self._emit_report(report)
return
@ -599,8 +624,9 @@ class Executor:
triangle_key=signal.get("triangle_key", []),
status="aborted",
fills=fills,
predicted_bps=0.0,
predicted_bps=predicted_bps,
ts_ms=int(time.time() * 1000),
timings=timings,
error="cancelled",
)
self._emit_report(report)
@ -623,9 +649,11 @@ class Executor:
triangle_key=signal.get("triangle_key", []),
status="failed",
fills=fills,
predicted_bps=0.0,
predicted_bps=predicted_bps,
ts_ms=int(time.time() * 1000),
timings=timings,
error=f"fill_timeout_leg{i}",
book_tops=book_tops,
)
self._emit_report(report)
return
@ -650,6 +678,7 @@ class Executor:
avg_price = weighted_avg_price if weighted_avg_price > 0 else _D0
fee = _D0
filled_vol_val = total_size if side == "buy" else total_funds
fills.append(LegFill(
leg=i,
order_id=order_id,
@ -658,12 +687,22 @@ class Executor:
input_currency=input_ccy,
output_currency=output_ccy,
fee_currency=fee_ccy,
filled_volume=total_size if side == "buy" else total_funds,
filled_volume=filled_vol_val,
deal_funds=total_funds,
avg_price=avg_price,
fee=fee,
latency_ms=latency_ms,
))
self._log.write_plain(
f"{time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime())}{int(time.time() * 1000) % 1000:03d}Z "
f"FILL | corr={correlation_id} | leg{i} | {pair} | {side} | "
f"out={str(filled_vol_val)}@{str(avg_price)} | "
f"fee={str(fee)} {fee_ccy} | lat={latency_ms:.1f}ms"
)
if i < 2:
await self._ws_client.await_balance(
output_ccy, fills[-1].filled_volume, 2000
)
in_flight.last_trade_ts_ms = int(time.time() * 1000)
continue
@ -735,9 +774,13 @@ class Executor:
if side == "buy":
asks = book_snapshot.get("asks", [])
price = Decimal(str(asks[0]["price"])) if asks else _D0
# For a buy with funds=X, you get X/price base units.
base_vol = order_param / price if price > 0 else _D0
deal_funds = order_param
funds = order_param
if quote_increment > 0:
funds = (funds / quote_increment).to_integral_value(rounding=ROUND_DOWN) * quote_increment
base_vol = funds / price if price > 0 else _D0
if base_increment > 0:
base_vol = (base_vol / base_increment).to_integral_value(rounding=ROUND_DOWN) * base_increment
deal_funds = funds
fee = base_vol * fee_rate if fee_ccy == base_ccy else order_param * fee_rate
filled_volume = base_vol
@ -761,8 +804,9 @@ class Executor:
else:
bids = book_snapshot.get("bids", [])
price = Decimal(str(bids[0]["price"])) if bids else _D0
# For a sell with size=X, you get X*price quote units.
base_vol = order_param
if base_increment > 0:
base_vol = (base_vol / base_increment).to_integral_value(rounding=ROUND_DOWN) * base_increment
quote_vol = base_vol * price if price > 0 else _D0
# Fee depends on whether KuCoin charges it in base or quote.
if fee_ccy == base_ccy:
@ -808,7 +852,6 @@ class Executor:
))
in_flight.last_trade_ts_ms = int(time.time() * 1000)
predicted_bps = signal.get("predicted_bps", 0.0)
# Compute actual profit in the primary quote currency.
# Leg 2 output: if buy, the received base (filled_volume); if sell, the received
# quote (deal_funds, already net when fee is in quote).
@ -839,6 +882,7 @@ class Executor:
ts_ms=int(time.time() * 1000),
timings=timings,
book_ts_ms=book_ts_ms,
book_tops=book_tops,
)
self._last_trade_ts_ms[in_flight.triangle_key] = report.ts_ms
self._emit_report(report)
@ -854,14 +898,20 @@ class Executor:
fills_repr = ", ".join(fills_lines)
self._reports.append(report)
error_suffix = f" | error={report.error}" if report.error else ""
profit_repr = f"profit={report.profit:.4f}" if report.status == "filled" else ""
profit_repr = f"profit={report.profit:.4f}"
timing_parts = " ".join(f"{t['step']}={t['elapsed_ms']:.1f}ms" for t in report.timings) if report.timings else ""
bt_lines = []
for bt in report.book_tops:
bt_lines.append(f"{bt.get('ask_px','?')}/{bt.get('bid_px','?')}")
book_top_repr = " | ".join(bt_lines) if bt_lines else ""
ts_iso = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.") + f"{report.ts_ms % 1000:03d}Z"
books_suffix = f" | books=[{book_top_repr}]" if book_top_repr else ""
msg = (
f"{ts_iso} {report.status.upper()} | corr={report.correlation_id} | "
f"triangle={report.triangle_key} | predicted_bps={report.predicted_bps:.2f} | "
f"effective_bps={report.effective_bps:.2f} | "
f"{profit_repr} | timings=[{timing_parts}] | fills=[{fills_repr}]{error_suffix}"
f"book_ts={report.book_ts_ms} | "
f"{profit_repr} | timings=[{timing_parts}] | fills=[{fills_repr}]{books_suffix}{error_suffix}"
)
self._log.write_plain(msg)
@ -870,7 +920,7 @@ class Executor:
f"triangle={report.triangle_key} | predicted_bps={report.predicted_bps:.2f} | "
f"effective_bps={report.effective_bps:.2f} | "
f"book_ts={report.book_ts_ms} | "
f"profit={report.profit:.4f}"
f"{profit_repr} | fills=[{fills_repr}]{books_suffix}"
f"{f' | error={report.error}' if report.error else ''}",
flush=True,
)

View File

@ -100,9 +100,10 @@ class KuCoinWSClient:
self._ping_timeout: float = 10.0
self._fill_futures: dict[str, asyncio.Future] = {}
self._accumulators: dict[str, FillAccumulator] = {}
self._balance_futures: dict[str, asyncio.Future] = {}
self._latest_balance: dict[str, Decimal] = {}
self._worker_task: Optional[asyncio.Task] = None
self._subscribe_ack_event: Optional[asyncio.Event] = None
self._subscribe_ack_id: Optional[str] = None
self._pending_acks: dict[str, asyncio.Event] = {}
@property
def is_connected(self) -> bool:
@ -125,6 +126,13 @@ class KuCoinWSClient:
future.set_result((False, {}))
self._fill_futures.clear()
self._accumulators.clear()
for future in self._balance_futures.values():
if not future.done():
future.set_result(False)
self._balance_futures.clear()
for evt in self._pending_acks.values():
evt.set()
self._pending_acks.clear()
if self._worker_task is not None and not self._worker_task.done():
self._worker_task.cancel()
if self._ws is not None:
@ -196,6 +204,41 @@ class KuCoinWSClient:
del self._accumulators[client_oid]
return result
async def await_balance(
self, currency: str, min_available: Decimal, timeout_ms: float
) -> bool:
"""
Wait until the available balance for *currency* reaches at least *min_available*.
If the latest known balance already meets the threshold, returns immediately.
Otherwise registers a future and waits for a balance WS event.
Returns True once the threshold is met, False on timeout.
"""
key = currency.upper()
current = self._latest_balance.get(key, _D0)
if current >= min_available:
return True
future: asyncio.Future = asyncio.Future()
self._balance_futures[key] = future
try:
await asyncio.wait_for(future, timeout=timeout_ms / 1000.0)
return True
except asyncio.TimeoutError:
self._log.warning(
"await_balance_timeout",
currency=key,
min_available=str(min_available),
latest=str(self._latest_balance.get(key, _D0)),
)
return False
except asyncio.CancelledError:
raise
finally:
if self._balance_futures.get(key) is future:
del self._balance_futures[key]
async def _connection_worker(self) -> None:
"""Main connection loop with exponential backoff reconnection."""
while self._running:
@ -267,30 +310,38 @@ class KuCoinWSClient:
self._connected = False
async def _subscribe(self) -> None:
"""Subscribe to the global tradeOrdersV2 topic."""
"""Subscribe to tradeOrdersV2 and balance channels."""
if self._ws is None:
return
ack_id = int(time.time() * 1000)
self._subscribe_ack_id = str(ack_id)
self._subscribe_ack_event = asyncio.Event()
ack_id1 = str(int(time.time() * 1000))
evt1 = asyncio.Event()
self._pending_acks[ack_id1] = evt1
sub_msg = {
"id": ack_id,
"id": int(ack_id1),
"type": "subscribe",
"topic": "/spotMarket/tradeOrdersV2",
"response": True,
"privateChannel": "true",
}
await self._ws.send(json.dumps(sub_msg))
self._log.info("subscribe_sent", topic="/spotMarket/tradeOrdersV2")
try:
await asyncio.wait_for(self._subscribe_ack_event.wait(), timeout=15.0)
self._log.info("subscription_confirmed", topic="/spotMarket/tradeOrdersV2")
except asyncio.TimeoutError:
self._log.warning("subscription_ack_timeout", topic="/spotMarket/tradeOrdersV2")
ack_id2 = str(int(time.time() * 1000) + 1)
evt2 = asyncio.Event()
self._pending_acks[ack_id2] = evt2
bal_msg = {
"id": int(ack_id2),
"type": "subscribe",
"topic": "/account/balance",
"response": True,
"privateChannel": "true",
}
await self._ws.send(json.dumps(bal_msg))
self._log.info("bal_subscribe_sent", topic="/account/balance")
await asyncio.wait_for(evt2.wait(), timeout=5.0)
self._log.info("bal_subscribe_ack_received")
async def _handle_message(self, msg: str) -> None:
"""Parse incoming WS message and dispatch fill events."""
@ -310,16 +361,26 @@ class KuCoinWSClient:
if msg_type == "ack":
ack_id = str(data.get("id", ""))
if ack_id == self._subscribe_ack_id and self._subscribe_ack_event is not None:
self._subscribe_ack_event.set()
self._subscribe_ack_event = None
self._subscribe_ack_id = None
return
if msg_type != "message":
evt = self._pending_acks.pop(ack_id, None)
if evt is not None:
evt.set()
return
subject = data.get("subject", "")
if subject == "account.balance":
payload = data.get("data", {})
currency = (payload.get("currency", "")).upper()
available_raw = payload.get("available")
if currency and available_raw is not None:
available = Decimal(str(available_raw))
self._latest_balance[currency] = available
self._log.debug("balance_update", currency=currency, available=str(available))
future = self._balance_futures.get(currency)
if future is not None and not future.done():
future.set_result(True)
return
if subject != "orderChange":
return

View File

@ -220,7 +220,12 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive
double min_quote = fmax(bms * price, qms);
if (qi > 0) min_quote = ceil(min_quote / qi - 1e-12) * qi;
leg_min_starting[leg] = min_quote / cumulative_rate;
if (is_buy) {
leg_min_starting[leg] = min_quote / cumulative_rate;
} else {
double denom = cumulative_rate * (price > 0 ? price : 1.0);
leg_min_starting[leg] = min_quote / denom;
}
cumulative_rate *= rates[leg];
}
@ -279,22 +284,36 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive
double leg_output;
if (is_buy) {
double ceiling = (qi > 0) ? ceil(leg_input / qi - 1e-12) * qi : leg_input;
double net = ceiling * ff;
double quote_input = (qi > 0) ? floor(leg_input / qi - 1e-12) * qi : leg_input;
double net = quote_input * ff;
double base = (bi > 0) ? floor(net / price / bi + 1e-12) * bi : (net / price);
double quote_cost = (qi > 0) ? ceil(base * price / qi - 1e-12) * qi : (base * price);
double quote_cost = (qi > 0) ? floor(base * price / qi - 1e-12) * qi : (base * price);
leg_quote_vol[leg] = quote_cost;
sig.legs.legs[leg].quote_volume = quote_cost;
leg_base_size[leg] = base;
leg_output = base;
} else {
double base = (bi > 0) ? floor(leg_input / bi + 1e-12) * bi : leg_input;
double gross = (qi > 0) ? ceil(base * price / qi - 1e-12) * qi : (base * price);
double gross = (qi > 0) ? floor(base * price / qi - 1e-12) * qi : (base * price);
leg_quote_vol[leg] = gross;
sig.legs.legs[leg].quote_volume = gross;
leg_base_size[leg] = base;
leg_output = gross * ff;
}
/* Floor leg_output to the next leg's input increment so the
cascade always produces valid order parameters downstream.
For the last leg (leg 2) there is no next leg to constrain. */
if (leg < 2) {
bool next_buy = !tri->use_bid[leg + 1];
double next_incr = next_buy
? tri->quote_increment[leg + 1]
: tri->base_increment[leg + 1];
if (next_incr > 0) {
leg_output = floor(leg_output / next_incr - 1e-12) * next_incr;
}
}
leg_input = leg_output;
}
sig.starting_volume = leg_quote_vol[0];