From 97b341fec9ba3d8f8d370afd99bba4092b2f6bf5 Mon Sep 17 00:00:00 2001 From: nicolas Date: Sun, 24 May 2026 21:36:48 -0300 Subject: [PATCH] cleanup: remove dead fh_ob/oe_em Python modules, add book_ts_ms to screen output --- executor/executor.py | 3 + fh_ob/__init__.py | 15 -- fh_ob/__main__.py | 87 --------- fh_ob/book_store.py | 70 ------- fh_ob/rest_server.py | 110 ----------- fh_ob/socket_server.py | 95 ---------- fh_ob/ws_client.py | 271 --------------------------- oe_em/__init__.py | 0 oe_em/__main__.py | 223 ---------------------- oe_em/book_consumer.py | 205 --------------------- oe_em/config.py | 90 --------- oe_em/kucoin_api.py | 96 ---------- oe_em/opportunity.py | 408 ----------------------------------------- oe_em/risk.py | 26 --- oe_em/socket_client.py | 113 ------------ oe_em/triangle_enum.py | 291 ----------------------------- 16 files changed, 3 insertions(+), 2100 deletions(-) delete mode 100644 fh_ob/__init__.py delete mode 100644 fh_ob/__main__.py delete mode 100644 fh_ob/book_store.py delete mode 100644 fh_ob/rest_server.py delete mode 100644 fh_ob/socket_server.py delete mode 100644 fh_ob/ws_client.py delete mode 100644 oe_em/__init__.py delete mode 100644 oe_em/__main__.py delete mode 100644 oe_em/book_consumer.py delete mode 100644 oe_em/config.py delete mode 100644 oe_em/kucoin_api.py delete mode 100644 oe_em/opportunity.py delete mode 100644 oe_em/risk.py delete mode 100644 oe_em/socket_client.py delete mode 100644 oe_em/triangle_enum.py diff --git a/executor/executor.py b/executor/executor.py index 5a08a3d..df1e032 100644 --- a/executor/executor.py +++ b/executor/executor.py @@ -188,6 +188,7 @@ class ExecutionReport: profit: float = 0.0 effective_bps: float = 0.0 error: str = "" + book_ts_ms: int = 0 @dataclass @@ -837,6 +838,7 @@ class Executor: effective_bps=effective_bps, ts_ms=int(time.time() * 1000), timings=timings, + book_ts_ms=book_ts_ms, ) self._last_trade_ts_ms[in_flight.triangle_key] = report.ts_ms self._emit_report(report) @@ -867,6 +869,7 @@ class Executor: f"{ts_iso} {report.status.upper()} | corr={report.correlation_id} | " f"triangle={report.triangle_key} | predicted_bps={report.predicted_bps:.2f} | " f"effective_bps={report.effective_bps:.2f} | " + f"book_ts={report.book_ts_ms} | " f"profit={report.profit:.4f}" f"{f' | error={report.error}' if report.error else ''}", flush=True, diff --git a/fh_ob/__init__.py b/fh_ob/__init__.py deleted file mode 100644 index 49de5a9..0000000 --- a/fh_ob/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -"""Feed Handler + Order Book Mirror.""" - -from fh_ob.ws_client import KuCoinWSClient -from fh_ob.book_store import BookStore, OrderBookTop5, BookLevel -from fh_ob.socket_server import SocketServer -from fh_ob.rest_server import create_app - -__all__ = [ - "KuCoinWSClient", - "BookStore", - "OrderBookTop5", - "BookLevel", - "SocketServer", - "create_app", -] \ No newline at end of file diff --git a/fh_ob/__main__.py b/fh_ob/__main__.py deleted file mode 100644 index ee1f2ff..0000000 --- a/fh_ob/__main__.py +++ /dev/null @@ -1,87 +0,0 @@ -import asyncio -import signal -from pathlib import Path - -import structlog -import uvicorn - -from common.config import Settings -from common.log import configure_logging -from fh_ob.book_store import BookStore -from fh_ob.rest_server import create_app -from fh_ob.socket_server import SocketServer -from fh_ob.ws_client import KuCoinWSClient - - -async def main() -> None: - config_path = Path("config.yaml") - settings = await Settings.from_yaml(config_path) if config_path.exists() else Settings() - configure_logging(settings.fh_ob.log_level, settings.fh_ob.log_file) - log = structlog.get_logger().bind(component="fh_ob") - - log.info("fh_ob_starting", symbols=settings.fh_ob.symbols) - - book_store = BookStore() - socket_server = SocketServer(settings.fh_ob.socket_path) - - async def on_book_update(book): - try: - await socket_server.broadcast(book) - except Exception: - pass - - ws_client = KuCoinWSClient( - settings=settings.fh_ob, - book_store=book_store, - on_book_update=on_book_update, - ) - - rest_app = create_app( - book_store, - get_socket_clients=socket_server.client_count, - get_subscribed_count=ws_client.subscribed_count, - is_connected=ws_client.is_connected, - add_symbol=ws_client.add_symbol, - remove_symbol=ws_client.remove_symbol, - get_symbols=ws_client.get_symbols, - get_reconnect_stats=ws_client.reconnect_stats, - ) - rest_config = uvicorn.Config( - rest_app, - host=settings.fh_ob.rest_host, - port=settings.fh_ob.rest_port, - log_level="warning", - ) - rest_server = uvicorn.Server(rest_config) - - async def shutdown(sig: signal.Signals) -> None: - log.info("shutdown_signal_received", signal=sig.name) - await ws_client.stop() - await socket_server.stop() - rest_config.should_exit = True - - loop = asyncio.get_running_loop() - for sig in (signal.SIGTERM, signal.SIGINT): - loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(shutdown(s))) - - ws_task = asyncio.create_task(ws_client.start()) - socket_task = asyncio.create_task(socket_server.start()) - rest_task = asyncio.create_task(rest_server.serve()) - - log.info( - "fh_ob_ready", - rest_endpoint=f"http://{settings.fh_ob.rest_host}:{settings.fh_ob.rest_port}", - socket_path=str(settings.fh_ob.socket_path), - ) - - try: - await asyncio.gather(ws_task, socket_task, rest_task) - except asyncio.CancelledError: - log.info("fh_ob_cancelled") - except Exception as e: - log.error("fh_ob_error", error=str(e)) - raise - - -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file diff --git a/fh_ob/book_store.py b/fh_ob/book_store.py deleted file mode 100644 index 0080245..0000000 --- a/fh_ob/book_store.py +++ /dev/null @@ -1,70 +0,0 @@ -import time -from dataclasses import dataclass, field -from typing import Optional - -import structlog - -logger = structlog.get_logger() - - -@dataclass -class BookLevel: - price: float - size: float - - @classmethod - def from_list(cls, data: list) -> "BookLevel": - return cls(price=float(data[0]), size=float(data[1])) - - def to_dict(self) -> dict: - return {"price": self.price, "size": self.size} - - -@dataclass -class OrderBookTop5: - symbol: str - bids: list[BookLevel] = field(default_factory=list) - asks: list[BookLevel] = field(default_factory=list) - ts_ms: int = 0 - - def to_dict(self) -> dict: - return { - "symbol": self.symbol, - "bids": [b.to_dict() for b in self.bids], - "asks": [a.to_dict() for a in self.asks], - "ts_ms": self.ts_ms, - } - - -class BookStore: - def __init__(self) -> None: - self._books: dict[str, OrderBookTop5] = {} - self._log = logger.bind(component="book_store") - - def update(self, raw: dict) -> Optional[OrderBookTop5]: - topic = raw.get("topic", "") - data = raw.get("data", {}) - - topic_suffix = topic.split(":")[-1] if ":" in topic else "" - symbol = topic_suffix.split(",")[0].strip() if topic_suffix else "" - asks_raw = data.get("asks", []) - bids_raw = data.get("bids", []) - - if not symbol: - return None - - ts_ms = int(data.get("time", time.time() * 1000)) - - bids = [BookLevel.from_list(b) for b in bids_raw[:1]] - asks = [BookLevel.from_list(a) for a in asks_raw[:1]] - - book = OrderBookTop5(symbol=symbol, bids=bids, asks=asks, ts_ms=ts_ms) - self._books[symbol] = book - - return book - - def get(self, symbol: str) -> Optional[OrderBookTop5]: - return self._books.get(symbol) - - def get_all(self) -> dict[str, OrderBookTop5]: - return self._books.copy() \ No newline at end of file diff --git a/fh_ob/rest_server.py b/fh_ob/rest_server.py deleted file mode 100644 index ef99e38..0000000 --- a/fh_ob/rest_server.py +++ /dev/null @@ -1,110 +0,0 @@ -from typing import Callable, Optional - -from fastapi import FastAPI, HTTPException -from pydantic import BaseModel - -from fh_ob.book_store import BookStore, OrderBookTop5 - - -class BookLevelResponse(BaseModel): - price: str - size: str - - -class OrderBookResponse(BaseModel): - symbol: str - bids: list[BookLevelResponse] - asks: list[BookLevelResponse] - ts_ms: int - - -class HealthResponse(BaseModel): - status: str - books_tracked: int - socket_clients: int - subscribed_symbols: int - connected: bool - last_update_ms: Optional[int] = None - reconnect_count: int = 0 - last_reconnect_ms: Optional[int] = None - - -class SymbolOpRequest(BaseModel): - symbol: str - - -class SymbolsResponse(BaseModel): - symbols: list[str] - - -def create_app( - book_store: BookStore, - get_socket_clients: Optional[Callable[[], int]] = None, - get_subscribed_count: Optional[Callable[[], int]] = None, - is_connected: Optional[Callable[[], bool]] = None, - add_symbol: Optional[Callable[[str], bool]] = None, - remove_symbol: Optional[Callable[[str], bool]] = None, - get_symbols: Optional[Callable[[], list[str]]] = None, - get_reconnect_stats: Optional[Callable[[], tuple[int, int]]] = None, -) -> FastAPI: - app = FastAPI(title="FH+OB Debug API", description="Dev-only debug endpoint") - - @app.get("/book/{symbol}", response_model=OrderBookResponse) - async def get_book(symbol: str) -> OrderBookResponse: - book = book_store.get(symbol) - if book is None: - raise HTTPException(status_code=404, detail=f"No book data for {symbol}") - return OrderBookResponse( - symbol=book.symbol, - bids=[BookLevelResponse(price=str(b.price), size=str(b.size)) for b in book.bids], - asks=[BookLevelResponse(price=str(a.price), size=str(a.size)) for a in book.asks], - ts_ms=book.ts_ms, - ) - - @app.get("/books", response_model=dict[str, OrderBookResponse]) - async def get_all_books() -> dict[str, OrderBookResponse]: - books = book_store.get_all() - return { - symbol: OrderBookResponse( - symbol=book.symbol, - bids=[BookLevelResponse(price=str(b.price), size=str(b.size)) for b in book.bids], - asks=[BookLevelResponse(price=str(a.price), size=str(a.size)) for a in book.asks], - ts_ms=book.ts_ms, - ) - for symbol, book in books.items() - } - - @app.get("/symbols") - async def list_symbols(): - return SymbolsResponse(symbols=get_symbols() if get_symbols else []) - - @app.post("/symbols") - async def add_sym(req: SymbolOpRequest): - if add_symbol and add_symbol(req.symbol): - return SymbolsResponse(symbols=get_symbols() if get_symbols else []) - raise HTTPException(status_code=400, detail="Symbol not found or already subscribed") - - @app.delete("/symbols/{symbol}") - async def rm_sym(symbol: str): - if remove_symbol and remove_symbol(symbol): - return SymbolsResponse(symbols=get_symbols() if get_symbols else []) - raise HTTPException(status_code=404, detail="Symbol not found or not subscribed") - - @app.get("/health", response_model=HealthResponse) - async def health() -> HealthResponse: - books = book_store.get_all() - latest_ts = max((b.ts_ms for b in books.values()), default=None) - reconnects, last_reconnect_ms = get_reconnect_stats() if get_reconnect_stats else (0, None) - - return HealthResponse( - status="ok" if (is_connected and is_connected()) else "degraded", - books_tracked=len(books), - socket_clients=get_socket_clients() if get_socket_clients else 0, - subscribed_symbols=get_subscribed_count() if get_subscribed_count else 0, - connected=is_connected() if is_connected else False, - last_update_ms=latest_ts, - reconnect_count=reconnects, - last_reconnect_ms=last_reconnect_ms, - ) - - return app \ No newline at end of file diff --git a/fh_ob/socket_server.py b/fh_ob/socket_server.py deleted file mode 100644 index 591505a..0000000 --- a/fh_ob/socket_server.py +++ /dev/null @@ -1,95 +0,0 @@ -from typing import Optional - -import asyncio -import json -from pathlib import Path - -import structlog - -from fh_ob.book_store import OrderBookTop5 - - -class SocketServer: - def __init__(self, socket_path: Path) -> None: - self._socket_path = socket_path - self._log = structlog.get_logger().bind(component="socket_server") - self._clients: set[asyncio.StreamWriter] = set() - self._server: Optional[asyncio.Server] = None - - async def start(self) -> None: - if self._socket_path.exists(): - self._socket_path.unlink() - - self._server = await asyncio.start_unix_server( - self._accept_client, - path=str(self._socket_path), - ) - self._log.info("socket_server_started", path=str(self._socket_path)) - - async def stop(self) -> None: - if self._server: - self._server.close() - await self._server.wait_closed() - if self._socket_path.exists(): - self._socket_path.unlink() - self._log.info("socket_server_stopped") - - def client_count(self) -> int: - return len(self._clients) - - async def _accept_client( - self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter - ) -> None: - self._clients.add(writer) - self._log.info("client_connected", addr=writer.get_extra_info("peername")) - try: - while True: - try: - line = await reader.readline() - except (ConnectionResetError, BrokenPipeError, asyncio.CancelledError): - break - except Exception: - break - if not line: - break - except asyncio.CancelledError: - pass - except Exception: - pass - finally: - self._clients.discard(writer) - writer.close() - try: - await asyncio.wait_for(writer.wait_closed(), timeout=1.0) - except (asyncio.CancelledError, Exception): - pass - self._log.info("client_disconnected") - - async def broadcast(self, book: OrderBookTop5) -> None: - if not self._clients: - return - - msg_bytes = json.dumps(book.to_dict(), separators=(",", ":")).encode() + b"\n" - - clients_snapshot = list(self._clients) - bad = set() - for w in clients_snapshot: - try: - w.write(msg_bytes) - except Exception as e: - self._log.warning("broadcast_write_failed", error=str(e)) - bad.add(w) - - if not clients_snapshot: - return - - drain_results = await asyncio.gather( - *(w.drain() for w in clients_snapshot), - return_exceptions=True, - ) - for w, res in zip(clients_snapshot, drain_results): - if isinstance(res, Exception): - self._log.warning("broadcast_drain_failed", error=str(res)) - bad.add(w) - - self._clients -= bad \ No newline at end of file diff --git a/fh_ob/ws_client.py b/fh_ob/ws_client.py deleted file mode 100644 index 7f540e0..0000000 --- a/fh_ob/ws_client.py +++ /dev/null @@ -1,271 +0,0 @@ -import asyncio -import json -import time -import uuid -from dataclasses import dataclass, field -from typing import Callable, Optional, Awaitable - -import aiohttp -import structlog -import websockets - -from common.config import FHobSettings -from fh_ob.book_store import BookStore, OrderBookTop5 - - -@dataclass -class _WorkerState: - symbols: set[str] = field(default_factory=set) - command_queue: asyncio.Queue = field(default_factory=asyncio.Queue) - ws_id: int = 0 - reconnect_count: int = 0 - last_reconnect_ts_ms: int = 0 - connection_active: bool = False - - -class KuCoinWSClient: - def __init__( - self, - settings: FHobSettings, - book_store: BookStore, - on_book_update: Optional[Callable[[OrderBookTop5], None | Awaitable[None]]] = None, - ) -> None: - self._settings = settings - self._book_store = book_store - self._on_book_update_callback = on_book_update - self._log = structlog.get_logger().bind(component="ws_client") - self._running = False - self._reconnect_delay = settings.reconnect_base_delay - self._subscription_events: dict[str, asyncio.Event] = {} - self._workers: list[_WorkerState] = [] - self._worker_tasks: list[asyncio.Task] = [] - self._http_session: Optional[aiohttp.ClientSession] = None - - async def start(self) -> None: - self._running = True - self._workers.clear() - self._worker_tasks.clear() - self._http_session = aiohttp.ClientSession() - symbol_list = list(self._settings.symbols) - for i in range(0, len(symbol_list) or 1, 400): - group = set(symbol_list[i : i + 400]) - ws_id = len(self._workers) + 1 - state = _WorkerState(symbols=group, ws_id=ws_id) - self._workers.append(state) - for state in self._workers: - task = asyncio.create_task(self._connection_worker(state)) - self._worker_tasks.append(task) - try: - await asyncio.gather(*self._worker_tasks) - except asyncio.CancelledError: - pass - self._log.debug("all_workers_stopped") - - async def stop(self) -> None: - self._running = False - for t in self._worker_tasks: - t.cancel() - if self._worker_tasks: - await asyncio.wait(self._worker_tasks, timeout=5) - if self._http_session and not self._http_session.closed: - await self._http_session.close() - self._log.debug("ws_client_stopped") - - def is_connected(self) -> bool: - return any(w.connection_active for w in self._workers) - - def subscribed_count(self) -> int: - return sum(len(w.symbols) for w in self._workers) - - def reconnect_stats(self) -> tuple[int, int]: - """Return (total_reconnects, timestamp_ms of last reconnect) across all workers.""" - total = sum(w.reconnect_count for w in self._workers) - latest = max((w.last_reconnect_ts_ms for w in self._workers), default=0) - return total, latest - - def get_symbols(self) -> list[str]: - result = [] - for w in self._workers: - result.extend(w.symbols) - return result - - def add_symbol(self, symbol: str) -> bool: - if not self._workers: - return False - if any(symbol in w.symbols for w in self._workers): - return False - self._settings.symbols.append(symbol) - eligible = [w for w in self._workers if len(w.symbols) < 400] - if not eligible: - self._log.warning("all_workers_full", symbol=symbol) - return False - worker = min(eligible, key=lambda w: len(w.symbols)) - worker.symbols.add(symbol) - worker.command_queue.put_nowait(("subscribe", symbol)) - return True - - def remove_symbol(self, symbol: str) -> bool: - found = False - for worker in self._workers: - if symbol in worker.symbols: - worker.symbols.discard(symbol) - found = True - break - if not found: - return False - self._settings.symbols.remove(symbol) - return True - - async def _connection_worker(self, state: _WorkerState) -> None: - while self._running: - try: - token, instance = await self._get_public_token() - self._ping_interval = instance.get("pingInterval", 18000) / 1000.0 - ws = await websockets.connect( - instance["endpoint"] + f"?token={token}&connectId={uuid.uuid4()}-{state.ws_id}", - ping_interval=None, - ) - self._log.debug("ws_connected", ws_id=state.ws_id) - self._reconnect_delay = self._settings.reconnect_base_delay - state.connection_active = True - - ping_task = asyncio.create_task(self._ping_loop(ws, state.ws_id)) - - async def reader() -> None: - try: - async for msg in ws: - await self._handle_message(msg) - except websockets.ConnectionClosed as e: - self._log.warning("reader_connection_closed", ws_id=state.ws_id, code=e.code, reason=e.reason) - except asyncio.CancelledError: - raise - except Exception as e: - self._log.error("reader_unexpected_error", ws_id=state.ws_id, error=str(e)) - - reader_task = asyncio.create_task(reader()) - - try: - if state.symbols: - await self._send_subscribe(ws, list(state.symbols), state.ws_id) - - while True: - cmd = await state.command_queue.get() - if cmd is None: - break - action, symbol = cmd - if action == "subscribe": - self._log.debug( - "subscribing_dynamic", - symbol=symbol, - ws_id=state.ws_id, - ) - await self._send_subscribe(ws, [symbol], state.ws_id) - except asyncio.CancelledError: - raise - except websockets.ConnectionClosed as e: - self._log.warning("ws_disconnected", ws_id=state.ws_id, code=e.code, reason=e.reason) - except Exception as e: - self._log.error("command_loop_error", ws_id=state.ws_id, error=str(e)) - finally: - state.connection_active = False - ping_task.cancel() - reader_task.cancel() - try: - await reader_task - except asyncio.CancelledError: - pass - except asyncio.CancelledError: - break - except Exception as e: - if not self._running: - break - state.connection_active = False - state.reconnect_count += 1 - state.last_reconnect_ts_ms = int(time.time() * 1000) - self._log.warning( - "ws_reconnecting", - ws_id=state.ws_id, - reconnect_count=state.reconnect_count, - error=str(e), - ) - await asyncio.sleep(self._reconnect_delay) - self._reconnect_delay = min( - self._reconnect_delay * 2, - self._settings.reconnect_max_delay, - ) - - self._log.debug("worker_exiting", ws_id=state.ws_id) - - async def _get_public_token(self) -> tuple[str, dict]: - self._log.debug("fetching_public_token", url=self._settings.token_url) - async with self._http_session.post(self._settings.token_url) as resp: - data = await resp.json() - token = data["data"]["token"] - instance = data["data"]["instanceServers"][0] - self._log.debug("public_token_received", ping_interval_ms=instance.get("pingInterval")) - return token, instance - - async def _send_subscribe(self, ws, symbols: list[str], ws_id: int) -> None: - for i in range(0, len(symbols), 100): - batch = symbols[i : i + 100] - topic = "/spotMarket/level2Depth5:" + ",".join(batch) - ack_id = str(uuid.uuid4()) - evt = asyncio.Event() - self._subscription_events[ack_id] = evt - sub_msg = { - "id": ack_id, - "type": "subscribe", - "topic": topic, - "response": True, - } - self._log.debug("subscribing", topic=topic[:80], ws_id=ws_id) - await ws.send(json.dumps(sub_msg)) - try: - await asyncio.wait_for(evt.wait(), timeout=self._reconnect_delay) - except asyncio.TimeoutError: - self._log.warning("subscription_ack_timeout", topic=topic[:80], ws_id=ws_id) - raise - - async def _ping_loop(self, ws, ws_id: int) -> None: - while self._running: - await asyncio.sleep(self._ping_interval) - try: - await ws.ping() - except Exception: - self._log.warning("ping_failed", ws_id=ws_id) - break - - async def _handle_message(self, msg: str) -> None: - try: - data = json.loads(msg) - except json.JSONDecodeError: - self._log.warning("invalid_json", msg=msg[:100]) - return - - msg_type = data.get("type") - - if msg_type == "welcome": - self._log.debug("ws_welcome") - return - - if msg_type == "pong": - return - - if msg_type == "ack": - ack_id = data.get("id") - self._log.debug("subscription_ack", topic=data.get("topic"), ack_id=ack_id) - if ack_id in self._subscription_events: - self._subscription_events[ack_id].set() - del self._subscription_events[ack_id] - return - - topic = data.get("topic", "") - - if msg_type == "message" and "level2Depth5" in topic: - book = self._book_store.update(data) - if book and self._on_book_update_callback: - result = self._on_book_update_callback(book) - if asyncio.iscoroutine(result): - asyncio.create_task(result) - elif topic: - self._log.warning("ws_unexpected_message", type=msg_type, topic=topic) diff --git a/oe_em/__init__.py b/oe_em/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/oe_em/__main__.py b/oe_em/__main__.py deleted file mode 100644 index 2c36576..0000000 --- a/oe_em/__main__.py +++ /dev/null @@ -1,223 +0,0 @@ -""" -Opportunity engine entry point. - -Initialises the order-book consumer, triangle index, and signal socket client; -starts background tasks for book consumption and periodic stats logging; shuts -down cleanly on SIGTERM/SIGINT. -""" -import asyncio -import signal -from pathlib import Path - -import aiohttp -import structlog - -from common.log import configure_logging -from oe_em.book_consumer import BookConsumer -from oe_em.config import Settings -from oe_em.kucoin_api import KuCoinAPI -from oe_em.opportunity import OpportunityEngine -from oe_em.risk import RiskManager -from oe_em.socket_client import SignalSocketClient -from oe_em.triangle_enum import TradingPair, enumerate_triangles - - -async def sync_symbols_with_fh_ob( - fh_ob_url: str, - needed_symbols: set[str], - http_session: aiohttp.ClientSession, - log, -) -> set[str]: - """ - Ensure fh_ob is subscribed to every symbol needed by the triangle index. - - Fetches the current subscription list from fh_ob, posts any missing symbols, - and returns the full set of subscribed symbols. - """ - get_url = f"{fh_ob_url}/symbols" - async with http_session.get(get_url) as resp: - resp.raise_for_status() - data = await resp.json() - - current_symbols = set(data.get("symbols", [])) - missing = needed_symbols - current_symbols - - if missing: - log.info("syncing_symbols", missing=len(missing), current=len(current_symbols)) - for sym in missing: - post_url = f"{fh_ob_url}/symbols" - payload = {"symbol": sym} - async with http_session.post(post_url, json=payload) as post_resp: - if post_resp.status == 400: - log.warning("symbol_cannot_be_subscribed", symbol=sym) - else: - log.debug("symbol_subscribed", symbol=sym) - - return current_symbols | missing - - -async def main() -> None: - config_path = Path("config.yaml") - settings = await Settings.from_yaml(config_path) if config_path.exists() else Settings() - configure_logging(settings.oe_em.log_level, settings.oe_em.log_file) - - log = structlog.get_logger().bind(component="oe_em") - - log.info("oe_em_starting") - - api = KuCoinAPI() - await api.fetch_pairs_and_fees() - - pair_list = [ - TradingPair( - symbol=p["symbol"], - base=p["base"], - quote=p["quote"], - fee_currency=p.get("fee_currency", ""), - ) - for p in api.get_all_pairs() - ] - excluded = set(settings.oe_em.excluded_currencies) - if excluded: - pair_list = [p for p in pair_list if p.base not in excluded and p.quote not in excluded] - log.info("pairs_loaded", count=len(pair_list), excluded=len(excluded)) - - fee_table = api._fee_table - triangle_index = enumerate_triangles( - pair_list, - fee_table, - hold_currencies=settings.oe_em.hold_currencies, - ) - log.info("triangles_enumerated", count=len(triangle_index.triangles)) - - needed_symbols = set() - for tri in triangle_index.triangles: - needed_symbols.update(tri.pair_symbols) - - async with aiohttp.ClientSession() as http_session: - subscribed = await sync_symbols_with_fh_ob( - settings.oe_em.fh_ob_url, - needed_symbols, - http_session, - log, - ) - - book_consumer = BookConsumer( - socket_path=settings.oe_em.socket_path, - on_update=lambda symbol, book: None, - ) - - signal_client: SignalSocketClient | None = None - signal_reconnect_task: asyncio.Task | None = None - - if settings.oe_em.send_signals: - signal_client = SignalSocketClient(socket_path=settings.oe_em.executor_socket_path) - - async def send_signal(signal_payload: dict) -> None: - """Forward a signal payload to the executor's Unix socket.""" - if signal_client: - await signal_client.send_signal(signal_payload) - - opp_engine = OpportunityEngine( - book_consumer=book_consumer, - triangle_index=triangle_index, - signal_threshold_bps=settings.oe_em.signal_threshold_bps, - log_path=settings.oe_em.opportunity_log_path, - kcs_discount_active=settings.oe_em.kcs_discount_active, - cooldown_seconds=settings.oe_em.cooldown_seconds, - on_signal=send_signal if settings.oe_em.send_signals else None, - ) - - risk_mgr = RiskManager() - - async def on_book_update(symbol: str, book) -> None: - """Callback invoked by BookConsumer whenever a subscribed book is refreshed.""" - if risk_mgr.should_continue(): - opp_engine.evaluate_triangles_for_pair(symbol) - - book_consumer.set_on_update(on_book_update) - - fh_ob_url = settings.oe_em.fh_ob_url - async with aiohttp.ClientSession() as http_session: - async with http_session.get(f"{fh_ob_url}/symbols") as resp: - resp.raise_for_status() - data = await resp.json() - symbols_now = set(data.get("symbols", [])) - missing = needed_symbols - symbols_now - if missing: - log.warning("symbols_not_subscribed_after_sync", count=len(missing)) - for sym in missing: - log.warning("unavailable_symbol", symbol=sym) - - if signal_client: - signal_reconnect_task = await signal_client.start() - log.info("signal_client_connecting", socket_path=str(settings.oe_em.executor_socket_path)) - - log.info( - "oe_em_ready", - triangles=len(triangle_index.triangles), - subscribed=len(symbols_now), - threshold_bps=settings.oe_em.signal_threshold_bps, - hold_currencies=settings.oe_em.hold_currencies, - send_signals=settings.oe_em.send_signals, - ) - - consumer_task = asyncio.create_task(book_consumer.start()) - - async def stats_loop() -> None: - """ - Periodically log evaluation stats so the operator can monitor the engine. - - Runs until cancelled. Suppressed by setting stats_interval_seconds <= 0. - """ - interval = settings.oe_em.stats_interval_seconds - if interval <= 0: - return - while True: - await asyncio.sleep(interval) - try: - s = opp_engine.get_stats() - books_tracked = sum( - 1 for t in triangle_index.triangles - if book_consumer.get_book(t.legs[0].pair.symbol) is not None - ) - except Exception as e: - log.error("stats_error", error=str(e)) - continue - log.info("stats", **{ - "triangles_evaluated": s.triangles_evaluated, - "signals_fired": s.signals_fired, - "books_tracked": books_tracked, - "subscribed": len(book_consumer._books), - "best_net_bps": f"{s.best_net_bps:.2f}", - "best_legs": s.best_legs, - }) - - stats_task = asyncio.create_task(stats_loop()) - - def shutdown(sig: signal.Signals) -> None: - """Begin graceful shutdown: stop book consumer, cancel stats, close signal client.""" - log.info("shutdown_signal_received", signal=sig.name) - asyncio.create_task(book_consumer.stop()) - stats_task.cancel() - if signal_reconnect_task: - signal_reconnect_task.cancel() - if signal_client: - asyncio.create_task(signal_client.close()) - - loop = asyncio.get_running_loop() - for sig in (signal.SIGTERM, signal.SIGINT): - loop.add_signal_handler(sig, lambda s=sig: shutdown(s)) - - tasks = [consumer_task, stats_task] - if signal_reconnect_task: - tasks.append(signal_reconnect_task) - - try: - await asyncio.gather(*tasks) - except asyncio.CancelledError: - log.info("oe_em_cancelled") - - -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file diff --git a/oe_em/book_consumer.py b/oe_em/book_consumer.py deleted file mode 100644 index b38b6a9..0000000 --- a/oe_em/book_consumer.py +++ /dev/null @@ -1,205 +0,0 @@ -""" -Order-book consumer for the opportunity engine. - -Connects to fh_ob's Unix-domain socket and receives JSON-serialized top-5 -order-book snapshots. On each update the registered on_update callback is -invoked, which triggers triangle evaluation in OpportunityEngine. - -The consumer maintains an in-memory snapshot of the last seen book for each -symbol, accessible via get_book(). -""" -import asyncio -import json -from dataclasses import dataclass, field -from pathlib import Path -from typing import Callable, Optional, Awaitable - -import structlog - -logger = structlog.get_logger().bind(component="book_consumer") - - -@dataclass -class BookLevel: - """ - A single price level in an order book. - - Attributes - ---------- - price, size : float - Price and available size at this level. - """ - - price: float - size: float - - @classmethod - def from_dict(cls, data: dict) -> "BookLevel": - return cls( - price=float(data["price"]), - size=float(data["size"]), - ) - - -@dataclass -class OrderBookTop5: - """ - Top-5 bid/ask snapshot for a single symbol. - - Attributes - ---------- - symbol : str - KuCoin symbol e.g. "BTC-USDT". - bids : list[BookLevel] - Best bids, most aggressive first. - asks : list[BookLevel] - Best asks, most aggressive first. - ts_ms : int - Timestamp (ms) of the snapshot from fh_ob. - """ - - symbol: str - bids: list[BookLevel] = field(default_factory=list) - asks: list[BookLevel] = field(default_factory=list) - ts_ms: int = 0 - - @classmethod - def from_json(cls, data: dict) -> "OrderBookTop5": - bids = [BookLevel.from_dict(b) for b in data.get("bids", [])] - asks = [BookLevel.from_dict(a) for a in data.get("asks", [])] - return cls( - symbol=data.get("symbol", ""), - bids=bids, - asks=asks, - ts_ms=data.get("ts_ms", 0), - ) - - -class BookConsumer: - """ - Consumes order-book snapshots from fh_ob and dispatches them to OpportunityEngine. - - Maintains a socket connection until EOF or error, then reconnects - automatically. Book updates are pushed to an internal queue; a dedicated - worker drains the queue and calls on_update, keeping the reader loop - non-blocking. - """ - - def __init__( - self, - socket_path: Path, - on_update: Callable[[str, OrderBookTop5], None | Awaitable[None]], - ) -> None: - self._socket_path = socket_path - self._on_update = on_update - self._running = False - self._books: dict[str, OrderBookTop5] = {} - self._log = logger - self._queue: asyncio.Queue[str] = asyncio.Queue(maxsize=2048) - self._queued: set[str] = set() - self._worker_task: Optional[asyncio.Task] = None - - def get_book(self, symbol: str) -> Optional[OrderBookTop5]: - """Return the last known book for a symbol, or None if not yet received.""" - return self._books.get(symbol) - - def set_on_update(self, callback: Callable[[str, OrderBookTop5], None]) -> None: - """Replace the on_update callback. Used when the callback needs a - reference to an object that does not yet exist at construction time.""" - self._on_update = callback - - async def start(self) -> None: - """ - Connect to fh_ob and run the consume loop until stop() is called. - - On unexpected disconnection a 1-second backoff is applied before - reconnecting. Interrupted cleanly by CancelledError. - """ - self._running = True - self._worker_task = asyncio.create_task(self._worker()) - while self._running: - try: - await self._connect() - except asyncio.CancelledError: - break - except Exception as e: - self._log.warning("connection_error", error=str(e)) - await asyncio.sleep(1.0) - if self._worker_task: - self._worker_task.cancel() - try: - await self._worker_task - except asyncio.CancelledError: - pass - - async def stop(self) -> None: - """Request the consume loop to exit on the next iteration.""" - self._running = False - - async def _worker(self) -> None: - """Drain the update queue and call on_update for each symbol.""" - while self._running: - try: - symbol = await asyncio.wait_for(self._queue.get(), timeout=0.5) - except asyncio.TimeoutError: - continue - except asyncio.CancelledError: - break - self._queued.discard(symbol) - book = self._books.get(symbol) - if not book: - continue - try: - result = self._on_update(symbol, book) - if asyncio.iscoroutine(result): - await result - except Exception as e: - self._log.error("on_update_error", symbol=symbol, error=str(e)) - - async def _connect(self) -> None: - """ - Open the Unix socket, read and queue messages until EOF or error. - - Each JSON line is parsed into an OrderBookTop5, stored in self._books, - and the symbol is pushed to the queue for the worker to evaluate. - The reader never blocks on evaluation. - """ - reader, writer = await asyncio.open_unix_connection(path=str(self._socket_path)) - self._log.info("connected", path=str(self._socket_path)) - - try: - while self._running: - try: - line = await reader.readline() - except asyncio.CancelledError: - raise - except Exception as e: - self._log.error("socket_read_error", error=str(e)) - break - - if not line: - self._log.warning("socket_eof") - break - - try: - data = json.loads(line.decode()) - except (json.JSONDecodeError, UnicodeDecodeError) as e: - self._log.warning("invalid_json", line=line[:50], error=str(e)) - continue - - book = OrderBookTop5.from_json(data) - if not book.symbol: - continue - - self._books[book.symbol] = book - if book.symbol not in self._queued: - self._queued.add(book.symbol) - try: - self._queue.put_nowait(book.symbol) - except asyncio.QueueFull: - self._queued.discard(book.symbol) - - finally: - writer.close() - await writer.wait_closed() - self._log.info("disconnected") \ No newline at end of file diff --git a/oe_em/config.py b/oe_em/config.py deleted file mode 100644 index b6c44d6..0000000 --- a/oe_em/config.py +++ /dev/null @@ -1,90 +0,0 @@ -""" -Configuration schema for the opportunity engine (oe_em). - -Parsed from config.yaml into OeEmSettings. Controls logging, signal -thresholds, the fee discount flag, symbol subscription, and the socket -path to the executor. -""" -import asyncio -from pathlib import Path -from typing import Optional - -import yaml -from pydantic import BaseModel, Field -from pydantic_settings import BaseSettings - - -class OeEmSettings(BaseModel): - """Settings that control oe_em's runtime behaviour.""" - - fh_ob_url: str = Field( - default="http://127.0.0.1:8000", - description="REST URL of fh_ob server", - ) - socket_path: Path = Field( - default=Path("/tmp/fh_ob.sock"), - description="Unix domain socket path for fh_ob", - ) - log_level: str = Field(default="INFO", description="Logging level") - log_file: Path = Field( - default=Path("/tmp/oe_em.log"), - description="Path to log file. Logs are written here in addition to stdout.", - ) - signal_threshold_bps: float = Field( - default=0.2, - description="Minimum net return in basis points to fire a signal", - ) - opportunity_log_path: Path = Field( - default=Path("/tmp/opportunities.log"), - description="Path to log detected opportunities", - ) - stats_interval_seconds: float = Field( - default=60.0, - description="Seconds between stats log lines. 0 to disable.", - ) - cooldown_seconds: float = Field( - default=0.0, - description="Deprecated — use executor's in-flight blocking instead. " - "Kept here for operational flexibility; set to 0.", - ) - excluded_currencies: list[str] = Field( - default_factory=list, - description="Currencies to exclude from triangle enumeration", - ) - hold_currencies: list[str] = Field( - default=["USDT"], - description="Currencies held as capital. Only triangles starting and ending in one of these are evaluated.", - ) - kcs_discount_active: bool = Field( - default=False, - description="If true, all taker fees are multiplied by 0.8 (KCS 20% fee discount)", - ) - executor_socket_path: Path = Field( - default=Path("/tmp/executor.sock"), - description="Unix domain socket path for executor", - ) - send_signals: bool = Field( - default=False, - description="If true, emit signals to executor socket when opportunities are found", - ) - - -class Settings(BaseSettings): - """Top-level settings parsed from config.yaml.""" - - oe_em: OeEmSettings = Field(default_factory=OeEmSettings) - fh_ob_url: Optional[str] = None - - @classmethod - async def from_yaml(cls, path: Path) -> "Settings": - """Load settings from a YAML file.""" - loop = asyncio.get_running_loop() - - def _read() -> dict: - with open(path) as f: - return yaml.safe_load(f) - - data = await loop.run_in_executor(None, _read) - return cls(**data) - - model_config = {"env_prefix": "TRIArb_", "extra": "ignore"} \ No newline at end of file diff --git a/oe_em/kucoin_api.py b/oe_em/kucoin_api.py deleted file mode 100644 index 204822e..0000000 --- a/oe_em/kucoin_api.py +++ /dev/null @@ -1,96 +0,0 @@ -""" -KuCoin API client for the opportunity engine. - -Fetches trading pair metadata (symbol, base, quote, fees, feeCurrency) -and builds an in-memory fee table keyed by base currency. This data is -used to construct the triangle index and to populate fee_currency in -signal payloads. -""" -import aiohttp -import structlog - -logger = structlog.get_logger().bind(component="kucoin_api") - -KUCoin_SYMBOLs_URL = "https://api.kucoin.com/api/v1/symbols" - -DEFAULT_FEES = { - "BTC": {"maker": 0.0010, "taker": 0.0010}, - "ETH": {"maker": 0.0010, "taker": 0.0010}, - "USDT": {"maker": 0.0010, "taker": 0.0010}, - "USDC": {"maker": 0.0010, "taker": 0.0010}, -} - - -class KuCoinAPI: - """ - Fetch and cache KuCoin pair metadata and per-currency fee rates. - - Used at startup to build the fee table required by triangle enumeration. - """ - - def __init__(self) -> None: - self._fee_table: dict[str, dict[str, float]] = {} - self._pairs: dict[str, dict] = {} - self._log = logger - - async def fetch_pairs_and_fees(self) -> None: - """ - Fetch all symbols from KuCoin, populate _pairs and _fee_table. - - Logs warnings for any symbol that cannot be parsed and skips it. - Sets feeCurrency to the empty string when absent in the API response. - """ - async with aiohttp.ClientSession() as session: - async with session.get(KUCoin_SYMBOLs_URL) as resp: - resp.raise_for_status() - payload = await resp.json() - - for item in payload.get("data", []): - symbol = item.get("symbol", "") - base = item.get("baseCurrency", "") - quote = item.get("quoteCurrency", "") - maker_fee = float(item.get("makerFeeRate", 0)) - taker_fee = float(item.get("takerFeeRate", 0)) - enable_trading = item.get("enableTrading", False) - - if not all([symbol, base, quote]): - continue - - fee_currency = item.get("feeCurrency") or "" - self._pairs[symbol] = { - "symbol": symbol, - "base": base, - "quote": quote, - "maker_fee": maker_fee, - "taker_fee": taker_fee, - "enable_trading": enable_trading, - "fee_currency": fee_currency, - } - - if base not in self._fee_table: - self._fee_table[base] = { - "maker": maker_fee if maker_fee > 0 else DEFAULT_FEES.get(base, {}).get("maker", 0.0010), - "taker": taker_fee if taker_fee > 0 else DEFAULT_FEES.get(base, {}).get("taker", 0.0010), - } - - self._log.info("fee_table_loaded", bases=len(self._fee_table), pairs=len(self._pairs)) - - def get_fee(self, symbol: str, side: str) -> float: - """ - Return the taker fee rate for the base currency of a given symbol. - - Falls back to DEFAULT_FEES when the base is not in the fee table. - """ - if symbol not in self._pairs: - return 0.0010 - base = self._pairs[symbol]["base"] - fee_data = self._fee_table.get(base, DEFAULT_FEES.get(base, {"maker": 0.0010, "taker": 0.0010})) - return fee_data.get(side, 0.0010) - - def get_pair_info(self, symbol: str) -> dict | None: - """Return the full info dict for a symbol, or None if not loaded.""" - return self._pairs.get(symbol) - - def get_all_pairs(self) -> list[dict]: - """Return all pairs where enable_trading is True.""" - return [p for p in self._pairs.values() if p["enable_trading"]] \ No newline at end of file diff --git a/oe_em/opportunity.py b/oe_em/opportunity.py deleted file mode 100644 index 245344e..0000000 --- a/oe_em/opportunity.py +++ /dev/null @@ -1,408 +0,0 @@ -""" -Opportunity detection engine. - -Evaluates all triangles involving a given symbol on every order-book update, -computes the net return after fees, and fires a signal when the return exceeds -the configured threshold. Supports KCS fee discounts and per-triangle cooldowns -to avoid flooding the executor with duplicate signals. -""" -import asyncio -import time -import uuid -from dataclasses import dataclass -from pathlib import Path -from typing import Optional, Callable - -import structlog - -from oe_em.book_consumer import BookConsumer -from oe_em.triangle_enum import Triangle, TriangleLeg, TriangleIndex - - -KCS_FEE_DISCOUNT = 0.8 - - -logger = structlog.get_logger().bind(component="opportunity") - - -def max_volume_for_triangle( - triangle: Triangle, - book_consumer: BookConsumer, - primary_quote: str, - fee_mult: float = 1.0, -) -> Optional[float]: - """Compute max volume — kept for backward compatibility, but _build_full now does this inline.""" - return None - - -@dataclass -class OpportunitySignal: - """ - A detected profitable triangular arbitrage opportunity. - - Emitted to the signal client when net_return_bps exceeds the threshold. - """ - triangle: Triangle - direction: str - net_return_bps: float - max_volume: float - leg_details: list[dict] - ts_ms: int - book_ts_ms: int - books: list[dict] - - -@dataclass -class Stats: - """ - Running statistics counters for opportunity evaluation. - - Updated on every evaluate_triangles_for_pair call and returned by - get_stats(). - """ - triangles_evaluated: int = 0 - signals_fired: int = 0 - books_missing: int = 0 - books_full: int = 0 - best_net_bps: float = -999999.0 - worst_net_bps: float = 999999.0 - last_eval_ts_ms: int = 0 - best_legs: str = "" - worst_legs: str = "" - - -@dataclass -class _EvalResult: - """ - Intermediate result of triangle evaluation. - - Attributes - ---------- - net_return_bps : float - Net return after fees in basis points. - max_volume : float - Maximum safe input volume for the triangle. - leg_details : list[dict] - Per-leg dictionary suitable for serialising into a signal payload. - book_ts_ms : int - Timestamp (ms) of the oldest order book used in the evaluation. - books : list[dict] - Serialised top-of-book for each leg, in order. - """ - - net_return_bps: float - max_volume: float - leg_details: list[dict] - book_ts_ms: int - books: list[dict] - - def leg_str(self) -> str: - return " -> ".join( - f"{d['pair']}({d['input_currency']}->{d['output_currency']})" for d in self.leg_details - ) - - -class OpportunityEngine: - """ - Detects and reports triangular arbitrage opportunities. - - On every order-book update (triggered via the on_update callback) the engine - evaluates every triangle that involves the updated symbol. If the net - return after fees exceeds signal_threshold_bps and the cooldown for that - triangle has elapsed, a signal is dispatched to the executor via the - on_signal callback. - """ - - def __init__( - self, - book_consumer: BookConsumer, - triangle_index: TriangleIndex, - signal_threshold_bps: float, - log_path: Path, - kcs_discount_active: bool = False, - cooldown_seconds: float = 5.0, - on_signal: Optional[callable] = None, - ) -> None: - self._book_consumer = book_consumer - self._triangle_index = triangle_index - self._threshold_bps = signal_threshold_bps - self._log_path = log_path - self._fee_mult = KCS_FEE_DISCOUNT if kcs_discount_active else 1.0 - self._cooldown_seconds = cooldown_seconds - self._last_signal_ts: dict[frozenset[str], float] = {} - self._log = logger - self._stats = Stats() - self._on_signal = on_signal - self._net_cache: dict[frozenset[str], tuple[float, tuple[int, int, int]]] = {} - - def _compute_net_only( - self, - triangle: Triangle, - ) -> tuple[Optional[float], int]: - """ - Compute net return BPS and min book ts_ms without building books/leg_details. - - Returns (net_return_bps, book_ts_ms) or (None, 0) if any book is missing. - Used for fast-path threshold filtering before expensive serialization. - """ - cumulative = 1.0 - book_ts_ms = 0 - - for leg in triangle.legs: - book = self._book_consumer.get_book(leg.pair.symbol) - if not book: - return None, 0 - - if leg.input_currency == leg.pair.base: - level = book.bids[0] if book.bids else None - if not level: - return None, 0 - rate = level.price - else: - level = book.asks[0] if book.asks else None - if not level: - return None, 0 - rate = 1.0 / level.price - - fee_factor = 1.0 - leg.taker_fee * self._fee_mult - cumulative *= rate * fee_factor - - if book_ts_ms == 0 or book.ts_ms < book_ts_ms: - book_ts_ms = book.ts_ms - - net_return = (cumulative - 1.0) * 10000 - return net_return, book_ts_ms - - def _build_full( - self, - triangle: Triangle, - ) -> Optional[_EvalResult]: - """ - Single-pass evaluation: compute net return, build leg_details/books, - and compute max_volume — all in one loop over the triangle's legs. - """ - cumulative = 1.0 - max_v0_list: list[float] = [] - cumulative_mult = 1.0 - leg_details = [] - books: list[dict] = [] - book_ts_ms = 0 - - for leg in triangle.legs: - book = self._book_consumer.get_book(leg.pair.symbol) - if not book: - return None - - if leg.input_currency == leg.pair.base: - level = book.bids[0] if book.bids else None - if not level: - return None - max_input = level.size - rate = level.price - else: - level = book.asks[0] if book.asks else None - if not level: - return None - max_input = level.size * level.price - rate = 1.0 / level.price - - fee_factor = 1.0 - leg.taker_fee * self._fee_mult - cumulative *= rate * fee_factor - - if cumulative_mult > 0: - max_v0_list.append(max_input / cumulative_mult) - cumulative_mult *= rate * fee_factor - - leg_details.append({ - "pair": leg.pair.symbol, - "input_currency": leg.input_currency, - "output_currency": leg.output_currency, - "exchange_rate": rate, - "fee_rate": leg.taker_fee, - "fee_currency": leg.pair.fee_currency, - }) - - books.append({ - "symbol": book.symbol, - "bids": [ - {"price": b.price, "size": b.size} for b in book.bids - ], - "asks": [ - {"price": a.price, "size": a.size} for a in book.asks - ], - "ts_ms": book.ts_ms, - }) - - if book_ts_ms == 0 or book.ts_ms < book_ts_ms: - book_ts_ms = book.ts_ms - - net_return = (cumulative - 1.0) * 10000 - max_volume = min(max_v0_list) if max_v0_list else 0.0 - return _EvalResult( - net_return_bps=net_return, - max_volume=max_volume, - leg_details=leg_details, - book_ts_ms=book_ts_ms, - books=books, - ) - - def evaluate_triangles_for_pair(self, symbol: str) -> list[OpportunitySignal]: - """ - Evaluate all triangles that include the given symbol. - - Called by the book consumer's on_update callback whenever an order book - is refreshed. Updates stats, emits signals for triangles that clear the - threshold and cooldown, and returns the list of signals (primarily for - use in tests). - """ - triangles = self._triangle_index.get_triangles_for_pair(symbol) - signals: list[OpportunitySignal] = [] - now_ts_ms = int(time.time() * 1000) - - for triangle in triangles: - cache_key = triangle.currencies - leg_books = [self._book_consumer.get_book(leg.pair.symbol) for leg in triangle.legs] - - if any(b is None for b in leg_books): - self._stats.triangles_evaluated += 1 - self._stats.books_missing += 1 - self._stats.last_eval_ts_ms = now_ts_ms - continue - - current_ts = tuple(b.ts_ms for b in leg_books) - cached = self._net_cache.get(cache_key) - - if cached and cached[1] == current_ts: - net_bps = cached[0] - book_ts_ms = min(current_ts) - else: - try: - net_bps, book_ts_ms = self._compute_net_only(triangle) - except Exception as e: - self._log.error("triangle_compute_error", triangle=str(triangle.currencies), error=str(e)) - self._stats.triangles_evaluated += 1 - self._stats.last_eval_ts_ms = now_ts_ms - continue - if net_bps is not None: - self._net_cache[cache_key] = (net_bps, current_ts) - - self._stats.triangles_evaluated += 1 - if net_bps is None: - self._stats.books_missing += 1 - self._stats.last_eval_ts_ms = now_ts_ms - continue - - self._stats.books_full += 1 - - if net_bps > self._stats.best_net_bps: - self._stats.best_net_bps = net_bps - if net_bps < self._stats.worst_net_bps: - self._stats.worst_net_bps = net_bps - - if net_bps <= self._threshold_bps: - self._stats.last_eval_ts_ms = now_ts_ms - continue - - try: - result = self._build_full(triangle) - except Exception as e: - self._log.error("triangle_compute_full_error", triangle=str(triangle.currencies), error=str(e)) - continue - if result is None: - continue - - sig = OpportunitySignal( - triangle=triangle, - direction="forward", - net_return_bps=net_bps, - max_volume=result.max_volume, - leg_details=result.leg_details, - ts_ms=now_ts_ms, - book_ts_ms=result.book_ts_ms, - books=result.books, - ) - signals.append(sig) - self._stats.signals_fired += 1 - now = time.time() - last = self._last_signal_ts.get(triangle.currencies, 0.0) - if now - last >= self._cooldown_seconds: - self._last_signal_ts[triangle.currencies] = now - self._notify_opportunity(sig) - - self._stats.last_eval_ts_ms = now_ts_ms - - return signals - - def get_stats(self) -> Stats: - """ - Return a snapshot of the current stats counters. - - The returned Stats object is a copy; the internal counters continue - to accumulate. - """ - return Stats( - triangles_evaluated=self._stats.triangles_evaluated, - signals_fired=self._stats.signals_fired, - books_missing=self._stats.books_missing, - books_full=self._stats.books_full, - best_net_bps=self._stats.best_net_bps, - worst_net_bps=self._stats.worst_net_bps, - last_eval_ts_ms=self._stats.last_eval_ts_ms, - best_legs=self._stats.best_legs, - worst_legs=self._stats.worst_legs, - ) - - def _notify_opportunity(self, sig: OpportunitySignal) -> None: - """ - Log the opportunity and dispatch the signal to the executor. - - The signal is sent over a Unix-domain socket via the on_signal callback - (which is the send_signal coroutine of SignalSocketClient). A done - callback is attached so any exception raised inside the receiver is - logged rather than propagating silently. - """ - ts = sig.ts_ms - direction = sig.direction - net_bps = sig.net_return_bps - leg_str = " -> ".join( - f"{ld['pair']}({ld['input_currency']}->{ld['output_currency']})" for ld in sig.leg_details - ) - - msg = ( - f"[{ts}] OPPORTUNITY FOUND | " - f"direction={direction} | " - f"net_return={net_bps:.2f} bps | " - f"max_volume={sig.max_volume} | " - f"legs: {leg_str}" - ) - - self._log.info("opportunity_detected", **{ - "ts_ms": ts, - "book_ts_ms": sig.book_ts_ms, - "direction": direction, - "net_return_bps": net_bps, - "legs": leg_str, - "max_volume": str(sig.max_volume), - }) - - if self._on_signal: - signal_payload = { - "type": "signal", - "correlation_id": str(uuid.uuid4()), - "triangle_key": list(sig.triangle.currencies), - "primary_quote": sig.triangle.primary_quote, - "legs": sig.leg_details, - "predicted_bps": net_bps, - "max_volume": str(sig.max_volume), - "ts_ms": ts, - "book_ts_ms": sig.book_ts_ms, - # books: full top-5 order books per leg — reserved for future - # volume-extension logic (analyze deeper levels to compute how - # far profit shrinks when volume is increased beyond top-of-book). - "books": sig.books, - } - try: - task = asyncio.create_task(self._on_signal(signal_payload)) - task.add_done_callback(lambda t: t.exception() and self._log.warning("on_signal_error", error=str(t.exception()))) - except Exception as e: - self._log.warning("on_signal_error", error=str(e)) \ No newline at end of file diff --git a/oe_em/risk.py b/oe_em/risk.py deleted file mode 100644 index 22172b6..0000000 --- a/oe_em/risk.py +++ /dev/null @@ -1,26 +0,0 @@ -""" -Placeholder risk management module for the opportunity engine. - -The RiskManager provides a hook for future risk checks (position limits, -daily loss gates, etc.). Currently it is a pass-through: should_continue() -always returns True. -""" -import structlog - -logger = structlog.get_logger().bind(component="risk") - - -class RiskManager: - """ - Enumerates and checks risk constraints before opportunity evaluation. - - Currently a stub that accepts all opportunities. Replace should_continue() - with real checks as needed. - """ - - def __init__(self) -> None: - self._log = logger - - def should_continue(self) -> bool: - """Return True if evaluation should proceed; False to skip this cycle.""" - return True \ No newline at end of file diff --git a/oe_em/socket_client.py b/oe_em/socket_client.py deleted file mode 100644 index 8ae7366..0000000 --- a/oe_em/socket_client.py +++ /dev/null @@ -1,113 +0,0 @@ -""" -Unix-domain socket client for sending opportunity signals to the executor. - -Connects to the executor's SignalSocketServer and keeps the connection open for -burst sending. If the connection is lost the background reconnect loop will -retry every 2 seconds. All send operations are non-blocking: a warning is -logged if the underlying writer is not connected rather than raising. -""" -import asyncio -import json -import uuid -from pathlib import Path - -import structlog - -logger = structlog.get_logger().bind(component="signal_socket_client") - - -class SignalSocketClient: - """ - Non-blocking signal sender that maintains a persistent Unix-socket connection. - - The caller invokes send_signal() for each opportunity; the client serialises - the payload and writes it to the socket. If the socket is not connected - (e.g. after a server restart) the signal is dropped with a warning log. - """ - - def __init__(self, socket_path: Path) -> None: - self._socket_path = socket_path - self._log = logger - self._writer: asyncio.StreamWriter | None = None - self._running = False - self._reconnect_task: asyncio.Task | None = None - - async def start(self) -> asyncio.Task: - """Start the background reconnect loop and return the task.""" - self._running = True - self._reconnect_task = asyncio.create_task(self._reconnect_loop()) - return self._reconnect_task - - async def _reconnect_loop(self) -> None: - """ - Attempt to connect and then wait for the connection to close. - - On connection failure a 2-second backoff is applied before retrying. - The loop exits when self._running becomes False (see close()). - """ - while self._running: - try: - reader, writer = await asyncio.open_unix_connection(path=str(self._socket_path)) - self._writer = writer - self._log.info("connected", path=str(self._socket_path)) - try: - await writer.wait_closed() - except Exception: - pass - except (ConnectionRefusedError, FileNotFoundError) as e: - if not self._running: - break - self._log.warning("connection_retrying", error=str(e)) - await asyncio.sleep(2.0) - except Exception as e: - self._log.error("reconnect_error", error=str(e)) - await asyncio.sleep(5.0) - finally: - self._writer = None - - async def send_signal(self, signal: dict) -> None: - """ - Serialise a signal dict and write it to the socket. - - If the socket is not connected this is a no-op (logged as warning). - The correlation_id is assigned here if not already set. - """ - writer = self._writer - if not writer: - self._log.warning("not_connected") - return - - correlation_id = signal.get("correlation_id", "") or str(uuid.uuid4()) - signal["correlation_id"] = correlation_id - - msg = json.dumps(signal) + "\n" - try: - writer.write(msg.encode()) - await writer.drain() - self._log.debug("signal_sent", correlation_id=correlation_id) - except Exception as e: - self._log.error("signal_send_failed", correlation_id=correlation_id, error=str(e)) - - async def close(self) -> None: - """ - Stop the reconnect loop and close the writer if open. - - Safe to call multiple times; after close() any send_signal() call - will be a no-op. - """ - self._running = False - if self._reconnect_task: - self._reconnect_task.cancel() - try: - await self._reconnect_task - except asyncio.CancelledError: - pass - self._reconnect_task = None - writer = self._writer - if writer: - self._writer = None - writer.close() - try: - await writer.wait_closed() - except Exception: - pass diff --git a/oe_em/triangle_enum.py b/oe_em/triangle_enum.py deleted file mode 100644 index 775c7f1..0000000 --- a/oe_em/triangle_enum.py +++ /dev/null @@ -1,291 +0,0 @@ -""" -Triangle enumeration for triangular arbitrage. - -Provides the core data structures (TradingPair, TriangleLeg, Triangle, -TriangleIndex) and the enumerate_triangles() function that enumerates all -valid triangles from a list of TradingPairs using a fee table. - -A triangle is a directed cycle of three currencies c1 → c2 → c3 → c1 where -each leg corresponds to a tradable pair. The hold currency (primary quote) -is the currency that enters and exits the cycle; it must be one of the three -currencies and is typically USDT or USDC. -""" -from dataclasses import dataclass, field -from typing import Optional - -import structlog - -logger = structlog.get_logger().bind(component="triangle_enum") - - -@dataclass(frozen=True) -class TradingPair: - """ - A single tradable currency pair on KuCoin. - - Attributes - ---------- - symbol : str - KuCoin symbol e.g. "BTC-USDT". - base : str - Base currency code e.g. "BTC". - quote : str - Quote currency code e.g. "USDT". - fee_currency : str - Currency in which fees are denominated for this pair (from KuCoin's - feeCurrency field). Included in signal payloads so the executor - can interpret fee amounts correctly. - """ - - symbol: str - base: str - quote: str - fee_currency: str = "" - - @classmethod - def from_api_response(cls, data: dict) -> Optional["TradingPair"]: - """ - Parse a KuCoin /symbols API response entry into a TradingPair. - - Returns None if enableTrading is not True or if required fields - are missing. - """ - if data.get("enableTrading") is not True: - return None - symbol = data.get("symbol", "") - base = data.get("baseCurrency", "") - quote = data.get("quoteCurrency", "") - if not all([symbol, base, quote]): - return None - return cls(symbol=symbol, base=base, quote=quote, fee_currency=data.get("feeCurrency", "")) - - @property - def currency_pair(self) -> frozenset[str]: - """The unordered set of the two currencies in this pair.""" - return frozenset([self.base, self.quote]) - - -@dataclass -class TriangleLeg: - """ - One directed hop in a triangle: input_currency → output_currency via a pair. - - Attributes - ---------- - pair : TradingPair - The trading pair used for this leg. - input_currency : str - Currency entering this leg. - output_currency : str - Currency leaving this leg. - maker_fee, taker_fee : float - Fee rates (fraction) for this leg's base currency. - """ - - pair: TradingPair - input_currency: str - output_currency: str - maker_fee: float - taker_fee: float - - -@dataclass -class Triangle: - """ - A complete triangular arbitrage cycle. - - Attributes - ---------- - legs : list[TriangleLeg] - The three directed hops (must sum to identity: c1 → c2 → c3 → c1). - currencies : frozenset[str] - The three distinct currency codes in the cycle. - pair_symbols : frozenset[str] - The three KuCoin symbols involved. - primary_quote : str - The hold currency that enters and exits the cycle. All minimum order - sizes and volumes are expressed in terms of this currency. - """ - - legs: list[TriangleLeg] = field(default_factory=list) - currencies: frozenset[str] = field(default_factory=frozenset()) - pair_symbols: frozenset[str] = field(default_factory=frozenset()) - primary_quote: str = "" - - -@dataclass -class TriangleIndex: - """ - Inverted index of triangles by pair symbol. - - Allows O(1) lookup of all triangles that involve a given symbol, which is - the primary query pattern used by OpportunityEngine on every book update. - """ - - triangles: list[Triangle] = field(default_factory=list) - by_pair: dict[str, list[Triangle]] = field(default_factory=dict) - - def get_triangles_for_pair(self, symbol: str) -> list[Triangle]: - """Return all triangles that include the given symbol.""" - return self.by_pair.get(symbol, []) - - -def _build_pair_map(pairs: list[TradingPair]) -> dict[frozenset[str], TradingPair]: - """ - Build a mapping from unordered currency pair (frozenset) to TradingPair. - - Used to look up pairs by their two currencies regardless of direction. - """ - pair_map: dict[frozenset[str], TradingPair] = {} - for p in pairs: - pair_map[p.currency_pair] = p - return pair_map - - -def _build_edge_map(pairs: list[TradingPair]) -> dict[str, list[frozenset[str]]]: - """ - Build an adjacency map: currency → list of currency pairs involving that currency. - - This is the graph representation used by enumerate_triangles to efficiently - find paths of length 2 (c1 → c2 → c3) without enumerating all O(n²) pairs. - """ - edge_map: dict[str, list[frozenset[str]]] = {} - for p in pairs: - for c in [p.base, p.quote]: - if c not in edge_map: - edge_map[c] = [] - edge_map[c].append(p.currency_pair) - return edge_map - - -def _build_legs( - c1: str, c2: str, c3: str, - pair_map: dict[frozenset[str], TradingPair], - fee_table: dict[str, dict[str, float]], -) -> list[TriangleLeg]: - """ - Construct the three TriangleLegs for the cycle c1 → c2 → c3 → c1. - - Looks up each leg's pair via pair_map (keyed by unordered currencies) and - resolves fees from fee_table using the base currency of each pair. - """ - default_fee = {"maker": 0.0010, "taker": 0.0010} - - def fee_for(base: str, side: str) -> float: - return fee_table.get(base, default_fee).get(side, 0.0010) - - p1 = pair_map[frozenset([c1, c2])] - p2 = pair_map[frozenset([c2, c3])] - p3 = pair_map[frozenset([c3, c1])] - - leg1 = TriangleLeg( - pair=p1, - input_currency=c1, - output_currency=c2, - maker_fee=fee_for(p1.base, "maker"), - taker_fee=fee_for(p1.base, "taker"), - ) - leg2 = TriangleLeg( - pair=p2, - input_currency=c2, - output_currency=c3, - maker_fee=fee_for(p2.base, "maker"), - taker_fee=fee_for(p2.base, "taker"), - ) - leg3 = TriangleLeg( - pair=p3, - input_currency=c3, - output_currency=c1, - maker_fee=fee_for(p3.base, "maker"), - taker_fee=fee_for(p3.base, "taker"), - ) - return [leg1, leg2, leg3] - - -def enumerate_triangles( - pairs: list[TradingPair], - fee_table: dict[str, dict[str, float]], - hold_currencies: Optional[list[str]] = None, -) -> TriangleIndex: - """ - Enumerate all valid triangular arbitrage cycles from a list of TradingPairs. - - A valid triangle must: - - Contain exactly three distinct currencies. - - Include at least one hold currency (default: ["USDT"]), which becomes - the primary_quote / entry/exit currency. - - Have all three legs (c1→c2, c2→c3, c3→c1) represent tradable pairs. - - Parameters - ---------- - pairs : list[TradingPair] - All known trading pairs (filtered to enableTrading == True by the caller). - fee_table : dict[str, dict[str, float]] - Per-base-currency fee rates as returned by oe_em.kucoin_api. - hold_currencies : list[str] or None - Currencies that may serve as the entry/exit point. Defaults to ["USDT"]. - - Returns - ------- - TriangleIndex - Contains all triangles and an inverted index by symbol for fast lookup. - """ - if hold_currencies is None: - hold_currencies = ["USDT"] - hold_set = set(hold_currencies) - - pair_map = _build_pair_map(pairs) - edge_map = _build_edge_map(pairs) - - triangles: list[Triangle] = [] - by_pair: dict[str, list[Triangle]] = {} - seen: set[frozenset[str]] = set() - - all_currencies = sorted(edge_map.keys()) - - for c1 in all_currencies: - for c2_edge in edge_map.get(c1, []): - c2 = next(x for x in c2_edge if x != c1) - for c3_edge in edge_map.get(c2, []): - c3 = next(x for x in c3_edge if x != c2) - if c3 == c1: - continue - if c3_edge == c2_edge: - continue - if frozenset([c1, c3]) not in pair_map: - continue - currencies = frozenset([c1, c2, c3]) - if currencies in seen: - continue - seen.add(currencies) - - in_triangle = hold_set & currencies - if not in_triangle: - continue - - for hold_curr in in_triangle: - others = [c for c in [c1, c2, c3] if c != hold_curr] - for x, y in [(others[0], others[1]), (others[1], others[0])]: - legs = _build_legs(hold_curr, x, y, pair_map, fee_table) - pair_symbols = frozenset([ - legs[0].pair.symbol, - legs[1].pair.symbol, - legs[2].pair.symbol, - ]) - - triangle = Triangle( - legs=legs, - currencies=currencies, - pair_symbols=pair_symbols, - primary_quote=hold_curr, - ) - triangles.append(triangle) - - for sym in pair_symbols: - if sym not in by_pair: - by_pair[sym] = [] - by_pair[sym].append(triangle) - - logger.info("triangles_enumerated", total=len(triangles), indexed_pairs=len(by_pair)) - - return TriangleIndex(triangles=triangles, by_pair=by_pair) \ No newline at end of file