cleanup: remove dead fh_ob/oe_em Python modules, add book_ts_ms to screen output

This commit is contained in:
nicolas 2026-05-24 21:36:48 -03:00
parent 43333984a3
commit 97b341fec9
16 changed files with 3 additions and 2100 deletions

View File

@ -188,6 +188,7 @@ class ExecutionReport:
profit: float = 0.0 profit: float = 0.0
effective_bps: float = 0.0 effective_bps: float = 0.0
error: str = "" error: str = ""
book_ts_ms: int = 0
@dataclass @dataclass
@ -837,6 +838,7 @@ class Executor:
effective_bps=effective_bps, effective_bps=effective_bps,
ts_ms=int(time.time() * 1000), ts_ms=int(time.time() * 1000),
timings=timings, timings=timings,
book_ts_ms=book_ts_ms,
) )
self._last_trade_ts_ms[in_flight.triangle_key] = report.ts_ms self._last_trade_ts_ms[in_flight.triangle_key] = report.ts_ms
self._emit_report(report) self._emit_report(report)
@ -867,6 +869,7 @@ class Executor:
f"{ts_iso} {report.status.upper()} | corr={report.correlation_id} | " f"{ts_iso} {report.status.upper()} | corr={report.correlation_id} | "
f"triangle={report.triangle_key} | predicted_bps={report.predicted_bps:.2f} | " f"triangle={report.triangle_key} | predicted_bps={report.predicted_bps:.2f} | "
f"effective_bps={report.effective_bps:.2f} | " f"effective_bps={report.effective_bps:.2f} | "
f"book_ts={report.book_ts_ms} | "
f"profit={report.profit:.4f}" f"profit={report.profit:.4f}"
f"{f' | error={report.error}' if report.error else ''}", f"{f' | error={report.error}' if report.error else ''}",
flush=True, flush=True,

View File

@ -1,15 +0,0 @@
"""Feed Handler + Order Book Mirror."""
from fh_ob.ws_client import KuCoinWSClient
from fh_ob.book_store import BookStore, OrderBookTop5, BookLevel
from fh_ob.socket_server import SocketServer
from fh_ob.rest_server import create_app
__all__ = [
"KuCoinWSClient",
"BookStore",
"OrderBookTop5",
"BookLevel",
"SocketServer",
"create_app",
]

View File

@ -1,87 +0,0 @@
import asyncio
import signal
from pathlib import Path
import structlog
import uvicorn
from common.config import Settings
from common.log import configure_logging
from fh_ob.book_store import BookStore
from fh_ob.rest_server import create_app
from fh_ob.socket_server import SocketServer
from fh_ob.ws_client import KuCoinWSClient
async def main() -> None:
config_path = Path("config.yaml")
settings = await Settings.from_yaml(config_path) if config_path.exists() else Settings()
configure_logging(settings.fh_ob.log_level, settings.fh_ob.log_file)
log = structlog.get_logger().bind(component="fh_ob")
log.info("fh_ob_starting", symbols=settings.fh_ob.symbols)
book_store = BookStore()
socket_server = SocketServer(settings.fh_ob.socket_path)
async def on_book_update(book):
try:
await socket_server.broadcast(book)
except Exception:
pass
ws_client = KuCoinWSClient(
settings=settings.fh_ob,
book_store=book_store,
on_book_update=on_book_update,
)
rest_app = create_app(
book_store,
get_socket_clients=socket_server.client_count,
get_subscribed_count=ws_client.subscribed_count,
is_connected=ws_client.is_connected,
add_symbol=ws_client.add_symbol,
remove_symbol=ws_client.remove_symbol,
get_symbols=ws_client.get_symbols,
get_reconnect_stats=ws_client.reconnect_stats,
)
rest_config = uvicorn.Config(
rest_app,
host=settings.fh_ob.rest_host,
port=settings.fh_ob.rest_port,
log_level="warning",
)
rest_server = uvicorn.Server(rest_config)
async def shutdown(sig: signal.Signals) -> None:
log.info("shutdown_signal_received", signal=sig.name)
await ws_client.stop()
await socket_server.stop()
rest_config.should_exit = True
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(shutdown(s)))
ws_task = asyncio.create_task(ws_client.start())
socket_task = asyncio.create_task(socket_server.start())
rest_task = asyncio.create_task(rest_server.serve())
log.info(
"fh_ob_ready",
rest_endpoint=f"http://{settings.fh_ob.rest_host}:{settings.fh_ob.rest_port}",
socket_path=str(settings.fh_ob.socket_path),
)
try:
await asyncio.gather(ws_task, socket_task, rest_task)
except asyncio.CancelledError:
log.info("fh_ob_cancelled")
except Exception as e:
log.error("fh_ob_error", error=str(e))
raise
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,70 +0,0 @@
import time
from dataclasses import dataclass, field
from typing import Optional
import structlog
logger = structlog.get_logger()
@dataclass
class BookLevel:
price: float
size: float
@classmethod
def from_list(cls, data: list) -> "BookLevel":
return cls(price=float(data[0]), size=float(data[1]))
def to_dict(self) -> dict:
return {"price": self.price, "size": self.size}
@dataclass
class OrderBookTop5:
symbol: str
bids: list[BookLevel] = field(default_factory=list)
asks: list[BookLevel] = field(default_factory=list)
ts_ms: int = 0
def to_dict(self) -> dict:
return {
"symbol": self.symbol,
"bids": [b.to_dict() for b in self.bids],
"asks": [a.to_dict() for a in self.asks],
"ts_ms": self.ts_ms,
}
class BookStore:
def __init__(self) -> None:
self._books: dict[str, OrderBookTop5] = {}
self._log = logger.bind(component="book_store")
def update(self, raw: dict) -> Optional[OrderBookTop5]:
topic = raw.get("topic", "")
data = raw.get("data", {})
topic_suffix = topic.split(":")[-1] if ":" in topic else ""
symbol = topic_suffix.split(",")[0].strip() if topic_suffix else ""
asks_raw = data.get("asks", [])
bids_raw = data.get("bids", [])
if not symbol:
return None
ts_ms = int(data.get("time", time.time() * 1000))
bids = [BookLevel.from_list(b) for b in bids_raw[:1]]
asks = [BookLevel.from_list(a) for a in asks_raw[:1]]
book = OrderBookTop5(symbol=symbol, bids=bids, asks=asks, ts_ms=ts_ms)
self._books[symbol] = book
return book
def get(self, symbol: str) -> Optional[OrderBookTop5]:
return self._books.get(symbol)
def get_all(self) -> dict[str, OrderBookTop5]:
return self._books.copy()

View File

