""" Triangular arbitrage executor. Handles incoming opportunity signals from oe_em, enforces concurrency and isolation constraints, computes order sizes, and posts orders to KuCoin (via order_test in paper mode or real orders in live mode). Signal contract --------------- Each signal dict carries: legs list[dict] — 3 legs, each with pair, side, order_param, base_min_size, fee_rate, fee_currency books list[dict] — order book snapshots per leg (top-of-book) starting_volume str — quote-currency amount to deploy into leg 0 predicted_bps float — expected profit in basis points primary_quote str — e.g. "USDT", used for capital capping & isolation live bool — true = real orders, false = paper simulation book_ts_ms int — when the order book was sampled (stale detection) t_sock_arrive_ms int — when bytes arrived at SSL_read (true network arrival) t_arrive_ms int — when parsing + book update completed ts_ms int — when the signal was created t_eval_ms int — when the signal was evaluated _cancelled bool — set by cancel_execution to abort in-flight """ import asyncio import contextlib import json import logging import time import uuid from collections import deque from dataclasses import dataclass, field from datetime import datetime from decimal import Decimal, ROUND_DOWN from pathlib import Path from typing import Optional import aiohttp from aiohttp.connector import TCPConnector import structlog _D0 = Decimal("0") _D1 = Decimal("1") _D0_1 = Decimal("0.1") class _DualLogger: """ Thin wrapper that mirrors structlog output to a plain-text file via an async queue, keeping file I/O off the asyncio event loop. The plain-text log supplements structlog's JSON output (written to stdout) with a human-readable format that is useful for manual inspection. """ def __init__(self, structlog_logger, log_file: Optional[Path]) -> None: self._sl = structlog_logger self._log_file = log_file self._queue: Optional[asyncio.Queue] = None self._task: Optional[asyncio.Task] = None if log_file: self._queue = asyncio.Queue() self._task = asyncio.create_task(self._writer_loop()) async def _writer_loop(self) -> None: """Background task that drains the queue and writes lines to disk via a thread-pool executor so the event loop never blocks on I/O.""" loop = asyncio.get_running_loop() log_file = self._log_file def _write(line: str) -> None: with open(log_file, "a") as f: f.write(line) while True: item = None try: item = await self._queue.get() except asyncio.CancelledError: while not self._queue.empty(): try: self._queue.get_nowait() self._queue.task_done() except asyncio.QueueEmpty: break raise except Exception: continue self._queue.task_done() if item is None: break try: await loop.run_in_executor(None, _write, item) except Exception: pass @staticmethod def _fmt_value(v: object) -> str: if isinstance(v, (dict, list)): return json.dumps(v, default=str, separators=(",", ":")) return str(v) def _enqueue(self, level: str, event: str, **kw) -> None: """Publish a log entry to structlog and, if a log file is set, to the queue.""" sl_level = getattr(logging, level.upper(), logging.INFO) if not self._sl.is_enabled_for(sl_level): return getattr(self._sl, level)(event, **kw) if self._queue is None: return ts = f"{time.time():.0f}" level_up = level.upper() extras = " ".join( f"{k}={self._fmt_value(v)}" for k, v in kw.items() if not k.startswith("_") ) line = f"[{ts}] [{level_up:5}] {event} {extras}\n" try: self._queue.put_nowait(line) except asyncio.QueueFull: pass def write_plain(self, text: str) -> None: """Write a line directly to the plain-text log queue (no structlog side-effect).""" if self._queue is not None: try: self._queue.put_nowait(text + "\n") except asyncio.QueueFull: pass def debug(self, event: str, **kw) -> None: self._enqueue("debug", event, **kw) def info(self, event: str, **kw) -> None: self._enqueue("info", event, **kw) def warning(self, event: str, **kw) -> None: self._enqueue("warning", event, **kw) def error(self, event: str, **kw) -> None: self._enqueue("error", event, **kw) async def close(self) -> None: """Signal the writer to stop and wait for it to flush.""" if self._queue is not None: await self._queue.put(None) await self._queue.join() if self._task is not None: self._task.cancel() try: await self._task except asyncio.CancelledError: pass @dataclass class LegFill: """Record of a single leg's filled order.""" leg: int order_id: str side: str pair: str input_currency: str output_currency: str fee_currency: str filled_volume: Decimal deal_funds: Decimal avg_price: Decimal fee: Decimal latency_ms: float = 0.0 @dataclass class ExecutionReport: """Summary of a triangle execution attempt, emitted after each fill or failure.""" correlation_id: str triangle_key: list[str] status: str fills: list[LegFill] predicted_bps: float ts_ms: int timings: list[dict] = field(default_factory=list) profit: float = 0.0 effective_bps: float = 0.0 error: str = "" book_ts_ms: int = 0 book_tops: list[dict] = field(default_factory=list) @dataclass class InFlightExecution: """Track a triangle execution that is currently in flight.""" triangle_key: frozenset pair_symbols: frozenset primary_quote: str correlation_id: str signal: dict started_ts_ms: int last_trade_ts_ms: int = 0 class Executor: """ Triangular arbitrage order executor. Receives signals from oe_em via a Unix-domain socket, checks volume and concurrency constraints, then executes up to three legs in sequence. """ def __init__( self, kucoin_api, settings, ws_client = None, log_file: Optional[Path] = None, live_mode: bool = False, ) -> None: self._api = kucoin_api self._settings = settings self._ws_client = ws_client self._log_file = log_file self._live_mode = live_mode self._log = _DualLogger(structlog.get_logger().bind(component="executor"), log_file) self._in_flight: dict[str, InFlightExecution] = {} self._last_trade_ts_ms: dict[frozenset, int] = {} self._reports: deque[ExecutionReport] = deque(maxlen=1000) self._paused = False self._pause_lock = asyncio.Lock() self._isolation_lock = asyncio.Lock() self._uptime_ns: int = time.monotonic_ns() self._report_written = False self._client_oid_to_order_id: dict[str, str] = {} self._session: aiohttp.ClientSession | None = None self._session_timeout = aiohttp.ClientTimeout(sock_connect=5, sock_read=10) self._keepalive_task: asyncio.Task | None = None async def start(self) -> None: """Pre-initialize and warm up the aiohttp session so the first trade is not slowed.""" self._session = self._create_session() async def _warm_up(): try: async with self._session.get("https://api.kucoin.com/api/v1/time") as resp: await resp.text() except Exception: pass await _warm_up() self._keepalive_task = asyncio.create_task(self._keepalive_loop()) _KEEPALIVE_INTERVAL = 15.0 def _create_session(self) -> aiohttp.ClientSession: connector = TCPConnector( limit=10, limit_per_host=5, force_close=False, enable_cleanup_closed=True, keepalive_timeout=60, ttl_dns_cache=300, ) return aiohttp.ClientSession( timeout=self._session_timeout, connector=connector, ) async def _keepalive_loop(self) -> None: """Ping KuCoin periodically to keep the authenticated POST path warm.""" while True: try: await asyncio.sleep(self._KEEPALIVE_INTERVAL) if self._live_mode: async with self._session.get("https://api.kucoin.com/api/v1/time") as resp: await resp.text() else: await self._api.order_test( session=self._session, symbol="BTC-USDT", side="buy", order_type="market", funds=Decimal("1"), ) except asyncio.CancelledError: break except Exception: pass async def handle_signal(self, signal: dict) -> None: """ Public entry point for processing an opportunity signal. Catches all exceptions so they do not bubble into the socket server's task chain; emits a failed ExecutionReport on error. """ try: await self._handle_signal_impl(signal) except asyncio.CancelledError: raise except BaseException as e: correlation_id = signal.get("correlation_id", "") error_msg = str(e) self._log.error("signal_handler_error", error=error_msg, correlation_id=correlation_id) self._emit_report(ExecutionReport( correlation_id=correlation_id, triangle_key=signal.get("triangle_key", []), status="failed", fills=[], predicted_bps=0.0, ts_ms=int(time.time() * 1000), error=error_msg, )) async def _handle_signal_impl(self, signal: dict) -> None: """ Internal signal processing pipeline. Steps (in order): 1. Block if executor is paused. 2. Validate triangle_key and legs. 3. Reject stale signals (book_ts_ms older than last seen). 4. Reject signals where volume is below any leg's minimum. 5. Reject if no concurrency slot available. 6. Add to _in_flight and run _execute_triangle. """ async with self._pause_lock: if self._paused: self._log.debug("signal_dropped_paused", correlation_id=signal.get("correlation_id", "")) return correlation_id = signal.get("correlation_id", "") triangle_key_list = signal.get("triangle_key", []) triangle_key = frozenset(triangle_key_list) primary_quote = signal.get("primary_quote", "") legs = signal.get("legs", []) book_ts_ms = signal.get("book_ts_ms", 0) pair_symbols = frozenset(leg.get("pair", "") for leg in legs if leg.get("pair")) if self._is_blocked(triangle_key, pair_symbols, primary_quote): self._log.debug("signal_blocked", correlation_id=correlation_id, triangle_key=triangle_key_list) return if len(legs) != 3: self._log.warning("invalid_signal", correlation_id=correlation_id, msg="expected 3 legs") return stale_ts = self._last_trade_ts_ms.get(triangle_key, 0) if book_ts_ms > 0 and stale_ts > 0 and book_ts_ms < stale_ts: self._log.debug("signal_discarded_stale", correlation_id=correlation_id, book_ts_ms=book_ts_ms, stale_ts=stale_ts) return async with self._isolation_lock: if self._session is None or self._session.closed: self._session = self._create_session() if len(self._in_flight) >= self._settings.concurrent_slots: self._log.debug("signal_dropped_no_slot", correlation_id=correlation_id) return if self._is_blocked(triangle_key, pair_symbols, primary_quote): self._log.debug("signal_blocked", correlation_id=correlation_id, triangle_key=triangle_key_list) return in_flight = InFlightExecution( triangle_key=triangle_key, pair_symbols=pair_symbols, primary_quote=primary_quote, correlation_id=correlation_id, signal=signal, started_ts_ms=int(time.time() * 1000), ) self._in_flight[correlation_id] = in_flight self._log.debug("execute_triangle_starting", correlation_id=correlation_id) try: await self._execute_triangle(correlation_id, signal, in_flight) finally: async with self._isolation_lock: self._in_flight.pop(correlation_id, None) def _is_blocked(self, triangle_key: frozenset, pair_symbols: frozenset, primary_quote: str) -> bool: """ Return True if the given triangle_key, pair_symbols, or primary_quote already has an in-flight execution (isolation enforcement). """ for inf in list(self._in_flight.values()): if inf.triangle_key == triangle_key: return True if self._settings.enforce_same_base_isolation and inf.primary_quote == primary_quote: return True if self._settings.enforce_pair_isolation and pair_symbols & inf.pair_symbols: return True return False async def _execute_triangle( self, correlation_id: str, signal: dict, in_flight: InFlightExecution ) -> None: """ Execute the three legs of a triangular arbitrage signal. Live mode --------- Places real market orders on KuCoin via order_place and waits for fill events from the WS client. Uses the incoming volume for leg 0 and propagates the output of each leg as the input for the next. Paper mode ---------- Validates orders against KuCoin's order_test endpoint (ensures the pair/size/funds are valid) then simulates fills from the order-book snapshot attached to the signal. Fill quantities are computed by walking the book at the top level — no real capital is committed. In both modes the signal's ``_cancelled`` flag is checked between legs so cancel_execution takes effect promptly. All outcomes (filled, failed, aborted) produce an ExecutionReport. """ exec_start_ts = time.perf_counter() timings: list[dict] = [] predicted_bps = signal.get("predicted_bps", 0.0) executor_receive_ts_ms = signal.get("_receiver_ts_ms") or int(time.time() * 1000) signal_ts_ms = signal.get("ts_ms", 0) book_ts_ms = signal.get("book_ts_ms", 0) t_sock_arrive_ms = signal.get("t_sock_arrive_ms", 0) t_arrive_ms = signal.get("t_arrive_ms", 0) t_eval_ms = signal.get("t_eval_ms", 0) if book_ts_ms > 0: timings.append({"step": "t-2_book_snapshot", "elapsed_ms": -(executor_receive_ts_ms - book_ts_ms)}) if t_sock_arrive_ms > 0: timings.append({"step": "socket_arrived", "elapsed_ms": -(executor_receive_ts_ms - t_sock_arrive_ms)}) if t_arrive_ms > 0: timings.append({"step": "book_update_arrived", "elapsed_ms": -(executor_receive_ts_ms - t_arrive_ms)}) if t_eval_ms > 0: timings.append({"step": "t-1_eval_complete", "elapsed_ms": -(executor_receive_ts_ms - t_eval_ms)}) if signal_ts_ms > 0: timings.append({"step": "t_signal_created", "elapsed_ms": -(executor_receive_ts_ms - signal_ts_ms)}) timings.append({"step": "signal_received", "elapsed_ms": 0.0}) self._log.debug("execute_triangle_entered", correlation_id=correlation_id) legs = signal.get("legs", []) books = signal.get("books", []) book_tops = [] for bk in books[:3]: asks = bk.get("asks", []) bids = bk.get("bids", []) book_tops.append({ "ask_px": str(asks[0]["price"]) if asks else "", "ask_sz": str(asks[0]["size"]) if asks else "", "bid_px": str(bids[0]["price"]) if bids else "", "bid_sz": str(bids[0]["size"]) if bids else "", }) live = signal.get("live", False) log_prefix = "" if live else "[PAPER] " fills: list[LegFill] = [] if self._session is None or self._session.closed: self._session = self._create_session() session = self._session for i, leg in enumerate(legs): if signal.get("_cancelled"): self._log.info("execution_cancelled", correlation_id=correlation_id, completed_legs=len(fills)) report = ExecutionReport( correlation_id=correlation_id, triangle_key=signal.get("triangle_key", []), status="aborted", fills=fills, predicted_bps=0.0, ts_ms=int(time.time() * 1000), error="cancelled", ) self._emit_report(report) return pair = leg.get("pair", "") side = leg.get("side", "") order_param = Decimal(str(leg.get("order_param", "0"))) base_min_size = Decimal(str(leg.get("base_min_size", "0"))) base_increment = Decimal(str(leg.get("base_increment", "0"))) quote_increment = Decimal(str(leg.get("quote_increment", "0"))) fee_rate = Decimal(str(leg.get("fee_rate", "0.001"))) fee_ccy = leg.get("fee_currency", "") base_ccy = pair.split("-")[0] quote_ccy = pair.split("-")[-1] if side == "buy": input_ccy = quote_ccy output_ccy = base_ccy else: input_ccy = base_ccy output_ccy = quote_ccy latency_ms = 0.0 if live: # --- LIVE MODE --- client_oid = f"t{correlation_id[-24:]}-{i}-{uuid.uuid4().hex[:4]}" t0 = time.perf_counter() if self._ws_client is None or not self._ws_client.is_connected: self._log.error( "live_mode_ws_not_connected", correlation_id=correlation_id, leg=i, pair=pair, ) report = ExecutionReport( correlation_id=correlation_id, triangle_key=signal.get("triangle_key", []), status="failed", fills=fills, predicted_bps=0.0, ts_ms=int(time.time() * 1000), error="ws_not_connected", ) self._emit_report(report) return order_fired_elapsed = (time.perf_counter() - exec_start_ts) * 1000 timings.append({"step": f"leg{i}_order_fired", "elapsed_ms": round(order_fired_elapsed, 2)}) if i == 0: input_vol = order_param else: prev = fills[i - 1] input_vol = prev.filled_volume inc = quote_increment if side == "buy" else base_increment if inc > 0: input_vol = (input_vol / inc).to_integral_value(rounding=ROUND_DOWN) * inc if side == "buy": ok, err_msg, order_id = await self._api.order_place( session=session, symbol=pair, side=side, order_type="market", funds=input_vol, client_oid=client_oid, ) else: ok, err_msg, order_id = await self._api.order_place( session=session, symbol=pair, side=side, order_type="market", size=input_vol, client_oid=client_oid, ) place_latency_ms = (time.perf_counter() - t0) * 1000 self._log.info( f"{log_prefix}order_placed", correlation_id=correlation_id, leg=i, pair=pair, side=side, order_id=order_id, client_oid=client_oid, volume=str(input_vol), place_latency_ms=round(place_latency_ms, 2), ) self._log.write_plain( f"{time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime())}{int(time.time() * 1000) % 1000:03d}Z " f"ORDER | corr={correlation_id} | leg{i} | {pair} | {side} | " f"vol={str(input_vol)} | order_id={order_id} | lat={place_latency_ms:.1f}ms" ) if not ok: fills.append(LegFill( leg=i, order_id=order_id or "", side=side, pair=pair, input_currency=input_ccy, output_currency=output_ccy, fee_currency=fee_ccy, filled_volume=input_vol, deal_funds=_D0, avg_price=_D0, fee=_D0, latency_ms=round(place_latency_ms, 2), )) self._log.error( f"{log_prefix}order_rejected", correlation_id=correlation_id, leg=i, pair=pair, side=side, volume=str(input_vol), error=err_msg, ) self._log.write_plain( f"{time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime())}{int(time.time() * 1000) % 1000:03d}Z " f"REJECTED | corr={correlation_id} | leg{i} | {pair} | {side} | " f"vol={str(input_vol)} | error={err_msg} | lat={place_latency_ms:.1f}ms" ) report = ExecutionReport( correlation_id=correlation_id, triangle_key=signal.get("triangle_key", []), status="failed", fills=fills, predicted_bps=predicted_bps, ts_ms=int(time.time() * 1000), timings=timings, error=err_msg or "order_rejected", book_tops=book_tops, ) self._emit_report(report) return self._client_oid_to_order_id[client_oid] = order_id or "" fill_timeout_ms = self._settings.fill_timeout_ms fill_ok, fill_data = await self._ws_client.await_fill( client_oid, fill_timeout_ms ) fill_latency_ms = (time.perf_counter() - t0) * 1000 latency_ms = fill_latency_ms if signal.get("_cancelled"): self._log.info( "execution_cancelled", correlation_id=correlation_id, completed_legs=len(fills), ) report = ExecutionReport( correlation_id=correlation_id, triangle_key=signal.get("triangle_key", []), status="aborted", fills=fills, predicted_bps=predicted_bps, ts_ms=int(time.time() * 1000), timings=timings, error="cancelled", ) self._emit_report(report) return if not fill_ok: elapsed_ms = (time.perf_counter() - exec_start_ts) * 1000 self._log.error( f"{log_prefix}fill_timeout", correlation_id=correlation_id, leg=i, pair=pair, client_oid=client_oid, order_id=order_id, fill_latency_ms=round(fill_latency_ms, 2), total_elapsed_ms=round(elapsed_ms, 2), ) report = ExecutionReport( correlation_id=correlation_id, triangle_key=signal.get("triangle_key", []), status="failed", fills=fills, predicted_bps=predicted_bps, ts_ms=int(time.time() * 1000), timings=timings, error=f"fill_timeout_leg{i}", book_tops=book_tops, ) self._emit_report(report) return fill_received_elapsed = (time.perf_counter() - exec_start_ts) * 1000 timings.append({"step": f"leg{i}_fill_received", "elapsed_ms": round(fill_received_elapsed, 2)}) order_id = fill_data.get("order_id", order_id or "") total_size = fill_data.get("total_size", Decimal("0")) total_funds = fill_data.get("total_funds", Decimal("0")) weighted_avg_price = fill_data.get("weighted_avg_price", _D0) # Fall back to book top if WS didn't provide a price (defensive). book_snapshot = books[i] if i < len(books) else {} if weighted_avg_price <= 0: if side == "buy": asks = book_snapshot.get("asks", []) weighted_avg_price = Decimal(str(asks[0]["price"])) if asks else _D0 else: bids = book_snapshot.get("bids", []) weighted_avg_price = Decimal(str(bids[0]["price"])) if bids else _D0 avg_price = weighted_avg_price if weighted_avg_price > 0 else _D0 fee = _D0 filled_vol_val = total_size if side == "buy" else total_funds fills.append(LegFill( leg=i, order_id=order_id, side=side, pair=pair, input_currency=input_ccy, output_currency=output_ccy, fee_currency=fee_ccy, filled_volume=filled_vol_val, deal_funds=total_funds, avg_price=avg_price, fee=fee, latency_ms=latency_ms, )) self._log.write_plain( f"{time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime())}{int(time.time() * 1000) % 1000:03d}Z " f"FILL | corr={correlation_id} | leg{i} | {pair} | {side} | " f"out={str(filled_vol_val)}@{str(avg_price)} | " f"fee={str(fee)} {fee_ccy} | lat={latency_ms:.1f}ms" ) if i < 2: await self._ws_client.await_balance( output_ccy, fills[-1].filled_volume, 2000 ) in_flight.last_trade_ts_ms = int(time.time() * 1000) continue # --- PAPER MODE --- # The fused_engine already sized everything (per-leg order_param, # fee rates, precision increments). We only validate via # order_test and simulate fills from the attached books. self._log.debug("order_test_calling", correlation_id=correlation_id, leg=i, pair=pair, side=side, order_param=str(order_param)) t0 = time.perf_counter() order_fired_elapsed = (t0 - exec_start_ts) * 1000 timings.append({"step": f"leg{i}_order_fired", "elapsed_ms": round(order_fired_elapsed, 2)}) if side == "buy": ok, err_msg, _ = await self._api.order_test( session=session, symbol=pair, side=side, order_type="market", funds=order_param, ) else: ok, err_msg, _ = await self._api.order_test( session=session, symbol=pair, side=side, order_type="market", size=order_param, ) latency_ms = (time.perf_counter() - t0) * 1000 fill_elapsed = (time.perf_counter() - exec_start_ts) * 1000 timings.append({"step": f"leg{i}_fill_received", "elapsed_ms": round(fill_elapsed, 2)}) self._log._sl.info(f"{log_prefix}order_latency", correlation_id=correlation_id, leg=i, pair=pair, latency_ms=round(latency_ms, 2)) if not ok: self._log.error(f"{log_prefix}order_rejected", correlation_id=correlation_id, leg=i, pair=pair, error=err_msg) report = ExecutionReport( correlation_id=correlation_id, triangle_key=signal.get("triangle_key", []), status="failed", fills=fills, predicted_bps=0.0, ts_ms=int(time.time() * 1000), error=err_msg or "order_rejected", ) self._emit_report(report) return # Paper order succeeded — assign a synthetic order ID so the fill # pipeline has something to track. order_id = f"paper-{uuid.uuid4().hex[:12]}" if signal.get("_cancelled"): self._log.info("execution_cancelled", correlation_id=correlation_id, completed_legs=len(fills)) report = ExecutionReport( correlation_id=correlation_id, triangle_key=signal.get("triangle_key", []), status="aborted", fills=fills, predicted_bps=0.0, ts_ms=int(time.time() * 1000), error="cancelled", ) self._emit_report(report) return book_snapshot = books[i] if i < len(books) else {} # Simulate fill from top-of-book: use the best ask (buy) or best bid (sell). if side == "buy": asks = book_snapshot.get("asks", []) price = Decimal(str(asks[0]["price"])) if asks else _D0 funds = order_param if quote_increment > 0: funds = (funds / quote_increment).to_integral_value(rounding=ROUND_DOWN) * quote_increment base_vol = funds / price if price > 0 else _D0 if base_increment > 0: base_vol = (base_vol / base_increment).to_integral_value(rounding=ROUND_DOWN) * base_increment deal_funds = funds fee = base_vol * fee_rate if fee_ccy == base_ccy else order_param * fee_rate filled_volume = base_vol # Reject if simulated fill is below the exchange minimum. if base_min_size > 0 and base_vol < base_min_size: self._log.info("execution_aborted_below_min", correlation_id=correlation_id, leg=i, pair=pair, base_vol=str(base_vol), min_base=str(base_min_size), ) report = ExecutionReport( correlation_id=correlation_id, triangle_key=signal.get("triangle_key", []), status="aborted", fills=fills, predicted_bps=0.0, ts_ms=int(time.time() * 1000), error="volume_below_minimum", ) self._emit_report(report) return else: bids = book_snapshot.get("bids", []) price = Decimal(str(bids[0]["price"])) if bids else _D0 base_vol = order_param if base_increment > 0: base_vol = (base_vol / base_increment).to_integral_value(rounding=ROUND_DOWN) * base_increment quote_vol = base_vol * price if price > 0 else _D0 # Fee depends on whether KuCoin charges it in base or quote. if fee_ccy == base_ccy: fee = base_vol * fee_rate deal_funds = quote_vol else: fee = quote_vol * fee_rate deal_funds = quote_vol * (_D1 - fee_rate) filled_volume = quote_vol if base_min_size > 0 and order_param < base_min_size: self._log.info("execution_aborted_below_min", correlation_id=correlation_id, leg=i, pair=pair, base_vol=str(order_param), min_base=str(base_min_size), ) report = ExecutionReport( correlation_id=correlation_id, triangle_key=signal.get("triangle_key", []), status="aborted", fills=fills, predicted_bps=0.0, ts_ms=int(time.time() * 1000), error="volume_below_minimum", ) self._emit_report(report) return avg_price = price fills.append(LegFill( leg=i, order_id=order_id, side=side, pair=pair, input_currency=input_ccy, output_currency=output_ccy, fee_currency=fee_ccy, filled_volume=filled_volume, deal_funds=deal_funds, avg_price=avg_price, fee=fee, latency_ms=latency_ms, )) in_flight.last_trade_ts_ms = int(time.time() * 1000) # Compute actual profit in the primary quote currency. # Leg 2 output: if buy, the received base (filled_volume); if sell, the received # quote (deal_funds, already net when fee is in quote). if fills[2].side == "buy": leg2_out = fills[2].filled_volume if fills[2].fee_currency == fills[2].output_currency: leg2_out -= fills[2].fee else: leg2_out = fills[2].deal_funds # Leg 0 input: if buy, quote spent (deal_funds); if sell, base sold. if fills[0].side == "buy": leg0_in = fills[0].deal_funds else: leg0_in = fills[0].filled_volume profit = float(leg2_out - leg0_in) effective_bps = (profit / float(leg0_in)) * 10000 if leg0_in else 0.0 total_exec_ms = (time.perf_counter() - exec_start_ts) * 1000 timings.append({"step": "execution_complete", "elapsed_ms": round(total_exec_ms, 2)}) self._log._sl.info("execution_timing", correlation_id=correlation_id, timings=timings) report = ExecutionReport( correlation_id=correlation_id, triangle_key=signal.get("triangle_key", []), status="filled", fills=fills, predicted_bps=predicted_bps, profit=profit, effective_bps=effective_bps, ts_ms=int(time.time() * 1000), timings=timings, book_ts_ms=book_ts_ms, book_tops=book_tops, ) self._last_trade_ts_ms[in_flight.triangle_key] = report.ts_ms self._emit_report(report) def _emit_report(self, report: ExecutionReport) -> None: """Persist an ExecutionReport to the in-memory deque and write to the log file.""" fills_lines = [] for f in report.fills: fills_lines.append( f"L{f.leg}:{f.side} {f.pair} {f.input_currency}->{f.output_currency} " f"{f.filled_volume}@{f.avg_price}(fee={f.fee} {f.fee_currency} lat={f.latency_ms:.1f}ms)" ) fills_repr = ", ".join(fills_lines) self._reports.append(report) error_suffix = f" | error={report.error}" if report.error else "" profit_repr = f"profit={report.profit:.4f}" timing_parts = " ".join(f"{t['step']}={t['elapsed_ms']:.1f}ms" for t in report.timings) if report.timings else "" bt_lines = [] for bt in report.book_tops: bt_lines.append(f"{bt.get('ask_px','?')}/{bt.get('bid_px','?')}") book_top_repr = " | ".join(bt_lines) if bt_lines else "" ts_iso = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.") + f"{report.ts_ms % 1000:03d}Z" books_suffix = f" | books=[{book_top_repr}]" if book_top_repr else "" msg = ( f"{ts_iso} {report.status.upper()} | corr={report.correlation_id} | " f"triangle={report.triangle_key} | predicted_bps={report.predicted_bps:.2f} | " f"effective_bps={report.effective_bps:.2f} | " f"book_ts={report.book_ts_ms} | " f"{profit_repr} | timings=[{timing_parts}] | fills=[{fills_repr}]{books_suffix}{error_suffix}" ) self._log.write_plain(msg) print( f"{ts_iso} {report.status.upper()} | corr={report.correlation_id} | " f"triangle={report.triangle_key} | predicted_bps={report.predicted_bps:.2f} | " f"effective_bps={report.effective_bps:.2f} | " f"book_ts={report.book_ts_ms} | " f"{profit_repr} | fills=[{fills_repr}]{books_suffix}" f"{f' | error={report.error}' if report.error else ''}", flush=True, ) async def cancel_execution(self, correlation_id: str) -> bool: """ Request cancellation of an in-flight execution by setting _cancelled on the signal dict. Returns True if the execution was found. """ if correlation_id in self._in_flight: self._log.info("cancel_requested", correlation_id=correlation_id) self._in_flight[correlation_id].signal["_cancelled"] = True return True return False async def close(self) -> None: """Close the aiohttp session and flush the log writer.""" if self._keepalive_task is not None and not self._keepalive_task.done(): self._keepalive_task.cancel() try: await self._keepalive_task except asyncio.CancelledError: pass self._keepalive_task = None if self._session is not None and not self._session.closed: await self._session.close() await self._log.close() async def pause(self) -> None: """Block the executor from accepting new signals.""" async with self._pause_lock: self._paused = True self._log.info("executor_paused") async def resume(self) -> None: """Unblock the executor to start accepting signals again.""" async with self._pause_lock: self._paused = False self._log.info("executor_resumed") def get_uptime_seconds(self) -> float: """Return process uptime in seconds (monotonic clock, not wall time).""" return (time.monotonic_ns() - self._uptime_ns) / 1e9 def set_concurrent_slots(self, slots: int) -> None: """ Dynamically adjust the maximum number of concurrent in-flight triangles. Thread-safe: updates self._settings.concurrent_slots only. """ self._settings.concurrent_slots = slots self._log.info("concurrent_slots_updated", slots=slots) def get_in_flight(self) -> list[dict]: return [ { "correlation_id": inf.correlation_id, "triangle_key": list(inf.triangle_key), "pair_symbols": list(inf.pair_symbols), "primary_quote": inf.primary_quote, "started_ts_ms": inf.started_ts_ms, } for inf in self._in_flight.values() ] def get_reports(self, limit: int = 50) -> list[dict]: reports = list(self._reports)[-limit:] return [ { "correlation_id": r.correlation_id, "triangle_key": r.triangle_key, "status": r.status, "fills": [{"leg": f.leg, "vol": str(f.filled_volume), "deal_funds": str(f.deal_funds), "price": str(f.avg_price), "fee": str(f.fee), "fee_currency": f.fee_currency} for f in r.fills], "predicted_bps": r.predicted_bps, "effective_bps": r.effective_bps, "ts_ms": r.ts_ms, **({"error": r.error} if r.error else {}), } for r in reports ] def get_config(self) -> dict: return { "live_mode": self._live_mode, "concurrent_slots": self._settings.concurrent_slots, "enforce_same_base_isolation": self._settings.enforce_same_base_isolation, "socket_path": str(self._settings.socket_path), }