""" Opportunity detection engine. Evaluates all triangles involving a given symbol on every order-book update, computes the net return after fees, and fires a signal when the return exceeds the configured threshold. Supports KCS fee discounts and per-triangle cooldowns to avoid flooding the executor with duplicate signals. """ import asyncio import time import uuid from dataclasses import dataclass from pathlib import Path from typing import Optional, Callable import structlog from oe_em.book_consumer import BookConsumer from oe_em.triangle_enum import Triangle, TriangleLeg, TriangleIndex KCS_FEE_DISCOUNT = 0.8 logger = structlog.get_logger().bind(component="opportunity") def max_volume_for_triangle( triangle: Triangle, book_consumer: BookConsumer, primary_quote: str, fee_mult: float = 1.0, ) -> Optional[float]: """Compute max volume — kept for backward compatibility, but _build_full now does this inline.""" return None @dataclass class OpportunitySignal: """ A detected profitable triangular arbitrage opportunity. Emitted to the signal client when net_return_bps exceeds the threshold. """ triangle: Triangle direction: str net_return_bps: float max_volume: float leg_details: list[dict] ts_ms: int book_ts_ms: int books: list[dict] @dataclass class Stats: """ Running statistics counters for opportunity evaluation. Updated on every evaluate_triangles_for_pair call and returned by get_stats(). """ triangles_evaluated: int = 0 signals_fired: int = 0 books_missing: int = 0 books_full: int = 0 best_net_bps: float = -999999.0 worst_net_bps: float = 999999.0 last_eval_ts_ms: int = 0 best_legs: str = "" worst_legs: str = "" @dataclass class _EvalResult: """ Intermediate result of triangle evaluation. Attributes ---------- net_return_bps : float Net return after fees in basis points. max_volume : float Maximum safe input volume for the triangle. leg_details : list[dict] Per-leg dictionary suitable for serialising into a signal payload. book_ts_ms : int Timestamp (ms) of the oldest order book used in the evaluation. books : list[dict] Serialised top-of-book for each leg, in order. """ net_return_bps: float max_volume: float leg_details: list[dict] book_ts_ms: int books: list[dict] def leg_str(self) -> str: return " -> ".join( f"{d['pair']}({d['input_currency']}->{d['output_currency']})" for d in self.leg_details ) class OpportunityEngine: """ Detects and reports triangular arbitrage opportunities. On every order-book update (triggered via the on_update callback) the engine evaluates every triangle that involves the updated symbol. If the net return after fees exceeds signal_threshold_bps and the cooldown for that triangle has elapsed, a signal is dispatched to the executor via the on_signal callback. """ def __init__( self, book_consumer: BookConsumer, triangle_index: TriangleIndex, signal_threshold_bps: float, log_path: Path, kcs_discount_active: bool = False, cooldown_seconds: float = 5.0, on_signal: Optional[callable] = None, ) -> None: self._book_consumer = book_consumer self._triangle_index = triangle_index self._threshold_bps = signal_threshold_bps self._log_path = log_path self._fee_mult = KCS_FEE_DISCOUNT if kcs_discount_active else 1.0 self._cooldown_seconds = cooldown_seconds self._last_signal_ts: dict[frozenset[str], float] = {} self._log = logger self._stats = Stats() self._on_signal = on_signal self._net_cache: dict[frozenset[str], tuple[float, tuple[int, int, int]]] = {} def _compute_net_only( self, triangle: Triangle, ) -> tuple[Optional[float], int]: """ Compute net return BPS and min book ts_ms without building books/leg_details. Returns (net_return_bps, book_ts_ms) or (None, 0) if any book is missing. Used for fast-path threshold filtering before expensive serialization. """ cumulative = 1.0 book_ts_ms = 0 for leg in triangle.legs: book = self._book_consumer.get_book(leg.pair.symbol) if not book: return None, 0 if leg.input_currency == leg.pair.base: level = book.bids[0] if book.bids else None if not level: return None, 0 rate = level.price else: level = book.asks[0] if book.asks else None if not level: return None, 0 rate = 1.0 / level.price fee_factor = 1.0 - leg.taker_fee * self._fee_mult cumulative *= rate * fee_factor if book_ts_ms == 0 or book.ts_ms < book_ts_ms: book_ts_ms = book.ts_ms net_return = (cumulative - 1.0) * 10000 return net_return, book_ts_ms def _build_full( self, triangle: Triangle, ) -> Optional[_EvalResult]: """ Single-pass evaluation: compute net return, build leg_details/books, and compute max_volume — all in one loop over the triangle's legs. """ cumulative = 1.0 max_v0_list: list[float] = [] cumulative_mult = 1.0 leg_details = [] books: list[dict] = [] book_ts_ms = 0 for leg in triangle.legs: book = self._book_consumer.get_book(leg.pair.symbol) if not book: return None if leg.input_currency == leg.pair.base: level = book.bids[0] if book.bids else None if not level: return None max_input = level.size rate = level.price else: level = book.asks[0] if book.asks else None if not level: return None max_input = level.size * level.price rate = 1.0 / level.price fee_factor = 1.0 - leg.taker_fee * self._fee_mult cumulative *= rate * fee_factor if cumulative_mult > 0: max_v0_list.append(max_input / cumulative_mult) cumulative_mult *= rate * fee_factor leg_details.append({ "pair": leg.pair.symbol, "input_currency": leg.input_currency, "output_currency": leg.output_currency, "exchange_rate": rate, "fee_rate": leg.taker_fee, "fee_currency": leg.pair.fee_currency, }) books.append({ "symbol": book.symbol, "bids": [ {"price": b.price, "size": b.size} for b in book.bids ], "asks": [ {"price": a.price, "size": a.size} for a in book.asks ], "ts_ms": book.ts_ms, }) if book_ts_ms == 0 or book.ts_ms < book_ts_ms: book_ts_ms = book.ts_ms net_return = (cumulative - 1.0) * 10000 max_volume = min(max_v0_list) if max_v0_list else 0.0 return _EvalResult( net_return_bps=net_return, max_volume=max_volume, leg_details=leg_details, book_ts_ms=book_ts_ms, books=books, ) def evaluate_triangles_for_pair(self, symbol: str) -> list[OpportunitySignal]: """ Evaluate all triangles that include the given symbol. Called by the book consumer's on_update callback whenever an order book is refreshed. Updates stats, emits signals for triangles that clear the threshold and cooldown, and returns the list of signals (primarily for use in tests). """ triangles = self._triangle_index.get_triangles_for_pair(symbol) signals: list[OpportunitySignal] = [] now_ts_ms = int(time.time() * 1000) for triangle in triangles: cache_key = triangle.currencies leg_books = [self._book_consumer.get_book(leg.pair.symbol) for leg in triangle.legs] if any(b is None for b in leg_books): self._stats.triangles_evaluated += 1 self._stats.books_missing += 1 self._stats.last_eval_ts_ms = now_ts_ms continue current_ts = tuple(b.ts_ms for b in leg_books) cached = self._net_cache.get(cache_key) if cached and cached[1] == current_ts: net_bps = cached[0] book_ts_ms = min(current_ts) else: try: net_bps, book_ts_ms = self._compute_net_only(triangle) except Exception as e: self._log.error("triangle_compute_error", triangle=str(triangle.currencies), error=str(e)) self._stats.triangles_evaluated += 1 self._stats.last_eval_ts_ms = now_ts_ms continue if net_bps is not None: self._net_cache[cache_key] = (net_bps, current_ts) self._stats.triangles_evaluated += 1 if net_bps is None: self._stats.books_missing += 1 self._stats.last_eval_ts_ms = now_ts_ms continue self._stats.books_full += 1 if net_bps > self._stats.best_net_bps: self._stats.best_net_bps = net_bps if net_bps < self._stats.worst_net_bps: self._stats.worst_net_bps = net_bps if net_bps <= self._threshold_bps: self._stats.last_eval_ts_ms = now_ts_ms continue try: result = self._build_full(triangle) except Exception as e: self._log.error("triangle_compute_full_error", triangle=str(triangle.currencies), error=str(e)) continue if result is None: continue sig = OpportunitySignal( triangle=triangle, direction="forward", net_return_bps=net_bps, max_volume=result.max_volume, leg_details=result.leg_details, ts_ms=now_ts_ms, book_ts_ms=result.book_ts_ms, books=result.books, ) signals.append(sig) self._stats.signals_fired += 1 now = time.time() last = self._last_signal_ts.get(triangle.currencies, 0.0) if now - last >= self._cooldown_seconds: self._last_signal_ts[triangle.currencies] = now self._notify_opportunity(sig) self._stats.last_eval_ts_ms = now_ts_ms return signals def get_stats(self) -> Stats: """ Return a snapshot of the current stats counters. The returned Stats object is a copy; the internal counters continue to accumulate. """ return Stats( triangles_evaluated=self._stats.triangles_evaluated, signals_fired=self._stats.signals_fired, books_missing=self._stats.books_missing, books_full=self._stats.books_full, best_net_bps=self._stats.best_net_bps, worst_net_bps=self._stats.worst_net_bps, last_eval_ts_ms=self._stats.last_eval_ts_ms, best_legs=self._stats.best_legs, worst_legs=self._stats.worst_legs, ) def _notify_opportunity(self, sig: OpportunitySignal) -> None: """ Log the opportunity and dispatch the signal to the executor. The signal is sent over a Unix-domain socket via the on_signal callback (which is the send_signal coroutine of SignalSocketClient). A done callback is attached so any exception raised inside the receiver is logged rather than propagating silently. """ ts = sig.ts_ms direction = sig.direction net_bps = sig.net_return_bps leg_str = " -> ".join( f"{ld['pair']}({ld['input_currency']}->{ld['output_currency']})" for ld in sig.leg_details ) msg = ( f"[{ts}] OPPORTUNITY FOUND | " f"direction={direction} | " f"net_return={net_bps:.2f} bps | " f"max_volume={sig.max_volume} | " f"legs: {leg_str}" ) self._log.info("opportunity_detected", **{ "ts_ms": ts, "book_ts_ms": sig.book_ts_ms, "direction": direction, "net_return_bps": net_bps, "legs": leg_str, "max_volume": str(sig.max_volume), }) if self._on_signal: signal_payload = { "type": "signal", "correlation_id": str(uuid.uuid4()), "triangle_key": list(sig.triangle.currencies), "primary_quote": sig.triangle.primary_quote, "legs": sig.leg_details, "predicted_bps": net_bps, "max_volume": str(sig.max_volume), "ts_ms": ts, "book_ts_ms": sig.book_ts_ms, # books: full top-5 order books per leg — reserved for future # volume-extension logic (analyze deeper levels to compute how # far profit shrinks when volume is increased beyond top-of-book). "books": sig.books, } try: task = asyncio.create_task(self._on_signal(signal_payload)) task.add_done_callback(lambda t: t.exception() and self._log.warning("on_signal_error", error=str(t.exception()))) except Exception as e: self._log.warning("on_signal_error", error=str(e))