@ -1,110 +0,0 @@
from typing import Callable, Optional
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from fh_ob.book_store import BookStore, OrderBookTop5
class BookLevelResponse(BaseModel):
price: str
size: str
class OrderBookResponse(BaseModel):
symbol: str
bids: list[BookLevelResponse]
asks: list[BookLevelResponse]
ts_ms: int
class HealthResponse(BaseModel):
status: str
books_tracked: int
socket_clients: int
subscribed_symbols: int
connected: bool
last_update_ms: Optional[int] = None
reconnect_count: int = 0
last_reconnect_ms: Optional[int] = None
class SymbolOpRequest(BaseModel):
symbol: str
class SymbolsResponse(BaseModel):
symbols: list[str]
def create_app(
book_store: BookStore,
get_socket_clients: Optional[Callable[[], int]] = None,
get_subscribed_count: Optional[Callable[[], int]] = None,
is_connected: Optional[Callable[[], bool]] = None,
add_symbol: Optional[Callable[[str], bool]] = None,
remove_symbol: Optional[Callable[[str], bool]] = None,
get_symbols: Optional[Callable[[], list[str]]] = None,
get_reconnect_stats: Optional[Callable[[], tuple[int, int]]] = None,
) -> FastAPI:
app = FastAPI(title="FH+OB Debug API", description="Dev-only debug endpoint")
@app.get("/book/{symbol}", response_model=OrderBookResponse)
async def get_book(symbol: str) -> OrderBookResponse:
book = book_store.get(symbol)
if book is None:
raise HTTPException(status_code=404, detail=f"No book data for {symbol}")
return OrderBookResponse(
symbol=book.symbol,
bids=[BookLevelResponse(price=str(b.price), size=str(b.size)) for b in book.bids],
asks=[BookLevelResponse(price=str(a.price), size=str(a.size)) for a in book.asks],
ts_ms=book.ts_ms,
)
@app.get("/books", response_model=dict[str, OrderBookResponse])
async def get_all_books() -> dict[str, OrderBookResponse]:
books = book_store.get_all()
return {
symbol: OrderBookResponse(
symbol=book.symbol,
bids=[BookLevelResponse(price=str(b.price), size=str(b.size)) for b in book.bids],
asks=[BookLevelResponse(price=str(a.price), size=str(a.size)) for a in book.asks],
ts_ms=book.ts_ms,
)
for symbol, book in books.items()
}
@app.get("/symbols")
async def list_symbols():
return SymbolsResponse(symbols=get_symbols() if get_symbols else [])
@app.post("/symbols")
async def add_sym(req: SymbolOpRequest):
if add_symbol and add_symbol(req.symbol):
return SymbolsResponse(symbols=get_symbols() if get_symbols else [])
raise HTTPException(status_code=400, detail="Symbol not found or already subscribed")
@app.delete("/symbols/{symbol}")
async def rm_sym(symbol: str):
if remove_symbol and remove_symbol(symbol):
return SymbolsResponse(symbols=get_symbols() if get_symbols else [])
raise HTTPException(status_code=404, detail="Symbol not found or not subscribed")
@app.get("/health", response_model=HealthResponse)
async def health() -> HealthResponse:
books = book_store.get_all()
latest_ts = max((b.ts_ms for b in books.values()), default=None)
reconnects, last_reconnect_ms = get_reconnect_stats() if get_reconnect_stats else (0, None)
return HealthResponse(
status="ok" if (is_connected and is_connected()) else "degraded",
books_tracked=len(books),
socket_clients=get_socket_clients() if get_socket_clients else 0,
subscribed_symbols=get_subscribed_count() if get_subscribed_count else 0,
connected=is_connected() if is_connected else False,
last_update_ms=latest_ts,
reconnect_count=reconnects,
last_reconnect_ms=last_reconnect_ms,
)
return app

View File

