1008 lines
42 KiB
Python
1008 lines
42 KiB
Python
"""
|
|
Triangular arbitrage executor.
|
|
|
|
Handles incoming opportunity signals from oe_em, enforces concurrency and
|
|
isolation constraints, computes order sizes, and posts orders to KuCoin
|
|
(via order_test in paper mode or real orders in live mode).
|
|
|
|
Signal contract
|
|
---------------
|
|
Each signal dict carries:
|
|
legs list[dict] — 3 legs, each with pair, side, order_param,
|
|
base_min_size, fee_rate, fee_currency
|
|
books list[dict] — order book snapshots per leg (top-of-book)
|
|
starting_volume str — quote-currency amount to deploy into leg 0
|
|
predicted_bps float — expected profit in basis points
|
|
primary_quote str — e.g. "USDT", used for capital capping & isolation
|
|
live bool — true = real orders, false = paper simulation
|
|
book_ts_ms int — when the order book was sampled (stale detection)
|
|
t_sock_arrive_ms int — when bytes arrived at SSL_read (true network arrival)
|
|
t_arrive_ms int — when parsing + book update completed
|
|
ts_ms int — when the signal was created
|
|
t_eval_ms int — when the signal was evaluated
|
|
_cancelled bool — set by cancel_execution to abort in-flight
|
|
"""
|
|
import asyncio
|
|
import contextlib
|
|
import json
|
|
import logging
|
|
import time
|
|
import uuid
|
|
from collections import deque
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from decimal import Decimal, ROUND_DOWN
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import aiohttp
|
|
from aiohttp.connector import TCPConnector
|
|
import structlog
|
|
|
|
|
|
_D0 = Decimal("0")
|
|
_D1 = Decimal("1")
|
|
_D0_1 = Decimal("0.1")
|
|
|
|
|
|
class _DualLogger:
|
|
"""
|
|
Thin wrapper that mirrors structlog output to a plain-text file via an
|
|
async queue, keeping file I/O off the asyncio event loop.
|
|
|
|
The plain-text log supplements structlog's JSON output (written to
|
|
stdout) with a human-readable format that is useful for manual inspection.
|
|
"""
|
|
|
|
def __init__(self, structlog_logger, log_file: Optional[Path]) -> None:
|
|
self._sl = structlog_logger
|
|
self._log_file = log_file
|
|
self._queue: Optional[asyncio.Queue] = None
|
|
self._task: Optional[asyncio.Task] = None
|
|
if log_file:
|
|
self._queue = asyncio.Queue()
|
|
self._task = asyncio.create_task(self._writer_loop())
|
|
|
|
async def _writer_loop(self) -> None:
|
|
"""Background task that drains the queue and writes lines to disk
|
|
via a thread-pool executor so the event loop never blocks on I/O."""
|
|
loop = asyncio.get_running_loop()
|
|
log_file = self._log_file
|
|
|
|
def _write(line: str) -> None:
|
|
with open(log_file, "a") as f:
|
|
f.write(line)
|
|
|
|
while True:
|
|
item = None
|
|
try:
|
|
item = await self._queue.get()
|
|
except asyncio.CancelledError:
|
|
while not self._queue.empty():
|
|
try:
|
|
self._queue.get_nowait()
|
|
self._queue.task_done()
|
|
except asyncio.QueueEmpty:
|
|
break
|
|
raise
|
|
except Exception:
|
|
continue
|
|
|
|
self._queue.task_done()
|
|
|
|
if item is None:
|
|
break
|
|
|
|
try:
|
|
await loop.run_in_executor(None, _write, item)
|
|
except Exception:
|
|
pass
|
|
|
|
@staticmethod
|
|
def _fmt_value(v: object) -> str:
|
|
if isinstance(v, (dict, list)):
|
|
return json.dumps(v, default=str, separators=(",", ":"))
|
|
return str(v)
|
|
|
|
def _enqueue(self, level: str, event: str, **kw) -> None:
|
|
"""Publish a log entry to structlog and, if a log file is set, to the queue."""
|
|
sl_level = getattr(logging, level.upper(), logging.INFO)
|
|
if not self._sl.is_enabled_for(sl_level):
|
|
return
|
|
getattr(self._sl, level)(event, **kw)
|
|
if self._queue is None:
|
|
return
|
|
ts = f"{time.time():.0f}"
|
|
level_up = level.upper()
|
|
extras = " ".join(
|
|
f"{k}={self._fmt_value(v)}"
|
|
for k, v in kw.items()
|
|
if not k.startswith("_")
|
|
)
|
|
line = f"[{ts}] [{level_up:5}] {event} {extras}\n"
|
|
try:
|
|
self._queue.put_nowait(line)
|
|
except asyncio.QueueFull:
|
|
pass
|
|
|
|
def write_plain(self, text: str) -> None:
|
|
"""Write a line directly to the plain-text log queue (no structlog side-effect)."""
|
|
if self._queue is not None:
|
|
try:
|
|
self._queue.put_nowait(text + "\n")
|
|
except asyncio.QueueFull:
|
|
pass
|
|
|
|
def debug(self, event: str, **kw) -> None:
|
|
self._enqueue("debug", event, **kw)
|
|
|
|
def info(self, event: str, **kw) -> None:
|
|
self._enqueue("info", event, **kw)
|
|
|
|
def warning(self, event: str, **kw) -> None:
|
|
self._enqueue("warning", event, **kw)
|
|
|
|
def error(self, event: str, **kw) -> None:
|
|
self._enqueue("error", event, **kw)
|
|
|
|
async def close(self) -> None:
|
|
"""Signal the writer to stop and wait for it to flush."""
|
|
if self._queue is not None:
|
|
await self._queue.put(None)
|
|
await self._queue.join()
|
|
if self._task is not None:
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class LegFill:
|
|
"""Record of a single leg's filled order."""
|
|
leg: int
|
|
order_id: str
|
|
side: str
|
|
pair: str
|
|
input_currency: str
|
|
output_currency: str
|
|
fee_currency: str
|
|
filled_volume: Decimal
|
|
deal_funds: Decimal
|
|
avg_price: Decimal
|
|
fee: Decimal
|
|
latency_ms: float = 0.0
|
|
|
|
|
|
@dataclass
|
|
class ExecutionReport:
|
|
"""Summary of a triangle execution attempt, emitted after each fill or failure."""
|
|
correlation_id: str
|
|
triangle_key: list[str]
|
|
status: str
|
|
fills: list[LegFill]
|
|
predicted_bps: float
|
|
ts_ms: int
|
|
timings: list[dict] = field(default_factory=list)
|
|
profit: float = 0.0
|
|
effective_bps: float = 0.0
|
|
error: str = ""
|
|
book_ts_ms: int = 0
|
|
book_tops: list[dict] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class InFlightExecution:
|
|
"""Track a triangle execution that is currently in flight."""
|
|
triangle_key: frozenset
|
|
pair_symbols: frozenset
|
|
primary_quote: str
|
|
correlation_id: str
|
|
signal: dict
|
|
started_ts_ms: int
|
|
last_trade_ts_ms: int = 0
|
|
|
|
|
|
class Executor:
|
|
"""
|
|
Triangular arbitrage order executor.
|
|
|
|
Receives signals from oe_em via a Unix-domain socket, checks volume and
|
|
concurrency constraints, then executes up to three legs in sequence.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
kucoin_api,
|
|
settings,
|
|
ws_client = None,
|
|
log_file: Optional[Path] = None,
|
|
live_mode: bool = False,
|
|
) -> None:
|
|
self._api = kucoin_api
|
|
self._settings = settings
|
|
self._ws_client = ws_client
|
|
self._log_file = log_file
|
|
self._live_mode = live_mode
|
|
self._log = _DualLogger(structlog.get_logger().bind(component="executor"), log_file)
|
|
self._in_flight: dict[str, InFlightExecution] = {}
|
|
self._last_trade_ts_ms: dict[frozenset, int] = {}
|
|
self._reports: deque[ExecutionReport] = deque(maxlen=1000)
|
|
self._paused = False
|
|
self._pause_lock = asyncio.Lock()
|
|
self._isolation_lock = asyncio.Lock()
|
|
self._uptime_ns: int = time.monotonic_ns()
|
|
self._report_written = False
|
|
self._client_oid_to_order_id: dict[str, str] = {}
|
|
self._session: aiohttp.ClientSession | None = None
|
|
self._session_timeout = aiohttp.ClientTimeout(sock_connect=5, sock_read=10)
|
|
self._keepalive_task: asyncio.Task | None = None
|
|
|
|
async def start(self) -> None:
|
|
"""Pre-initialize and warm up the aiohttp session so the first trade is not slowed."""
|
|
self._session = self._create_session()
|
|
await self._api.warmup_session(self._session)
|
|
|
|
self._keepalive_task = asyncio.create_task(self._keepalive_loop())
|
|
|
|
_KEEPALIVE_INTERVAL = 30.0
|
|
|
|
def _create_session(self) -> aiohttp.ClientSession:
|
|
connector = TCPConnector(
|
|
limit=10,
|
|
limit_per_host=5,
|
|
force_close=False,
|
|
enable_cleanup_closed=True,
|
|
keepalive_timeout=60,
|
|
ttl_dns_cache=300,
|
|
)
|
|
return aiohttp.ClientSession(
|
|
timeout=self._session_timeout,
|
|
connector=connector,
|
|
)
|
|
|
|
async def _keepalive_loop(self) -> None:
|
|
"""Ping KuCoin periodically to keep the authenticated connection pool warm."""
|
|
while True:
|
|
try:
|
|
await asyncio.sleep(self._KEEPALIVE_INTERVAL)
|
|
if self._live_mode:
|
|
await self._api.warmup_session(self._session)
|
|
else:
|
|
await self._api.order_test(
|
|
session=self._session,
|
|
symbol="BTC-USDT",
|
|
side="buy",
|
|
order_type="market",
|
|
funds=Decimal("1"),
|
|
)
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
async def handle_signal(self, signal: dict) -> None:
|
|
"""
|
|
Public entry point for processing an opportunity signal.
|
|
|
|
Catches all exceptions so they do not bubble into the socket server's
|
|
task chain; emits a failed ExecutionReport on error.
|
|
"""
|
|
try:
|
|
await self._handle_signal_impl(signal)
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except BaseException as e:
|
|
correlation_id = signal.get("correlation_id", "")
|
|
error_msg = str(e)
|
|
self._log.error("signal_handler_error", error=error_msg, correlation_id=correlation_id)
|
|
self._emit_report(ExecutionReport(
|
|
correlation_id=correlation_id,
|
|
triangle_key=signal.get("triangle_key", []),
|
|
status="failed",
|
|
fills=[],
|
|
predicted_bps=0.0,
|
|
ts_ms=int(time.time() * 1000),
|
|
error=error_msg,
|
|
))
|
|
|
|
async def _handle_signal_impl(self, signal: dict) -> None:
|
|
"""
|
|
Internal signal processing pipeline.
|
|
|
|
Steps (in order):
|
|
1. Block if executor is paused.
|
|
2. Validate triangle_key and legs.
|
|
3. Reject stale signals (book_ts_ms older than last seen).
|
|
4. Reject signals where volume is below any leg's minimum.
|
|
5. Reject if no concurrency slot available.
|
|
6. Add to _in_flight and run _execute_triangle.
|
|
"""
|
|
async with self._pause_lock:
|
|
if self._paused:
|
|
self._log.debug("signal_dropped_paused", correlation_id=signal.get("correlation_id", ""))
|
|
return
|
|
|
|
correlation_id = signal.get("correlation_id", "")
|
|
triangle_key_list = signal.get("triangle_key", [])
|
|
triangle_key = frozenset(triangle_key_list)
|
|
primary_quote = signal.get("primary_quote", "")
|
|
legs = signal.get("legs", [])
|
|
book_ts_ms = signal.get("book_ts_ms", 0)
|
|
|
|
pair_symbols = frozenset(leg.get("pair", "") for leg in legs if leg.get("pair"))
|
|
|
|
if self._is_blocked(triangle_key, pair_symbols, primary_quote):
|
|
self._log.debug("signal_blocked", correlation_id=correlation_id, triangle_key=triangle_key_list)
|
|
return
|
|
|
|
if len(legs) != 3:
|
|
self._log.warning("invalid_signal", correlation_id=correlation_id, msg="expected 3 legs")
|
|
return
|
|
|
|
stale_ts = self._last_trade_ts_ms.get(triangle_key, 0)
|
|
if book_ts_ms > 0 and stale_ts > 0 and book_ts_ms < stale_ts:
|
|
self._log.debug("signal_discarded_stale", correlation_id=correlation_id, book_ts_ms=book_ts_ms, stale_ts=stale_ts)
|
|
return
|
|
|
|
async with self._isolation_lock:
|
|
if self._session is None or self._session.closed:
|
|
self._session = self._create_session()
|
|
if len(self._in_flight) >= self._settings.concurrent_slots:
|
|
self._log.debug("signal_dropped_no_slot", correlation_id=correlation_id)
|
|
return
|
|
|
|
if self._is_blocked(triangle_key, pair_symbols, primary_quote):
|
|
self._log.debug("signal_blocked", correlation_id=correlation_id, triangle_key=triangle_key_list)
|
|
return
|
|
|
|
in_flight = InFlightExecution(
|
|
triangle_key=triangle_key,
|
|
pair_symbols=pair_symbols,
|
|
primary_quote=primary_quote,
|
|
correlation_id=correlation_id,
|
|
signal=signal,
|
|
started_ts_ms=int(time.time() * 1000),
|
|
)
|
|
self._in_flight[correlation_id] = in_flight
|
|
|
|
self._log.debug("execute_triangle_starting", correlation_id=correlation_id)
|
|
try:
|
|
await self._execute_triangle(correlation_id, signal, in_flight)
|
|
finally:
|
|
async with self._isolation_lock:
|
|
self._in_flight.pop(correlation_id, None)
|
|
|
|
def _is_blocked(self, triangle_key: frozenset, pair_symbols: frozenset, primary_quote: str) -> bool:
|
|
"""
|
|
Return True if the given triangle_key, pair_symbols, or primary_quote
|
|
already has an in-flight execution (isolation enforcement).
|
|
"""
|
|
for inf in list(self._in_flight.values()):
|
|
if inf.triangle_key == triangle_key:
|
|
return True
|
|
if self._settings.enforce_same_base_isolation and inf.primary_quote == primary_quote:
|
|
return True
|
|
if self._settings.enforce_pair_isolation and pair_symbols & inf.pair_symbols:
|
|
return True
|
|
return False
|
|
|
|
async def _execute_triangle(
|
|
self, correlation_id: str, signal: dict, in_flight: InFlightExecution
|
|
) -> None:
|
|
"""
|
|
Execute the three legs of a triangular arbitrage signal.
|
|
|
|
Live mode
|
|
---------
|
|
Places real market orders on KuCoin via order_place and waits for
|
|
fill events from the WS client. Uses the incoming volume for leg 0
|
|
and propagates the output of each leg as the input for the next.
|
|
|
|
Paper mode
|
|
----------
|
|
Validates orders against KuCoin's order_test endpoint (ensures the
|
|
pair/size/funds are valid) then simulates fills from the order-book
|
|
snapshot attached to the signal. Fill quantities are computed by
|
|
walking the book at the top level — no real capital is committed.
|
|
|
|
In both modes the signal's ``_cancelled`` flag is checked between
|
|
legs so cancel_execution takes effect promptly. All outcomes
|
|
(filled, failed, aborted) produce an ExecutionReport.
|
|
"""
|
|
exec_start_ts = time.perf_counter()
|
|
timings: list[dict] = []
|
|
predicted_bps = signal.get("predicted_bps", 0.0)
|
|
executor_receive_ts_ms = signal.get("_receiver_ts_ms") or int(time.time() * 1000)
|
|
signal_ts_ms = signal.get("ts_ms", 0)
|
|
book_ts_ms = signal.get("book_ts_ms", 0)
|
|
t_sock_arrive_ms = signal.get("t_sock_arrive_ms", 0)
|
|
t_arrive_ms = signal.get("t_arrive_ms", 0)
|
|
t_eval_ms = signal.get("t_eval_ms", 0)
|
|
if book_ts_ms > 0:
|
|
timings.append({"step": "t-2_book_snapshot", "elapsed_ms": -(executor_receive_ts_ms - book_ts_ms)})
|
|
if t_sock_arrive_ms > 0:
|
|
timings.append({"step": "socket_arrived", "elapsed_ms": -(executor_receive_ts_ms - t_sock_arrive_ms)})
|
|
if t_arrive_ms > 0:
|
|
timings.append({"step": "book_update_arrived", "elapsed_ms": -(executor_receive_ts_ms - t_arrive_ms)})
|
|
if t_eval_ms > 0:
|
|
timings.append({"step": "t-1_eval_complete", "elapsed_ms": -(executor_receive_ts_ms - t_eval_ms)})
|
|
if signal_ts_ms > 0:
|
|
timings.append({"step": "t_signal_created", "elapsed_ms": -(executor_receive_ts_ms - signal_ts_ms)})
|
|
timings.append({"step": "signal_received", "elapsed_ms": 0.0})
|
|
self._log.debug("execute_triangle_entered", correlation_id=correlation_id)
|
|
legs = signal.get("legs", [])
|
|
books = signal.get("books", [])
|
|
book_tops = []
|
|
for bk in books[:3]:
|
|
asks = bk.get("asks", [])
|
|
bids = bk.get("bids", [])
|
|
book_tops.append({
|
|
"ask_px": str(asks[0]["price"]) if asks else "",
|
|
"ask_sz": str(asks[0]["size"]) if asks else "",
|
|
"bid_px": str(bids[0]["price"]) if bids else "",
|
|
"bid_sz": str(bids[0]["size"]) if bids else "",
|
|
})
|
|
live = signal.get("live", False)
|
|
log_prefix = "" if live else "[PAPER] "
|
|
|
|
fills: list[LegFill] = []
|
|
|
|
if self._session is None or self._session.closed:
|
|
self._session = self._create_session()
|
|
session = self._session
|
|
for i, leg in enumerate(legs):
|
|
if signal.get("_cancelled"):
|
|
self._log.info("execution_cancelled", correlation_id=correlation_id, completed_legs=len(fills))
|
|
report = ExecutionReport(
|
|
correlation_id=correlation_id,
|
|
triangle_key=signal.get("triangle_key", []),
|
|
status="aborted",
|
|
fills=fills,
|
|
predicted_bps=0.0,
|
|
ts_ms=int(time.time() * 1000),
|
|
error="cancelled",
|
|
)
|
|
self._emit_report(report)
|
|
return
|
|
|
|
pair = leg.get("pair", "")
|
|
side = leg.get("side", "")
|
|
order_param = Decimal(str(leg.get("order_param", "0")))
|
|
base_min_size = Decimal(str(leg.get("base_min_size", "0")))
|
|
base_increment = Decimal(str(leg.get("base_increment", "0")))
|
|
quote_increment = Decimal(str(leg.get("quote_increment", "0")))
|
|
fee_rate = Decimal(str(leg.get("fee_rate", "0.001")))
|
|
fee_ccy = leg.get("fee_currency", "")
|
|
|
|
base_ccy = pair.split("-")[0]
|
|
quote_ccy = pair.split("-")[-1]
|
|
|
|
if side == "buy":
|
|
input_ccy = quote_ccy
|
|
output_ccy = base_ccy
|
|
else:
|
|
input_ccy = base_ccy
|
|
output_ccy = quote_ccy
|
|
latency_ms = 0.0
|
|
|
|
if live:
|
|
# --- LIVE MODE ---
|
|
client_oid = f"t{correlation_id[-24:]}-{i}-{uuid.uuid4().hex[:4]}"
|
|
t0 = time.perf_counter()
|
|
|
|
if self._ws_client is None or not self._ws_client.is_connected:
|
|
self._log.error(
|
|
"live_mode_ws_not_connected",
|
|
correlation_id=correlation_id,
|
|
leg=i,
|
|
pair=pair,
|
|
)
|
|
report = ExecutionReport(
|
|
correlation_id=correlation_id,
|
|
triangle_key=signal.get("triangle_key", []),
|
|
status="failed",
|
|
fills=fills,
|
|
predicted_bps=0.0,
|
|
ts_ms=int(time.time() * 1000),
|
|
error="ws_not_connected",
|
|
)
|
|
self._emit_report(report)
|
|
return
|
|
|
|
order_fired_elapsed = (time.perf_counter() - exec_start_ts) * 1000
|
|
timings.append({"step": f"leg{i}_order_fired", "elapsed_ms": round(order_fired_elapsed, 2)})
|
|
|
|
if i == 0:
|
|
input_vol = order_param
|
|
else:
|
|
prev = fills[i - 1]
|
|
input_vol = prev.filled_volume
|
|
inc = quote_increment if side == "buy" else base_increment
|
|
if inc > 0:
|
|
input_vol = (input_vol / inc).to_integral_value(rounding=ROUND_DOWN) * inc
|
|
|
|
if side == "buy":
|
|
ok, err_msg, order_id = await self._api.order_place(
|
|
session=session,
|
|
symbol=pair,
|
|
side=side,
|
|
order_type="market",
|
|
funds=input_vol,
|
|
client_oid=client_oid,
|
|
)
|
|
else:
|
|
ok, err_msg, order_id = await self._api.order_place(
|
|
session=session,
|
|
symbol=pair,
|
|
side=side,
|
|
order_type="market",
|
|
size=input_vol,
|
|
client_oid=client_oid,
|
|
)
|
|
place_latency_ms = (time.perf_counter() - t0) * 1000
|
|
self._log.info(
|
|
f"{log_prefix}order_placed",
|
|
correlation_id=correlation_id,
|
|
leg=i,
|
|
pair=pair,
|
|
side=side,
|
|
order_id=order_id,
|
|
client_oid=client_oid,
|
|
volume=str(input_vol),
|
|
place_latency_ms=round(place_latency_ms, 2),
|
|
)
|
|
self._log.write_plain(
|
|
f"{time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime())}{int(time.time() * 1000) % 1000:03d}Z "
|
|
f"ORDER | corr={correlation_id} | leg{i} | {pair} | {side} | "
|
|
f"vol={str(input_vol)} | order_id={order_id} | lat={place_latency_ms:.1f}ms"
|
|
)
|
|
|
|
if not ok:
|
|
fills.append(LegFill(
|
|
leg=i, order_id=order_id or "",
|
|
side=side, pair=pair,
|
|
input_currency=input_ccy, output_currency=output_ccy,
|
|
fee_currency=fee_ccy,
|
|
filled_volume=input_vol, deal_funds=_D0,
|
|
avg_price=_D0, fee=_D0,
|
|
latency_ms=round(place_latency_ms, 2),
|
|
))
|
|
self._log.error(
|
|
f"{log_prefix}order_rejected",
|
|
correlation_id=correlation_id,
|
|
leg=i,
|
|
pair=pair,
|
|
side=side,
|
|
volume=str(input_vol),
|
|
error=err_msg,
|
|
)
|
|
self._log.write_plain(
|
|
f"{time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime())}{int(time.time() * 1000) % 1000:03d}Z "
|
|
f"REJECTED | corr={correlation_id} | leg{i} | {pair} | {side} | "
|
|
f"vol={str(input_vol)} | error={err_msg} | lat={place_latency_ms:.1f}ms"
|
|
)
|
|
report = ExecutionReport(
|
|
correlation_id=correlation_id,
|
|
triangle_key=signal.get("triangle_key", []),
|
|
status="failed",
|
|
fills=fills,
|
|
predicted_bps=predicted_bps,
|
|
ts_ms=int(time.time() * 1000),
|
|
timings=timings,
|
|
error=err_msg or "order_rejected",
|
|
book_tops=book_tops,
|
|
)
|
|
self._emit_report(report)
|
|
return
|
|
|
|
self._client_oid_to_order_id[client_oid] = order_id or ""
|
|
|
|
fill_timeout_ms = self._settings.fill_timeout_ms
|
|
fill_ok, fill_data = await self._ws_client.await_fill(
|
|
client_oid, fill_timeout_ms
|
|
)
|
|
fill_latency_ms = (time.perf_counter() - t0) * 1000
|
|
latency_ms = fill_latency_ms
|
|
|
|
if signal.get("_cancelled"):
|
|
self._log.info(
|
|
"execution_cancelled",
|
|
correlation_id=correlation_id,
|
|
completed_legs=len(fills),
|
|
)
|
|
report = ExecutionReport(
|
|
correlation_id=correlation_id,
|
|
triangle_key=signal.get("triangle_key", []),
|
|
status="aborted",
|
|
fills=fills,
|
|
predicted_bps=predicted_bps,
|
|
ts_ms=int(time.time() * 1000),
|
|
timings=timings,
|
|
error="cancelled",
|
|
)
|
|
self._emit_report(report)
|
|
return
|
|
|
|
if not fill_ok:
|
|
elapsed_ms = (time.perf_counter() - exec_start_ts) * 1000
|
|
self._log.error(
|
|
f"{log_prefix}fill_timeout",
|
|
correlation_id=correlation_id,
|
|
leg=i,
|
|
pair=pair,
|
|
client_oid=client_oid,
|
|
order_id=order_id,
|
|
fill_latency_ms=round(fill_latency_ms, 2),
|
|
total_elapsed_ms=round(elapsed_ms, 2),
|
|
)
|
|
report = ExecutionReport(
|
|
correlation_id=correlation_id,
|
|
triangle_key=signal.get("triangle_key", []),
|
|
status="failed",
|
|
fills=fills,
|
|
predicted_bps=predicted_bps,
|
|
ts_ms=int(time.time() * 1000),
|
|
timings=timings,
|
|
error=f"fill_timeout_leg{i}",
|
|
book_tops=book_tops,
|
|
)
|
|
self._emit_report(report)
|
|
return
|
|
|
|
fill_received_elapsed = (time.perf_counter() - exec_start_ts) * 1000
|
|
timings.append({"step": f"leg{i}_fill_received", "elapsed_ms": round(fill_received_elapsed, 2)})
|
|
order_id = fill_data.get("order_id", order_id or "")
|
|
total_size = fill_data.get("total_size", Decimal("0"))
|
|
total_funds = fill_data.get("total_funds", Decimal("0"))
|
|
weighted_avg_price = fill_data.get("weighted_avg_price", _D0)
|
|
|
|
# Fall back to book top if WS didn't provide a price (defensive).
|
|
book_snapshot = books[i] if i < len(books) else {}
|
|
if weighted_avg_price <= 0:
|
|
if side == "buy":
|
|
asks = book_snapshot.get("asks", [])
|
|
weighted_avg_price = Decimal(str(asks[0]["price"])) if asks else _D0
|
|
else:
|
|
bids = book_snapshot.get("bids", [])
|
|
weighted_avg_price = Decimal(str(bids[0]["price"])) if bids else _D0
|
|
|
|
avg_price = weighted_avg_price if weighted_avg_price > 0 else _D0
|
|
fee = _D0
|
|
|
|
filled_vol_val = total_size if side == "buy" else total_funds
|
|
fills.append(LegFill(
|
|
leg=i,
|
|
order_id=order_id,
|
|
side=side,
|
|
pair=pair,
|
|
input_currency=input_ccy,
|
|
output_currency=output_ccy,
|
|
fee_currency=fee_ccy,
|
|
filled_volume=filled_vol_val,
|
|
deal_funds=total_funds,
|
|
avg_price=avg_price,
|
|
fee=fee,
|
|
latency_ms=latency_ms,
|
|
))
|
|
self._log.write_plain(
|
|
f"{time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime())}{int(time.time() * 1000) % 1000:03d}Z "
|
|
f"FILL | corr={correlation_id} | leg{i} | {pair} | {side} | "
|
|
f"out={str(filled_vol_val)}@{str(avg_price)} | "
|
|
f"fee={str(fee)} {fee_ccy} | lat={latency_ms:.1f}ms"
|
|
)
|
|
if i < 2:
|
|
await self._ws_client.await_balance(
|
|
output_ccy, fills[-1].filled_volume, 2000
|
|
)
|
|
if side == "sell":
|
|
bal = self._ws_client.latest_balance(output_ccy)
|
|
if bal > 0:
|
|
fills[-1].filled_volume = bal
|
|
in_flight.last_trade_ts_ms = int(time.time() * 1000)
|
|
continue
|
|
|
|
# --- PAPER MODE ---
|
|
# The fused_engine already sized everything (per-leg order_param,
|
|
# fee rates, precision increments). We only validate via
|
|
# order_test and simulate fills from the attached books.
|
|
self._log.debug("order_test_calling", correlation_id=correlation_id, leg=i, pair=pair, side=side, order_param=str(order_param))
|
|
t0 = time.perf_counter()
|
|
order_fired_elapsed = (t0 - exec_start_ts) * 1000
|
|
timings.append({"step": f"leg{i}_order_fired", "elapsed_ms": round(order_fired_elapsed, 2)})
|
|
|
|
if side == "buy":
|
|
ok, err_msg, _ = await self._api.order_test(
|
|
session=session,
|
|
symbol=pair,
|
|
side=side,
|
|
order_type="market",
|
|
funds=order_param,
|
|
)
|
|
else:
|
|
ok, err_msg, _ = await self._api.order_test(
|
|
session=session,
|
|
symbol=pair,
|
|
side=side,
|
|
order_type="market",
|
|
size=order_param,
|
|
)
|
|
|
|
latency_ms = (time.perf_counter() - t0) * 1000
|
|
fill_elapsed = (time.perf_counter() - exec_start_ts) * 1000
|
|
timings.append({"step": f"leg{i}_fill_received", "elapsed_ms": round(fill_elapsed, 2)})
|
|
self._log._sl.info(f"{log_prefix}order_latency", correlation_id=correlation_id, leg=i, pair=pair, latency_ms=round(latency_ms, 2))
|
|
|
|
if not ok:
|
|
self._log.error(f"{log_prefix}order_rejected", correlation_id=correlation_id, leg=i, pair=pair, error=err_msg)
|
|
report = ExecutionReport(
|
|
correlation_id=correlation_id,
|
|
triangle_key=signal.get("triangle_key", []),
|
|
status="failed",
|
|
fills=fills,
|
|
predicted_bps=0.0,
|
|
ts_ms=int(time.time() * 1000),
|
|
error=err_msg or "order_rejected",
|
|
)
|
|
self._emit_report(report)
|
|
return
|
|
|
|
# Paper order succeeded — assign a synthetic order ID so the fill
|
|
# pipeline has something to track.
|
|
order_id = f"paper-{uuid.uuid4().hex[:12]}"
|
|
if signal.get("_cancelled"):
|
|
self._log.info("execution_cancelled", correlation_id=correlation_id, completed_legs=len(fills))
|
|
report = ExecutionReport(
|
|
correlation_id=correlation_id,
|
|
triangle_key=signal.get("triangle_key", []),
|
|
status="aborted",
|
|
fills=fills,
|
|
predicted_bps=0.0,
|
|
ts_ms=int(time.time() * 1000),
|
|
error="cancelled",
|
|
)
|
|
self._emit_report(report)
|
|
return
|
|
|
|
book_snapshot = books[i] if i < len(books) else {}
|
|
|
|
# Simulate fill from top-of-book: use the best ask (buy) or best bid (sell).
|
|
if side == "buy":
|
|
asks = book_snapshot.get("asks", [])
|
|
price = Decimal(str(asks[0]["price"])) if asks else _D0
|
|
funds = order_param
|
|
if quote_increment > 0:
|
|
funds = (funds / quote_increment).to_integral_value(rounding=ROUND_DOWN) * quote_increment
|
|
base_vol = funds / price if price > 0 else _D0
|
|
if base_increment > 0:
|
|
base_vol = (base_vol / base_increment).to_integral_value(rounding=ROUND_DOWN) * base_increment
|
|
deal_funds = funds
|
|
fee = base_vol * fee_rate if fee_ccy == base_ccy else order_param * fee_rate
|
|
filled_volume = base_vol
|
|
|
|
# Reject if simulated fill is below the exchange minimum.
|
|
if base_min_size > 0 and base_vol < base_min_size:
|
|
self._log.info("execution_aborted_below_min",
|
|
correlation_id=correlation_id, leg=i, pair=pair,
|
|
base_vol=str(base_vol), min_base=str(base_min_size),
|
|
)
|
|
report = ExecutionReport(
|
|
correlation_id=correlation_id,
|
|
triangle_key=signal.get("triangle_key", []),
|
|
status="aborted",
|
|
fills=fills,
|
|
predicted_bps=0.0,
|
|
ts_ms=int(time.time() * 1000),
|
|
error="volume_below_minimum",
|
|
)
|
|
self._emit_report(report)
|
|
return
|
|
else:
|
|
bids = book_snapshot.get("bids", [])
|
|
price = Decimal(str(bids[0]["price"])) if bids else _D0
|
|
base_vol = order_param
|
|
if base_increment > 0:
|
|
base_vol = (base_vol / base_increment).to_integral_value(rounding=ROUND_DOWN) * base_increment
|
|
quote_vol = base_vol * price if price > 0 else _D0
|
|
# Fee depends on whether KuCoin charges it in base or quote.
|
|
if fee_ccy == base_ccy:
|
|
fee = base_vol * fee_rate
|
|
deal_funds = quote_vol
|
|
else:
|
|
fee = quote_vol * fee_rate
|
|
deal_funds = quote_vol * (_D1 - fee_rate)
|
|
filled_volume = quote_vol
|
|
|
|
if base_min_size > 0 and order_param < base_min_size:
|
|
self._log.info("execution_aborted_below_min",
|
|
correlation_id=correlation_id, leg=i, pair=pair,
|
|
base_vol=str(order_param), min_base=str(base_min_size),
|
|
)
|
|
report = ExecutionReport(
|
|
correlation_id=correlation_id,
|
|
triangle_key=signal.get("triangle_key", []),
|
|
status="aborted",
|
|
fills=fills,
|
|
predicted_bps=0.0,
|
|
ts_ms=int(time.time() * 1000),
|
|
error="volume_below_minimum",
|
|
)
|
|
self._emit_report(report)
|
|
return
|
|
|
|
avg_price = price
|
|
|
|
fills.append(LegFill(
|
|
leg=i,
|
|
order_id=order_id,
|
|
side=side,
|
|
pair=pair,
|
|
input_currency=input_ccy,
|
|
output_currency=output_ccy,
|
|
fee_currency=fee_ccy,
|
|
filled_volume=filled_volume,
|
|
deal_funds=deal_funds,
|
|
avg_price=avg_price,
|
|
fee=fee,
|
|
latency_ms=latency_ms,
|
|
))
|
|
in_flight.last_trade_ts_ms = int(time.time() * 1000)
|
|
|
|
# Compute actual profit in the primary quote currency.
|
|
# Leg 2 output: if buy, the received base (filled_volume); if sell, the received
|
|
# quote (deal_funds, already net when fee is in quote).
|
|
if fills[2].side == "buy":
|
|
leg2_out = fills[2].filled_volume
|
|
if fills[2].fee_currency == fills[2].output_currency:
|
|
leg2_out -= fills[2].fee
|
|
else:
|
|
leg2_out = fills[2].deal_funds
|
|
# Leg 0 input: if buy, quote spent (deal_funds); if sell, base sold.
|
|
if fills[0].side == "buy":
|
|
leg0_in = fills[0].deal_funds
|
|
else:
|
|
leg0_in = fills[0].filled_volume
|
|
profit = float(leg2_out - leg0_in)
|
|
effective_bps = (profit / float(leg0_in)) * 10000 if leg0_in else 0.0
|
|
total_exec_ms = (time.perf_counter() - exec_start_ts) * 1000
|
|
timings.append({"step": "execution_complete", "elapsed_ms": round(total_exec_ms, 2)})
|
|
self._log._sl.info("execution_timing", correlation_id=correlation_id, timings=timings)
|
|
report = ExecutionReport(
|
|
correlation_id=correlation_id,
|
|
triangle_key=signal.get("triangle_key", []),
|
|
status="filled",
|
|
fills=fills,
|
|
predicted_bps=predicted_bps,
|
|
profit=profit,
|
|
effective_bps=effective_bps,
|
|
ts_ms=int(time.time() * 1000),
|
|
timings=timings,
|
|
book_ts_ms=book_ts_ms,
|
|
book_tops=book_tops,
|
|
)
|
|
self._last_trade_ts_ms[in_flight.triangle_key] = report.ts_ms
|
|
self._emit_report(report)
|
|
|
|
def _emit_report(self, report: ExecutionReport) -> None:
|
|
"""Persist an ExecutionReport to the in-memory deque and write to the log file."""
|
|
fills_lines = []
|
|
for f in report.fills:
|
|
fills_lines.append(
|
|
f"L{f.leg}:{f.side} {f.pair} {f.input_currency}->{f.output_currency} "
|
|
f"{f.filled_volume}@{f.avg_price}(fee={f.fee} {f.fee_currency} lat={f.latency_ms:.1f}ms)"
|
|
)
|
|
fills_repr = ", ".join(fills_lines)
|
|
self._reports.append(report)
|
|
error_suffix = f" | error={report.error}" if report.error else ""
|
|
profit_repr = f"profit={report.profit:.4f}"
|
|
timing_parts = " ".join(f"{t['step']}={t['elapsed_ms']:.1f}ms" for t in report.timings) if report.timings else ""
|
|
bt_lines = []
|
|
for bt in report.book_tops:
|
|
bt_lines.append(f"{bt.get('ask_px','?')}/{bt.get('bid_px','?')}")
|
|
book_top_repr = " | ".join(bt_lines) if bt_lines else ""
|
|
ts_iso = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.") + f"{report.ts_ms % 1000:03d}Z"
|
|
books_suffix = f" | books=[{book_top_repr}]" if book_top_repr else ""
|
|
msg = (
|
|
f"{ts_iso} {report.status.upper()} | corr={report.correlation_id} | "
|
|
f"triangle={report.triangle_key} | predicted_bps={report.predicted_bps:.2f} | "
|
|
f"effective_bps={report.effective_bps:.2f} | "
|
|
f"book_ts={report.book_ts_ms} | "
|
|
f"{profit_repr} | timings=[{timing_parts}] | fills=[{fills_repr}]{books_suffix}{error_suffix}"
|
|
)
|
|
self._log.write_plain(msg)
|
|
|
|
print(
|
|
f"{ts_iso} {report.status.upper()} | corr={report.correlation_id} | "
|
|
f"triangle={report.triangle_key} | predicted_bps={report.predicted_bps:.2f} | "
|
|
f"effective_bps={report.effective_bps:.2f} | "
|
|
f"book_ts={report.book_ts_ms} | "
|
|
f"{profit_repr} | fills=[{fills_repr}]{books_suffix}"
|
|
f"{f' | error={report.error}' if report.error else ''}",
|
|
flush=True,
|
|
)
|
|
|
|
async def cancel_execution(self, correlation_id: str) -> bool:
|
|
"""
|
|
Request cancellation of an in-flight execution by setting _cancelled
|
|
on the signal dict. Returns True if the execution was found.
|
|
"""
|
|
if correlation_id in self._in_flight:
|
|
self._log.info("cancel_requested", correlation_id=correlation_id)
|
|
self._in_flight[correlation_id].signal["_cancelled"] = True
|
|
return True
|
|
return False
|
|
|
|
async def close(self) -> None:
|
|
"""Close the aiohttp session and flush the log writer."""
|
|
if self._keepalive_task is not None and not self._keepalive_task.done():
|
|
self._keepalive_task.cancel()
|
|
try:
|
|
await self._keepalive_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
self._keepalive_task = None
|
|
if self._session is not None and not self._session.closed:
|
|
await self._session.close()
|
|
await self._log.close()
|
|
|
|
async def pause(self) -> None:
|
|
"""Block the executor from accepting new signals."""
|
|
async with self._pause_lock:
|
|
self._paused = True
|
|
self._log.info("executor_paused")
|
|
|
|
async def resume(self) -> None:
|
|
"""Unblock the executor to start accepting signals again."""
|
|
async with self._pause_lock:
|
|
self._paused = False
|
|
self._log.info("executor_resumed")
|
|
|
|
def get_uptime_seconds(self) -> float:
|
|
"""Return process uptime in seconds (monotonic clock, not wall time)."""
|
|
return (time.monotonic_ns() - self._uptime_ns) / 1e9
|
|
|
|
def set_concurrent_slots(self, slots: int) -> None:
|
|
"""
|
|
Dynamically adjust the maximum number of concurrent in-flight triangles.
|
|
Thread-safe: updates self._settings.concurrent_slots only.
|
|
"""
|
|
self._settings.concurrent_slots = slots
|
|
self._log.info("concurrent_slots_updated", slots=slots)
|
|
|
|
def get_in_flight(self) -> list[dict]:
|
|
return [
|
|
{
|
|
"correlation_id": inf.correlation_id,
|
|
"triangle_key": list(inf.triangle_key),
|
|
"pair_symbols": list(inf.pair_symbols),
|
|
"primary_quote": inf.primary_quote,
|
|
"started_ts_ms": inf.started_ts_ms,
|
|
}
|
|
for inf in self._in_flight.values()
|
|
]
|
|
|
|
def get_reports(self, limit: int = 50) -> list[dict]:
|
|
reports = list(self._reports)[-limit:]
|
|
return [
|
|
{
|
|
"correlation_id": r.correlation_id,
|
|
"triangle_key": r.triangle_key,
|
|
"status": r.status,
|
|
"fills": [{"leg": f.leg, "vol": str(f.filled_volume), "deal_funds": str(f.deal_funds), "price": str(f.avg_price), "fee": str(f.fee), "fee_currency": f.fee_currency} for f in r.fills],
|
|
"predicted_bps": r.predicted_bps,
|
|
"effective_bps": r.effective_bps,
|
|
"ts_ms": r.ts_ms,
|
|
**({"error": r.error} if r.error else {}),
|
|
}
|
|
for r in reports
|
|
]
|
|
|
|
def get_config(self) -> dict:
|
|
return {
|
|
"live_mode": self._live_mode,
|
|
"concurrent_slots": self._settings.concurrent_slots,
|
|
"enforce_same_base_isolation": self._settings.enforce_same_base_isolation,
|
|
"socket_path": str(self._settings.socket_path),
|
|
}
|