triangular_arbitrage_bot/executor/executor.py

949 lines
38 KiB
Python

"""
Triangular arbitrage executor.
Handles incoming opportunity signals from oe_em, enforces concurrency and
isolation constraints, computes order sizes, and posts orders to KuCoin
(via order_test in paper mode or real orders in live mode).
Signal contract
---------------
Each signal dict carries:
legs list[dict] — 3 legs, each with pair, side, order_param,
base_min_size, fee_rate, fee_currency
books list[dict] — order book snapshots per leg (top-of-book)
starting_volume str — quote-currency amount to deploy into leg 0
predicted_bps float — expected profit in basis points
primary_quote str — e.g. "USDT", used for capital capping & isolation
live bool — true = real orders, false = paper simulation
book_ts_ms int — when the order book was sampled (stale detection)
t_sock_arrive_ms int — when bytes arrived at SSL_read (true network arrival)
t_arrive_ms int — when parsing + book update completed
ts_ms int — when the signal was created
t_eval_ms int — when the signal was evaluated
_cancelled bool — set by cancel_execution to abort in-flight
"""
import asyncio
import contextlib
import json
import logging
import time
import uuid
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal
from pathlib import Path
from typing import Optional
import aiohttp
from aiohttp.connector import TCPConnector
import structlog
_D0 = Decimal("0")
_D1 = Decimal("1")
_D0_1 = Decimal("0.1")
class _DualLogger:
"""
Thin wrapper that mirrors structlog output to a plain-text file via an
async queue, keeping file I/O off the asyncio event loop.
The plain-text log supplements structlog's JSON output (written to
stdout) with a human-readable format that is useful for manual inspection.
"""
def __init__(self, structlog_logger, log_file: Optional[Path]) -> None:
self._sl = structlog_logger
self._log_file = log_file
self._queue: Optional[asyncio.Queue] = None
self._task: Optional[asyncio.Task] = None
if log_file:
self._queue = asyncio.Queue()
self._task = asyncio.create_task(self._writer_loop())
async def _writer_loop(self) -> None:
"""Background task that drains the queue and writes lines to disk
via a thread-pool executor so the event loop never blocks on I/O."""
loop = asyncio.get_running_loop()
log_file = self._log_file
def _write(line: str) -> None:
with open(log_file, "a") as f:
f.write(line)
while True:
item = None
try:
item = await self._queue.get()
except asyncio.CancelledError:
while not self._queue.empty():
try:
self._queue.get_nowait()
self._queue.task_done()
except asyncio.QueueEmpty:
break
raise
except Exception:
continue
self._queue.task_done()
if item is None:
break
try:
await loop.run_in_executor(None, _write, item)
except Exception:
pass
@staticmethod
def _fmt_value(v: object) -> str:
if isinstance(v, (dict, list)):
return json.dumps(v, default=str, separators=(",", ":"))
return str(v)
def _enqueue(self, level: str, event: str, **kw) -> None:
"""Publish a log entry to structlog and, if a log file is set, to the queue."""
sl_level = getattr(logging, level.upper(), logging.INFO)
if not self._sl.is_enabled_for(sl_level):
return
getattr(self._sl, level)(event, **kw)
if self._queue is None:
return
ts = f"{time.time():.0f}"
level_up = level.upper()
extras = " ".join(
f"{k}={self._fmt_value(v)}"
for k, v in kw.items()
if not k.startswith("_")
)
line = f"[{ts}] [{level_up:5}] {event} {extras}\n"
try:
self._queue.put_nowait(line)
except asyncio.QueueFull:
pass
def write_plain(self, text: str) -> None:
"""Write a line directly to the plain-text log queue (no structlog side-effect)."""
if self._queue is not None:
try:
self._queue.put_nowait(text + "\n")
except asyncio.QueueFull:
pass
def debug(self, event: str, **kw) -> None:
self._enqueue("debug", event, **kw)
def info(self, event: str, **kw) -> None:
self._enqueue("info", event, **kw)
def warning(self, event: str, **kw) -> None:
self._enqueue("warning", event, **kw)
def error(self, event: str, **kw) -> None:
self._enqueue("error", event, **kw)
async def close(self) -> None:
"""Signal the writer to stop and wait for it to flush."""
if self._queue is not None:
await self._queue.put(None)
await self._queue.join()
if self._task is not None:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
@dataclass
class LegFill:
"""Record of a single leg's filled order."""
leg: int
order_id: str
side: str
pair: str
input_currency: str
output_currency: str
fee_currency: str
filled_volume: Decimal
deal_funds: Decimal
avg_price: Decimal
fee: Decimal
latency_ms: float = 0.0
@dataclass
class ExecutionReport:
"""Summary of a triangle execution attempt, emitted after each fill or failure."""
correlation_id: str
triangle_key: list[str]
status: str
fills: list[LegFill]
predicted_bps: float
ts_ms: int
timings: list[dict] = field(default_factory=list)
profit: float = 0.0
effective_bps: float = 0.0
error: str = ""
@dataclass
class InFlightExecution:
"""Track a triangle execution that is currently in flight."""
triangle_key: frozenset
pair_symbols: frozenset
primary_quote: str
correlation_id: str
signal: dict
started_ts_ms: int
last_trade_ts_ms: int = 0
class Executor:
"""
Triangular arbitrage order executor.
Receives signals from oe_em via a Unix-domain socket, checks volume and
concurrency constraints, then executes up to three legs in sequence.
"""
def __init__(
self,
kucoin_api,
settings,
ws_client = None,
log_file: Optional[Path] = None,
live_mode: bool = False,
) -> None:
self._api = kucoin_api
self._settings = settings
self._ws_client = ws_client
self._log_file = log_file
self._live_mode = live_mode
self._log = _DualLogger(structlog.get_logger().bind(component="executor"), log_file)
self._in_flight: dict[str, InFlightExecution] = {}
self._last_trade_ts_ms: dict[frozenset, int] = {}
self._reports: deque[ExecutionReport] = deque(maxlen=1000)
self._paused = False
self._pause_lock = asyncio.Lock()
self._isolation_lock = asyncio.Lock()
self._uptime_ns: int = time.monotonic_ns()
self._report_written = False
self._client_oid_to_order_id: dict[str, str] = {}
self._session: aiohttp.ClientSession | None = None
self._session_timeout = aiohttp.ClientTimeout(sock_connect=5, sock_read=10)
self._keepalive_task: asyncio.Task | None = None
async def start(self) -> None:
"""Pre-initialize and warm up the aiohttp session so the first trade is not slowed."""
self._session = self._create_session()
async def _warm_up():
try:
async with self._session.get("https://api.kucoin.com/api/v1/time") as resp:
await resp.text()
except Exception:
pass
await _warm_up()
self._keepalive_task = asyncio.create_task(self._keepalive_loop())
_KEEPALIVE_INTERVAL = 15.0
def _create_session(self) -> aiohttp.ClientSession:
connector = TCPConnector(
limit=10,
limit_per_host=5,
force_close=False,
enable_cleanup_closed=True,
keepalive_timeout=60,
ttl_dns_cache=300,
)
return aiohttp.ClientSession(
timeout=self._session_timeout,
connector=connector,
)
async def _keepalive_loop(self) -> None:
"""Ping KuCoin periodically to keep the authenticated POST path warm."""
while True:
try:
await asyncio.sleep(self._KEEPALIVE_INTERVAL)
if self._live_mode:
async with self._session.get("https://api.kucoin.com/api/v1/time") as resp:
await resp.text()
else:
await self._api.order_test(
session=self._session,
symbol="BTC-USDT",
side="buy",
order_type="market",
funds=Decimal("1"),
)
except asyncio.CancelledError:
break
except Exception:
pass
async def handle_signal(self, signal: dict) -> None:
"""
Public entry point for processing an opportunity signal.
Catches all exceptions so they do not bubble into the socket server's
task chain; emits a failed ExecutionReport on error.
"""
try:
await self._handle_signal_impl(signal)
except asyncio.CancelledError:
raise
except BaseException as e:
correlation_id = signal.get("correlation_id", "")
error_msg = str(e)
self._log.error("signal_handler_error", error=error_msg, correlation_id=correlation_id)
self._emit_report(ExecutionReport(
correlation_id=correlation_id,
triangle_key=signal.get("triangle_key", []),
status="failed",
fills=[],
predicted_bps=0.0,
ts_ms=int(time.time() * 1000),
error=error_msg,
))
async def _handle_signal_impl(self, signal: dict) -> None:
"""
Internal signal processing pipeline.
Steps (in order):
1. Block if executor is paused.
2. Validate triangle_key and legs.
3. Reject stale signals (book_ts_ms older than last seen).
4. Reject signals where volume is below any leg's minimum.
5. Reject if no concurrency slot available.
6. Add to _in_flight and run _execute_triangle.
"""
async with self._pause_lock:
if self._paused:
self._log.debug("signal_dropped_paused", correlation_id=signal.get("correlation_id", ""))
return
correlation_id = signal.get("correlation_id", "")
triangle_key_list = signal.get("triangle_key", [])
triangle_key = frozenset(triangle_key_list)
primary_quote = signal.get("primary_quote", "")
legs = signal.get("legs", [])
book_ts_ms = signal.get("book_ts_ms", 0)
pair_symbols = frozenset(leg.get("pair", "") for leg in legs if leg.get("pair"))
if self._is_blocked(triangle_key, pair_symbols, primary_quote):
self._log.debug("signal_blocked", correlation_id=correlation_id, triangle_key=triangle_key_list)
return
if len(legs) != 3:
self._log.warning("invalid_signal", correlation_id=correlation_id, msg="expected 3 legs")
return
stale_ts = self._last_trade_ts_ms.get(triangle_key, 0)
if book_ts_ms > 0 and stale_ts > 0 and book_ts_ms < stale_ts:
self._log.debug("signal_discarded_stale", correlation_id=correlation_id, book_ts_ms=book_ts_ms, stale_ts=stale_ts)
return
async with self._isolation_lock:
if self._session is None or self._session.closed:
self._session = self._create_session()
if len(self._in_flight) >= self._settings.concurrent_slots:
self._log.debug("signal_dropped_no_slot", correlation_id=correlation_id)
return
if self._is_blocked(triangle_key, pair_symbols, primary_quote):
self._log.debug("signal_blocked", correlation_id=correlation_id, triangle_key=triangle_key_list)
return
in_flight = InFlightExecution(
triangle_key=triangle_key,
pair_symbols=pair_symbols,
primary_quote=primary_quote,
correlation_id=correlation_id,
signal=signal,
started_ts_ms=int(time.time() * 1000),
)
self._in_flight[correlation_id] = in_flight
self._log.debug("execute_triangle_starting", correlation_id=correlation_id)
try:
await self._execute_triangle(correlation_id, signal, in_flight)
finally:
async with self._isolation_lock:
self._in_flight.pop(correlation_id, None)
def _is_blocked(self, triangle_key: frozenset, pair_symbols: frozenset, primary_quote: str) -> bool:
"""
Return True if the given triangle_key, pair_symbols, or primary_quote
already has an in-flight execution (isolation enforcement).
"""
for inf in list(self._in_flight.values()):
if inf.triangle_key == triangle_key:
return True
if self._settings.enforce_same_base_isolation and inf.primary_quote == primary_quote:
return True
if self._settings.enforce_pair_isolation and pair_symbols & inf.pair_symbols:
return True
return False
async def _execute_triangle(
self, correlation_id: str, signal: dict, in_flight: InFlightExecution
) -> None:
"""
Execute the three legs of a triangular arbitrage signal.
Live mode
---------
Places real market orders on KuCoin via order_place and waits for
fill events from the WS client. Uses the incoming volume for leg 0
and propagates the output of each leg as the input for the next.
Paper mode
----------
Validates orders against KuCoin's order_test endpoint (ensures the
pair/size/funds are valid) then simulates fills from the order-book
snapshot attached to the signal. Fill quantities are computed by
walking the book at the top level — no real capital is committed.
In both modes the signal's ``_cancelled`` flag is checked between
legs so cancel_execution takes effect promptly. All outcomes
(filled, failed, aborted) produce an ExecutionReport.
"""
exec_start_ts = time.perf_counter()
timings: list[dict] = []
executor_receive_ts_ms = signal.get("_receiver_ts_ms") or int(time.time() * 1000)
signal_ts_ms = signal.get("ts_ms", 0)
book_ts_ms = signal.get("book_ts_ms", 0)
t_sock_arrive_ms = signal.get("t_sock_arrive_ms", 0)
t_arrive_ms = signal.get("t_arrive_ms", 0)
t_eval_ms = signal.get("t_eval_ms", 0)
if book_ts_ms > 0:
timings.append({"step": "t-2_book_snapshot", "elapsed_ms": -(executor_receive_ts_ms - book_ts_ms)})
if t_sock_arrive_ms > 0:
timings.append({"step": "socket_arrived", "elapsed_ms": -(executor_receive_ts_ms - t_sock_arrive_ms)})
if t_arrive_ms > 0:
timings.append({"step": "book_update_arrived", "elapsed_ms": -(executor_receive_ts_ms - t_arrive_ms)})
if t_eval_ms > 0:
timings.append({"step": "t-1_eval_complete", "elapsed_ms": -(executor_receive_ts_ms - t_eval_ms)})
if signal_ts_ms > 0:
timings.append({"step": "t_signal_created", "elapsed_ms": -(executor_receive_ts_ms - signal_ts_ms)})
timings.append({"step": "signal_received", "elapsed_ms": 0.0})
self._log.debug("execute_triangle_entered", correlation_id=correlation_id)
legs = signal.get("legs", [])
books = signal.get("books", [])
live = signal.get("live", False)
log_prefix = "" if live else "[PAPER] "
fills: list[LegFill] = []
if self._session is None or self._session.closed:
self._session = self._create_session()
session = self._session
for i, leg in enumerate(legs):
if signal.get("_cancelled"):
self._log.info("execution_cancelled", correlation_id=correlation_id, completed_legs=len(fills))
report = ExecutionReport(
correlation_id=correlation_id,
triangle_key=signal.get("triangle_key", []),
status="aborted",
fills=fills,
predicted_bps=0.0,
ts_ms=int(time.time() * 1000),
error="cancelled",
)
self._emit_report(report)
return
pair = leg.get("pair", "")
side = leg.get("side", "")
order_param = Decimal(str(leg.get("order_param", "0")))
base_min_size = Decimal(str(leg.get("base_min_size", "0")))
fee_rate = Decimal(str(leg.get("fee_rate", "0.001")))
fee_ccy = leg.get("fee_currency", "")
base_ccy = pair.split("-")[0]
quote_ccy = pair.split("-")[-1]
if side == "buy":
input_ccy = quote_ccy
output_ccy = base_ccy
else:
input_ccy = base_ccy
output_ccy = quote_ccy
latency_ms = 0.0
if live:
# --- LIVE MODE ---
client_oid = f"live-{correlation_id}-{i}-{uuid.uuid4().hex[:8]}"
t0 = time.perf_counter()
if self._ws_client is None or not self._ws_client.is_connected:
self._log.error(
"live_mode_ws_not_connected",
correlation_id=correlation_id,
leg=i,
pair=pair,
)
report = ExecutionReport(
correlation_id=correlation_id,
triangle_key=signal.get("triangle_key", []),
status="failed",
fills=fills,
predicted_bps=0.0,
ts_ms=int(time.time() * 1000),
error="ws_not_connected",
)
self._emit_report(report)
return
order_fired_elapsed = (time.perf_counter() - exec_start_ts) * 1000
timings.append({"step": f"leg{i}_order_fired", "elapsed_ms": round(order_fired_elapsed, 2)})
if i == 0:
input_vol = order_param
else:
prev = fills[i - 1]
input_vol = prev.filled_volume
if base_min_size > 0 and side == "sell" and input_vol < base_min_size:
self._log.info("execution_aborted_below_min",
correlation_id=correlation_id, leg=i, pair=pair,
base_vol=str(input_vol), min_base=str(base_min_size),
)
report = ExecutionReport(
correlation_id=correlation_id,
triangle_key=signal.get("triangle_key", []),
status="aborted",
fills=fills,
predicted_bps=0.0,
ts_ms=int(time.time() * 1000),
error="volume_below_minimum",
)
self._emit_report(report)
return
if side == "buy":
ok, err_msg, order_id = await self._api.order_place(
session=session,
symbol=pair,
side=side,
order_type="market",
funds=input_vol,
client_oid=client_oid,
)
else:
ok, err_msg, order_id = await self._api.order_place(
session=session,
symbol=pair,
side=side,
order_type="market",
size=input_vol,
client_oid=client_oid,
)
place_latency_ms = (time.perf_counter() - t0) * 1000
self._log.info(
f"{log_prefix}order_placed",
correlation_id=correlation_id,
leg=i,
pair=pair,
order_id=order_id,
client_oid=client_oid,
place_latency_ms=round(place_latency_ms, 2),
)
if not ok:
self._log.error(
f"{log_prefix}order_rejected",
correlation_id=correlation_id,
leg=i,
pair=pair,
error=err_msg,
)
report = ExecutionReport(
correlation_id=correlation_id,
triangle_key=signal.get("triangle_key", []),
status="failed",
fills=fills,
predicted_bps=0.0,
ts_ms=int(time.time() * 1000),
error=err_msg or "order_rejected",
)
self._emit_report(report)
return
self._client_oid_to_order_id[client_oid] = order_id or ""
fill_timeout_ms = self._settings.fill_timeout_ms
fill_ok, fill_data = await self._ws_client.await_fill(
client_oid, fill_timeout_ms
)
fill_latency_ms = (time.perf_counter() - t0) * 1000
latency_ms = fill_latency_ms
if signal.get("_cancelled"):
self._log.info(
"execution_cancelled",
correlation_id=correlation_id,
completed_legs=len(fills),
)
report = ExecutionReport(
correlation_id=correlation_id,
triangle_key=signal.get("triangle_key", []),
status="aborted",
fills=fills,
predicted_bps=0.0,
ts_ms=int(time.time() * 1000),
error="cancelled",
)
self._emit_report(report)
return
if not fill_ok:
elapsed_ms = (time.perf_counter() - exec_start_ts) * 1000
self._log.error(
f"{log_prefix}fill_timeout",
correlation_id=correlation_id,
leg=i,
pair=pair,
client_oid=client_oid,
order_id=order_id,
fill_latency_ms=round(fill_latency_ms, 2),
total_elapsed_ms=round(elapsed_ms, 2),
)
report = ExecutionReport(
correlation_id=correlation_id,
triangle_key=signal.get("triangle_key", []),
status="failed",
fills=fills,
predicted_bps=0.0,
ts_ms=int(time.time() * 1000),
error=f"fill_timeout_leg{i}",
)
self._emit_report(report)
return
fill_received_elapsed = (time.perf_counter() - exec_start_ts) * 1000
timings.append({"step": f"leg{i}_fill_received", "elapsed_ms": round(fill_received_elapsed, 2)})
order_id = fill_data.get("order_id", order_id or "")
total_size = fill_data.get("total_size", Decimal("0"))
total_funds = fill_data.get("total_funds", Decimal("0"))
weighted_avg_price = fill_data.get("weighted_avg_price", _D0)
# Fall back to book top if WS didn't provide a price (defensive).
book_snapshot = books[i] if i < len(books) else {}
if weighted_avg_price <= 0:
if side == "buy":
asks = book_snapshot.get("asks", [])
weighted_avg_price = Decimal(str(asks[0]["price"])) if asks else _D0
else:
bids = book_snapshot.get("bids", [])
weighted_avg_price = Decimal(str(bids[0]["price"])) if bids else _D0
avg_price = weighted_avg_price if weighted_avg_price > 0 else _D0
fee = _D0
fills.append(LegFill(
leg=i,
order_id=order_id,
side=side,
pair=pair,
input_currency=input_ccy,
output_currency=output_ccy,
fee_currency=fee_ccy,
filled_volume=total_size if side == "buy" else total_funds,
deal_funds=total_funds,
avg_price=avg_price,
fee=fee,
latency_ms=latency_ms,
))
in_flight.last_trade_ts_ms = int(time.time() * 1000)
continue
# --- PAPER MODE ---
# The fused_engine already sized everything (per-leg order_param,
# fee rates, precision increments). We only validate via
# order_test and simulate fills from the attached books.
self._log.debug("order_test_calling", correlation_id=correlation_id, leg=i, pair=pair, side=side, order_param=str(order_param))
t0 = time.perf_counter()
order_fired_elapsed = (t0 - exec_start_ts) * 1000
timings.append({"step": f"leg{i}_order_fired", "elapsed_ms": round(order_fired_elapsed, 2)})
if side == "buy":
ok, err_msg, _ = await self._api.order_test(
session=session,
symbol=pair,
side=side,
order_type="market",
funds=order_param,
)
else:
ok, err_msg, _ = await self._api.order_test(
session=session,
symbol=pair,
side=side,
order_type="market",
size=order_param,
)
latency_ms = (time.perf_counter() - t0) * 1000
fill_elapsed = (time.perf_counter() - exec_start_ts) * 1000
timings.append({"step": f"leg{i}_fill_received", "elapsed_ms": round(fill_elapsed, 2)})
self._log._sl.info(f"{log_prefix}order_latency", correlation_id=correlation_id, leg=i, pair=pair, latency_ms=round(latency_ms, 2))
if not ok:
self._log.error(f"{log_prefix}order_rejected", correlation_id=correlation_id, leg=i, pair=pair, error=err_msg)
report = ExecutionReport(
correlation_id=correlation_id,
triangle_key=signal.get("triangle_key", []),
status="failed",
fills=fills,
predicted_bps=0.0,
ts_ms=int(time.time() * 1000),
error=err_msg or "order_rejected",
)
self._emit_report(report)
return
# Paper order succeeded — assign a synthetic order ID so the fill
# pipeline has something to track.
order_id = f"paper-{uuid.uuid4().hex[:12]}"
if signal.get("_cancelled"):
self._log.info("execution_cancelled", correlation_id=correlation_id, completed_legs=len(fills))
report = ExecutionReport(
correlation_id=correlation_id,
triangle_key=signal.get("triangle_key", []),
status="aborted",
fills=fills,
predicted_bps=0.0,
ts_ms=int(time.time() * 1000),
error="cancelled",
)
self._emit_report(report)
return
book_snapshot = books[i] if i < len(books) else {}
# Simulate fill from top-of-book: use the best ask (buy) or best bid (sell).
if side == "buy":
asks = book_snapshot.get("asks", [])
price = Decimal(str(asks[0]["price"])) if asks else _D0
# For a buy with funds=X, you get X/price base units.
base_vol = order_param / price if price > 0 else _D0
deal_funds = order_param
fee = base_vol * fee_rate if fee_ccy == base_ccy else order_param * fee_rate
filled_volume = base_vol
# Reject if simulated fill is below the exchange minimum.
if base_min_size > 0 and base_vol < base_min_size:
self._log.info("execution_aborted_below_min",
correlation_id=correlation_id, leg=i, pair=pair,
base_vol=str(base_vol), min_base=str(base_min_size),
)
report = ExecutionReport(
correlation_id=correlation_id,
triangle_key=signal.get("triangle_key", []),
status="aborted",
fills=fills,
predicted_bps=0.0,
ts_ms=int(time.time() * 1000),
error="volume_below_minimum",
)
self._emit_report(report)
return
else:
bids = book_snapshot.get("bids", [])
price = Decimal(str(bids[0]["price"])) if bids else _D0
# For a sell with size=X, you get X*price quote units.
base_vol = order_param
quote_vol = base_vol * price if price > 0 else _D0
# Fee depends on whether KuCoin charges it in base or quote.
if fee_ccy == base_ccy:
fee = base_vol * fee_rate
deal_funds = quote_vol
else:
fee = quote_vol * fee_rate
deal_funds = quote_vol * (_D1 - fee_rate)
filled_volume = quote_vol
if base_min_size > 0 and order_param < base_min_size:
self._log.info("execution_aborted_below_min",
correlation_id=correlation_id, leg=i, pair=pair,
base_vol=str(order_param), min_base=str(base_min_size),
)
report = ExecutionReport(
correlation_id=correlation_id,
triangle_key=signal.get("triangle_key", []),
status="aborted",
fills=fills,
predicted_bps=0.0,
ts_ms=int(time.time() * 1000),
error="volume_below_minimum",
)
self._emit_report(report)
return
avg_price = price
fills.append(LegFill(
leg=i,
order_id=order_id,
side=side,
pair=pair,
input_currency=input_ccy,
output_currency=output_ccy,
fee_currency=fee_ccy,
filled_volume=filled_volume,
deal_funds=deal_funds,
avg_price=avg_price,
fee=fee,
latency_ms=latency_ms,
))
in_flight.last_trade_ts_ms = int(time.time() * 1000)
predicted_bps = signal.get("predicted_bps", 0.0)
# Compute actual profit in the primary quote currency.
# Leg 2 output: if buy, the received base (filled_volume); if sell, the received
# quote (deal_funds, already net when fee is in quote).
if fills[2].side == "buy":
leg2_out = fills[2].filled_volume
if fills[2].fee_currency == fills[2].output_currency:
leg2_out -= fills[2].fee
else:
leg2_out = fills[2].deal_funds
# Leg 0 input: if buy, quote spent (deal_funds); if sell, base sold.
if fills[0].side == "buy":
leg0_in = fills[0].deal_funds
else:
leg0_in = fills[0].filled_volume
profit = float(leg2_out - leg0_in)
effective_bps = (profit / float(leg0_in)) * 10000 if leg0_in else 0.0
total_exec_ms = (time.perf_counter() - exec_start_ts) * 1000
timings.append({"step": "execution_complete", "elapsed_ms": round(total_exec_ms, 2)})
self._log._sl.info("execution_timing", correlation_id=correlation_id, timings=timings)
report = ExecutionReport(
correlation_id=correlation_id,
triangle_key=signal.get("triangle_key", []),
status="filled",
fills=fills,
predicted_bps=predicted_bps,
profit=profit,
effective_bps=effective_bps,
ts_ms=int(time.time() * 1000),
timings=timings,
)
self._last_trade_ts_ms[in_flight.triangle_key] = report.ts_ms
self._emit_report(report)
def _emit_report(self, report: ExecutionReport) -> None:
"""Persist an ExecutionReport to the in-memory deque and write to the log file."""
fills_lines = []
for f in report.fills:
fills_lines.append(
f"L{f.leg}:{f.side} {f.pair} {f.input_currency}->{f.output_currency} "
f"{f.filled_volume}@{f.avg_price}(fee={f.fee} {f.fee_currency} lat={f.latency_ms:.1f}ms)"
)
fills_repr = ", ".join(fills_lines)
self._reports.append(report)
error_suffix = f" | error={report.error}" if report.error else ""
profit_repr = f"profit={report.profit:.4f}" if report.status == "filled" else ""
timing_parts = " ".join(f"{t['step']}={t['elapsed_ms']:.1f}ms" for t in report.timings) if report.timings else ""
ts_iso = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.") + f"{report.ts_ms % 1000:03d}Z"
msg = (
f"{ts_iso} {report.status.upper()} | corr={report.correlation_id} | "
f"triangle={report.triangle_key} | predicted_bps={report.predicted_bps:.2f} | "
f"effective_bps={report.effective_bps:.2f} | "
f"{profit_repr} | timings=[{timing_parts}] | fills=[{fills_repr}]{error_suffix}"
)
self._log.write_plain(msg)
async def cancel_execution(self, correlation_id: str) -> bool:
"""
Request cancellation of an in-flight execution by setting _cancelled
on the signal dict. Returns True if the execution was found.
"""
if correlation_id in self._in_flight:
self._log.info("cancel_requested", correlation_id=correlation_id)
self._in_flight[correlation_id].signal["_cancelled"] = True
return True
return False
async def close(self) -> None:
"""Close the aiohttp session and flush the log writer."""
if self._keepalive_task is not None and not self._keepalive_task.done():
self._keepalive_task.cancel()
try:
await self._keepalive_task
except asyncio.CancelledError:
pass
self._keepalive_task = None
if self._session is not None and not self._session.closed:
await self._session.close()
await self._log.close()
async def pause(self) -> None:
"""Block the executor from accepting new signals."""
async with self._pause_lock:
self._paused = True
self._log.info("executor_paused")
async def resume(self) -> None:
"""Unblock the executor to start accepting signals again."""
async with self._pause_lock:
self._paused = False
self._log.info("executor_resumed")
def get_uptime_seconds(self) -> float:
"""Return process uptime in seconds (monotonic clock, not wall time)."""
return (time.monotonic_ns() - self._uptime_ns) / 1e9
def set_concurrent_slots(self, slots: int) -> None:
"""
Dynamically adjust the maximum number of concurrent in-flight triangles.
Thread-safe: updates self._settings.concurrent_slots only.
"""
self._settings.concurrent_slots = slots
self._log.info("concurrent_slots_updated", slots=slots)
def get_in_flight(self) -> list[dict]:
return [
{
"correlation_id": inf.correlation_id,
"triangle_key": list(inf.triangle_key),
"pair_symbols": list(inf.pair_symbols),
"primary_quote": inf.primary_quote,
"started_ts_ms": inf.started_ts_ms,
}
for inf in self._in_flight.values()
]
def get_reports(self, limit: int = 50) -> list[dict]:
reports = list(self._reports)[-limit:]
return [
{
"correlation_id": r.correlation_id,
"triangle_key": r.triangle_key,
"status": r.status,
"fills": [{"leg": f.leg, "vol": str(f.filled_volume), "deal_funds": str(f.deal_funds), "price": str(f.avg_price), "fee": str(f.fee), "fee_currency": f.fee_currency} for f in r.fills],
"predicted_bps": r.predicted_bps,
"effective_bps": r.effective_bps,
"ts_ms": r.ts_ms,
**({"error": r.error} if r.error else {}),
}
for r in reports
]
def get_config(self) -> dict:
return {
"live_mode": self._live_mode,
"concurrent_slots": self._settings.concurrent_slots,
"enforce_same_base_isolation": self._settings.enforce_same_base_isolation,
"socket_path": str(self._settings.socket_path),
}