@ -1,95 +0,0 @@
from typing import Optional
import asyncio
import json
from pathlib import Path
import structlog
from fh_ob.book_store import OrderBookTop5
class SocketServer:
def __init__(self, socket_path: Path) -> None:
self._socket_path = socket_path
self._log = structlog.get_logger().bind(component="socket_server")
self._clients: set[asyncio.StreamWriter] = set()
self._server: Optional[asyncio.Server] = None
async def start(self) -> None:
if self._socket_path.exists():
self._socket_path.unlink()
self._server = await asyncio.start_unix_server(
self._accept_client,
path=str(self._socket_path),
)
self._log.info("socket_server_started", path=str(self._socket_path))
async def stop(self) -> None:
if self._server:
self._server.close()
await self._server.wait_closed()
if self._socket_path.exists():
self._socket_path.unlink()
self._log.info("socket_server_stopped")
def client_count(self) -> int:
return len(self._clients)
async def _accept_client(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
self._clients.add(writer)
self._log.info("client_connected", addr=writer.get_extra_info("peername"))
try:
while True:
try:
line = await reader.readline()
except (ConnectionResetError, BrokenPipeError, asyncio.CancelledError):
break
except Exception:
break
if not line:
break
except asyncio.CancelledError:
pass
except Exception:
pass
finally:
self._clients.discard(writer)
writer.close()
try:
await asyncio.wait_for(writer.wait_closed(), timeout=1.0)
except (asyncio.CancelledError, Exception):
pass
self._log.info("client_disconnected")
async def broadcast(self, book: OrderBookTop5) -> None:
if not self._clients:
return
msg_bytes = json.dumps(book.to_dict(), separators=(",", ":")).encode() + b"\n"
clients_snapshot = list(self._clients)
bad = set()
for w in clients_snapshot:
try:
w.write(msg_bytes)
except Exception as e:
self._log.warning("broadcast_write_failed", error=str(e))
bad.add(w)
if not clients_snapshot:
return
drain_results = await asyncio.gather(
*(w.drain() for w in clients_snapshot),
return_exceptions=True,
)
for w, res in zip(clients_snapshot, drain_results):
if isinstance(res, Exception):
self._log.warning("broadcast_drain_failed", error=str(res))
bad.add(w)
self._clients -= bad

View File

@ -1,271 +0,0 @@
import asyncio
import json
import time
import uuid
from dataclasses import dataclass, field
from typing import Callable, Optional, Awaitable
import aiohttp
import structlog
import websockets
from common.config import FHobSettings
from fh_ob.book_store import BookStore, OrderBookTop5
@dataclass
class _WorkerState:
symbols: set[str] = field(default_factory=set)
command_queue: asyncio.Queue = field(default_factory=asyncio.Queue)
ws_id: int = 0
reconnect_count: int = 0
last_reconnect_ts_ms: int = 0
connection_active: bool = False
class KuCoinWSClient:
def __init__(
self,
settings: FHobSettings,
book_store: BookStore,
on_book_update: Optional[Callable[[OrderBookTop5], None | Awaitable[None]]] = None,
) -> None:
self._settings = settings
self._book_store = book_store
self._on_book_update_callback = on_book_update
self._log = structlog.get_logger().bind(component="ws_client")
self._running = False
self._reconnect_delay = settings.reconnect_base_delay
self._subscription_events: dict[str, asyncio.Event] = {}
self._workers: list[_WorkerState] = []
self._worker_tasks: list[asyncio.Task] = []
self._http_session: Optional[aiohttp.ClientSession] = None
async def start(self) -> None:
self._running = True
self._workers.clear()
self._worker_tasks.clear()
self._http_session = aiohttp.ClientSession()
symbol_list = list(self._settings.symbols)
for i in range(0, len(symbol_list) or 1, 400):
group = set(symbol_list[i : i + 400])
ws_id = len(self._workers) + 1
state = _WorkerState(symbols=group, ws_id=ws_id)
self._workers.append(state)
for state in self._workers:
task = asyncio.create_task(self._connection_worker(state))
self._worker_tasks.append(task)
try:
await asyncio.gather(*self._worker_tasks)
except asyncio.CancelledError:
pass
self._log.debug("all_workers_stopped")
async def stop(self) -> None:
self._running = False
for t in self._worker_tasks:
t.cancel()
if self._worker_tasks:
await asyncio.wait(self._worker_tasks, timeout=5)
if self._http_session and not self._http_session.closed:
await self._http_session.close()
self._log.debug("ws_client_stopped")
def is_connected(self) -> bool:
return any(w.connection_active for w in self._workers)
def subscribed_count(self) -> int:
return sum(len(w.symbols) for w in self._workers)
def reconnect_stats(self) -> tuple[int, int]:
"""Return (total_reconnects, timestamp_ms of last reconnect) across all workers."""
total = sum(w.reconnect_count for w in self._workers)
latest = max((w.last_reconnect_ts_ms for w in self._workers), default=0)
return total, latest
def get_symbols(self) -> list[str]:
result = []
for w in self._workers:
result.extend(w.symbols)
return result
def add_symbol(self, symbol: str) -> bool:
if not self._workers:
return False
if any(symbol in w.symbols for w in self._workers):
return False
self._settings.symbols.append(symbol)
eligible = [w for w in self._workers if len(w.symbols) < 400]
if not eligible:
self._log.warning("all_workers_full", symbol=symbol)
return False
worker = min(eligible, key=lambda w: len(w.symbols))
worker.symbols.add(symbol)
worker.command_queue.put_nowait(("subscribe", symbol))
return True
def remove_symbol(self, symbol: str) -> bool:
found = False
for worker in self._workers:
if symbol in worker.symbols:
worker.symbols.discard(symbol)
found = True
break
if not found:
return False
self._settings.symbols.remove(symbol)
return True
async def _connection_worker(self, state: _WorkerState) -> None:
while self._running:
try:
token, instance = await self._get_public_token()
self._ping_interval = instance.get("pingInterval", 18000) / 1000.0
ws = await websockets.connect(
instance["endpoint"] + f"?token={token}&connectId={uuid.uuid4()}-{state.ws_id}",
ping_interval=None,
)
self._log.debug("ws_connected", ws_id=state.ws_id)
self._reconnect_delay = self._settings.reconnect_base_delay
state.connection_active = True
ping_task = asyncio.create_task(self._ping_loop(ws, state.ws_id))
async def reader() -> None:
try:
async for msg in ws:
await self._handle_message(msg)
except websockets.ConnectionClosed as e:
self._log.warning("reader_connection_closed", ws_id=state.ws_id, code=e.code, reason=e.reason)
except asyncio.CancelledError:
raise
except Exception as e:
self._log.error("reader_unexpected_error", ws_id=state.ws_id, error=str(e))
reader_task = asyncio.create_task(reader())
try:
if state.symbols:
await self._send_subscribe(ws, list(state.symbols), state.ws_id)
while True:
cmd = await state.command_queue.get()
if cmd is None:
break
action, symbol = cmd
if action == "subscribe":
self._log.debug(
"subscribing_dynamic",
symbol=symbol,
ws_id=state.ws_id,
)
await self._send_subscribe(ws, [symbol], state.ws_id)
except asyncio.CancelledError:
raise
except websockets.ConnectionClosed as e:
self._log.warning("ws_disconnected", ws_id=state.ws_id, code=e.code, reason=e.reason)
except Exception as e:
self._log.error("command_loop_error", ws_id=state.ws_id, error=str(e))
finally:
state.connection_active = False
ping_task.cancel()
reader_task.cancel()
try:
await reader_task
except asyncio.CancelledError:
pass
except asyncio.CancelledError:
break
except Exception as e:
if not self._running:
break
state.connection_active = False
state.reconnect_count += 1
state.last_reconnect_ts_ms = int(time.time() * 1000)
self._log.warning(
"ws_reconnecting",
ws_id=state.ws_id,
reconnect_count=state.reconnect_count,
error=str(e),
)
await asyncio.sleep(self._reconnect_delay)
self._reconnect_delay = min(
self._reconnect_delay * 2,
self._settings.reconnect_max_delay,
)
self._log.debug("worker_exiting", ws_id=state.ws_id)
async def _get_public_token(self) -> tuple[str, dict]:
self._log.debug("fetching_public_token", url=self._settings.token_url)
async with self._http_session.post(self._settings.token_url) as resp:
data = await resp.json()
token = data["data"]["token"]
instance = data["data"]["instanceServers"][0]
self._log.debug("public_token_received", ping_interval_ms=instance.get("pingInterval"))
return token, instance
async def _send_subscribe(self, ws, symbols: list[str], ws_id: int) -> None:
for i in range(0, len(symbols), 100):
batch = symbols[i : i + 100]
topic = "/spotMarket/level2Depth5:" + ",".join(batch)
ack_id = str(uuid.uuid4())
evt = asyncio.Event()
self._subscription_events[ack_id] = evt
sub_msg = {
"id": ack_id,
"type": "subscribe",
"topic": topic,
"response": True,
}
self._log.debug("subscribing", topic=topic[:80], ws_id=ws_id)
await ws.send(json.dumps(sub_msg))
try:
await asyncio.wait_for(evt.wait(), timeout=self._reconnect_delay)
except asyncio.TimeoutError:
self._log.warning("subscription_ack_timeout", topic=topic[:80], ws_id=ws_id)
raise
async def _ping_loop(self, ws, ws_id: int) -> None:
while self._running:
await asyncio.sleep(self._ping_interval)
try:
await ws.ping()
except Exception:
self._log.warning("ping_failed", ws_id=ws_id)
break
async def _handle_message(self, msg: str) -> None:
try:
data = json.loads(msg)
except json.JSONDecodeError:
self._log.warning("invalid_json", msg=msg[:100])
return
msg_type = data.get("type")
if msg_type == "welcome":
self._log.debug("ws_welcome")
return
if msg_type == "pong":
return
if msg_type == "ack":
ack_id = data.get("id")
self._log.debug("subscription_ack", topic=data.get("topic"), ack_id=ack_id)
if ack_id in self._subscription_events:
self._subscription_events[ack_id].set()
del self._subscription_events[ack_id]
return
topic = data.get("topic", "")
if msg_type == "message" and "level2Depth5" in topic:
book = self._book_store.update(data)
if book and self._on_book_update_callback:
result = self._on_book_update_callback(book)
if asyncio.iscoroutine(result):
asyncio.create_task(result)
elif topic:
self._log.warning("ws_unexpected_message", type=msg_type, topic=topic)

View File

View File

@ -1,223 +0,0 @@
"""
Opportunity engine entry point.
Initialises the order-book consumer, triangle index, and signal socket client;
starts background tasks for book consumption and periodic stats logging; shuts
down cleanly on SIGTERM/SIGINT.
"""
import asyncio
import signal
from pathlib import Path
import aiohttp
import structlog
from common.log import configure_logging
from oe_em.book_consumer import BookConsumer
from oe_em.config import Settings
from oe_em.kucoin_api import KuCoinAPI
from oe_em.opportunity import OpportunityEngine
from oe_em.risk import RiskManager
from oe_em.socket_client import SignalSocketClient
from oe_em.triangle_enum import TradingPair, enumerate_triangles
async def sync_symbols_with_fh_ob(
fh_ob_url: str,
needed_symbols: set[str],
http_session: aiohttp.ClientSession,
log,
) -> set[str]:
"""
Ensure fh_ob is subscribed to every symbol needed by the triangle index.
Fetches the current subscription list from fh_ob, posts any missing symbols,
and returns the full set of subscribed symbols.
"""
get_url = f"{fh_ob_url}/symbols"
async with http_session.get(get_url) as resp:
resp.raise_for_status()
data = await resp.json()
current_symbols = set(data.get("symbols", []))
missing = needed_symbols - current_symbols
if missing:
log.info("syncing_symbols", missing=len(missing), current=len(current_symbols))
for sym in missing:
post_url = f"{fh_ob_url}/symbols"
payload = {"symbol": sym}
async with http_session.post(post_url, json=payload) as post_resp:
if post_resp.status == 400:
log.warning("symbol_cannot_be_subscribed", symbol=sym)
else:
log.debug("symbol_subscribed", symbol=sym)
return current_symbols | missing
async def main() -> None:
config_path = Path("config.yaml")
settings = await Settings.from_yaml(config_path) if config_path.exists() else Settings()
configure_logging(settings.oe_em.log_level, settings.oe_em.log_file)
log = structlog.get_logger().bind(component="oe_em")
log.info("oe_em_starting")
api = KuCoinAPI()
await api.fetch_pairs_and_fees()
pair_list = [
TradingPair(
symbol=p["symbol"],
base=p["base"],
quote=p["quote"],
fee_currency=p.get("fee_currency", ""),
)
for p in api.get_all_pairs()
]
excluded = set(settings.oe_em.excluded_currencies)
if excluded:
pair_list = [p for p in pair_list if p.base not in excluded and p.quote not in excluded]
log.info("pairs_loaded", count=len(pair_list), excluded=len(excluded))
fee_table = api._fee_table
triangle_index = enumerate_triangles(
pair_list,
fee_table,
hold_currencies=settings.oe_em.hold_currencies,
)
log.info("triangles_enumerated", count=len(triangle_index.triangles))
needed_symbols = set()
for tri in triangle_index.triangles:
needed_symbols.update(tri.pair_symbols)
async with aiohttp.ClientSession() as http_session:
subscribed = await sync_symbols_with_fh_ob(
settings.oe_em.fh_ob_url,
needed_symbols,
http_session,
log,
)
book_consumer = BookConsumer(
socket_path=settings.oe_em.socket_path,
on_update=lambda symbol, book: None,
)
signal_client: SignalSocketClient | None = None
signal_reconnect_task: asyncio.Task | None = None
if settings.oe_em.send_signals:
signal_client = SignalSocketClient(socket_path=settings.oe_em.executor_socket_path)
async def send_signal(signal_payload: dict) -> None:
"""Forward a signal payload to the executor's Unix socket."""
if signal_client:
await signal_client.send_signal(signal_payload)
opp_engine = OpportunityEngine(
book_consumer=book_consumer,
triangle_index=triangle_index,
signal_threshold_bps=settings.oe_em.signal_threshold_bps,
log_path=settings.oe_em.opportunity_log_path,
kcs_discount_active=settings.oe_em.kcs_discount_active,
cooldown_seconds=settings.oe_em.cooldown_seconds,
on_signal=send_signal if settings.oe_em.send_signals else None,
)
risk_mgr = RiskManager()
async def on_book_update(symbol: str, book) -> None:
"""Callback invoked by BookConsumer whenever a subscribed book is refreshed."""
if risk_mgr.should_continue():
opp_engine.evaluate_triangles_for_pair(symbol)
book_consumer.set_on_update(on_book_update)
fh_ob_url = settings.oe_em.fh_ob_url
async with aiohttp.ClientSession() as http_session:
async with http_session.get(f"{fh_ob_url}/symbols") as resp:
resp.raise_for_status()
data = await resp.json()
symbols_now = set(data.get("symbols", []))
missing = needed_symbols - symbols_now
if missing:
log.warning("symbols_not_subscribed_after_sync", count=len(missing))
for sym in missing:
log.warning("unavailable_symbol", symbol=sym)
if signal_client:
signal_reconnect_task = await signal_client.start()
log.info("signal_client_connecting", socket_path=str(settings.oe_em.executor_socket_path))
log.info(
"oe_em_ready",
triangles=len(triangle_index.triangles),
subscribed=len(symbols_now),
threshold_bps=settings.oe_em.signal_threshold_bps,
hold_currencies=settings.oe_em.hold_currencies,
send_signals=settings.oe_em.send_signals,
)
consumer_task = asyncio.create_task(book_consumer.start())
async def stats_loop() -> None:
"""
Periodically log evaluation stats so the operator can monitor the engine.
Runs until cancelled. Suppressed by setting stats_interval_seconds <= 0.
"""
interval = settings.oe_em.stats_interval_seconds
if interval <= 0:
return
while True:
await asyncio.sleep(interval)
try:
s = opp_engine.get_stats()
books_tracked = sum(
1 for t in triangle_index.triangles
if book_consumer.get_book(t.legs[0].pair.symbol) is not None
)
except Exception as e:
log.error("stats_error", error=str(e))
continue
log.info("stats", **{
"triangles_evaluated": s.triangles_evaluated,
"signals_fired": s.signals_fired,
"books_tracked": books_tracked,
"subscribed": len(book_consumer._books),
"best_net_bps": f"{s.best_net_bps:.2f}",
"best_legs": s.best_legs,
})
stats_task = asyncio.create_task(stats_loop())
def shutdown(sig: signal.Signals) -> None:
"""Begin graceful shutdown: stop book consumer, cancel stats, close signal client."""
log.info("shutdown_signal_received", signal=sig.name)
asyncio.create_task(book_consumer.stop())
stats_task.cancel()
if signal_reconnect_task:
signal_reconnect_task.cancel()
if signal_client:
asyncio.create_task(signal_client.close())
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda s=sig: shutdown(s))
tasks = [consumer_task, stats_task]
if signal_reconnect_task:
tasks.append(signal_reconnect_task)
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
log.info("oe_em_cancelled")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,205 +0,0 @@
"""
Order-book consumer for the opportunity engine.
Connects to fh_ob's Unix-domain socket and receives JSON-serialized top-5
order-book snapshots. On each update the registered on_update callback is
invoked, which triggers triangle evaluation in OpportunityEngine.
The consumer maintains an in-memory snapshot of the last seen book for each
symbol, accessible via get_book().
"""
import asyncio
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional, Awaitable
import structlog
logger = structlog.get_logger().bind(component="book_consumer")
@dataclass
class BookLevel:
"""
A single price level in an order book.
Attributes
----------
price, size : float
Price and available size at this level.
"""
price: float
size: float
@classmethod
def from_dict(cls, data: dict) -> "BookLevel":
return cls(
price=float(data["price"]),
size=float(data["size"]),
)
@dataclass
class OrderBookTop5:
"""
Top-5 bid/ask snapshot for a single symbol.
Attributes
----------
symbol : str
KuCoin symbol e.g. "BTC-USDT".
bids : list[BookLevel]
Best bids, most aggressive first.
asks : list[BookLevel]
Best asks, most aggressive first.
ts_ms : int
Timestamp (ms) of the snapshot from fh_ob.
"""
symbol: str
bids: list[BookLevel] = field(default_factory=list)
asks: list[BookLevel] = field(default_factory=list)
ts_ms: int = 0
@classmethod
def from_json(cls, data: dict) -> "OrderBookTop5":
bids = [BookLevel.from_dict(b) for b in data.get("bids", [])]
asks = [BookLevel.from_dict(a) for a in data.get("asks", [])]
return cls(
symbol=data.get("symbol", ""),
bids=bids,
asks=asks,
ts_ms=data.get("ts_ms", 0),
)
class BookConsumer:
"""
Consumes order-book snapshots from fh_ob and dispatches them to OpportunityEngine.
Maintains a socket connection until EOF or error, then reconnects
automatically. Book updates are pushed to an internal queue; a dedicated
worker drains the queue and calls on_update, keeping the reader loop
non-blocking.
"""
def __init__(
self,
socket_path: Path,
on_update: Callable[[str, OrderBookTop5], None | Awaitable[None]],
) -> None:
self._socket_path = socket_path
self._on_update = on_update
self._running = False
self._books: dict[str, OrderBookTop5] = {}
self._log = logger
self._queue: asyncio.Queue[str] = asyncio.Queue(maxsize=2048)
self._queued: set[str] = set()
self._worker_task: Optional[asyncio.Task] = None
def get_book(self, symbol: str) -> Optional[OrderBookTop5]:
"""Return the last known book for a symbol, or None if not yet received."""
return self._books.get(symbol)
def set_on_update(self, callback: Callable[[str, OrderBookTop5], None]) -> None:
"""Replace the on_update callback. Used when the callback needs a
reference to an object that does not yet exist at construction time."""
self._on_update = callback
async def start(self) -> None:
"""
Connect to fh_ob and run the consume loop until stop() is called.
On unexpected disconnection a 1-second backoff is applied before
reconnecting. Interrupted cleanly by CancelledError.
"""
self._running = True
self._worker_task = asyncio.create_task(self._worker())
while self._running:
try:
await self._connect()
except asyncio.CancelledError:
break
except Exception as e:
self._log.warning("connection_error", error=str(e))
await asyncio.sleep(1.0)
if self._worker_task:
self._worker_task.cancel()
try:
await self._worker_task
except asyncio.CancelledError:
pass
async def stop(self) -> None:
"""Request the consume loop to exit on the next iteration."""
self._running = False
async def _worker(self) -> None:
"""Drain the update queue and call on_update for each symbol."""
while self._running:
try:
symbol = await asyncio.wait_for(self._queue.get(), timeout=0.5)
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
self._queued.discard(symbol)
book = self._books.get(symbol)
if not book:
continue
try:
result = self._on_update(symbol, book)
if asyncio.iscoroutine(result):
await result
except Exception as e:
self._log.error("on_update_error", symbol=symbol, error=str(e))
async def _connect(self) -> None:
"""
Open the Unix socket, read and queue messages until EOF or error.
Each JSON line is parsed into an OrderBookTop5, stored in self._books,
and the symbol is pushed to the queue for the worker to evaluate.
The reader never blocks on evaluation.
"""
reader, writer = await asyncio.open_unix_connection(path=str(self._socket_path))
self._log.info("connected", path=str(self._socket_path))
try:
while self._running:
try:
line = await reader.readline()
except asyncio.CancelledError:
raise
except Exception as e:
self._log.error("socket_read_error", error=str(e))
break
if not line:
self._log.warning("socket_eof")
break
try:
data = json.loads(line.decode())
except (json.JSONDecodeError, UnicodeDecodeError) as e:
self._log.warning("invalid_json", line=line[:50], error=str(e))
continue
book = OrderBookTop5.from_json(data)
if not book.symbol:
continue
self._books[book.symbol] = book
if book.symbol not in self._queued:
self._queued.add(book.symbol)
try:
self._queue.put_nowait(book.symbol)
except asyncio.QueueFull:
self._queued.discard(book.symbol)
finally:
writer.close()
await writer.wait_closed()
self._log.info("disconnected")

