diff --git a/executor/executor.py b/executor/executor.py index df1e032..dea2323 100644 --- a/executor/executor.py +++ b/executor/executor.py @@ -31,7 +31,7 @@ import uuid from collections import deque from dataclasses import dataclass, field from datetime import datetime -from decimal import Decimal +from decimal import Decimal, ROUND_DOWN from pathlib import Path from typing import Optional @@ -189,6 +189,7 @@ class ExecutionReport: effective_bps: float = 0.0 error: str = "" book_ts_ms: int = 0 + book_tops: list[dict] = field(default_factory=list) @dataclass @@ -419,6 +420,7 @@ class Executor: """ exec_start_ts = time.perf_counter() timings: list[dict] = [] + predicted_bps = signal.get("predicted_bps", 0.0) executor_receive_ts_ms = signal.get("_receiver_ts_ms") or int(time.time() * 1000) signal_ts_ms = signal.get("ts_ms", 0) book_ts_ms = signal.get("book_ts_ms", 0) @@ -439,6 +441,16 @@ class Executor: self._log.debug("execute_triangle_entered", correlation_id=correlation_id) legs = signal.get("legs", []) books = signal.get("books", []) + book_tops = [] + for bk in books[:3]: + asks = bk.get("asks", []) + bids = bk.get("bids", []) + book_tops.append({ + "ask_px": str(asks[0]["price"]) if asks else "", + "ask_sz": str(asks[0]["size"]) if asks else "", + "bid_px": str(bids[0]["price"]) if bids else "", + "bid_sz": str(bids[0]["size"]) if bids else "", + }) live = signal.get("live", False) log_prefix = "" if live else "[PAPER] " @@ -466,6 +478,8 @@ class Executor: side = leg.get("side", "") order_param = Decimal(str(leg.get("order_param", "0"))) base_min_size = Decimal(str(leg.get("base_min_size", "0"))) + base_increment = Decimal(str(leg.get("base_increment", "0"))) + quote_increment = Decimal(str(leg.get("quote_increment", "0"))) fee_rate = Decimal(str(leg.get("fee_rate", "0.001"))) fee_ccy = leg.get("fee_currency", "") @@ -482,7 +496,7 @@ class Executor: if live: # --- LIVE MODE --- - client_oid = f"live-{correlation_id}-{i}-{uuid.uuid4().hex[:8]}" + client_oid = f"t{correlation_id[-24:]}-{i}-{uuid.uuid4().hex[:4]}" t0 = time.perf_counter() if self._ws_client is None or not self._ws_client.is_connected: @@ -512,23 +526,9 @@ class Executor: else: prev = fills[i - 1] input_vol = prev.filled_volume - - if base_min_size > 0 and side == "sell" and input_vol < base_min_size: - self._log.info("execution_aborted_below_min", - correlation_id=correlation_id, leg=i, pair=pair, - base_vol=str(input_vol), min_base=str(base_min_size), - ) - report = ExecutionReport( - correlation_id=correlation_id, - triangle_key=signal.get("triangle_key", []), - status="aborted", - fills=fills, - predicted_bps=0.0, - ts_ms=int(time.time() * 1000), - error="volume_below_minimum", - ) - self._emit_report(report) - return + inc = quote_increment if side == "buy" else base_increment + if inc > 0: + input_vol = (input_vol / inc).to_integral_value(rounding=ROUND_DOWN) * inc if side == "buy": ok, err_msg, order_id = await self._api.order_place( @@ -554,27 +554,52 @@ class Executor: correlation_id=correlation_id, leg=i, pair=pair, + side=side, order_id=order_id, client_oid=client_oid, + volume=str(input_vol), place_latency_ms=round(place_latency_ms, 2), ) + self._log.write_plain( + f"{time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime())}{int(time.time() * 1000) % 1000:03d}Z " + f"ORDER | corr={correlation_id} | leg{i} | {pair} | {side} | " + f"vol={str(input_vol)} | order_id={order_id} | lat={place_latency_ms:.1f}ms" + ) if not ok: + fills.append(LegFill( + leg=i, order_id=order_id or "", + side=side, pair=pair, + input_currency=input_ccy, output_currency=output_ccy, + fee_currency=fee_ccy, + filled_volume=input_vol, deal_funds=_D0, + avg_price=_D0, fee=_D0, + latency_ms=round(place_latency_ms, 2), + )) self._log.error( f"{log_prefix}order_rejected", correlation_id=correlation_id, leg=i, pair=pair, + side=side, + volume=str(input_vol), error=err_msg, ) + self._log.write_plain( + f"{time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime())}{int(time.time() * 1000) % 1000:03d}Z " + f"REJECTED | corr={correlation_id} | leg{i} | {pair} | {side} | " + f"vol={str(input_vol)} | error={err_msg} | lat={place_latency_ms:.1f}ms" + ) report = ExecutionReport( correlation_id=correlation_id, triangle_key=signal.get("triangle_key", []), status="failed", fills=fills, - predicted_bps=0.0, + predicted_bps=predicted_bps, ts_ms=int(time.time() * 1000), + timings=timings, error=err_msg or "order_rejected", + book_tops=book_tops, ) self._emit_report(report) return @@ -599,8 +624,9 @@ class Executor: triangle_key=signal.get("triangle_key", []), status="aborted", fills=fills, - predicted_bps=0.0, + predicted_bps=predicted_bps, ts_ms=int(time.time() * 1000), + timings=timings, error="cancelled", ) self._emit_report(report) @@ -623,9 +649,11 @@ class Executor: triangle_key=signal.get("triangle_key", []), status="failed", fills=fills, - predicted_bps=0.0, + predicted_bps=predicted_bps, ts_ms=int(time.time() * 1000), + timings=timings, error=f"fill_timeout_leg{i}", + book_tops=book_tops, ) self._emit_report(report) return @@ -650,6 +678,7 @@ class Executor: avg_price = weighted_avg_price if weighted_avg_price > 0 else _D0 fee = _D0 + filled_vol_val = total_size if side == "buy" else total_funds fills.append(LegFill( leg=i, order_id=order_id, @@ -658,12 +687,22 @@ class Executor: input_currency=input_ccy, output_currency=output_ccy, fee_currency=fee_ccy, - filled_volume=total_size if side == "buy" else total_funds, + filled_volume=filled_vol_val, deal_funds=total_funds, avg_price=avg_price, fee=fee, latency_ms=latency_ms, )) + self._log.write_plain( + f"{time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime())}{int(time.time() * 1000) % 1000:03d}Z " + f"FILL | corr={correlation_id} | leg{i} | {pair} | {side} | " + f"out={str(filled_vol_val)}@{str(avg_price)} | " + f"fee={str(fee)} {fee_ccy} | lat={latency_ms:.1f}ms" + ) + if i < 2: + await self._ws_client.await_balance( + output_ccy, fills[-1].filled_volume, 2000 + ) in_flight.last_trade_ts_ms = int(time.time() * 1000) continue @@ -735,9 +774,13 @@ class Executor: if side == "buy": asks = book_snapshot.get("asks", []) price = Decimal(str(asks[0]["price"])) if asks else _D0 - # For a buy with funds=X, you get X/price base units. - base_vol = order_param / price if price > 0 else _D0 - deal_funds = order_param + funds = order_param + if quote_increment > 0: + funds = (funds / quote_increment).to_integral_value(rounding=ROUND_DOWN) * quote_increment + base_vol = funds / price if price > 0 else _D0 + if base_increment > 0: + base_vol = (base_vol / base_increment).to_integral_value(rounding=ROUND_DOWN) * base_increment + deal_funds = funds fee = base_vol * fee_rate if fee_ccy == base_ccy else order_param * fee_rate filled_volume = base_vol @@ -761,8 +804,9 @@ class Executor: else: bids = book_snapshot.get("bids", []) price = Decimal(str(bids[0]["price"])) if bids else _D0 - # For a sell with size=X, you get X*price quote units. base_vol = order_param + if base_increment > 0: + base_vol = (base_vol / base_increment).to_integral_value(rounding=ROUND_DOWN) * base_increment quote_vol = base_vol * price if price > 0 else _D0 # Fee depends on whether KuCoin charges it in base or quote. if fee_ccy == base_ccy: @@ -808,7 +852,6 @@ class Executor: )) in_flight.last_trade_ts_ms = int(time.time() * 1000) - predicted_bps = signal.get("predicted_bps", 0.0) # Compute actual profit in the primary quote currency. # Leg 2 output: if buy, the received base (filled_volume); if sell, the received # quote (deal_funds, already net when fee is in quote). @@ -839,6 +882,7 @@ class Executor: ts_ms=int(time.time() * 1000), timings=timings, book_ts_ms=book_ts_ms, + book_tops=book_tops, ) self._last_trade_ts_ms[in_flight.triangle_key] = report.ts_ms self._emit_report(report) @@ -854,14 +898,20 @@ class Executor: fills_repr = ", ".join(fills_lines) self._reports.append(report) error_suffix = f" | error={report.error}" if report.error else "" - profit_repr = f"profit={report.profit:.4f}" if report.status == "filled" else "" + profit_repr = f"profit={report.profit:.4f}" timing_parts = " ".join(f"{t['step']}={t['elapsed_ms']:.1f}ms" for t in report.timings) if report.timings else "" + bt_lines = [] + for bt in report.book_tops: + bt_lines.append(f"{bt.get('ask_px','?')}/{bt.get('bid_px','?')}") + book_top_repr = " | ".join(bt_lines) if bt_lines else "" ts_iso = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.") + f"{report.ts_ms % 1000:03d}Z" + books_suffix = f" | books=[{book_top_repr}]" if book_top_repr else "" msg = ( f"{ts_iso} {report.status.upper()} | corr={report.correlation_id} | " f"triangle={report.triangle_key} | predicted_bps={report.predicted_bps:.2f} | " f"effective_bps={report.effective_bps:.2f} | " - f"{profit_repr} | timings=[{timing_parts}] | fills=[{fills_repr}]{error_suffix}" + f"book_ts={report.book_ts_ms} | " + f"{profit_repr} | timings=[{timing_parts}] | fills=[{fills_repr}]{books_suffix}{error_suffix}" ) self._log.write_plain(msg) @@ -870,7 +920,7 @@ class Executor: f"triangle={report.triangle_key} | predicted_bps={report.predicted_bps:.2f} | " f"effective_bps={report.effective_bps:.2f} | " f"book_ts={report.book_ts_ms} | " - f"profit={report.profit:.4f}" + f"{profit_repr} | fills=[{fills_repr}]{books_suffix}" f"{f' | error={report.error}' if report.error else ''}", flush=True, ) diff --git a/executor/ws_client.py b/executor/ws_client.py index a68a395..ac03e1a 100644 --- a/executor/ws_client.py +++ b/executor/ws_client.py @@ -100,9 +100,10 @@ class KuCoinWSClient: self._ping_timeout: float = 10.0 self._fill_futures: dict[str, asyncio.Future] = {} self._accumulators: dict[str, FillAccumulator] = {} + self._balance_futures: dict[str, asyncio.Future] = {} + self._latest_balance: dict[str, Decimal] = {} self._worker_task: Optional[asyncio.Task] = None - self._subscribe_ack_event: Optional[asyncio.Event] = None - self._subscribe_ack_id: Optional[str] = None + self._pending_acks: dict[str, asyncio.Event] = {} @property def is_connected(self) -> bool: @@ -125,6 +126,13 @@ class KuCoinWSClient: future.set_result((False, {})) self._fill_futures.clear() self._accumulators.clear() + for future in self._balance_futures.values(): + if not future.done(): + future.set_result(False) + self._balance_futures.clear() + for evt in self._pending_acks.values(): + evt.set() + self._pending_acks.clear() if self._worker_task is not None and not self._worker_task.done(): self._worker_task.cancel() if self._ws is not None: @@ -196,6 +204,41 @@ class KuCoinWSClient: del self._accumulators[client_oid] return result + async def await_balance( + self, currency: str, min_available: Decimal, timeout_ms: float + ) -> bool: + """ + Wait until the available balance for *currency* reaches at least *min_available*. + + If the latest known balance already meets the threshold, returns immediately. + Otherwise registers a future and waits for a balance WS event. + + Returns True once the threshold is met, False on timeout. + """ + key = currency.upper() + current = self._latest_balance.get(key, _D0) + if current >= min_available: + return True + + future: asyncio.Future = asyncio.Future() + self._balance_futures[key] = future + try: + await asyncio.wait_for(future, timeout=timeout_ms / 1000.0) + return True + except asyncio.TimeoutError: + self._log.warning( + "await_balance_timeout", + currency=key, + min_available=str(min_available), + latest=str(self._latest_balance.get(key, _D0)), + ) + return False + except asyncio.CancelledError: + raise + finally: + if self._balance_futures.get(key) is future: + del self._balance_futures[key] + async def _connection_worker(self) -> None: """Main connection loop with exponential backoff reconnection.""" while self._running: @@ -267,30 +310,38 @@ class KuCoinWSClient: self._connected = False async def _subscribe(self) -> None: - """Subscribe to the global tradeOrdersV2 topic.""" + """Subscribe to tradeOrdersV2 and balance channels.""" if self._ws is None: return - ack_id = int(time.time() * 1000) - self._subscribe_ack_id = str(ack_id) - self._subscribe_ack_event = asyncio.Event() - + ack_id1 = str(int(time.time() * 1000)) + evt1 = asyncio.Event() + self._pending_acks[ack_id1] = evt1 sub_msg = { - "id": ack_id, + "id": int(ack_id1), "type": "subscribe", "topic": "/spotMarket/tradeOrdersV2", "response": True, "privateChannel": "true", } - await self._ws.send(json.dumps(sub_msg)) self._log.info("subscribe_sent", topic="/spotMarket/tradeOrdersV2") - try: - await asyncio.wait_for(self._subscribe_ack_event.wait(), timeout=15.0) - self._log.info("subscription_confirmed", topic="/spotMarket/tradeOrdersV2") - except asyncio.TimeoutError: - self._log.warning("subscription_ack_timeout", topic="/spotMarket/tradeOrdersV2") + ack_id2 = str(int(time.time() * 1000) + 1) + evt2 = asyncio.Event() + self._pending_acks[ack_id2] = evt2 + bal_msg = { + "id": int(ack_id2), + "type": "subscribe", + "topic": "/account/balance", + "response": True, + "privateChannel": "true", + } + await self._ws.send(json.dumps(bal_msg)) + self._log.info("bal_subscribe_sent", topic="/account/balance") + + await asyncio.wait_for(evt2.wait(), timeout=5.0) + self._log.info("bal_subscribe_ack_received") async def _handle_message(self, msg: str) -> None: """Parse incoming WS message and dispatch fill events.""" @@ -310,16 +361,26 @@ class KuCoinWSClient: if msg_type == "ack": ack_id = str(data.get("id", "")) - if ack_id == self._subscribe_ack_id and self._subscribe_ack_event is not None: - self._subscribe_ack_event.set() - self._subscribe_ack_event = None - self._subscribe_ack_id = None - return - - if msg_type != "message": + evt = self._pending_acks.pop(ack_id, None) + if evt is not None: + evt.set() return subject = data.get("subject", "") + + if subject == "account.balance": + payload = data.get("data", {}) + currency = (payload.get("currency", "")).upper() + available_raw = payload.get("available") + if currency and available_raw is not None: + available = Decimal(str(available_raw)) + self._latest_balance[currency] = available + self._log.debug("balance_update", currency=currency, available=str(available)) + future = self._balance_futures.get(currency) + if future is not None and not future.done(): + future.set_result(True) + return + if subject != "orderChange": return diff --git a/src/evaluate.c b/src/evaluate.c index d89a812..0505d1e 100644 --- a/src/evaluate.c +++ b/src/evaluate.c @@ -220,7 +220,12 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive double min_quote = fmax(bms * price, qms); if (qi > 0) min_quote = ceil(min_quote / qi - 1e-12) * qi; - leg_min_starting[leg] = min_quote / cumulative_rate; + if (is_buy) { + leg_min_starting[leg] = min_quote / cumulative_rate; + } else { + double denom = cumulative_rate * (price > 0 ? price : 1.0); + leg_min_starting[leg] = min_quote / denom; + } cumulative_rate *= rates[leg]; } @@ -279,22 +284,36 @@ bool evaluate_symbol(evaluator_t *ev, uint16_t symbol_idx, int64_t t_sock_arrive double leg_output; if (is_buy) { - double ceiling = (qi > 0) ? ceil(leg_input / qi - 1e-12) * qi : leg_input; - double net = ceiling * ff; + double quote_input = (qi > 0) ? floor(leg_input / qi - 1e-12) * qi : leg_input; + double net = quote_input * ff; double base = (bi > 0) ? floor(net / price / bi + 1e-12) * bi : (net / price); - double quote_cost = (qi > 0) ? ceil(base * price / qi - 1e-12) * qi : (base * price); + double quote_cost = (qi > 0) ? floor(base * price / qi - 1e-12) * qi : (base * price); leg_quote_vol[leg] = quote_cost; sig.legs.legs[leg].quote_volume = quote_cost; leg_base_size[leg] = base; leg_output = base; } else { double base = (bi > 0) ? floor(leg_input / bi + 1e-12) * bi : leg_input; - double gross = (qi > 0) ? ceil(base * price / qi - 1e-12) * qi : (base * price); + double gross = (qi > 0) ? floor(base * price / qi - 1e-12) * qi : (base * price); leg_quote_vol[leg] = gross; sig.legs.legs[leg].quote_volume = gross; leg_base_size[leg] = base; leg_output = gross * ff; } + + /* Floor leg_output to the next leg's input increment so the + cascade always produces valid order parameters downstream. + For the last leg (leg 2) there is no next leg to constrain. */ + if (leg < 2) { + bool next_buy = !tri->use_bid[leg + 1]; + double next_incr = next_buy + ? tri->quote_increment[leg + 1] + : tri->base_increment[leg + 1]; + if (next_incr > 0) { + leg_output = floor(leg_output / next_incr - 1e-12) * next_incr; + } + } + leg_input = leg_output; } sig.starting_volume = leg_quote_vol[0];