triangular_arbitrage_bot/oe_em/book_consumer.py

205 lines
6.6 KiB
Python

"""
Order-book consumer for the opportunity engine.
Connects to fh_ob's Unix-domain socket and receives JSON-serialized top-5
order-book snapshots. On each update the registered on_update callback is
invoked, which triggers triangle evaluation in OpportunityEngine.
The consumer maintains an in-memory snapshot of the last seen book for each
symbol, accessible via get_book().
"""
import asyncio
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional, Awaitable
import structlog
logger = structlog.get_logger().bind(component="book_consumer")
@dataclass
class BookLevel:
"""
A single price level in an order book.
Attributes
----------
price, size : float
Price and available size at this level.
"""
price: float
size: float
@classmethod
def from_dict(cls, data: dict) -> "BookLevel":
return cls(
price=float(data["price"]),
size=float(data["size"]),
)
@dataclass
class OrderBookTop5:
"""
Top-5 bid/ask snapshot for a single symbol.
Attributes
----------
symbol : str
KuCoin symbol e.g. "BTC-USDT".
bids : list[BookLevel]
Best bids, most aggressive first.
asks : list[BookLevel]
Best asks, most aggressive first.
ts_ms : int
Timestamp (ms) of the snapshot from fh_ob.
"""
symbol: str
bids: list[BookLevel] = field(default_factory=list)
asks: list[BookLevel] = field(default_factory=list)
ts_ms: int = 0
@classmethod
def from_json(cls, data: dict) -> "OrderBookTop5":
bids = [BookLevel.from_dict(b) for b in data.get("bids", [])]
asks = [BookLevel.from_dict(a) for a in data.get("asks", [])]
return cls(
symbol=data.get("symbol", ""),
bids=bids,
asks=asks,
ts_ms=data.get("ts_ms", 0),
)
class BookConsumer:
"""
Consumes order-book snapshots from fh_ob and dispatches them to OpportunityEngine.
Maintains a socket connection until EOF or error, then reconnects
automatically. Book updates are pushed to an internal queue; a dedicated
worker drains the queue and calls on_update, keeping the reader loop
non-blocking.
"""
def __init__(
self,
socket_path: Path,
on_update: Callable[[str, OrderBookTop5], None | Awaitable[None]],
) -> None:
self._socket_path = socket_path
self._on_update = on_update
self._running = False
self._books: dict[str, OrderBookTop5] = {}
self._log = logger
self._queue: asyncio.Queue[str] = asyncio.Queue(maxsize=2048)
self._queued: set[str] = set()
self._worker_task: Optional[asyncio.Task] = None
def get_book(self, symbol: str) -> Optional[OrderBookTop5]:
"""Return the last known book for a symbol, or None if not yet received."""
return self._books.get(symbol)
def set_on_update(self, callback: Callable[[str, OrderBookTop5], None]) -> None:
"""Replace the on_update callback. Used when the callback needs a
reference to an object that does not yet exist at construction time."""
self._on_update = callback
async def start(self) -> None:
"""
Connect to fh_ob and run the consume loop until stop() is called.
On unexpected disconnection a 1-second backoff is applied before
reconnecting. Interrupted cleanly by CancelledError.
"""
self._running = True
self._worker_task = asyncio.create_task(self._worker())
while self._running:
try:
await self._connect()
except asyncio.CancelledError:
break
except Exception as e:
self._log.warning("connection_error", error=str(e))
await asyncio.sleep(1.0)
if self._worker_task:
self._worker_task.cancel()
try:
await self._worker_task
except asyncio.CancelledError:
pass
async def stop(self) -> None:
"""Request the consume loop to exit on the next iteration."""
self._running = False
async def _worker(self) -> None:
"""Drain the update queue and call on_update for each symbol."""
while self._running:
try:
symbol = await asyncio.wait_for(self._queue.get(), timeout=0.5)
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
self._queued.discard(symbol)
book = self._books.get(symbol)
if not book:
continue
try:
result = self._on_update(symbol, book)
if asyncio.iscoroutine(result):
await result
except Exception as e:
self._log.error("on_update_error", symbol=symbol, error=str(e))
async def _connect(self) -> None:
"""
Open the Unix socket, read and queue messages until EOF or error.
Each JSON line is parsed into an OrderBookTop5, stored in self._books,
and the symbol is pushed to the queue for the worker to evaluate.
The reader never blocks on evaluation.
"""
reader, writer = await asyncio.open_unix_connection(path=str(self._socket_path))
self._log.info("connected", path=str(self._socket_path))
try:
while self._running:
try:
line = await reader.readline()
except asyncio.CancelledError:
raise
except Exception as e:
self._log.error("socket_read_error", error=str(e))
break
if not line:
self._log.warning("socket_eof")
break
try:
data = json.loads(line.decode())
except (json.JSONDecodeError, UnicodeDecodeError) as e:
self._log.warning("invalid_json", line=line[:50], error=str(e))
continue
book = OrderBookTop5.from_json(data)
if not book.symbol:
continue
self._books[book.symbol] = book
if book.symbol not in self._queued:
self._queued.add(book.symbol)
try:
self._queue.put_nowait(book.symbol)
except asyncio.QueueFull:
self._queued.discard(book.symbol)
finally:
writer.close()
await writer.wait_closed()
self._log.info("disconnected")