View File

@ -1,90 +0,0 @@
"""
Configuration schema for the opportunity engine (oe_em).
Parsed from config.yaml into OeEmSettings. Controls logging, signal
thresholds, the fee discount flag, symbol subscription, and the socket
path to the executor.
"""
import asyncio
from pathlib import Path
from typing import Optional
import yaml
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings
class OeEmSettings(BaseModel):
"""Settings that control oe_em's runtime behaviour."""
fh_ob_url: str = Field(
default="http://127.0.0.1:8000",
description="REST URL of fh_ob server",
)
socket_path: Path = Field(
default=Path("/tmp/fh_ob.sock"),
description="Unix domain socket path for fh_ob",
)
log_level: str = Field(default="INFO", description="Logging level")
log_file: Path = Field(
default=Path("/tmp/oe_em.log"),
description="Path to log file. Logs are written here in addition to stdout.",
)
signal_threshold_bps: float = Field(
default=0.2,
description="Minimum net return in basis points to fire a signal",
)
opportunity_log_path: Path = Field(
default=Path("/tmp/opportunities.log"),
description="Path to log detected opportunities",
)
stats_interval_seconds: float = Field(
default=60.0,
description="Seconds between stats log lines. 0 to disable.",
)
cooldown_seconds: float = Field(
default=0.0,
description="Deprecated — use executor's in-flight blocking instead. "
"Kept here for operational flexibility; set to 0.",
)
excluded_currencies: list[str] = Field(
default_factory=list,
description="Currencies to exclude from triangle enumeration",
)
hold_currencies: list[str] = Field(
default=["USDT"],
description="Currencies held as capital. Only triangles starting and ending in one of these are evaluated.",
)
kcs_discount_active: bool = Field(
default=False,
description="If true, all taker fees are multiplied by 0.8 (KCS 20% fee discount)",
)
executor_socket_path: Path = Field(
default=Path("/tmp/executor.sock"),
description="Unix domain socket path for executor",
)
send_signals: bool = Field(
default=False,
description="If true, emit signals to executor socket when opportunities are found",
)
class Settings(BaseSettings):
"""Top-level settings parsed from config.yaml."""
oe_em: OeEmSettings = Field(default_factory=OeEmSettings)
fh_ob_url: Optional[str] = None
@classmethod
async def from_yaml(cls, path: Path) -> "Settings":
"""Load settings from a YAML file."""
loop = asyncio.get_running_loop()
def _read() -> dict:
with open(path) as f:
return yaml.safe_load(f)
data = await loop.run_in_executor(None, _read)
return cls(**data)
model_config = {"env_prefix": "TRIArb_", "extra": "ignore"}

View File

@ -1,96 +0,0 @@
"""
KuCoin API client for the opportunity engine.
Fetches trading pair metadata (symbol, base, quote, fees, feeCurrency)
and builds an in-memory fee table keyed by base currency. This data is
used to construct the triangle index and to populate fee_currency in
signal payloads.
"""
import aiohttp
import structlog
logger = structlog.get_logger().bind(component="kucoin_api")
KUCoin_SYMBOLs_URL = "https://api.kucoin.com/api/v1/symbols"
DEFAULT_FEES = {
"BTC": {"maker": 0.0010, "taker": 0.0010},
"ETH": {"maker": 0.0010, "taker": 0.0010},
"USDT": {"maker": 0.0010, "taker": 0.0010},
"USDC": {"maker": 0.0010, "taker": 0.0010},
}
class KuCoinAPI:
"""
Fetch and cache KuCoin pair metadata and per-currency fee rates.
Used at startup to build the fee table required by triangle enumeration.
"""
def __init__(self) -> None:
self._fee_table: dict[str, dict[str, float]] = {}
self._pairs: dict[str, dict] = {}
self._log = logger
async def fetch_pairs_and_fees(self) -> None:
"""
Fetch all symbols from KuCoin, populate _pairs and _fee_table.
Logs warnings for any symbol that cannot be parsed and skips it.
Sets feeCurrency to the empty string when absent in the API response.
"""
async with aiohttp.ClientSession() as session:
async with session.get(KUCoin_SYMBOLs_URL) as resp:
resp.raise_for_status()
payload = await resp.json()
for item in payload.get("data", []):
symbol = item.get("symbol", "")
base = item.get("baseCurrency", "")
quote = item.get("quoteCurrency", "")
maker_fee = float(item.get("makerFeeRate", 0))
taker_fee = float(item.get("takerFeeRate", 0))
enable_trading = item.get("enableTrading", False)
if not all([symbol, base, quote]):
continue
fee_currency = item.get("feeCurrency") or ""
self._pairs[symbol] = {
"symbol": symbol,
"base": base,
"quote": quote,
"maker_fee": maker_fee,
"taker_fee": taker_fee,
"enable_trading": enable_trading,
"fee_currency": fee_currency,
}
if base not in self._fee_table:
self._fee_table[base] = {
"maker": maker_fee if maker_fee > 0 else DEFAULT_FEES.get(base, {}).get("maker", 0.0010),
"taker": taker_fee if taker_fee > 0 else DEFAULT_FEES.get(base, {}).get("taker", 0.0010),
}
self._log.info("fee_table_loaded", bases=len(self._fee_table), pairs=len(self._pairs))
def get_fee(self, symbol: str, side: str) -> float:
"""
Return the taker fee rate for the base currency of a given symbol.
Falls back to DEFAULT_FEES when the base is not in the fee table.
"""
if symbol not in self._pairs:
return 0.0010
base = self._pairs[symbol]["base"]
fee_data = self._fee_table.get(base, DEFAULT_FEES.get(base, {"maker": 0.0010, "taker": 0.0010}))
return fee_data.get(side, 0.0010)
def get_pair_info(self, symbol: str) -> dict | None:
"""Return the full info dict for a symbol, or None if not loaded."""
return self._pairs.get(symbol)
def get_all_pairs(self) -> list[dict]:
"""Return all pairs where enable_trading is True."""
return [p for p in self._pairs.values() if p["enable_trading"]]

View File

@ -1,408 +0,0 @@
"""
Opportunity detection engine.
Evaluates all triangles involving a given symbol on every order-book update,
computes the net return after fees, and fires a signal when the return exceeds
the configured threshold. Supports KCS fee discounts and per-triangle cooldowns
to avoid flooding the executor with duplicate signals.
"""
import asyncio
import time
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Callable
import structlog
from oe_em.book_consumer import BookConsumer
from oe_em.triangle_enum import Triangle, TriangleLeg, TriangleIndex
KCS_FEE_DISCOUNT = 0.8
logger = structlog.get_logger().bind(component="opportunity")
def max_volume_for_triangle(
triangle: Triangle,
book_consumer: BookConsumer,
primary_quote: str,
fee_mult: float = 1.0,
) -> Optional[float]:
"""Compute max volume — kept for backward compatibility, but _build_full now does this inline."""
return None
@dataclass
class OpportunitySignal:
"""
A detected profitable triangular arbitrage opportunity.
Emitted to the signal client when net_return_bps exceeds the threshold.
"""
triangle: Triangle
direction: str
net_return_bps: float
max_volume: float
leg_details: list[dict]
ts_ms: int
book_ts_ms: int
books: list[dict]
@dataclass
class Stats:
"""
Running statistics counters for opportunity evaluation.
Updated on every evaluate_triangles_for_pair call and returned by
get_stats().
"""
triangles_evaluated: int = 0
signals_fired: int = 0
books_missing: int = 0
books_full: int = 0
best_net_bps: float = -999999.0
worst_net_bps: float = 999999.0
last_eval_ts_ms: int = 0
best_legs: str = ""
worst_legs: str = ""
@dataclass
class _EvalResult:
"""
Intermediate result of triangle evaluation.
Attributes
----------
net_return_bps : float
Net return after fees in basis points.
max_volume : float
Maximum safe input volume for the triangle.
leg_details : list[dict]
Per-leg dictionary suitable for serialising into a signal payload.
book_ts_ms : int
Timestamp (ms) of the oldest order book used in the evaluation.
books : list[dict]
Serialised top-of-book for each leg, in order.
"""
net_return_bps: float
max_volume: float
leg_details: list[dict]
book_ts_ms: int
books: list[dict]
def leg_str(self) -> str:
return " -> ".join(
f"{d['pair']}({d['input_currency']}->{d['output_currency']})" for d in self.leg_details
)
class OpportunityEngine:
"""
Detects and reports triangular arbitrage opportunities.
On every order-book update (triggered via the on_update callback) the engine
evaluates every triangle that involves the updated symbol. If the net
return after fees exceeds signal_threshold_bps and the cooldown for that
triangle has elapsed, a signal is dispatched to the executor via the
on_signal callback.
"""
def __init__(
self,
book_consumer: BookConsumer,
triangle_index: TriangleIndex,
signal_threshold_bps: float,
log_path: Path,
kcs_discount_active: bool = False,
cooldown_seconds: float = 5.0,
on_signal: Optional[callable] = None,
) -> None:
self._book_consumer = book_consumer
self._triangle_index = triangle_index
self._threshold_bps = signal_threshold_bps
self._log_path = log_path
self._fee_mult = KCS_FEE_DISCOUNT if kcs_discount_active else 1.0
self._cooldown_seconds = cooldown_seconds
self._last_signal_ts: dict[frozenset[str], float] = {}
self._log = logger
self._stats = Stats()
self._on_signal = on_signal
self._net_cache: dict[frozenset[str], tuple[float, tuple[int, int, int]]] = {}
def _compute_net_only(
self,
triangle: Triangle,
) -> tuple[Optional[float], int]:
"""
Compute net return BPS and min book ts_ms without building books/leg_details.
Returns (net_return_bps, book_ts_ms) or (None, 0) if any book is missing.
Used for fast-path threshold filtering before expensive serialization.
"""
cumulative = 1.0
book_ts_ms = 0
for leg in triangle.legs:
book = self._book_consumer.get_book(leg.pair.symbol)
if not book:
return None, 0
if leg.input_currency == leg.pair.base:
level = book.bids[0] if book.bids else None
if not level:
return None, 0
rate = level.price
else:
level = book.asks[0] if book.asks else None
if not level:
return None, 0
rate = 1.0 / level.price
fee_factor = 1.0 - leg.taker_fee * self._fee_mult
cumulative *= rate * fee_factor
if book_ts_ms == 0 or book.ts_ms < book_ts_ms:
book_ts_ms = book.ts_ms
net_return = (cumulative - 1.0) * 10000
return net_return, book_ts_ms
def _build_full(
self,
triangle: Triangle,
) -> Optional[_EvalResult]:
"""
Single-pass evaluation: compute net return, build leg_details/books,
and compute max_volume all in one loop over the triangle's legs.
"""
cumulative = 1.0
max_v0_list: list[float] = []
cumulative_mult = 1.0
leg_details = []
books: list[dict] = []
book_ts_ms = 0
for leg in triangle.legs:
book = self._book_consumer.get_book(leg.pair.symbol)
if not book:
return None
if leg.input_currency == leg.pair.base:
level = book.bids[0] if book.bids else None
if not level:
return None
max_input = level.size
rate = level.price
else:
level = book.asks[0] if book.asks else None
if not level:
return None
max_input = level.size * level.price
rate = 1.0 / level.price
fee_factor = 1.0 - leg.taker_fee * self._fee_mult
cumulative *= rate * fee_factor
if cumulative_mult > 0:
max_v0_list.append(max_input / cumulative_mult)
cumulative_mult *= rate * fee_factor
leg_details.append({
"pair": leg.pair.symbol,
"input_currency": leg.input_currency,
"output_currency": leg.output_currency,
"exchange_rate": rate,
"fee_rate": leg.taker_fee,
"fee_currency": leg.pair.fee_currency,
})
books.append({
"symbol": book.symbol,
"bids": [
{"price": b.price, "size": b.size} for b in book.bids
],
"asks": [
{"price": a.price, "size": a.size} for a in book.asks
],
"ts_ms": book.ts_ms,
})
if book_ts_ms == 0 or book.ts_ms < book_ts_ms:
book_ts_ms = book.ts_ms
net_return = (cumulative - 1.0) * 10000
max_volume = min(max_v0_list) if max_v0_list else 0.0
return _EvalResult(
net_return_bps=net_return,
max_volume=max_volume,
leg_details=leg_details,
book_ts_ms=book_ts_ms,
books=books,
)
def evaluate_triangles_for_pair(self, symbol: str) -> list[OpportunitySignal]:
"""
Evaluate all triangles that include the given symbol.
Called by the book consumer's on_update callback whenever an order book
is refreshed. Updates stats, emits signals for triangles that clear the
threshold and cooldown, and returns the list of signals (primarily for
use in tests).
"""
triangles = self._triangle_index.get_triangles_for_pair(symbol)
signals: list[OpportunitySignal] = []
now_ts_ms = int(time.time() * 1000)
for triangle in triangles:
cache_key = triangle.currencies
leg_books = [self._book_consumer.get_book(leg.pair.symbol) for leg in triangle.legs]
if any(b is None for b in leg_books):
self._stats.triangles_evaluated += 1
self._stats.books_missing += 1
self._stats.last_eval_ts_ms = now_ts_ms
continue
current_ts = tuple(b.ts_ms for b in leg_books)
cached = self._net_cache.get(cache_key)
if cached and cached[1] == current_ts:
net_bps = cached[0]
book_ts_ms = min(current_ts)
else:
try:
net_bps, book_ts_ms = self._compute_net_only(triangle)
except Exception as e:
self._log.error("triangle_compute_error", triangle=str(triangle.currencies), error=str(e))
self._stats.triangles_evaluated += 1
self._stats.last_eval_ts_ms = now_ts_ms
continue
if net_bps is not None:
self._net_cache[cache_key] = (net_bps, current_ts)
self._stats.triangles_evaluated += 1
if net_bps is None:
self._stats.books_missing += 1
self._stats.last_eval_ts_ms = now_ts_ms
continue
self._stats.books_full += 1
if net_bps > self._stats.best_net_bps:
self._stats.best_net_bps = net_bps
if net_bps < self._stats.worst_net_bps:
self._stats.worst_net_bps = net_bps
if net_bps <= self._threshold_bps:
self._stats.last_eval_ts_ms = now_ts_ms
continue
try:
result = self._build_full(triangle)
except Exception as e:
self._log.error("triangle_compute_full_error", triangle=str(triangle.currencies), error=str(e))
continue
if result is None:
continue
sig = OpportunitySignal(
triangle=triangle,
direction="forward",
net_return_bps=net_bps,
max_volume=result.max_volume,
leg_details=result.leg_details,
ts_ms=now_ts_ms,
book_ts_ms=result.book_ts_ms,
books=result.books,
)
signals.append(sig)
self._stats.signals_fired += 1
now = time.time()
last = self._last_signal_ts.get(triangle.currencies, 0.0)
if now - last >= self._cooldown_seconds:
self._last_signal_ts[triangle.currencies] = now
self._notify_opportunity(sig)
self._stats.last_eval_ts_ms = now_ts_ms
return signals
def get_stats(self) -> Stats:
"""
Return a snapshot of the current stats counters.
The returned Stats object is a copy; the internal counters continue
to accumulate.
"""
return Stats(
triangles_evaluated=self._stats.triangles_evaluated,
signals_fired=self._stats.signals_fired,
books_missing=self._stats.books_missing,
books_full=self._stats.books_full,
best_net_bps=self._stats.best_net_bps,
worst_net_bps=self._stats.worst_net_bps,
last_eval_ts_ms=self._stats.last_eval_ts_ms,
best_legs=self._stats.best_legs,
worst_legs=self._stats.worst_legs,
)
def _notify_opportunity(self, sig: OpportunitySignal) -> None:
"""
Log the opportunity and dispatch the signal to the executor.
The signal is sent over a Unix-domain socket via the on_signal callback
(which is the send_signal coroutine of SignalSocketClient). A done
callback is attached so any exception raised inside the receiver is
logged rather than propagating silently.
"""
ts = sig.ts_ms
direction = sig.direction
net_bps = sig.net_return_bps
leg_str = " -> ".join(
f"{ld['pair']}({ld['input_currency']}->{ld['output_currency']})" for ld in sig.leg_details
)
msg = (
f"[{ts}] OPPORTUNITY FOUND | "
f"direction={direction} | "
f"net_return={net_bps:.2f} bps | "
f"max_volume={sig.max_volume} | "
f"legs: {leg_str}"
)
self._log.info("opportunity_detected", **{
"ts_ms": ts,
"book_ts_ms": sig.book_ts_ms,
"direction": direction,
"net_return_bps": net_bps,
"legs": leg_str,
"max_volume": str(sig.max_volume),
})
if self._on_signal:
signal_payload = {
"type": "signal",
"correlation_id": str(uuid.uuid4()),
"triangle_key": list(sig.triangle.currencies),
"primary_quote": sig.triangle.primary_quote,
"legs": sig.leg_details,
"predicted_bps": net_bps,
"max_volume": str(sig.max_volume),
"ts_ms": ts,
"book_ts_ms": sig.book_ts_ms,
# books: full top-5 order books per leg — reserved for future
# volume-extension logic (analyze deeper levels to compute how
# far profit shrinks when volume is increased beyond top-of-book).
"books": sig.books,
}
try:
task = asyncio.create_task(self._on_signal(signal_payload))
task.add_done_callback(lambda t: t.exception() and self._log.warning("on_signal_error", error=str(t.exception())))
except Exception as e:
self._log.warning("on_signal_error", error=str(e))

View File

@ -1,26 +0,0 @@
"""
Placeholder risk management module for the opportunity engine.
The RiskManager provides a hook for future risk checks (position limits,
daily loss gates, etc.). Currently it is a pass-through: should_continue()
always returns True.
"""
import structlog
logger = structlog.get_logger().bind(component="risk")
class RiskManager:
"""
Enumerates and checks risk constraints before opportunity evaluation.
Currently a stub that accepts all opportunities. Replace should_continue()
with real checks as needed.
"""
def __init__(self) -> None:
self._log = logger
def should_continue(self) -> bool:
"""Return True if evaluation should proceed; False to skip this cycle."""
return True

View File

@ -1,113 +0,0 @@
"""
Unix-domain socket client for sending opportunity signals to the executor.
Connects to the executor's SignalSocketServer and keeps the connection open for
burst sending. If the connection is lost the background reconnect loop will
retry every 2 seconds. All send operations are non-blocking: a warning is
logged if the underlying writer is not connected rather than raising.
"""
import asyncio
import json
import uuid
from pathlib import Path
import structlog
logger = structlog.get_logger().bind(component="signal_socket_client")
class SignalSocketClient:
"""
Non-blocking signal sender that maintains a persistent Unix-socket connection.
The caller invokes send_signal() for each opportunity; the client serialises
the payload and writes it to the socket. If the socket is not connected
(e.g. after a server restart) the signal is dropped with a warning log.
"""
def __init__(self, socket_path: Path) -> None:
self._socket_path = socket_path
self._log = logger
self._writer: asyncio.StreamWriter | None = None
self._running = False
self._reconnect_task: asyncio.Task | None = None
async def start(self) -> asyncio.Task:
"""Start the background reconnect loop and return the task."""
self._running = True
self._reconnect_task = asyncio.create_task(self._reconnect_loop())
return self._reconnect_task
async def _reconnect_loop(self) -> None:
"""
Attempt to connect and then wait for the connection to close.
On connection failure a 2-second backoff is applied before retrying.
The loop exits when self._running becomes False (see close()).
"""
while self._running:
try:
reader, writer = await asyncio.open_unix_connection(path=str(self._socket_path))
self._writer = writer
self._log.info("connected", path=str(self._socket_path))
try:
await writer.wait_closed()
except Exception:
pass
except (ConnectionRefusedError, FileNotFoundError) as e:
if not self._running:
break
self._log.warning("connection_retrying", error=str(e))
await asyncio.sleep(2.0)
except Exception as e:
self._log.error("reconnect_error", error=str(e))
await asyncio.sleep(5.0)
finally:
self._writer = None
async def send_signal(self, signal: dict) -> None:
"""
Serialise a signal dict and write it to the socket.
If the socket is not connected this is a no-op (logged as warning).
The correlation_id is assigned here if not already set.
"""
writer = self._writer
if not writer:
self._log.warning("not_connected")
return
correlation_id = signal.get("correlation_id", "") or str(uuid.uuid4())
signal["correlation_id"] = correlation_id
msg = json.dumps(signal) + "\n"
try:
writer.write(msg.encode())
await writer.drain()
self._log.debug("signal_sent", correlation_id=correlation_id)
except Exception as e:
self._log.error("signal_send_failed", correlation_id=correlation_id, error=str(e))
async def close(self) -> None:
"""
Stop the reconnect loop and close the writer if open.
Safe to call multiple times; after close() any send_signal() call
will be a no-op.
"""
self._running = False
if self._reconnect_task:
self._reconnect_task.cancel()
try:
await self._reconnect_task
except asyncio.CancelledError:
pass
self._reconnect_task = None
writer = self._writer
if writer:
self._writer = None
writer.close()
try:
await writer.wait_closed()
except Exception:
pass

View File

@ -1,291 +0,0 @@
"""
Triangle enumeration for triangular arbitrage.
Provides the core data structures (TradingPair, TriangleLeg, Triangle,
TriangleIndex) and the enumerate_triangles() function that enumerates all
valid triangles from a list of TradingPairs using a fee table.
A triangle is a directed cycle of three currencies c1 c2 c3 c1 where
each leg corresponds to a tradable pair. The hold currency (primary quote)
is the currency that enters and exits the cycle; it must be one of the three
currencies and is typically USDT or USDC.
"""
from dataclasses import dataclass, field
from typing import Optional
import structlog
logger = structlog.get_logger().bind(component="triangle_enum")
@dataclass(frozen=True)
class TradingPair:
"""
A single tradable currency pair on KuCoin.
Attributes
----------
symbol : str
KuCoin symbol e.g. "BTC-USDT".
base : str
Base currency code e.g. "BTC".
quote : str
Quote currency code e.g. "USDT".
fee_currency : str
Currency in which fees are denominated for this pair (from KuCoin's
feeCurrency field). Included in signal payloads so the executor
can interpret fee amounts correctly.
"""
symbol: str
base: str
quote: str
fee_currency: str = ""
@classmethod
def from_api_response(cls, data: dict) -> Optional["TradingPair"]:
"""
Parse a KuCoin /symbols API response entry into a TradingPair.
Returns None if enableTrading is not True or if required fields
are missing.
"""
if data.get("enableTrading") is not True:
return None
symbol = data.get("symbol", "")
base = data.get("baseCurrency", "")
quote = data.get("quoteCurrency", "")
if not all([symbol, base, quote]):
return None
return cls(symbol=symbol, base=base, quote=quote, fee_currency=data.get("feeCurrency", ""))
@property
def currency_pair(self) -> frozenset[str]:
"""The unordered set of the two currencies in this pair."""
return frozenset([self.base, self.quote])
@dataclass
class TriangleLeg:
"""
One directed hop in a triangle: input_currency output_currency via a pair.
Attributes
----------
pair : TradingPair
The trading pair used for this leg.
input_currency : str
Currency entering this leg.
output_currency : str
Currency leaving this leg.
maker_fee, taker_fee : float
Fee rates (fraction) for this leg's base currency.
"""
pair: TradingPair
input_currency: str
output_currency: str
maker_fee: float
taker_fee: float
@dataclass
class Triangle:
"""
A complete triangular arbitrage cycle.
Attributes
----------
legs : list[TriangleLeg]
The three directed hops (must sum to identity: c1 c2 c3 c1).
currencies : frozenset[str]
The three distinct currency codes in the cycle.
pair_symbols : frozenset[str]
The three KuCoin symbols involved.
primary_quote : str
The hold currency that enters and exits the cycle. All minimum order
sizes and volumes are expressed in terms of this currency.
"""
legs: list[TriangleLeg] = field(default_factory=list)
currencies: frozenset[str] = field(default_factory=frozenset())
pair_symbols: frozenset[str] = field(default_factory=frozenset())
primary_quote: str = ""
@dataclass
class TriangleIndex:
"""
Inverted index of triangles by pair symbol.
Allows O(1) lookup of all triangles that involve a given symbol, which is
the primary query pattern used by OpportunityEngine on every book update.
"""
triangles: list[Triangle] = field(default_factory=list)
by_pair: dict[str, list[Triangle]] = field(default_factory=dict)
def get_triangles_for_pair(self, symbol: str) -> list[Triangle]:
"""Return all triangles that include the given symbol."""
return self.by_pair.get(symbol, [])
def _build_pair_map(pairs: list[TradingPair]) -> dict[frozenset[str], TradingPair]:
"""
Build a mapping from unordered currency pair (frozenset) to TradingPair.
Used to look up pairs by their two currencies regardless of direction.
"""
pair_map: dict[frozenset[str], TradingPair] = {}
for p in pairs:
pair_map[p.currency_pair] = p
return pair_map
def _build_edge_map(pairs: list[TradingPair]) -> dict[str, list[frozenset[str]]]:
"""
Build an adjacency map: currency list of currency pairs involving that currency.
This is the graph representation used by enumerate_triangles to efficiently
find paths of length 2 (c1 c2 c3) without enumerating all O() pairs.
"""
edge_map: dict[str, list[frozenset[str]]] = {}
for p in pairs:
for c in [p.base, p.quote]:
if c not in edge_map:
edge_map[c] = []
edge_map[c].append(p.currency_pair)
return edge_map
def _build_legs(
c1: str, c2: str, c3: str,
pair_map: dict[frozenset[str], TradingPair],
fee_table: dict[str, dict[str, float]],
) -> list[TriangleLeg]:
"""
Construct the three TriangleLegs for the cycle c1 c2 c3 c1.
Looks up each leg's pair via pair_map (keyed by unordered currencies) and
resolves fees from fee_table using the base currency of each pair.
"""
default_fee = {"maker": 0.0010, "taker": 0.0010}
def fee_for(base: str, side: str) -> float:
return fee_table.get(base, default_fee).get(side, 0.0010)
p1 = pair_map[frozenset([c1, c2])]
p2 = pair_map[frozenset([c2, c3])]
p3 = pair_map[frozenset([c3, c1])]
leg1 = TriangleLeg(
pair=p1,
input_currency=c1,
output_currency=c2,
maker_fee=fee_for(p1.base, "maker"),
taker_fee=fee_for(p1.base, "taker"),
)
leg2 = TriangleLeg(
pair=p2,
input_currency=c2,
output_currency=c3,
maker_fee=fee_for(p2.base, "maker"),
taker_fee=fee_for(p2.base, "taker"),
)
leg3 = TriangleLeg(
pair=p3,
input_currency=c3,
output_currency=c1,
maker_fee=fee_for(p3.base, "maker"),
taker_fee=fee_for(p3.base, "taker"),
)
return [leg1, leg2, leg3]
def enumerate_triangles(
pairs: list[TradingPair],
fee_table: dict[str, dict[str, float]],
hold_currencies: Optional[list[str]] = None,
) -> TriangleIndex:
"""
Enumerate all valid triangular arbitrage cycles from a list of TradingPairs.
A valid triangle must:
- Contain exactly three distinct currencies.
- Include at least one hold currency (default: ["USDT"]), which becomes
the primary_quote / entry/exit currency.
- Have all three legs (c1c2, c2c3, c3c1) represent tradable pairs.
Parameters
----------
pairs : list[TradingPair]
All known trading pairs (filtered to enableTrading == True by the caller).
fee_table : dict[str, dict[str, float]]
Per-base-currency fee rates as returned by oe_em.kucoin_api.
hold_currencies : list[str] or None
Currencies that may serve as the entry/exit point. Defaults to ["USDT"].
Returns
-------
TriangleIndex
Contains all triangles and an inverted index by symbol for fast lookup.
"""
if hold_currencies is None:
hold_currencies = ["USDT"]
hold_set = set(hold_currencies)
pair_map = _build_pair_map(pairs)
edge_map = _build_edge_map(pairs)
triangles: list[Triangle] = []
by_pair: dict[str, list[Triangle]] = {}
seen: set[frozenset[str]] = set()
all_currencies = sorted(edge_map.keys())
for c1 in all_currencies:
for c2_edge in edge_map.get(c1, []):
c2 = next(x for x in c2_edge if x != c1)
for c3_edge in edge_map.get(c2, []):
c3 = next(x for x in c3_edge if x != c2)
if c3 == c1:
continue
if c3_edge == c2_edge:
continue
if frozenset([c1, c3]) not in pair_map:
continue
currencies = frozenset([c1, c2, c3])
if currencies in seen:
continue
seen.add(currencies)
in_triangle = hold_set & currencies
if not in_triangle:
continue
for hold_curr in in_triangle:
others = [c for c in [c1, c2, c3] if c != hold_curr]
for x, y in [(others[0], others[1]), (others[1], others[0])]:
legs = _build_legs(hold_curr, x, y, pair_map, fee_table)
pair_symbols = frozenset([
legs[0].pair.symbol,
legs[1].pair.symbol,
legs[2].pair.symbol,
])
triangle = Triangle(
legs=legs,
currencies=currencies,
pair_symbols=pair_symbols,
primary_quote=hold_curr,
)
triangles.append(triangle)
for sym in pair_symbols:
if sym not in by_pair:
by_pair[sym] = []
by_pair[sym].append(triangle)
logger.info("triangles_enumerated", total=len(triangles), indexed_pairs=len(by_pair))
return TriangleIndex(triangles=triangles, by_pair=by_pair)