diff --git a/common/__init__.py b/common/__init__.py deleted file mode 100644 index 64c70fe..0000000 --- a/common/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -"""Common utilities for triangular arbitrage bot.""" - -from common.config import Settings -from common.log import configure_logging - -__all__ = ["Settings", "configure_logging"] \ No newline at end of file diff --git a/common/config.py b/common/config.py deleted file mode 100644 index 5f9e7dc..0000000 --- a/common/config.py +++ /dev/null @@ -1,67 +0,0 @@ -"""Deprecated config schema for the old fh_ob Python process. Kept so that -common/log.py can be imported without errors. Not used at runtime.""" -import asyncio -from pathlib import Path -from typing import Optional - -import yaml -from pydantic import BaseModel, Field -from pydantic_settings import BaseSettings - - -class FHobSettings(BaseModel): - symbols: list[str] = Field( - default_factory=list, - description="Trading pairs to subscribe to. Empty = no subscriptions. oe_em adds pairs via REST.", - ) - log_level: str = Field(default="INFO", description="Logging level") - log_file: Path = Field( - default=Path("/tmp/fh_ob.log"), - description="Path to log file. Logs are written here in addition to stdout.", - ) - socket_path: Path = Field( - default=Path("/tmp/fh_ob.sock"), - description="Unix domain socket path for OE+EM", - ) - rest_host: str = Field(default="0.0.0.0", description="FastAPI debug host") - rest_port: int = Field(default=8000, description="FastAPI debug port") - ws_url: str = Field( - default="wss://ws-api-spot.kucoin.com", - description="KuCoin WebSocket endpoint", - ) - token_url: str = Field( - default="https://api.kucoin.com/api/v1/bullet-public", - description="KuCoin public token endpoint", - ) - reconnect_base_delay: float = Field( - default=1.0, - description="Base delay for reconnect exponential backoff (seconds)", - ) - reconnect_max_delay: float = Field( - default=60.0, - description="Max delay for reconnect exponential backoff (seconds)", - ) - heartbeat_interval: float = Field( - default=18.0, - description="WS ping interval (seconds) - KuCoin uses 18s", - ) - - -class Settings(BaseSettings): - fh_ob: FHobSettings = Field(default_factory=FHobSettings) - - @classmethod - async def from_yaml(cls, path: Path) -> "Settings": - loop = asyncio.get_running_loop() - - def _read() -> dict: - with open(path) as f: - return yaml.safe_load(f) or {} - - data = await loop.run_in_executor(None, _read) - fh_ob_data = data.get("fh_ob", {}) - if fh_ob_data.get("symbols") is None: - fh_ob_data["symbols"] = [] - return cls(**data) - - model_config = {"env_prefix": "TRIARB_", "extra": "ignore"} \ No newline at end of file diff --git a/common/log.py b/common/log.py deleted file mode 100644 index c7ed8cc..0000000 --- a/common/log.py +++ /dev/null @@ -1,133 +0,0 @@ -""" -Shared logging configuration for all components. - -Provides configure_logging() which sets up structlog with JSON output to stdout -and an optional plain-text file handler. All components (fh_ob, oe_em, executor) -call this at startup before any other logging. -""" -import asyncio -import logging -import sys -from pathlib import Path -from typing import Optional - -import structlog - - -class _AsyncFileHandler(logging.Handler): - """Non-blocking file handler that queues log records for async writing.""" - - def __init__(self, filepath: Path) -> None: - super().__init__() - self._filepath = filepath - self._queue: Optional[asyncio.Queue] = None - self._task: Optional[asyncio.Task] = None - - def _ensure_loop(self) -> None: - if self._queue is None: - self._queue = asyncio.Queue(maxsize=4096) - loop = asyncio.get_running_loop() - self._task = loop.create_task(self._writer_loop()) - - async def _writer_loop(self) -> None: - loop = asyncio.get_running_loop() - log_file = self._filepath - - def _write(msg: str) -> None: - with open(log_file, "a") as f: - f.write(msg + "\n") - - while True: - record = await self._queue.get() - try: - msg = self.format(record) - await loop.run_in_executor(None, _write, msg) - except Exception: - pass - self._queue.task_done() - - def emit(self, record: logging.LogRecord) -> None: - self._ensure_loop() - try: - self._queue.put_nowait(record) - except asyncio.QueueFull: - pass - - def close(self) -> None: - """Override stdlib Handler.close() — no-op, use async _flush() instead.""" - pass - - def flush(self) -> None: - """Override stdlib Handler.flush() — no-op, queue is non-blocking.""" - pass - - async def _flush(self) -> None: - """Wait for all queued records to be written.""" - if self._queue: - await self._queue.join() - - -_async_file_handler: Optional[_AsyncFileHandler] = None - - -def configure_logging(level: str = "INFO", log_file: Path | None = None) -> None: - """ - Configure structlog with JSON output to stdout and optional file handler. - - Uses stdlib logging as the backend so that standard-library integrations - (e.g. uvicorn, aiohttp) produce structured JSON too. - - Parameters - ---------- - level : str - Log level string (DEBUG, INFO, WARNING, ERROR). - log_file : Path or None - If set, a FileHandler is added to the root logger writing the - same JSON lines to disk. - """ - global _async_file_handler - - logging.basicConfig( - level=getattr(logging, level.upper()), - format="%(message)s", - handlers=[], - ) - - root_logger = logging.getLogger() - root_logger.setLevel(getattr(logging, level.upper())) - root_logger.handlers.clear() - - console_handler = logging.StreamHandler(sys.stdout) - console_handler.setFormatter(logging.Formatter("%(message)s")) - console_handler.setLevel(getattr(logging, level.upper())) - root_logger.addHandler(console_handler) - - structlog.configure( - wrapper_class=structlog.make_filtering_bound_logger( - getattr(logging, level.upper()) - ), - context_class=dict, - logger_factory=structlog.stdlib.LoggerFactory(), - cache_logger_on_first_use=True, - processors=[ - structlog.stdlib.add_log_level, - structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S.%f"), - structlog.processors.JSONRenderer(), - ], - ) - - if log_file: - _async_file_handler = _AsyncFileHandler(log_file) - _async_file_handler.setFormatter(logging.Formatter("%(message)s")) - _async_file_handler.setLevel(getattr(logging, level.upper())) - root_logger.addHandler(_async_file_handler) - - root_logger.propagate = False - - -async def close_logging() -> None: - """Flush and close the async file handler.""" - global _async_file_handler - if _async_file_handler: - await _async_file_handler._flush() - _async_file_handler = None diff --git a/config.yaml.example b/config.yaml.example index 491d719..fc198da 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -5,7 +5,6 @@ fused_engine: signal_threshold_bps: 2 excluded_currencies: [EUR, BRL] hold_currencies: [USDT, USDC, USD1] - send_signals: true ws_url: wss://ws-api-spot.kucoin.com token_url: https://api.kucoin.com/api/v1/bullet-public reconnect_base_delay: 1.0 @@ -18,16 +17,6 @@ fused_engine: USDC: 5 USD1: 5 -executor: - fill_timeout_ms: 1000 - log_level: INFO - log_file: /tmp/executor.log - socket_path: /tmp/executor.sock - concurrent_slots: 1 - enforce_same_base_isolation: true - enforce_pair_isolation: true - rest_port: 8002 - kucoin_api_key: "" kucoin_api_secret: "" kucoin_api_passphrase: "" diff --git a/executor/__init__.py b/executor/__init__.py deleted file mode 100644 index 11aedaa..0000000 --- a/executor/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -""" -Executor package. - -Re-exports the Executor class used by the fused_engine process. -""" - -from executor.executor import Executor - -__all__ = ["Executor"] diff --git a/executor/__main__.py b/executor/__main__.py deleted file mode 100644 index 922a36c..0000000 --- a/executor/__main__.py +++ /dev/null @@ -1,132 +0,0 @@ -""" -Executor process entry point. - -Starts the Unix-socket signal server, REST API control interface, -and orchestrates clean shutdown on SIGTERM/SIGINT. -""" -import asyncio -import signal -from pathlib import Path -from typing import Optional - -import structlog -import uvicorn -from common.log import configure_logging - -from executor.config import Settings -from executor.executor import Executor -from executor.kucoin_api import KuCoinAPI -from executor.rest_api import create_app -from executor.socket_server import SignalSocketServer -from executor.ws_client import KuCoinWSClient - - -async def main() -> None: - config_path = Path("config.yaml") - settings = await Settings.from_yaml(config_path) if config_path.exists() else Settings() - configure_logging(settings.executor.log_level) # file I/O handled by Executor's _DualLogger - - log = structlog.get_logger().bind(component="executor") - - log.info("executor_starting", live_mode=settings.live_mode) - - # Always initialise KuCoinAPI even in paper mode — symbol metadata is - # needed for size/precision validation regardless of execution mode. - api = KuCoinAPI( - api_key=settings.kucoin_api_key, - api_secret=settings.kucoin_api_secret, - api_passphrase=settings.kucoin_api_passphrase, - ) - await api.fetch_symbols() - - ws_client: Optional[KuCoinWSClient] = None - if settings.live_mode: - # Live mode requires the private WebSocket client to receive fill events. - ws_client = KuCoinWSClient( - kucoin_api=api, - private_token_url=settings.executor.private_token_url, - ) - - executor = Executor( - kucoin_api=api, - settings=settings.executor, - ws_client=ws_client, - log_file=settings.executor.log_file, - live_mode=settings.live_mode, - ) - await executor.start() - - should_exit = asyncio.Event() - - def shutdown_callback() -> None: - should_exit.set() - - rest_app = create_app(executor, shutdown_callback=shutdown_callback) - rest_config = uvicorn.Config( - rest_app, - host="127.0.0.1", - port=settings.executor.rest_port, - log_level="warning", - ) - rest_server = uvicorn.Server(rest_config) - - socket_server = SignalSocketServer( - socket_path=settings.executor.socket_path, - on_signal=executor.handle_signal, - ) - - socket_task: asyncio.Task | None = None - rest_task: asyncio.Task | None = None - exit_task: asyncio.Task | None = None - ws_task: asyncio.Task | None = None - - async def shutdown(sig: signal.Signals) -> None: - """Clean up on shutdown signal: pause executor, cancel tasks, close server.""" - log.info("shutdown_signal_received", signal=sig.name) - await executor.pause() - await executor.close() - if socket_task is not None and not socket_task.done(): - socket_task.cancel() - if rest_task is not None: - rest_server.should_exit = True - if ws_client is not None: - await ws_client.stop() - should_exit.set() - - loop = asyncio.get_running_loop() - # Register signal handlers so shutdown runs in the asyncio event loop - # rather than in a plain threading context. - for sig in (signal.SIGTERM, signal.SIGINT): - loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(shutdown(s))) - - socket_task = asyncio.create_task(socket_server.start()) - rest_task = asyncio.create_task(rest_server.serve()) - exit_task = asyncio.create_task(should_exit.wait()) - - if ws_client is not None: - ws_task = asyncio.create_task(ws_client.start()) - - log.info( - "executor_ready", - rest_endpoint=f"http://127.0.0.1:{settings.executor.rest_port}", - socket_path=str(settings.executor.socket_path), - live_mode=settings.live_mode, - ) - - tasks = {t for t in (socket_task, rest_task, exit_task, ws_task) if t is not None} - try: - done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - except asyncio.CancelledError: - log.info("executor_cancelled") - finally: - rest_server.should_exit = True - for t in tasks: - if not t.done(): - await asyncio.wait({t}, timeout=3.0) - if not t.done(): - t.cancel() - log.info("executor_shutdown_complete") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/executor/config.py b/executor/config.py deleted file mode 100644 index 5eadb63..0000000 --- a/executor/config.py +++ /dev/null @@ -1,93 +0,0 @@ -""" -Executor configuration loaded from config.yaml. - -Defines settings for the triangular arbitrage executor including -Unix socket path, concurrency limits, KuCoin credentials, and logging. - -live_mode at the top level controls whether the executor places real -orders (live) or validates via order_test and simulates fills (paper). -""" -import asyncio -import logging -from decimal import Decimal -from pathlib import Path -from typing import Optional - -import yaml -from pydantic import BaseModel, Field -from pydantic_settings import BaseSettings - -log = logging.getLogger("executor-config") - - -class ExecutorSettings(BaseModel): - """Settings that control executor runtime behaviour.""" - - socket_path: Path = Field( - default=Path("/tmp/executor.sock"), - description="Unix domain socket path for fused_engine -> executor", - ) - concurrent_slots: int = Field(default=1, ge=1, description="Max concurrent triangle executions") - enforce_same_base_isolation: bool = Field( - default=True, - description="Block concurrent triangles sharing the same base currency", - ) - enforce_pair_isolation: bool = Field( - default=True, - description="Block concurrent triangles that share any trading pair symbol", - ) - log_file: Path = Field( - default=Path("/tmp/executor.log"), - description="Path to log file", - ) - log_level: str = Field(default="INFO", description="Logging level") - rest_port: int = Field( - default=8002, - description="HTTP REST API port (8000=fh_ob, 8001=oe_em)", - ) - initial_capital: dict[str, Decimal] = Field( - default_factory=lambda: {"USDT": Decimal("10")}, - description="Starting capital per currency. Caps the max_volume for each triangle's primary_quote.", - ) - private_token_url: str = Field( - default="https://api.kucoin.com/api/v1/bullet-private", - description="KuCoin private bullet token endpoint", - ) - fill_timeout_ms: float = Field( - default=1000, - description="Per-leg fill wait timeout in milliseconds", - ) - await_balance: bool = Field( - default=False, - description="Wait for balance WS update before next leg", - ) - - -class Settings(BaseSettings): - """Top-level settings parsed from config.yaml.""" - - # live_mode is the master switch: False = paper (order_test + simulated fills), - # True = real orders on KuCoin. Set at top level of config.yaml. - live_mode: bool = Field(default=False, description="false = paper; true = live orders") - executor: ExecutorSettings = Field(default_factory=ExecutorSettings) - kucoin_api_key: str = Field(default="", description="KuCoin API key") - kucoin_api_secret: str = Field(default="", description="KuCoin API secret") - kucoin_api_passphrase: str = Field(default="", description="KuCoin API passphrase") - - @classmethod - async def from_yaml(cls, path: Path) -> "Settings": - """Load settings from a YAML file, ignoring unknown keys.""" - loop = asyncio.get_running_loop() - - def _read() -> dict: - with open(path) as f: - return yaml.safe_load(f) or {} - - data = await loop.run_in_executor(None, _read) - known = {"live_mode", "fused_engine", "executor", "kucoin_api_key", "kucoin_api_secret", "kucoin_api_passphrase", "kcs_discount_active"} - unknown = set(data.keys()) - known - if unknown: - log.warning("ignoring unknown config keys: %s", sorted(unknown)) - return cls(**data) - - model_config = {"env_prefix": "TRIArb_", "extra": "ignore"} diff --git a/executor/executor.py b/executor/executor.py deleted file mode 100644 index eda3fc3..0000000 --- a/executor/executor.py +++ /dev/null @@ -1,1018 +0,0 @@ -""" -Triangular arbitrage executor. - -Handles incoming opportunity signals from oe_em, enforces concurrency and -isolation constraints, computes order sizes, and posts orders to KuCoin -(via order_test in paper mode or real orders in live mode). - -Signal contract ---------------- -Each signal dict carries: - legs list[dict] — 3 legs, each with pair, side, order_param, - base_min_size, fee_rate, fee_currency - books list[dict] — order book snapshots per leg (top-of-book) - starting_volume str — quote-currency amount to deploy into leg 0 - predicted_bps float — expected profit in basis points - primary_quote str — e.g. "USDT", used for capital capping & isolation - live bool — true = real orders, false = paper simulation - book_ts_ms int — when the order book was sampled (stale detection) - t_sock_arrive_ms int — when bytes arrived at SSL_read (true network arrival) - t_arrive_ms int — when parsing + book update completed - ts_ms int — when the signal was created - t_eval_ms int — when the signal was evaluated - _cancelled bool — set by cancel_execution to abort in-flight -""" -import asyncio -import contextlib -import json -import logging -import time -import uuid -from collections import deque -from dataclasses import dataclass, field -from datetime import datetime -from decimal import Decimal, ROUND_DOWN -from pathlib import Path -from typing import Optional - -import aiohttp -from aiohttp.connector import TCPConnector -import structlog - - -_D0 = Decimal("0") -_D1 = Decimal("1") -_D0_1 = Decimal("0.1") - - -class _DualLogger: - """ - Thin wrapper that mirrors structlog output to a plain-text file via an - async queue, keeping file I/O off the asyncio event loop. - - The plain-text log supplements structlog's JSON output (written to - stdout) with a human-readable format that is useful for manual inspection. - """ - - def __init__(self, structlog_logger, log_file: Optional[Path]) -> None: - self._sl = structlog_logger - self._log_file = log_file - self._queue: Optional[asyncio.Queue] = None - self._task: Optional[asyncio.Task] = None - if log_file: - self._queue = asyncio.Queue() - self._task = asyncio.create_task(self._writer_loop()) - - async def _writer_loop(self) -> None: - """Background task that drains the queue and writes lines to disk - via a thread-pool executor so the event loop never blocks on I/O.""" - loop = asyncio.get_running_loop() - log_file = self._log_file - - def _write(line: str) -> None: - with open(log_file, "a") as f: - f.write(line) - - while True: - item = None - try: - item = await self._queue.get() - except asyncio.CancelledError: - while not self._queue.empty(): - try: - self._queue.get_nowait() - self._queue.task_done() - except asyncio.QueueEmpty: - break - raise - except Exception: - continue - - self._queue.task_done() - - if item is None: - break - - try: - await loop.run_in_executor(None, _write, item) - except Exception: - pass - - @staticmethod - def _fmt_value(v: object) -> str: - if isinstance(v, (dict, list)): - return json.dumps(v, default=str, separators=(",", ":")) - return str(v) - - def _enqueue(self, level: str, event: str, **kw) -> None: - """Publish a log entry to structlog and, if a log file is set, to the queue.""" - sl_level = getattr(logging, level.upper(), logging.INFO) - if not self._sl.is_enabled_for(sl_level): - return - getattr(self._sl, level)(event, **kw) - if self._queue is None: - return - ts = f"{time.time():.0f}" - level_up = level.upper() - extras = " ".join( - f"{k}={self._fmt_value(v)}" - for k, v in kw.items() - if not k.startswith("_") - ) - line = f"[{ts}] [{level_up:5}] {event} {extras}\n" - try: - self._queue.put_nowait(line) - except asyncio.QueueFull: - pass - - def write_plain(self, text: str) -> None: - """Write a line directly to the plain-text log queue (no structlog side-effect).""" - if self._queue is not None: - try: - self._queue.put_nowait(text + "\n") - except asyncio.QueueFull: - pass - - def debug(self, event: str, **kw) -> None: - self._enqueue("debug", event, **kw) - - def info(self, event: str, **kw) -> None: - self._enqueue("info", event, **kw) - - def warning(self, event: str, **kw) -> None: - self._enqueue("warning", event, **kw) - - def error(self, event: str, **kw) -> None: - self._enqueue("error", event, **kw) - - async def close(self) -> None: - """Signal the writer to stop and wait for it to flush.""" - if self._queue is not None: - await self._queue.put(None) - await self._queue.join() - if self._task is not None: - self._task.cancel() - try: - await self._task - except asyncio.CancelledError: - pass - - -@dataclass -class LegFill: - """Record of a single leg's filled order.""" - leg: int - order_id: str - side: str - pair: str - input_currency: str - output_currency: str - fee_currency: str - filled_volume: Decimal - deal_funds: Decimal - avg_price: Decimal - fee: Decimal - latency_ms: float = 0.0 - - -@dataclass -class ExecutionReport: - """Summary of a triangle execution attempt, emitted after each fill or failure.""" - correlation_id: str - triangle_key: list[str] - status: str - fills: list[LegFill] - predicted_bps: float - ts_ms: int - timings: list[dict] = field(default_factory=list) - profit: float = 0.0 - effective_bps: float = 0.0 - error: str = "" - book_ts_ms: int = 0 - book_tops: list[dict] = field(default_factory=list) - - -@dataclass -class InFlightExecution: - """Track a triangle execution that is currently in flight.""" - triangle_key: frozenset - pair_symbols: frozenset - primary_quote: str - correlation_id: str - signal: dict - started_ts_ms: int - last_trade_ts_ms: int = 0 - - -class Executor: - """ - Triangular arbitrage order executor. - - Receives signals from oe_em via a Unix-domain socket, checks volume and - concurrency constraints, then executes up to three legs in sequence. - """ - - def __init__( - self, - kucoin_api, - settings, - ws_client = None, - log_file: Optional[Path] = None, - live_mode: bool = False, - ) -> None: - self._api = kucoin_api - self._settings = settings - self._ws_client = ws_client - self._log_file = log_file - self._live_mode = live_mode - self._log = _DualLogger(structlog.get_logger().bind(component="executor"), log_file) - self._in_flight: dict[str, InFlightExecution] = {} - self._last_trade_ts_ms: dict[frozenset, int] = {} - self._reports: deque[ExecutionReport] = deque(maxlen=1000) - self._paused = False - self._pause_lock = asyncio.Lock() - self._isolation_lock = asyncio.Lock() - self._uptime_ns: int = time.monotonic_ns() - self._report_written = False - self._client_oid_to_order_id: dict[str, str] = {} - self._session: aiohttp.ClientSession | None = None - self._session_timeout = aiohttp.ClientTimeout(sock_connect=5, sock_read=10) - self._keepalive_task: asyncio.Task | None = None - - async def start(self) -> None: - """Pre-initialize and warm up the aiohttp session so the first trade is not slowed.""" - self._session = self._create_session() - await self._api.warmup_session(self._session) - - self._keepalive_task = asyncio.create_task(self._keepalive_loop()) - - _KEEPALIVE_INTERVAL = 30.0 - - def _create_session(self) -> aiohttp.ClientSession: - connector = TCPConnector( - limit=10, - limit_per_host=5, - force_close=False, - enable_cleanup_closed=True, - keepalive_timeout=60, - ttl_dns_cache=300, - ) - return aiohttp.ClientSession( - timeout=self._session_timeout, - connector=connector, - ) - - async def _keepalive_loop(self) -> None: - """Ping KuCoin periodically to keep the authenticated connection pool warm.""" - while True: - try: - await asyncio.sleep(self._KEEPALIVE_INTERVAL) - if self._live_mode: - await self._api.warmup_session(self._session) - else: - await self._api.order_test( - session=self._session, - symbol="BTC-USDT", - side="buy", - order_type="market", - funds=Decimal("1"), - ) - except asyncio.CancelledError: - break - except Exception: - pass - - async def handle_signal(self, signal: dict) -> None: - """ - Public entry point for processing an opportunity signal. - - Catches all exceptions so they do not bubble into the socket server's - task chain; emits a failed ExecutionReport on error. - """ - try: - await self._handle_signal_impl(signal) - except asyncio.CancelledError: - raise - except BaseException as e: - correlation_id = signal.get("correlation_id", "") - error_msg = str(e) - self._log.error("signal_handler_error", error=error_msg, correlation_id=correlation_id) - self._emit_report(ExecutionReport( - correlation_id=correlation_id, - triangle_key=signal.get("triangle_key", []), - status="failed", - fills=[], - predicted_bps=0.0, - ts_ms=int(time.time() * 1000), - error=error_msg, - )) - - async def _handle_signal_impl(self, signal: dict) -> None: - """ - Internal signal processing pipeline. - - Steps (in order): - 1. Block if executor is paused. - 2. Validate triangle_key and legs. - 3. Reject stale signals (book_ts_ms older than last seen). - 4. Reject signals where volume is below any leg's minimum. - 5. Reject if no concurrency slot available. - 6. Add to _in_flight and run _execute_triangle. - """ - async with self._pause_lock: - if self._paused: - self._log.debug("signal_dropped_paused", correlation_id=signal.get("correlation_id", "")) - return - - correlation_id = signal.get("correlation_id", "") - triangle_key_list = signal.get("triangle_key", []) - triangle_key = frozenset(triangle_key_list) - primary_quote = signal.get("primary_quote", "") - legs = signal.get("legs", []) - book_ts_ms = signal.get("book_ts_ms", 0) - - pair_symbols = frozenset(leg.get("pair", "") for leg in legs if leg.get("pair")) - - if self._is_blocked(triangle_key, pair_symbols, primary_quote): - self._log.debug("signal_blocked", correlation_id=correlation_id, triangle_key=triangle_key_list) - return - - if len(legs) != 3: - self._log.warning("invalid_signal", correlation_id=correlation_id, msg="expected 3 legs") - return - - stale_ts = self._last_trade_ts_ms.get(triangle_key, 0) - if book_ts_ms > 0 and stale_ts > 0 and book_ts_ms < stale_ts: - self._log.debug("signal_discarded_stale", correlation_id=correlation_id, book_ts_ms=book_ts_ms, stale_ts=stale_ts) - return - - async with self._isolation_lock: - if self._session is None or self._session.closed: - self._session = self._create_session() - if len(self._in_flight) >= self._settings.concurrent_slots: - self._log.debug("signal_dropped_no_slot", correlation_id=correlation_id) - return - - if self._is_blocked(triangle_key, pair_symbols, primary_quote): - self._log.debug("signal_blocked", correlation_id=correlation_id, triangle_key=triangle_key_list) - return - - in_flight = InFlightExecution( - triangle_key=triangle_key, - pair_symbols=pair_symbols, - primary_quote=primary_quote, - correlation_id=correlation_id, - signal=signal, - started_ts_ms=int(time.time() * 1000), - ) - self._in_flight[correlation_id] = in_flight - - self._log.debug("execute_triangle_starting", correlation_id=correlation_id) - try: - await self._execute_triangle(correlation_id, signal, in_flight) - finally: - async with self._isolation_lock: - self._in_flight.pop(correlation_id, None) - - def _is_blocked(self, triangle_key: frozenset, pair_symbols: frozenset, primary_quote: str) -> bool: - """ - Return True if the given triangle_key, pair_symbols, or primary_quote - already has an in-flight execution (isolation enforcement). - """ - for inf in list(self._in_flight.values()): - if inf.triangle_key == triangle_key: - return True - if self._settings.enforce_same_base_isolation and inf.primary_quote == primary_quote: - return True - if self._settings.enforce_pair_isolation and pair_symbols & inf.pair_symbols: - return True - return False - - async def _execute_triangle( - self, correlation_id: str, signal: dict, in_flight: InFlightExecution - ) -> None: - """ - Execute the three legs of a triangular arbitrage signal. - - Live mode - --------- - Places real market orders on KuCoin via order_place and waits for - fill events from the WS client. Uses the incoming volume for leg 0 - and propagates the output of each leg as the input for the next. - - Paper mode - ---------- - Validates orders against KuCoin's order_test endpoint (ensures the - pair/size/funds are valid) then simulates fills from the order-book - snapshot attached to the signal. Fill quantities are computed by - walking the book at the top level — no real capital is committed. - - In both modes the signal's ``_cancelled`` flag is checked between - legs so cancel_execution takes effect promptly. All outcomes - (filled, failed, aborted) produce an ExecutionReport. - """ - exec_start_ts = time.perf_counter() - timings: list[dict] = [] - predicted_bps = signal.get("predicted_bps", 0.0) - executor_receive_ts_ms = signal.get("_receiver_ts_ms") or int(time.time() * 1000) - signal_ts_ms = signal.get("ts_ms", 0) - book_ts_ms = signal.get("book_ts_ms", 0) - t_sock_arrive_ms = signal.get("t_sock_arrive_ms", 0) - t_arrive_ms = signal.get("t_arrive_ms", 0) - t_eval_ms = signal.get("t_eval_ms", 0) - if book_ts_ms > 0: - timings.append({"step": "t-2_book_snapshot", "elapsed_ms": -(executor_receive_ts_ms - book_ts_ms)}) - if t_sock_arrive_ms > 0: - timings.append({"step": "socket_arrived", "elapsed_ms": -(executor_receive_ts_ms - t_sock_arrive_ms)}) - if t_arrive_ms > 0: - timings.append({"step": "book_update_arrived", "elapsed_ms": -(executor_receive_ts_ms - t_arrive_ms)}) - if t_eval_ms > 0: - timings.append({"step": "t-1_eval_complete", "elapsed_ms": -(executor_receive_ts_ms - t_eval_ms)}) - if signal_ts_ms > 0: - timings.append({"step": "t_signal_created", "elapsed_ms": -(executor_receive_ts_ms - signal_ts_ms)}) - timings.append({"step": "signal_received", "elapsed_ms": 0.0}) - self._log.debug("execute_triangle_entered", correlation_id=correlation_id) - legs = signal.get("legs", []) - books = signal.get("books", []) - book_tops = [] - for bk in books[:3]: - asks = bk.get("asks", []) - bids = bk.get("bids", []) - book_tops.append({ - "ask_px": str(asks[0]["price"]) if asks else "", - "ask_sz": str(asks[0]["size"]) if asks else "", - "bid_px": str(bids[0]["price"]) if bids else "", - "bid_sz": str(bids[0]["size"]) if bids else "", - }) - live = signal.get("live", False) - log_prefix = "" if live else "[PAPER] " - - fills: list[LegFill] = [] - - if self._session is None or self._session.closed: - self._session = self._create_session() - session = self._session - for i, leg in enumerate(legs): - if signal.get("_cancelled"): - self._log.info("execution_cancelled", correlation_id=correlation_id, completed_legs=len(fills)) - report = ExecutionReport( - correlation_id=correlation_id, - triangle_key=signal.get("triangle_key", []), - status="aborted", - fills=fills, - predicted_bps=0.0, - ts_ms=int(time.time() * 1000), - error="cancelled", - ) - self._emit_report(report) - return - - pair = leg.get("pair", "") - side = leg.get("side", "") - order_param = Decimal(str(leg.get("order_param", "0"))) - base_min_size = Decimal(str(leg.get("base_min_size", "0"))) - base_increment = Decimal(str(leg.get("base_increment", "0"))) - quote_increment = Decimal(str(leg.get("quote_increment", "0"))) - fee_rate = Decimal(str(leg.get("fee_rate", "0.001"))) - fee_ccy = leg.get("fee_currency", "") - - base_ccy = pair.split("-")[0] - quote_ccy = pair.split("-")[-1] - - if side == "buy": - input_ccy = quote_ccy - output_ccy = base_ccy - else: - input_ccy = base_ccy - output_ccy = quote_ccy - latency_ms = 0.0 - - if live: - # --- LIVE MODE --- - client_oid = f"t{correlation_id[-24:]}-{i}-{uuid.uuid4().hex[:4]}" - t0 = time.perf_counter() - - if self._ws_client is None or not self._ws_client.is_connected: - self._log.error( - "live_mode_ws_not_connected", - correlation_id=correlation_id, - leg=i, - pair=pair, - ) - report = ExecutionReport( - correlation_id=correlation_id, - triangle_key=signal.get("triangle_key", []), - status="failed", - fills=fills, - predicted_bps=0.0, - ts_ms=int(time.time() * 1000), - error="ws_not_connected", - ) - self._emit_report(report) - return - - order_fired_elapsed = (time.perf_counter() - exec_start_ts) * 1000 - timings.append({"step": f"leg{i}_order_fired", "elapsed_ms": round(order_fired_elapsed, 2)}) - - if i == 0: - input_vol = order_param - else: - prev = fills[i - 1] - input_vol = prev.filled_volume - inc = quote_increment if side == "buy" else base_increment - if inc > 0: - input_vol = (input_vol / inc).to_integral_value(rounding=ROUND_DOWN) * inc - - if side == "buy": - ok, err_msg, order_id = await self._api.order_place( - session=session, - symbol=pair, - side=side, - order_type="market", - funds=input_vol, - client_oid=client_oid, - ) - else: - ok, err_msg, order_id = await self._api.order_place( - session=session, - symbol=pair, - side=side, - order_type="market", - size=input_vol, - client_oid=client_oid, - ) - place_latency_ms = (time.perf_counter() - t0) * 1000 - self._log.info( - f"{log_prefix}order_placed", - correlation_id=correlation_id, - leg=i, - pair=pair, - side=side, - order_id=order_id, - client_oid=client_oid, - volume=str(input_vol), - place_latency_ms=round(place_latency_ms, 2), - ) - self._log.write_plain( - f"{time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime())}{int(time.time() * 1000) % 1000:03d}Z " - f"ORDER | corr={correlation_id} | leg{i} | {pair} | {side} | " - f"vol={str(input_vol)} | order_id={order_id} | lat={place_latency_ms:.1f}ms" - ) - - if not ok: - fills.append(LegFill( - leg=i, order_id=order_id or "", - side=side, pair=pair, - input_currency=input_ccy, output_currency=output_ccy, - fee_currency=fee_ccy, - filled_volume=input_vol, deal_funds=_D0, - avg_price=_D0, fee=_D0, - latency_ms=round(place_latency_ms, 2), - )) - self._log.error( - f"{log_prefix}order_rejected", - correlation_id=correlation_id, - leg=i, - pair=pair, - side=side, - volume=str(input_vol), - error=err_msg, - ) - self._log.write_plain( - f"{time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime())}{int(time.time() * 1000) % 1000:03d}Z " - f"REJECTED | corr={correlation_id} | leg{i} | {pair} | {side} | " - f"vol={str(input_vol)} | error={err_msg} | lat={place_latency_ms:.1f}ms" - ) - report = ExecutionReport( - correlation_id=correlation_id, - triangle_key=signal.get("triangle_key", []), - status="failed", - fills=fills, - predicted_bps=predicted_bps, - ts_ms=int(time.time() * 1000), - timings=timings, - error=err_msg or "order_rejected", - book_tops=book_tops, - ) - self._emit_report(report) - return - - self._client_oid_to_order_id[client_oid] = order_id or "" - - fill_timeout_ms = self._settings.fill_timeout_ms - fill_ok, fill_data = await self._ws_client.await_fill( - client_oid, fill_timeout_ms - ) - fill_latency_ms = (time.perf_counter() - t0) * 1000 - latency_ms = fill_latency_ms - - if signal.get("_cancelled"): - self._log.info( - "execution_cancelled", - correlation_id=correlation_id, - completed_legs=len(fills), - ) - report = ExecutionReport( - correlation_id=correlation_id, - triangle_key=signal.get("triangle_key", []), - status="aborted", - fills=fills, - predicted_bps=predicted_bps, - ts_ms=int(time.time() * 1000), - timings=timings, - error="cancelled", - ) - self._emit_report(report) - return - - if not fill_ok: - elapsed_ms = (time.perf_counter() - exec_start_ts) * 1000 - self._log.error( - f"{log_prefix}fill_timeout", - correlation_id=correlation_id, - leg=i, - pair=pair, - client_oid=client_oid, - order_id=order_id, - fill_latency_ms=round(fill_latency_ms, 2), - total_elapsed_ms=round(elapsed_ms, 2), - ) - report = ExecutionReport( - correlation_id=correlation_id, - triangle_key=signal.get("triangle_key", []), - status="failed", - fills=fills, - predicted_bps=predicted_bps, - ts_ms=int(time.time() * 1000), - timings=timings, - error=f"fill_timeout_leg{i}", - book_tops=book_tops, - ) - self._emit_report(report) - return - - fill_received_elapsed = (time.perf_counter() - exec_start_ts) * 1000 - timings.append({"step": f"leg{i}_fill_received", "elapsed_ms": round(fill_received_elapsed, 2)}) - order_id = fill_data.get("order_id", order_id or "") - total_size = fill_data.get("total_size", Decimal("0")) - total_funds = fill_data.get("total_funds", Decimal("0")) - weighted_avg_price = fill_data.get("weighted_avg_price", _D0) - - # Fall back to book top if WS didn't provide a price (defensive). - book_snapshot = books[i] if i < len(books) else {} - if weighted_avg_price <= 0: - if side == "buy": - asks = book_snapshot.get("asks", []) - weighted_avg_price = Decimal(str(asks[0]["price"])) if asks else _D0 - else: - bids = book_snapshot.get("bids", []) - weighted_avg_price = Decimal(str(bids[0]["price"])) if bids else _D0 - - avg_price = weighted_avg_price if weighted_avg_price > 0 else _D0 - fee = _D0 - - filled_vol_val = total_size if side == "buy" else total_funds - fills.append(LegFill( - leg=i, - order_id=order_id, - side=side, - pair=pair, - input_currency=input_ccy, - output_currency=output_ccy, - fee_currency=fee_ccy, - filled_volume=filled_vol_val, - deal_funds=total_funds, - avg_price=avg_price, - fee=fee, - latency_ms=latency_ms, - )) - self._log.write_plain( - f"{time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime())}{int(time.time() * 1000) % 1000:03d}Z " - f"FILL | corr={correlation_id} | leg{i} | {pair} | {side} | " - f"out={str(filled_vol_val)}@{str(avg_price)} | " - f"fee={str(fee)} {fee_ccy} | lat={latency_ms:.1f}ms" - ) - if i < 2: - if self._settings.await_balance: - await self._ws_client.await_balance( - output_ccy, fills[-1].filled_volume, 2000 - ) - if side == "sell": - bal = self._ws_client.latest_balance(output_ccy) - if bal > 0: - fills[-1].filled_volume = bal - nxt = legs[i + 1] - nxt_fee = Decimal(str(nxt.get("fee_rate", "0.001"))) - nxt_side = nxt.get("side", "") - nxt_inc = Decimal(str(nxt.get( - "quote_increment" if nxt_side == "buy" else "base_increment", "0"))) - if nxt_side == "buy": - fills[-1].filled_volume /= (_D1 + nxt_fee) - if nxt_inc > 0: - fills[-1].filled_volume = (fills[-1].filled_volume / nxt_inc - ).to_integral_value(rounding=ROUND_DOWN) * nxt_inc - in_flight.last_trade_ts_ms = int(time.time() * 1000) - continue - - # --- PAPER MODE --- - # The fused_engine already sized everything (per-leg order_param, - # fee rates, precision increments). We only validate via - # order_test and simulate fills from the attached books. - self._log.debug("order_test_calling", correlation_id=correlation_id, leg=i, pair=pair, side=side, order_param=str(order_param)) - t0 = time.perf_counter() - order_fired_elapsed = (t0 - exec_start_ts) * 1000 - timings.append({"step": f"leg{i}_order_fired", "elapsed_ms": round(order_fired_elapsed, 2)}) - - if side == "buy": - ok, err_msg, _ = await self._api.order_test( - session=session, - symbol=pair, - side=side, - order_type="market", - funds=order_param, - ) - else: - ok, err_msg, _ = await self._api.order_test( - session=session, - symbol=pair, - side=side, - order_type="market", - size=order_param, - ) - - latency_ms = (time.perf_counter() - t0) * 1000 - fill_elapsed = (time.perf_counter() - exec_start_ts) * 1000 - timings.append({"step": f"leg{i}_fill_received", "elapsed_ms": round(fill_elapsed, 2)}) - self._log._sl.info(f"{log_prefix}order_latency", correlation_id=correlation_id, leg=i, pair=pair, latency_ms=round(latency_ms, 2)) - - if not ok: - self._log.error(f"{log_prefix}order_rejected", correlation_id=correlation_id, leg=i, pair=pair, error=err_msg) - report = ExecutionReport( - correlation_id=correlation_id, - triangle_key=signal.get("triangle_key", []), - status="failed", - fills=fills, - predicted_bps=0.0, - ts_ms=int(time.time() * 1000), - error=err_msg or "order_rejected", - ) - self._emit_report(report) - return - - # Paper order succeeded — assign a synthetic order ID so the fill - # pipeline has something to track. - order_id = f"paper-{uuid.uuid4().hex[:12]}" - if signal.get("_cancelled"): - self._log.info("execution_cancelled", correlation_id=correlation_id, completed_legs=len(fills)) - report = ExecutionReport( - correlation_id=correlation_id, - triangle_key=signal.get("triangle_key", []), - status="aborted", - fills=fills, - predicted_bps=0.0, - ts_ms=int(time.time() * 1000), - error="cancelled", - ) - self._emit_report(report) - return - - book_snapshot = books[i] if i < len(books) else {} - - # Simulate fill from top-of-book: use the best ask (buy) or best bid (sell). - if side == "buy": - asks = book_snapshot.get("asks", []) - price = Decimal(str(asks[0]["price"])) if asks else _D0 - funds = order_param - if quote_increment > 0: - funds = (funds / quote_increment).to_integral_value(rounding=ROUND_DOWN) * quote_increment - base_vol = funds / price if price > 0 else _D0 - if base_increment > 0: - base_vol = (base_vol / base_increment).to_integral_value(rounding=ROUND_DOWN) * base_increment - deal_funds = funds - fee = base_vol * fee_rate if fee_ccy == base_ccy else order_param * fee_rate - filled_volume = base_vol - - # Reject if simulated fill is below the exchange minimum. - if base_min_size > 0 and base_vol < base_min_size: - self._log.info("execution_aborted_below_min", - correlation_id=correlation_id, leg=i, pair=pair, - base_vol=str(base_vol), min_base=str(base_min_size), - ) - report = ExecutionReport( - correlation_id=correlation_id, - triangle_key=signal.get("triangle_key", []), - status="aborted", - fills=fills, - predicted_bps=0.0, - ts_ms=int(time.time() * 1000), - error="volume_below_minimum", - ) - self._emit_report(report) - return - else: - bids = book_snapshot.get("bids", []) - price = Decimal(str(bids[0]["price"])) if bids else _D0 - base_vol = order_param - if base_increment > 0: - base_vol = (base_vol / base_increment).to_integral_value(rounding=ROUND_DOWN) * base_increment - quote_vol = base_vol * price if price > 0 else _D0 - # Fee depends on whether KuCoin charges it in base or quote. - if fee_ccy == base_ccy: - fee = base_vol * fee_rate - deal_funds = quote_vol - else: - fee = quote_vol * fee_rate - deal_funds = quote_vol * (_D1 - fee_rate) - filled_volume = quote_vol - - if base_min_size > 0 and order_param < base_min_size: - self._log.info("execution_aborted_below_min", - correlation_id=correlation_id, leg=i, pair=pair, - base_vol=str(order_param), min_base=str(base_min_size), - ) - report = ExecutionReport( - correlation_id=correlation_id, - triangle_key=signal.get("triangle_key", []), - status="aborted", - fills=fills, - predicted_bps=0.0, - ts_ms=int(time.time() * 1000), - error="volume_below_minimum", - ) - self._emit_report(report) - return - - avg_price = price - - fills.append(LegFill( - leg=i, - order_id=order_id, - side=side, - pair=pair, - input_currency=input_ccy, - output_currency=output_ccy, - fee_currency=fee_ccy, - filled_volume=filled_volume, - deal_funds=deal_funds, - avg_price=avg_price, - fee=fee, - latency_ms=latency_ms, - )) - in_flight.last_trade_ts_ms = int(time.time() * 1000) - - # Compute actual profit in the primary quote currency. - # Leg 2 output: if buy, the received base (filled_volume); if sell, the received - # quote (deal_funds, already net when fee is in quote). - if fills[2].side == "buy": - leg2_out = fills[2].filled_volume - if fills[2].fee_currency == fills[2].output_currency: - leg2_out -= fills[2].fee - else: - leg2_out = fills[2].deal_funds - # Leg 0 input: if buy, quote spent (deal_funds); if sell, base sold. - if fills[0].side == "buy": - leg0_in = fills[0].deal_funds - else: - leg0_in = fills[0].filled_volume - profit = float(leg2_out - leg0_in) - effective_bps = (profit / float(leg0_in)) * 10000 if leg0_in else 0.0 - total_exec_ms = (time.perf_counter() - exec_start_ts) * 1000 - timings.append({"step": "execution_complete", "elapsed_ms": round(total_exec_ms, 2)}) - self._log._sl.info("execution_timing", correlation_id=correlation_id, timings=timings) - report = ExecutionReport( - correlation_id=correlation_id, - triangle_key=signal.get("triangle_key", []), - status="filled", - fills=fills, - predicted_bps=predicted_bps, - profit=profit, - effective_bps=effective_bps, - ts_ms=int(time.time() * 1000), - timings=timings, - book_ts_ms=book_ts_ms, - book_tops=book_tops, - ) - self._last_trade_ts_ms[in_flight.triangle_key] = report.ts_ms - self._emit_report(report) - - def _emit_report(self, report: ExecutionReport) -> None: - """Persist an ExecutionReport to the in-memory deque and write to the log file.""" - fills_lines = [] - for f in report.fills: - fills_lines.append( - f"L{f.leg}:{f.side} {f.pair} {f.input_currency}->{f.output_currency} " - f"{f.filled_volume}@{f.avg_price}(fee={f.fee} {f.fee_currency} lat={f.latency_ms:.1f}ms)" - ) - fills_repr = ", ".join(fills_lines) - self._reports.append(report) - error_suffix = f" | error={report.error}" if report.error else "" - profit_repr = f"profit={report.profit:.4f}" - timing_parts = " ".join(f"{t['step']}={t['elapsed_ms']:.1f}ms" for t in report.timings) if report.timings else "" - bt_lines = [] - for bt in report.book_tops: - bt_lines.append(f"{bt.get('ask_px','?')}/{bt.get('bid_px','?')}") - book_top_repr = " | ".join(bt_lines) if bt_lines else "" - ts_iso = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.") + f"{report.ts_ms % 1000:03d}Z" - books_suffix = f" | books=[{book_top_repr}]" if book_top_repr else "" - msg = ( - f"{ts_iso} {report.status.upper()} | corr={report.correlation_id} | " - f"triangle={report.triangle_key} | predicted_bps={report.predicted_bps:.2f} | " - f"effective_bps={report.effective_bps:.2f} | " - f"book_ts={report.book_ts_ms} | " - f"{profit_repr} | timings=[{timing_parts}] | fills=[{fills_repr}]{books_suffix}{error_suffix}" - ) - self._log.write_plain(msg) - - print( - f"{ts_iso} {report.status.upper()} | corr={report.correlation_id} | " - f"triangle={report.triangle_key} | predicted_bps={report.predicted_bps:.2f} | " - f"effective_bps={report.effective_bps:.2f} | " - f"book_ts={report.book_ts_ms} | " - f"{profit_repr} | fills=[{fills_repr}]{books_suffix}" - f"{f' | error={report.error}' if report.error else ''}", - flush=True, - ) - - async def cancel_execution(self, correlation_id: str) -> bool: - """ - Request cancellation of an in-flight execution by setting _cancelled - on the signal dict. Returns True if the execution was found. - """ - if correlation_id in self._in_flight: - self._log.info("cancel_requested", correlation_id=correlation_id) - self._in_flight[correlation_id].signal["_cancelled"] = True - return True - return False - - async def close(self) -> None: - """Close the aiohttp session and flush the log writer.""" - if self._keepalive_task is not None and not self._keepalive_task.done(): - self._keepalive_task.cancel() - try: - await self._keepalive_task - except asyncio.CancelledError: - pass - self._keepalive_task = None - if self._session is not None and not self._session.closed: - await self._session.close() - await self._log.close() - - async def pause(self) -> None: - """Block the executor from accepting new signals.""" - async with self._pause_lock: - self._paused = True - self._log.info("executor_paused") - - async def resume(self) -> None: - """Unblock the executor to start accepting signals again.""" - async with self._pause_lock: - self._paused = False - self._log.info("executor_resumed") - - def get_uptime_seconds(self) -> float: - """Return process uptime in seconds (monotonic clock, not wall time).""" - return (time.monotonic_ns() - self._uptime_ns) / 1e9 - - def set_concurrent_slots(self, slots: int) -> None: - """ - Dynamically adjust the maximum number of concurrent in-flight triangles. - Thread-safe: updates self._settings.concurrent_slots only. - """ - self._settings.concurrent_slots = slots - self._log.info("concurrent_slots_updated", slots=slots) - - def get_in_flight(self) -> list[dict]: - return [ - { - "correlation_id": inf.correlation_id, - "triangle_key": list(inf.triangle_key), - "pair_symbols": list(inf.pair_symbols), - "primary_quote": inf.primary_quote, - "started_ts_ms": inf.started_ts_ms, - } - for inf in self._in_flight.values() - ] - - def get_reports(self, limit: int = 50) -> list[dict]: - reports = list(self._reports)[-limit:] - return [ - { - "correlation_id": r.correlation_id, - "triangle_key": r.triangle_key, - "status": r.status, - "fills": [{"leg": f.leg, "vol": str(f.filled_volume), "deal_funds": str(f.deal_funds), "price": str(f.avg_price), "fee": str(f.fee), "fee_currency": f.fee_currency} for f in r.fills], - "predicted_bps": r.predicted_bps, - "effective_bps": r.effective_bps, - "ts_ms": r.ts_ms, - **({"error": r.error} if r.error else {}), - } - for r in reports - ] - - def get_config(self) -> dict: - return { - "live_mode": self._live_mode, - "concurrent_slots": self._settings.concurrent_slots, - "enforce_same_base_isolation": self._settings.enforce_same_base_isolation, - "socket_path": str(self._settings.socket_path), - } diff --git a/executor/kucoin_api.py b/executor/kucoin_api.py deleted file mode 100644 index 355f2be..0000000 --- a/executor/kucoin_api.py +++ /dev/null @@ -1,457 +0,0 @@ -""" -KuCoin REST API client for the executor. - -Covers symbol metadata fetch, HF order-test endpoint for paper mode, -real order placement for live mode, and private WebSocket token -acquisition required by ws_client.py. -""" -import aiohttp -import base64 -import hmac -import hashlib -import json -import time -import uuid -import structlog -from decimal import Decimal -from typing import Optional - -logger = structlog.get_logger().bind(component="executor-kucoin") - -KUCoin_SYMBOLS_URL = "https://api.kucoin.com/api/v1/symbols" -KUCoin_ORDER_TEST_URL = "https://api.kucoin.com/api/v1/hf/orders/test" -KUCoin_ORDER_PLACE_URL = "https://api.kucoin.com/api/v1/hf/orders" -KUCoin_SERVER_TIME_URL = "https://api.kucoin.com/api/v1/time" -KUCoin_BULLET_PRIVATE_URL = "https://api.kucoin.com/api/v1/bullet-private" - - -def _signRequest(timestamp: str, method: str, path: str, secret: str, data: str = "") -> str: - """ - Compute the HMAC-SHA256 signature for a KuCoin API request. - - Parameters - ---------- - timestamp : str - Millisecond Unix timestamp. - method : str - HTTP method (e.g. "POST"). - path : str - API endpoint path (e.g. "/api/v1/hf/orders/test"). - secret : str - API secret as returned by the user. - data : str - Request body. Empty string for GET requests. - - Returns - ------- - str - Base64-encoded HMAC-SHA256 digest suitable for the KC-API-SIGN header. - """ - # KuCoin signs timestamp + method + path + body concatenated. - message = timestamp + method + path + data - mac = hmac.new(secret.encode("utf-8"), message.encode("utf-8"), hashlib.sha256) - return base64.b64encode(mac.digest()).decode("utf-8") - - -def _encryptPassphrase(passphrase: str, secret: str) -> str: - """ - Encrypt the API passphrase for KuCoin. - - KuCoin requires the passphrase to be encrypted with HMAC-SHA256 - using the API secret as the key, then base64-encoded. - - Returns - ------- - str - Base64-encoded encrypted passphrase for the KC-API-PASSPHRASE header. - """ - mac = hmac.new(secret.encode("utf-8"), passphrase.encode("utf-8"), hashlib.sha256) - return base64.b64encode(mac.digest()).decode("utf-8") - - -class SymbolMeta: - """ - Cached metadata for a single KuCoin trading pair. - - Fetched once at startup from /api/v1/symbols and held in memory - for the lifetime of the process. - """ - - def __init__( - self, - symbol: str, - base: str, - quote: str, - base_min_size: Decimal, - quote_min_size: Decimal, - base_increment: Decimal, - quote_increment: Decimal, - min_funds: Decimal, - taker_fee: Decimal, - maker_fee: Decimal, - incr_min_size: Decimal, - base_precision: int, - quote_precision: int, - ) -> None: - self.symbol = symbol - self.base = base - self.quote = quote - self.base_min_size = base_min_size - self.quote_min_size = quote_min_size - self.base_increment = base_increment - self.quote_increment = quote_increment - self.min_funds = min_funds - self.taker_fee = taker_fee - self.maker_fee = maker_fee - self.incr_min_size = incr_min_size - self.base_precision = base_precision - self.quote_precision = quote_precision - - - -def _cost_to_precision(amount: Decimal, quote_increment: Decimal) -> Decimal: - """ - Round a quote-currency amount *up* to the nearest valid increment. - - KuCoin requires quote amounts to be multiples of quote_increment. When - computing the minimum order size in quote currency we therefore round up - so we never underestimate the minimum. - """ - if quote_increment <= 0: - return amount - mult = Decimal("1") / quote_increment - return (amount * mult).to_integral_value(rounding="ROUND_UP") / mult - - -def _amount_to_precision( - amount: Decimal, - base_increment: Decimal, - round_down: bool = False, -) -> Decimal: - """ - Round a base-currency amount to the nearest valid increment. - - Use round_down=True for sell orders so the resulting size never - exceeds the available balance. Use round_down=False (default) for - buy orders and minimum-size computations where rounding up is safe. - """ - if base_increment <= 0: - return amount - mult = Decimal("1") / base_increment - rounding = "ROUND_DOWN" if round_down else "ROUND_UP" - return (amount * mult).to_integral_value(rounding=rounding) / mult - - -class KuCoinAPI: - """ - Thin client for KuCoin REST API calls needed by the executor. - - Handles HMAC signing, symbol metadata caching, order placement - (live) and order validation (paper), plus private token fetch - for the WebSocket client. - """ - - def __init__(self, api_key: str = "", api_secret: str = "", api_passphrase: str = "") -> None: - self._api_key = api_key - self._api_secret = api_secret - self._api_passphrase = api_passphrase - # Pre-encrypt the passphrase at init time so we don't recompute per request. - self._encrypted_passphrase = _encryptPassphrase(api_passphrase, api_secret) if api_passphrase and api_secret else "" - self._symbols: dict[str, SymbolMeta] = {} - self._log = logger - - async def fetch_symbols(self) -> None: - """ - Fetch all trading pair metadata from KuCoin and populate _symbols. - - Logs a warning for any symbol that fails to parse and skips it. - """ - async with aiohttp.ClientSession() as session: - async with session.get(KUCoin_SYMBOLS_URL) as resp: - resp.raise_for_status() - payload = await resp.json() - - for item in payload.get("data", []): - symbol = item.get("symbol", "") - base = item.get("baseCurrency", "") - quote = item.get("quoteCurrency", "") - if not all([symbol, base, quote]): - continue - - raw_base_min = item.get("baseMinSize") or "0" - raw_quote_min = item.get("quoteMinSize") or "0" - raw_base_inc = item.get("baseIncrement") or "0" - raw_quote_inc = item.get("quoteIncrement") or "0" - raw_min_funds = item.get("minFunds") or "0" - raw_maker = item.get("makerFeeRate") or "0" - raw_taker = item.get("takerFeeRate") or "0" - raw_incr_min = item.get("incrMinSize") or "0" - raw_base_prec = item.get("basePrecision") or "0" - raw_quote_prec = item.get("quotePrecision") or "0" - - try: - base_min_size = Decimal(raw_base_min) - quote_min_size = Decimal(raw_quote_min) - base_increment = Decimal(raw_base_inc) - quote_increment = Decimal(raw_quote_inc) - min_funds = Decimal(raw_min_funds) - taker_fee = Decimal(raw_taker) - maker_fee = Decimal(raw_maker) - incr_min_size = Decimal(raw_incr_min) - base_precision = int(raw_base_prec) if raw_base_prec else 0 - quote_precision = int(raw_quote_prec) if raw_quote_prec else 0 - except Exception as e: - self._log.warning("symbol_field_parse_error", symbol=symbol, error=str(e), - baseMinSize=repr(raw_base_min), quoteMinSize=repr(raw_quote_min), - baseIncrement=repr(raw_base_inc), quoteIncrement=repr(raw_quote_inc), - minFunds=repr(raw_min_funds), makerFeeRate=repr(raw_maker), - takerFeeRate=repr(raw_taker), incrMinSize=repr(raw_incr_min), - basePrecision=repr(raw_base_prec), quotePrecision=repr(raw_quote_prec)) - continue - - self._symbols[symbol] = SymbolMeta( - symbol=symbol, - base=base, - quote=quote, - base_min_size=base_min_size, - quote_min_size=quote_min_size, - base_increment=base_increment, - quote_increment=quote_increment, - min_funds=min_funds, - taker_fee=taker_fee, - maker_fee=maker_fee, - incr_min_size=incr_min_size, - base_precision=base_precision, - quote_precision=quote_precision, - ) - - self._log.info("symbols_loaded", count=len(self._symbols)) - - def get_symbol_meta(self, symbol: str) -> Optional[SymbolMeta]: - """Return cached SymbolMeta for a pair, or None if not yet loaded.""" - return self._symbols.get(symbol) - - async def order_test( - self, - session: aiohttp.ClientSession, - symbol: str, - side: str, - order_type: str, - price: Optional[Decimal] = None, - size: Optional[Decimal] = None, - funds: Optional[Decimal] = None, - ) -> tuple[bool, str, Optional[str]]: - """ - Validate an order against KuCoin's paper-trading endpoint. - - In paper mode the executor uses this instead of a real order so that - fill simulation can proceed without risking capital. - - Parameters - ---------- - session : aiohttp.ClientSession - Shared session for connection pooling. - symbol : str - KuCoin symbol e.g. "BTC-USDT". - side : str - "buy" or "sell". - order_type : str - Order type string (e.g. "market"). - price, size, funds : Optional[Decimal] - Order parameters; at least one must be provided. - - Returns - ------- - tuple[bool, str, Optional[str]] - (success, error_message, order_id). order_id is populated on success. - """ - timestamp = str(int(time.time() * 1000)) - path = "/api/v1/hf/orders/test" - body: dict = { - "symbol": symbol, - "type": order_type, - "side": side, - "clientOid": f"paper-{timestamp}-{uuid.uuid4().hex[:8]}", - } - if price is not None: - body["price"] = str(price) - if size is not None: - body["size"] = str(size) - if funds is not None: - body["funds"] = str(funds) - - raw_body = json.dumps(body, separators=(",", ":")) - sign = _signRequest(timestamp, "POST", path, self._api_secret, raw_body) - headers = { - "KC-API-TIMESTAMP": timestamp, - "KC-API-SIGN": sign, - "KC-API-KEY": self._api_key, - "KC-API-PASSPHRASE": self._encrypted_passphrase, - "KC-API-SIGN-TYPE": "2", - "KC-API-KEY-VERSION": "3", - "Content-Type": "application/json", - } - - try: - async with session.post( - KUCoin_ORDER_TEST_URL, - data=raw_body.encode(), - headers=headers, - ) as resp: - text = await resp.text() - if resp.status == 200: - data = json.loads(text) - order_id = data.get("data", {}).get("orderId", "") - return True, "", order_id - else: - data = json.loads(text) - code = data.get("code", "") - msg = data.get("msg", text) - return False, f"{code}: {msg}", None - except (json.JSONDecodeError, ValueError) as e: - return False, f"invalid_response: {e}", None - except (aiohttp.ClientError, OSError) as e: - return False, f"http_error: {e}", None - - async def order_place( - self, - session: aiohttp.ClientSession, - symbol: str, - side: str, - order_type: str, - price: Optional[Decimal] = None, - size: Optional[Decimal] = None, - funds: Optional[Decimal] = None, - client_oid: str = "", - ) -> tuple[bool, str, Optional[str]]: - """ - Place a real market order on KuCoin. - - Parameters - ---------- - session : aiohttp.ClientSession - Shared session for connection pooling. - symbol : str - KuCoin symbol e.g. "BTC-USDT". - side : str - "buy" or "sell". - order_type : str - Order type string (e.g. "market"). - price, size, funds : Optional[Decimal] - Order parameters; at least one must be provided. - client_oid : str - Client-order ID for matching with WS fill events. - - Returns - ------- - tuple[bool, str, Optional[str]] - (success, error_message, order_id). order_id is populated on success. - """ - timestamp = str(int(time.time() * 1000)) - path = "/api/v1/hf/orders" - body: dict = { - "symbol": symbol, - "type": order_type, - "side": side, - "clientOid": client_oid or f"live-{timestamp}-{uuid.uuid4().hex[:8]}", - } - if price is not None: - body["price"] = str(price) - if size is not None: - body["size"] = str(size) - if funds is not None: - body["funds"] = str(funds) - - raw_body = json.dumps(body, separators=(",", ":")) - sign = _signRequest(timestamp, "POST", path, self._api_secret, raw_body) - headers = { - "KC-API-TIMESTAMP": timestamp, - "KC-API-SIGN": sign, - "KC-API-KEY": self._api_key, - "KC-API-PASSPHRASE": self._encrypted_passphrase, - "KC-API-SIGN-TYPE": "2", - "KC-API-KEY-VERSION": "3", - "Content-Type": "application/json", - } - - try: - async with session.post( - KUCoin_ORDER_PLACE_URL, - data=raw_body.encode(), - headers=headers, - ) as resp: - text = await resp.text() - self._log.info("order_place_raw_response", status=resp.status, response=text[:300]) - data = json.loads(text) - if resp.status == 200 and data.get("code") == "200000": - order_id = data.get("data", {}).get("orderId", "") - return True, "", order_id - else: - code = data.get("code", resp.status) - msg = data.get("msg", text) - return False, f"{code}: {msg}", None - except (json.JSONDecodeError, ValueError) as e: - return False, f"invalid_response: {e}", None - except (aiohttp.ClientError, OSError) as e: - return False, f"http_error: {e}", None - - async def get_private_token(self, session: aiohttp.ClientSession) -> Optional[dict]: - """ - Request a private WebSocket token from KuCoin. - - Returns the full data dict from the bullet-private response, containing - 'token', 'instanceServers' with endpoint, pingInterval, pingTimeout. - Returns None on error. - """ - timestamp = str(int(time.time() * 1000)) - path = "/api/v1/bullet-private" - sign = _signRequest(timestamp, "POST", path, self._api_secret, "") - headers = { - "KC-API-TIMESTAMP": timestamp, - "KC-API-SIGN": sign, - "KC-API-KEY": self._api_key, - "KC-API-PASSPHRASE": self._encrypted_passphrase, - "KC-API-SIGN-TYPE": "2", - "KC-API-KEY-VERSION": "3", - "Content-Type": "application/json", - } - - try: - async with session.post( - KUCoin_BULLET_PRIVATE_URL, - data=b"", - headers=headers, - ) as resp: - if resp.status == 200: - data = await resp.json() - return data.get("data") - else: - text = await resp.text() - self._log.error("private_token_failed", status=resp.status, response=text[:200]) - return None - except (aiohttp.ClientError, OSError) as e: - self._log.error("private_token_http_error", error=str(e)) - return None - - async def warmup_session(self, session: aiohttp.ClientSession) -> None: - """Warm up the authenticated HTTP connection pool with a minimal GET.""" - timestamp = str(int(time.time() * 1000)) - path = "/api/v1/accounts" - sign = _signRequest(timestamp, "GET", path, self._api_secret, "") - headers = { - "KC-API-TIMESTAMP": timestamp, - "KC-API-SIGN": sign, - "KC-API-KEY": self._api_key, - "KC-API-PASSPHRASE": self._encrypted_passphrase, - "KC-API-SIGN-TYPE": "2", - "KC-API-KEY-VERSION": "3", - } - try: - async with session.get( - f"https://api.kucoin.com{path}", - headers=headers, - ) as resp: - await resp.read() - pass - except Exception as e: - self._log.warning("session_warmup_failed", error=str(e)) diff --git a/executor/rest_api.py b/executor/rest_api.py deleted file mode 100644 index 68478b7..0000000 --- a/executor/rest_api.py +++ /dev/null @@ -1,171 +0,0 @@ -""" -FastAPI control interface for the executor. - -Exposes endpoints for status inspection, configuration changes, execution -cancellation, and pause/resume. Intended for operator use; not used in the -normal signal flow. -""" -from typing import Optional - -from fastapi import FastAPI, HTTPException -from pydantic import BaseModel - -from executor.executor import Executor - - -class ConfigResponse(BaseModel): - """Current executor configuration.""" - live_mode: bool - concurrent_slots: int - enforce_same_base_isolation: bool - socket_path: str - - -class ConfigPatchRequest(BaseModel): - """Request body for configuration update endpoints.""" - live_mode: Optional[bool] = None - concurrent_slots: Optional[int] = None - enforce_same_base_isolation: Optional[bool] = None - - -class ConfigPatchResponse(BaseModel): - """Response after a configuration update, echoing the new values.""" - ok: bool - live_mode: Optional[bool] = None - concurrent_slots: Optional[int] = None - enforce_same_base_isolation: Optional[bool] = None - - -class PauseResponse(BaseModel): - """Response after a pause request.""" - ok: bool - - -class ResumeResponse(BaseModel): - """Response after a resume request.""" - ok: bool - - -class CancelResponse(BaseModel): - """Response after a cancellation request.""" - ok: bool - message: str - - -class StatusResponse(BaseModel): - """Runtime status of the executor.""" - status: str - version: str - live_mode: bool - slots_used: int - slots_total: int - uptime_seconds: float - - -class ShutdownResponse(BaseModel): - """Response after a shutdown request.""" - ok: bool - - -def create_app(executor: Executor, shutdown_callback) -> FastAPI: - """ - Build the FastAPI application and wire all routes to the executor. - - Parameters - ---------- - executor : Executor - The live Executor instance to control. - shutdown_callback : callable - Called when /api/v1/shutdown is hit to begin clean shutdown. - - Returns - ------- - FastAPI - Configured app ready to be passed to uvicorn. - """ - app = FastAPI(title="Executor API", description="Control interface for the triangular arbitrage executor") - - @app.get("/api/v1/status", response_model=StatusResponse) - async def get_status() -> StatusResponse: - config = executor.get_config() - in_flight = executor.get_in_flight() - return StatusResponse( - status="ok", - version="0.1.0", - live_mode=config["live_mode"], - slots_used=len(in_flight), - slots_total=config["concurrent_slots"], - uptime_seconds=executor.get_uptime_seconds(), - ) - - @app.get("/api/v1/config", response_model=ConfigResponse) - async def get_config() -> ConfigResponse: - cfg = executor.get_config() - return ConfigResponse( - paper_mode=cfg["paper_mode"], - concurrent_slots=cfg["concurrent_slots"], - enforce_same_base_isolation=cfg["enforce_same_base_isolation"], - socket_path=cfg["socket_path"], - ) - - @app.patch("/api/v1/config/live_mode", response_model=ConfigPatchResponse) - async def patch_live_mode(req: ConfigPatchRequest) -> ConfigPatchResponse: - if req.live_mode is None: - raise HTTPException(status_code=400, detail="live_mode required") - executor._live_mode = req.live_mode - return ConfigPatchResponse(ok=True, live_mode=req.live_mode) - - @app.patch("/api/v1/config/concurrent_slots", response_model=ConfigPatchResponse) - async def patch_concurrent_slots(req: ConfigPatchRequest) -> ConfigPatchResponse: - if req.concurrent_slots is None or req.concurrent_slots < 1: - raise HTTPException(status_code=400, detail="concurrent_slots must be >= 1") - executor.set_concurrent_slots(req.concurrent_slots) - return ConfigPatchResponse(ok=True, concurrent_slots=req.concurrent_slots) - - @app.patch("/api/v1/config/enforce_same_base_isolation", response_model=ConfigPatchResponse) - async def patch_isolation(req: ConfigPatchRequest) -> ConfigPatchResponse: - if req.enforce_same_base_isolation is None: - raise HTTPException(status_code=400, detail="enforce_same_base_isolation required") - executor._settings.enforce_same_base_isolation = req.enforce_same_base_isolation - return ConfigPatchResponse(ok=True, enforce_same_base_isolation=req.enforce_same_base_isolation) - - @app.get("/api/v1/executions") - async def get_executions() -> list[dict]: - return executor.get_in_flight() - - @app.get("/api/v1/executions/{correlation_id}") - async def get_execution(correlation_id: str) -> dict: - in_flight = executor.get_in_flight() - for inf in in_flight: - if inf["correlation_id"] == correlation_id: - return inf - raise HTTPException(status_code=404, detail="Execution not found") - - @app.post("/api/v1/cancel/{correlation_id}", response_model=CancelResponse) - async def cancel_execution(correlation_id: str) -> CancelResponse: - ok = await executor.cancel_execution(correlation_id) - if ok: - return CancelResponse(ok=True, message=f"Cancellation requested for {correlation_id}") - raise HTTPException(status_code=404, detail="Execution not found") - - @app.post("/api/v1/pause", response_model=PauseResponse) - async def pause() -> PauseResponse: - await executor.pause() - return PauseResponse(ok=True) - - @app.post("/api/v1/resume", response_model=ResumeResponse) - async def resume() -> ResumeResponse: - await executor.resume() - return ResumeResponse(ok=True) - - @app.post("/api/v1/shutdown", response_model=ShutdownResponse) - async def shutdown() -> ShutdownResponse: - await executor.pause() - shutdown_callback() - return ShutdownResponse(ok=True) - - @app.get("/api/v1/reports") - async def get_reports(limit: int = 50) -> list[dict]: - return executor.get_reports(limit=limit) - - return app diff --git a/executor/socket_server.py b/executor/socket_server.py deleted file mode 100644 index fbafd2e..0000000 --- a/executor/socket_server.py +++ /dev/null @@ -1,100 +0,0 @@ -""" -Unix-domain socket server that receives opportunity signals from oe_em. - -Each JSON line on the socket is parsed and dispatched as an asyncio task -to avoid blocking the reader. -""" -import asyncio -import json -import time -from pathlib import Path -import structlog - -logger = structlog.get_logger().bind(component="signal_socket_server") - - -class SignalSocketServer: - """ - Accepts JSON-serialized signals over a Unix domain socket. - - Every valid "signal" message is wrapped in create_task so processing - is concurrent and a slow handler never blocks new connections. - """ - - def __init__(self, socket_path: Path, on_signal) -> None: - self._socket_path = socket_path - self._on_signal = on_signal - self._log = logger - self._server: asyncio.Server | None = None - self._running = False - - async def start(self) -> None: - """Remove any stale socket file and start accepting connections.""" - if self._socket_path.exists(): - self._socket_path.unlink() - - self._running = True - self._server = await asyncio.start_unix_server( - self._accept_client, - path=str(self._socket_path), - ) - self._log.info("signal_socket_server_started", path=str(self._socket_path)) - async with self._server: - await self._server.serve_forever() - - async def stop(self) -> None: - """Stop the server and remove the socket file.""" - self._running = False - if self._server: - self._server.close() - await self._server.wait_closed() - if self._socket_path.exists(): - self._socket_path.unlink() - self._log.info("signal_socket_server_stopped") - - async def _accept_client( - self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter - ) -> None: - """ - Handle one client connection: read lines, parse JSON, dispatch signals. - - The client is assumed to be oe_em's SignalSocketClient. Any line that - is not JSON or is not type="signal" is silently ignored. - """ - self._log.info("client_connected", addr=writer.get_extra_info("peername")) - try: - while self._running: - try: - line = await reader.readline() - except (ConnectionResetError, BrokenPipeError, asyncio.CancelledError): - break - except Exception: - break - if not line: - break - - arrived_ms = int(time.time() * 1000) - - try: - data = json.loads(line.decode()) - except (json.JSONDecodeError, UnicodeDecodeError) as e: - self._log.warning("invalid_json", line=line[:50], error=str(e)) - continue - - if data.get("type") == "signal": - data["_receiver_ts_ms"] = arrived_ms - asyncio.create_task(self._on_signal(data)) - else: - self._log.debug("ignored_non_signal", type=data.get("type")) - - except asyncio.CancelledError: - pass - except Exception as e: - self._log.warning("client_error", error=str(e)) - finally: - writer.close() - try: - await asyncio.wait_for(writer.wait_closed(), timeout=1.0) - except Exception: - pass - self._log.info("client_disconnected") \ No newline at end of file diff --git a/executor/ws_client.py b/executor/ws_client.py deleted file mode 100644 index c71e2a7..0000000 --- a/executor/ws_client.py +++ /dev/null @@ -1,483 +0,0 @@ -""" -KuCoin private WebSocket client for fill event delivery. - -Manages the private WebSocket connection, authenticates via bullet-private, -subscribes to /spotMarket/tradeOrdersV2, and dispatches fill events to -waiting executor coroutines. - -KuCoin market orders placed with ``funds`` often complete with a tiny -unfilled remainder, emitting ``canceled`` + ``status=done`` rather than -``filled``. The message handler in ``_handle_message`` treats this -terminal combination as a successful fill when match events have been -accumulated. -""" -import asyncio -import json -import time -import uuid -from decimal import Decimal -from typing import Optional - -import aiohttp -import structlog -import websockets - -from executor.kucoin_api import KuCoinAPI - -logger = structlog.get_logger().bind(component="executor-ws") -_D0 = Decimal("0") - - -class FillAccumulator: - """Accumulate match events for a single order, compute aggregated totals.""" - - def __init__(self, client_oid: str, order_id: str = "") -> None: - self.client_oid = client_oid - self.order_id = order_id - self.total_size = _D0 - self.total_funds = _D0 - self.match_count = 0 - self.side = "" - self.symbol = "" - self._done = False - - def add_match(self, data: dict) -> None: - match_price = Decimal(str(data.get("matchPrice", "0"))) - match_size = Decimal(str(data.get("matchSize", "0"))) - self.total_size += match_size - self.total_funds += match_price * match_size - self.match_count += 1 - if not self.side: - self.side = data.get("side", "") - if not self.symbol: - self.symbol = data.get("symbol", "") - if not self.order_id: - self.order_id = data.get("orderId", "") - - @property - def weighted_avg_price(self) -> Decimal: - if self.total_size <= 0: - return _D0 - return self.total_funds / self.total_size - - def to_dict(self) -> dict: - return { - "total_size": self.total_size, - "total_funds": self.total_funds, - "weighted_avg_price": self.weighted_avg_price, - "match_count": self.match_count, - "order_id": self.order_id, - "side": self.side, - "symbol": self.symbol, - } - - -class KuCoinWSClient: - """ - Private WebSocket client for KuCoin execution events. - - Subscribes to /spotMarket/tradeOrdersV2 (global topic) and dispatches - fill events to awaiting executor coroutines via await_fill(). - """ - - def __init__( - self, - kucoin_api: KuCoinAPI, - private_token_url: str = "https://api.kucoin.com/api/v1/bullet-private", - reconnect_base_delay: float = 1.0, - reconnect_max_delay: float = 30.0, - ) -> None: - self._api = kucoin_api - self._private_token_url = private_token_url - self._reconnect_base_delay = reconnect_base_delay - self._reconnect_max_delay = reconnect_max_delay - self._reconnect_delay = reconnect_base_delay - self._log = logger - self._running = False - self._ws: Optional[websockets.WebSocketClientProtocol] = None - self._connected = False - self._ping_interval: float = 18.0 - self._ping_timeout: float = 10.0 - self._fill_futures: dict[str, asyncio.Future] = {} - self._accumulators: dict[str, FillAccumulator] = {} - self._balance_futures: dict[str, asyncio.Future] = {} - self._latest_balance: dict[str, Decimal] = {} - self._worker_task: Optional[asyncio.Task] = None - self._pending_acks: dict[str, asyncio.Event] = {} - - @property - def is_connected(self) -> bool: - return self._connected - - async def start(self) -> None: - """Start the WebSocket connection worker with reconnection loop.""" - self._running = True - self._worker_task = asyncio.create_task(self._connection_worker()) - try: - await self._worker_task - except asyncio.CancelledError: - pass - - async def stop(self) -> None: - """Stop the WebSocket connection and resolve any pending futures.""" - self._running = False - for future in self._fill_futures.values(): - if not future.done(): - future.set_result((False, {})) - self._fill_futures.clear() - self._accumulators.clear() - for future in self._balance_futures.values(): - if not future.done(): - future.set_result(False) - self._balance_futures.clear() - for evt in self._pending_acks.values(): - evt.set() - self._pending_acks.clear() - if self._worker_task is not None and not self._worker_task.done(): - self._worker_task.cancel() - if self._ws is not None: - try: - await self._ws.close() - except Exception: - pass - self._connected = False - self._log.debug("ws_client_stopped") - - async def await_fill( - self, client_oid: str, timeout_ms: float - ) -> tuple[bool, dict]: - """ - Wait for the order identified by client_oid to be fully filled. - - Parameters - ---------- - client_oid : str - The client-order ID used when placing the order. - timeout_ms : float - Maximum wait time in milliseconds. - - Returns - ------- - tuple[bool, dict] - (success, aggregated_fill_data) on fill. - (False, {}) on timeout, failure, or disconnected. - """ - if not self._connected: - self._log.warning( - "await_fill_not_connected", - client_oid=client_oid, - ) - return (False, {}) - - future: asyncio.Future = asyncio.get_event_loop().create_future() - self._fill_futures[client_oid] = future - - if client_oid in self._accumulators and self._accumulators[client_oid]._done: - acc = self._accumulators[client_oid] - result = (True, acc.to_dict()) - del self._accumulators[client_oid] - self._fill_futures.pop(client_oid, None) - return result - - try: - await asyncio.wait_for(future, timeout=timeout_ms / 1000.0) - except asyncio.TimeoutError: - self._fill_futures.pop(client_oid, None) - self._log.warning( - "fill_timeout", - client_oid=client_oid, - timeout_ms=timeout_ms, - accumulator=( - self._accumulators.get(client_oid).to_dict() - if client_oid in self._accumulators - else None - ), - ) - return (False, {}) - except asyncio.CancelledError: - self._fill_futures.pop(client_oid, None) - raise - - result = future.result() - self._fill_futures.pop(client_oid, None) - if client_oid in self._accumulators: - del self._accumulators[client_oid] - return result - - async def await_balance( - self, currency: str, min_available: Decimal, timeout_ms: float - ) -> bool: - """ - Wait until the available balance for *currency* reaches at least *min_available*. - - If the latest known balance already meets the threshold, returns immediately. - Otherwise registers a future and waits for a balance WS event. - - Returns True once the threshold is met, False on timeout. - """ - key = currency.upper() - current = self._latest_balance.get(key, _D0) - if current >= min_available: - return True - - future: asyncio.Future = asyncio.Future() - self._balance_futures[key] = future - try: - await asyncio.wait_for(future, timeout=timeout_ms / 1000.0) - return True - except asyncio.TimeoutError: - self._log.warning( - "await_balance_timeout", - currency=key, - min_available=str(min_available), - latest=str(self._latest_balance.get(key, _D0)), - ) - return False - except asyncio.CancelledError: - raise - finally: - if self._balance_futures.get(key) is future: - del self._balance_futures[key] - - def latest_balance(self, currency: str) -> Decimal: - """Return the latest known available balance for *currency*, or 0.""" - return self._latest_balance.get(currency.upper(), _D0) - - async def _connection_worker(self) -> None: - """Main connection loop with exponential backoff reconnection.""" - while self._running: - try: - await self._connect_and_run() - except asyncio.CancelledError: - break - except Exception as e: - if not self._running: - break - self._connected = False - self._log.warning( - "ws_reconnecting", - error=str(e), - delay=self._reconnect_delay, - ) - await asyncio.sleep(self._reconnect_delay) - self._reconnect_delay = min( - self._reconnect_delay * 2, - self._reconnect_max_delay, - ) - - async def _connect_and_run(self) -> None: - """Authenticate, connect, subscribe, and run the message loop.""" - async with aiohttp.ClientSession() as session: - token_data = await self._api.get_private_token(session) - if not token_data: - raise RuntimeError("Failed to obtain private token") - - token = token_data.get("token", "") - servers = token_data.get("instanceServers", []) - if not servers: - raise RuntimeError("No instance servers in token response") - - server = servers[0] - endpoint = server.get("endpoint", "") - self._ping_interval = server.get("pingInterval", 18000) / 1000.0 - self._ping_timeout = server.get("pingTimeout", 10000) / 1000.0 - - ws_url = f"{endpoint}?token={token}&connectId={uuid.uuid4()}" - self._log.debug("ws_connecting", url=ws_url[:80]) - - self._ws = await websockets.connect( - ws_url, - ping_interval=self._ping_interval, - ping_timeout=self._ping_timeout, - ) - self._connected = True - self._reconnect_delay = self._reconnect_base_delay - self._log.info("ws_connected") - - sub_task = asyncio.create_task(self._subscribe()) - - try: - async for msg in self._ws: - await self._handle_message(msg) - except websockets.ConnectionClosed as e: - self._log.warning( - "ws_connection_closed", - code=e.code, - reason=e.reason, - ) - except asyncio.CancelledError: - sub_task.cancel() - raise - except Exception as e: - self._log.error("ws_message_loop_error", error=str(e)) - finally: - self._connected = False - - async def _subscribe(self) -> None: - """Subscribe to tradeOrdersV2 and balance channels.""" - if self._ws is None: - return - - ack_id1 = str(int(time.time() * 1000)) - evt1 = asyncio.Event() - self._pending_acks[ack_id1] = evt1 - sub_msg = { - "id": int(ack_id1), - "type": "subscribe", - "topic": "/spotMarket/tradeOrdersV2", - "response": True, - "privateChannel": "true", - } - await self._ws.send(json.dumps(sub_msg)) - self._log.info("subscribe_sent", topic="/spotMarket/tradeOrdersV2") - - ack_id2 = str(int(time.time() * 1000) + 1) - evt2 = asyncio.Event() - self._pending_acks[ack_id2] = evt2 - bal_msg = { - "id": int(ack_id2), - "type": "subscribe", - "topic": "/account/balance", - "response": True, - "privateChannel": "true", - } - await self._ws.send(json.dumps(bal_msg)) - self._log.info("bal_subscribe_sent", topic="/account/balance") - - await asyncio.wait_for(evt2.wait(), timeout=5.0) - self._log.info("bal_subscribe_ack_received") - - async def _handle_message(self, msg: str) -> None: - """Parse incoming WS message and dispatch fill events.""" - try: - data = json.loads(msg) - except json.JSONDecodeError: - self._log.warning("ws_raw_message_parse_error", raw=msg[:500]) - return - - msg_type = data.get("type") - - if msg_type == "welcome": - return - - if msg_type == "pong": - return - - if msg_type == "ack": - ack_id = str(data.get("id", "")) - evt = self._pending_acks.pop(ack_id, None) - if evt is not None: - evt.set() - return - - subject = data.get("subject", "") - - if subject == "account.balance": - payload = data.get("data", {}) - currency = (payload.get("currency", "")).upper() - available_raw = payload.get("available") - if currency and available_raw is not None: - available = Decimal(str(available_raw)) - self._latest_balance[currency] = available - self._log.debug("balance_update", currency=currency, available=str(available)) - future = self._balance_futures.get(currency) - if future is not None and not future.done(): - future.set_result(True) - return - - if subject != "orderChange": - return - - payload = data.get("data", {}) - event_type = payload.get("type", "") - client_oid = payload.get("clientOid", "") - order_id = payload.get("orderId", "") - status = payload.get("status", "") - - if not client_oid: - return - - if event_type == "match": - if client_oid not in self._accumulators: - self._accumulators[client_oid] = FillAccumulator(client_oid, order_id) - self._accumulators[client_oid].add_match(payload) - - elif event_type == "filled" and status == "done": - if client_oid in self._accumulators: - acc = self._accumulators[client_oid] - acc.order_id = order_id or acc.order_id - fill_data = acc.to_dict() - self._log.debug( - "fill_received", - client_oid=client_oid, - order_id=order_id, - total_size=fill_data["total_size"], - total_funds=fill_data["total_funds"], - avg_price=fill_data["weighted_avg_price"], - match_count=fill_data["match_count"], - ) - else: - # Shouldn't happen in normal flow, but handle defensively. - fill_data = { - "total_size": Decimal(str(payload.get("filledSize", "0"))), - "total_funds": _D0, - "weighted_avg_price": _D0, - "match_count": 0, - "order_id": order_id, - "side": payload.get("side", ""), - "symbol": payload.get("symbol", ""), - } - self._log.warning( - "filled_without_matches", - client_oid=client_oid, - order_id=order_id, - ) - - if client_oid in self._fill_futures: - future = self._fill_futures[client_oid] - if not future.done(): - future.set_result((True, fill_data)) - elif client_oid in self._accumulators: - self._accumulators[client_oid]._done = True - - elif event_type in ("canceled", "failed"): - self._log.warning( - "ws_terminal_event_full_payload", - client_oid=client_oid, - event_type=event_type, - status=status, - full_payload=json.dumps(payload, indent=2), - ) - # Market orders with `funds` send type="canceled" + status="done" when - # the order completes with a tiny remainder. If we have accumulated - # matches, this is actually a successful fill. - if ( - event_type == "canceled" - and status == "done" - and client_oid in self._accumulators - ): - acc = self._accumulators[client_oid] - if acc.match_count > 0: - acc.order_id = order_id or acc.order_id - fill_data = acc.to_dict() - self._log.info( - "fill_via_cancel_done", - client_oid=client_oid, - order_id=order_id, - total_size=fill_data["total_size"], - total_funds=fill_data["total_funds"], - avg_price=fill_data["weighted_avg_price"], - match_count=fill_data["match_count"], - ) - if client_oid in self._fill_futures: - future = self._fill_futures[client_oid] - if not future.done(): - future.set_result((True, fill_data)) - elif client_oid in self._accumulators: - self._accumulators[client_oid]._done = True - return - - if client_oid in self._fill_futures: - future = self._fill_futures[client_oid] - if not future.done(): - future.set_result((False, {})) diff --git a/src/config.c b/src/config.c index 751199a..5a4a061 100644 --- a/src/config.c +++ b/src/config.c @@ -62,12 +62,6 @@ static void handle_value(parse_state_t *st, const char *val) { } } else if (strcmp(key, "log_level") == 0) { copy_string(val, st->cfg->log_level, sizeof(st->cfg->log_level)); - } else if (strcmp(key, "socket_path") == 0) { - copy_string(val, st->cfg->socket_path, sizeof(st->cfg->socket_path)); - } else if (strcmp(key, "rest_host") == 0) { - copy_string(val, st->cfg->rest_host, sizeof(st->cfg->rest_host)); - } else if (strcmp(key, "rest_port") == 0) { - st->cfg->rest_port = atoi(val); } else if (strcmp(key, "ws_url") == 0) { copy_string(val, st->cfg->ws_url, sizeof(st->cfg->ws_url)); } else if (strcmp(key, "token_url") == 0) { @@ -82,10 +76,6 @@ static void handle_value(parse_state_t *st, const char *val) { st->cfg->signal_threshold_bps = atof(val); } else if (strcmp(key, "kcs_discount_active") == 0) { st->cfg->kcs_discount_active = (strcmp(val, "true") == 0 || strcmp(val, "yes") == 0); - } else if (strcmp(key, "executor_socket_path") == 0) { - copy_string(val, st->cfg->executor_socket_path, sizeof(st->cfg->executor_socket_path)); - } else if (strcmp(key, "send_signals") == 0) { - st->cfg->send_signals = (strcmp(val, "true") == 0 || strcmp(val, "yes") == 0); } else if (strcmp(key, "cooldown_seconds") == 0) { st->cfg->cooldown_seconds = atof(val); } else if (strcmp(key, "stats_interval_seconds") == 0) { @@ -121,9 +111,6 @@ int config_load(const char *path, config_t *cfg) { memset(cfg, 0, sizeof(config_t)); copy_string("INFO", cfg->log_level, sizeof(cfg->log_level)); - copy_string("/tmp/fh_ob.sock", cfg->socket_path, sizeof(cfg->socket_path)); - copy_string("0.0.0.0", cfg->rest_host, sizeof(cfg->rest_host)); - cfg->rest_port = 8000; copy_string("wss://ws-api-spot.kucoin.com", cfg->ws_url, sizeof(cfg->ws_url)); copy_string("https://api.kucoin.com/api/v1/bullet-public", cfg->token_url, sizeof(cfg->token_url)); cfg->reconnect_base_delay = 1.0; @@ -134,8 +121,6 @@ int config_load(const char *path, config_t *cfg) { copy_string("USDT", cfg->hold_currencies[0], CURRENCY_NAME_LEN); cfg->hold_currency_count = 1; cfg->kcs_discount_active = false; - copy_string("/tmp/executor.sock", cfg->executor_socket_path, sizeof(cfg->executor_socket_path)); - cfg->send_signals = false; cfg->cooldown_seconds = 0.0; cfg->stats_interval_seconds = 60.0; cfg->live_mode = false; @@ -174,8 +159,10 @@ int config_load(const char *path, config_t *cfg) { st.expect_key = false; } else { if (st.cfg->initial_capital_count < MAX_CAPITAL_ENTRIES) { - strncpy(st.cfg->initial_capital[st.cfg->initial_capital_count].currency, - st.capital_currency, CURRENCY_NAME_LEN - 1); + size_t _cl = strlen(st.capital_currency); + if (_cl >= sizeof(st.cfg->initial_capital[0].currency)) _cl = sizeof(st.cfg->initial_capital[0].currency) - 1; + memcpy(st.cfg->initial_capital[st.cfg->initial_capital_count].currency, st.capital_currency, _cl); + st.cfg->initial_capital[st.cfg->initial_capital_count].currency[_cl] = '\0'; st.cfg->initial_capital[st.cfg->initial_capital_count].amount = atof(s); st.cfg->initial_capital_count++; } @@ -198,7 +185,10 @@ int config_load(const char *path, config_t *cfg) { if (st.mapping_depth == 1 && (strcmp(st.current_key, "fused_engine") == 0 || strcmp(st.current_key, "executor") == 0)) { - strncpy(st.section, st.current_key, sizeof(st.section) - 1); + size_t _sk = strlen(st.current_key); + if (_sk >= sizeof(st.section)) _sk = sizeof(st.section) - 1; + memcpy(st.section, st.current_key, _sk); + st.section[_sk] = '\0'; } // Nested mapping inside fused_engine section (depth 2) if (st.mapping_depth == 2 && strcmp(st.section, "fused_engine") == 0 && diff --git a/src/config.h b/src/config.h index d4e65c5..f82e044 100644 --- a/src/config.h +++ b/src/config.h @@ -21,9 +21,6 @@ typedef struct { char symbols[MAX_SYMBOLS][SYMBOL_NAME_LEN]; /* subscribed trading symbol names */ uint32_t symbol_count; /* number of symbols in the list */ char log_level[8]; /* log verbosity level string */ - char socket_path[256]; /* unix socket path for inter-process comm */ - char rest_host[64]; /* KuCoin REST API hostname */ - int rest_port; /* KuCoin REST API port */ char ws_url[256]; /* KuCoin WebSocket base URL */ char token_url[256]; /* KuCoin token endpoint URL */ double reconnect_base_delay; /* initial WebSocket reconnect delay (seconds) */ @@ -38,7 +35,6 @@ typedef struct { uint32_t excluded_currency_count; /* number of excluded currencies */ bool kcs_discount_active; /* whether KCS fee discount applies */ char executor_socket_path[256]; /* unix socket path for signal executor */ - bool send_signals; /* whether to actually emit signals */ double cooldown_seconds; /* min seconds between signals for same triangle */ double stats_interval_seconds; /* period between stats log dumps */ bool live_mode; /* live trading vs paper/simulation */ diff --git a/src/events.c b/src/events.c index f9f1f17..198d3af 100644 --- a/src/events.c +++ b/src/events.c @@ -3,9 +3,9 @@ * * Two-thread architecture: * HOT thread: epoll_wait on WebSocket fds + timer fd for keep-alive pings - * COLD thread: polls SPSC signal queue + Unix domain socket to executor + * EXECUTOR thread: polls SPSC signal queue + fill wake fd, executes triangles * - * Signals flow: evaluate.c -> SPSC queue -> COLD thread -> executor via UDS + * Signals flow: evaluate.c -> SPSC queue -> executor thread (direct execution) */ #include "log.h" @@ -28,7 +28,6 @@ static int64_t now_mono_ms(void) { #include #include #include -#include #include #include #include @@ -103,7 +102,6 @@ int event_loops_init(event_loops_t *loops, ws_client_t *ws_client, loops->ws_client = ws_client; loops->signal_queue = signal_queue; loops->running = true; - loops->unix_client_fd = -1; loops->wakeup_fd = wakeup_fd; epoll_set_init(&loops->hot_epoll); @@ -126,213 +124,10 @@ void event_loops_destroy(event_loops_t *loops) { loops->running = false; if (loops->timer_fd >= 0) close(loops->timer_fd); if (loops->wakeup_fd >= 0) close(loops->wakeup_fd); - if (loops->unix_client_fd >= 0) close(loops->unix_client_fd); - if (loops->http_server_fd >= 0) close(loops->http_server_fd); if (loops->hot_epoll.epoll_fd >= 0) close(loops->hot_epoll.epoll_fd); if (loops->cold_epoll.epoll_fd >= 0) close(loops->cold_epoll.epoll_fd); } -/* Connect to a Unix domain socket at the given path. Uses SOCK_NONBLOCK - with a poll-based 100ms timeout for the connection to complete. - Returns connected fd on success, -1 on failure. */ -int unix_client_connect(const char *socket_path) { - int fd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0); - if (fd < 0) return -1; - - struct sockaddr_un addr; - memset(&addr, 0, sizeof(addr)); - addr.sun_family = AF_UNIX; - strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1); - - if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { - if (errno != EINPROGRESS) { - close(fd); - return -1; - } - struct pollfd pfd = { .fd = fd, .events = POLLOUT }; - if (poll(&pfd, 1, 100) <= 0) { // 100 ms timeout - close(fd); - return -1; - } - } - return fd; -} - -/* Create and bind a Unix domain stream socket server, remove stale socket - file first. Sets O_NONBLOCK on the listening fd. Returns fd, or -1. */ -int unix_server_create(const char *socket_path) { - int fd = socket(AF_UNIX, SOCK_STREAM, 0); - if (fd < 0) return -1; - - struct sockaddr_un addr; - memset(&addr, 0, sizeof(addr)); - addr.sun_family = AF_UNIX; - strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1); - - unlink(socket_path); - - if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { - close(fd); - return -1; - } - - if (listen(fd, 5) < 0) { - close(fd); - return -1; - } - - set_nonblocking(fd); - return fd; -} - -/* - * Build a JSON signal message and send it to the external executor over a Unix socket. - * - * JSON structure: - * { - * "type": "signal", - * "correlation_id": "", - * "triangle_key": ["base","mid","quote"], - * "primary_quote": "", - * "live": true/false, - * "starting_volume": "", - * "legs": [{...}, {...}, {...}], - * "predicted_bps": , - * "ts_ms", "book_ts_ms", "t_sock_arrive_ms", "t_arrive_ms", "t_eval_ms": , - * "books": [...] (snapshot, only when !live) - * } - * - * correlation_id is a mix of address/ts/bps values for best-effort uniqueness. - * Connects lazily on first signal; reconnects on write failure. - */ -static void send_signal_to_executor(event_loops_t *loops, signal_entry_t *sig) { - (void)loops; (void)sig; - /* No longer used — execution is direct in the executor thread */ - if (loops->unix_client_fd < 0) { - loops->unix_client_fd = unix_client_connect(loops->ws_client->cfg->executor_socket_path); - if (loops->unix_client_fd < 0) { - log_write("[EVENTS] Cannot connect to executor at %s\n", - loops->ws_client->cfg->executor_socket_path); - return; - } - event_loops_add_fd(&loops->cold_epoll, loops->unix_client_fd, - FD_TYPE_UNIX_CLIENT, 0, NULL, EPOLLIN); - } - - char json_buf[4096]; - char corr_id[37]; - snprintf(corr_id, sizeof(corr_id), - "%08x%08x%08x%08x", - (unsigned)(uintptr_t)&sig->legs.legs[0] ^ (unsigned)sig->ts_ms, - (unsigned)sig->ts_ms ^ (unsigned)sig->book_ts_ms, - (unsigned)sig->predicted_bps, - (unsigned)sig->t_arrive_ms); - - char legs_json[1024]; - legs_json[0] = '\0'; - for (uint8_t l = 0; l < 3; l++) { - const signal_leg_t *sl = &sig->legs.legs[l]; - char tmp[384]; - snprintf(tmp, sizeof(tmp), - "%s{\"pair\":\"%s\",\"side\":\"%s\"," - "\"order_param\":\"%s\"," - "\"fee_rate\":%.6f,\"fee_currency\":\"%s\"," - "\"base_increment\":\"%.10g\",\"quote_increment\":\"%.10g\",\"base_min_size\":\"%.10g\"}", - l ? "," : "", sl->symbol, sl->side, - sl->order_param, - sl->fee_rate, sl->fee_currency, - sl->base_increment, sl->quote_increment, sl->base_min_size); - strncat(legs_json, tmp, sizeof(legs_json) - 1); - } - - // triangle_key as JSON array ["base","mid","quote"] - char triangle_key_json[96]; - { - char parts[3][16] = {{0}}; - const char *tk = sig->triangle_key; - const char *s1 = strchr(tk, '/'); - const char *s2 = s1 ? strchr(s1 + 1, '/') : NULL; - if (s1 && s2) { - uint32_t l1 = s1 - tk; - if (l1 > 15) l1 = 15; - memcpy(parts[0], tk, l1); - uint32_t l2 = s2 - s1 - 1; - if (l2 > 15) l2 = 15; - memcpy(parts[1], s1 + 1, l2); - strncpy(parts[2], s2 + 1, 15); - snprintf(triangle_key_json, sizeof(triangle_key_json), - "[\"%s\",\"%s\",\"%s\"]", parts[0], parts[1], parts[2]); - } else { - snprintf(triangle_key_json, sizeof(triangle_key_json), "[\"%s\"]", tk); - } - } - - // Book tops included for display and drift analysis - char books_json_str[2048] = ""; - if (sig->book_count > 0) { - char *bp = books_json_str; - size_t rem = sizeof(books_json_str); - for (uint8_t b = 0; b < sig->book_count; b++) { - const signal_book_t *sb = &sig->books[b]; - char bid_arr[256] = {0}, ask_arr[256] = {0}; - // Only top-of-book level included for display / drift analysis - char tmp[64]; - if (sb->bid_count > 0) { - snprintf(tmp, sizeof(tmp), "{\"price\":\"%.6g\",\"size\":\"%.8g\"}", - sb->bids[0].price, sb->bids[0].size); - strncat(bid_arr, tmp, sizeof(bid_arr) - 1); - } - if (sb->ask_count > 0) { - snprintf(tmp, sizeof(tmp), "{\"price\":\"%.6g\",\"size\":\"%.8g\"}", - sb->asks[0].price, sb->asks[0].size); - strncat(ask_arr, tmp, sizeof(ask_arr) - 1); - } - int n = snprintf(bp, rem, - "%s{\"symbol\":\"%s\",\"bids\":[%s],\"asks\":[%s],\"ts_ms\":%lld}", - b ? "," : "", sb->symbol, bid_arr, ask_arr, (long long)sb->ts_ms); - if (n > 0 && (size_t)n < rem) { bp += n; rem -= (size_t)n; } - } - } - - snprintf(json_buf, sizeof(json_buf), - "{\"type\":\"signal\",\"correlation_id\":\"%s\"," - "\"triangle_key\":%s,\"primary_quote\":\"%s\"," - "\"live\":%s,\"starting_volume\":\"%.8g\"," - "\"legs\":[%s],\"predicted_bps\":%.4f," - "\"ts_ms\":%lld,\"book_ts_ms\":%lld,\"t_sock_arrive_ms\":%lld,\"t_arrive_ms\":%lld,\"t_eval_ms\":%lld" - "%s%s%s" - "}\n", - corr_id, triangle_key_json, sig->primary_quote, - sig->live ? "true" : "false", sig->starting_volume, - legs_json, sig->predicted_bps, - (long long)sig->ts_ms, (long long)sig->book_ts_ms, - (long long)sig->t_sock_arrive_ms, - (long long)sig->t_arrive_ms, (long long)sig->t_eval_ms, - sig->book_count == 0 ? "" : ",\"books\":[", - books_json_str[0] ? books_json_str : "", - sig->book_count == 0 ? "" : "]"); - - size_t to_send = strlen(json_buf); - size_t sent = 0; - while (sent < to_send) { - int r = (int)write(loops->unix_client_fd, json_buf + sent, to_send - sent); - if (r > 0) { - sent += (size_t)r; - continue; - } - if (r == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) { - log_write("[EVENTS] Write to executor failed, reconnecting\n"); - int old_fd = loops->unix_client_fd; - loops->unix_client_fd = -1; - close(old_fd); - event_loops_remove_fd(&loops->cold_epoll, old_fd); - break; - } - /* EAGAIN: executor buffer full, drop this signal and move on */ - break; - } -} - static void arm_ping_timer(event_loops_t *loops, uint64_t interval_ms) { if (interval_ms == 0) return; struct itimerspec its = {0}; @@ -383,7 +178,7 @@ void *event_hot_thread(void *arg) { ws_client_read(ws, tf->ws_conn_idx); } else if (tf->type == FD_TYPE_TIMER) { uint64_t expirations = 0; - read(loops->timer_fd, &expirations, sizeof(expirations)); + if (read(loops->timer_fd, &expirations, sizeof(expirations)) < 0) {} for (uint32_t c = 0; c < ws->connection_count; c++) { ws_connection_t *conn = &ws->connections[c]; @@ -413,7 +208,8 @@ void *event_cold_thread(void *arg) { log_write("[EXEC] Thread started\n"); executor_thread_t *exec = executor_thread_create(loops->ws_client->cfg, - loops->ws_client->fill_ch); + loops->ws_client->fill_ch, + loops->ws_client); if (!exec) { log_write("[EXEC] Failed to create executor\n"); return NULL; @@ -443,14 +239,14 @@ void *event_cold_thread(void *arg) { while (!spsc_empty(loops->signal_queue)) { signal_entry_t sig; if (spsc_pop(loops->signal_queue, &sig)) { - executor_execute_triangle(exec, &sig, loops->ws_client->fill_ch); + executor_execute_triangle(exec, &sig); } } /* Drain fill wakeup */ if (fds[1].revents & POLLIN) { uint64_t val; - read(fds[1].fd, &val, sizeof(val)); + if (read(fds[1].fd, &val, sizeof(val)) < 0) {} } /* Keepalive: warm up REST connection every 30s */ @@ -463,7 +259,7 @@ void *event_cold_thread(void *arg) { while (!spsc_empty(loops->signal_queue)) { signal_entry_t sig; if (spsc_pop(loops->signal_queue, &sig)) { - executor_execute_triangle(exec, &sig, loops->ws_client->fill_ch); + executor_execute_triangle(exec, &sig); } } } diff --git a/src/events.h b/src/events.h index cd0b34a..cbd5755 100644 --- a/src/events.h +++ b/src/events.h @@ -14,9 +14,6 @@ typedef enum { FD_TYPE_WS, /* WebSocket connection */ FD_TYPE_TIMER, /* timerfd */ FD_TYPE_EVENT, /* eventfd for queue wakeup */ - FD_TYPE_UNIX_SERVER, /* unix domain socket server */ - FD_TYPE_HTTP_SERVER, /* HTTP server socket */ - FD_TYPE_UNIX_CLIENT /* unix domain socket client */ } fd_type_t; /* A file descriptor tracked by the epoll event loop */ @@ -41,9 +38,6 @@ typedef struct { epoll_set_t cold_epoll; /* cold epoll set for timer/http events */ ws_client_t *ws_client; /* WebSocket client instance */ spsc_queue_t *signal_queue; /* signal queue for emitting opportunities */ - int unix_server_fd; /* unix domain server socket */ - int unix_client_fd; /* unix domain client socket */ - int http_server_fd; /* HTTP server socket */ int timer_fd; /* timerfd for periodic tasks */ int wakeup_fd; /* eventfd for waking the cold loop */ uint64_t next_ping_ms; /* next scheduled WebSocket ping timestamp */ @@ -64,9 +58,5 @@ void event_loops_remove_fd(epoll_set_t *set, int fd); void *event_hot_thread(void *arg); /* Cold event loop thread: handles timers, HTTP, and signal dispatch */ void *event_cold_thread(void *arg); -/* Connect to a unix domain socket (non-blocking) */ -int unix_client_connect(const char *socket_path); -/* Create and listen on a unix domain socket */ -int unix_server_create(const char *socket_path); #endif diff --git a/src/executor.c b/src/executor.c index 5ae60a5..d6e7f2f 100644 --- a/src/executor.c +++ b/src/executor.c @@ -17,6 +17,7 @@ struct executor_thread_s { const config_t *cfg; rest_conn_t *rest; + ws_client_t *ws; /* Concurrency isolation state */ char in_flight_triangles[MAX_IN_FLIGHT][128]; /* triangle_key */ @@ -77,11 +78,13 @@ static double apply_increment_floor(double vol, double inc) { /* ── Core execution loop ── */ executor_thread_t *executor_thread_create(const config_t *cfg, - fill_channel_t *fill_ch) { + fill_channel_t *fill_ch, + ws_client_t *ws) { (void)fill_ch; executor_thread_t *et = calloc(1, sizeof(*et)); if (!et) return NULL; et->cfg = cfg; + et->ws = ws; et->rest = rest_conn_new(); if (et->rest) { rest_conn_set_auth(et->rest, @@ -91,29 +94,22 @@ executor_thread_t *executor_thread_create(const config_t *cfg, } /* Warm up the authenticated REST connection pool */ - if (et->rest && cfg->live_mode) { + if (et->rest && cfg->live_mode && cfg->initial_capital_count > 0) { double dummy = 0; - (void)rest_get_balance(et->rest, "USDT", &dummy); + (void)rest_get_balance(et->rest, cfg->initial_capital[0].currency, &dummy); } return et; } bool executor_keepalive(executor_thread_t *et) { - if (!et->rest) return false; + if (!et->rest || et->cfg->initial_capital_count == 0) return false; double dummy = 0; - return rest_get_balance(et->rest, "USDT", &dummy); -} - -void executor_thread_set_fill_ch(executor_thread_t *et, fill_channel_t *ch) { - (void)et; - (void)ch; - /* In the initial implementation the fill channel is global (ws_client->fill_ch), - passed directly to execute_triangle. This setter is for per-thread channels. */ + return rest_get_balance(et->rest, et->cfg->initial_capital[0].currency, &dummy); } void executor_execute_triangle(executor_thread_t *et, - signal_entry_t *sig, - fill_channel_t *fill_ch) { + signal_entry_t *sig) { + fill_channel_t *fill_ch = et->ws->fill_ch; /* ── Concurrency isolation ── */ uint64_t pair_hashes[3] = {0}; for (int p = 0; p < 3; p++) { @@ -171,6 +167,7 @@ void executor_execute_triangle(executor_thread_t *et, double input_vol; if (leg == 0) { input_vol = atof(sl->order_param); + input_vol = apply_fee_hold(input_vol, sl->fee_rate, is_buy); } else { input_vol = leg_output[leg - 1]; input_vol = apply_fee_hold(input_vol, sl->fee_rate, is_buy); @@ -240,7 +237,6 @@ void executor_execute_triangle(executor_thread_t *et, /* ── Wait for fill (live only) ── */ double total_size = 0, total_funds = 0, avg_price = 0, total_fee = 0; - int match_count = 0; if (sig->live) { fill_result_t fr = {0}; @@ -255,8 +251,7 @@ void executor_execute_triangle(executor_thread_t *et, total_funds = fr.total_funds; total_fee = fr.total_fee; avg_price = fr.avg_price; - match_count = fr.match_count; - if (fr.order_id[0]) strncpy(order_id, fr.order_id, sizeof(order_id) - 1); + if (fr.order_id[0]) memcpy(order_id, fr.order_id, sizeof(order_id)); } else { /* Paper mode: simulate fill from books[leg] top-of-book */ if (sig->book_count > leg) { @@ -312,12 +307,37 @@ void executor_execute_triangle(executor_thread_t *et, (double)(now_mono_ms() - order_fire_ms_tracking)); /* ── Cascade ── */ - leg_output[leg] = is_buy ? total_size : total_funds; + double raw_output = is_buy ? total_size : total_funds; + leg_output[leg] = raw_output; fills[leg][0] = leg_output[leg]; fills[leg][1] = avg_price; fills[leg][2] = total_fee; fills[leg][3] = total_funds; + /* ── Balance wait (live only): wait for KuCoin to settle before next leg ── */ + if (sig->live && leg < 2) { + const char *pair = sl->symbol; + const char *dash = strchr(pair, '-'); + char out_ccy[16] = {0}; + if (dash) { + const char *ccy = is_buy ? pair : (dash + 1); + size_t ccy_len = is_buy ? (size_t)(dash - pair) : strlen(dash + 1); + if (ccy_len > 15) ccy_len = 15; + memcpy(out_ccy, ccy, ccy_len); + } + if (out_ccy[0]) { + ws_client_await_balance(et->ws, out_ccy, raw_output, 2000); + /* For sells, KuCoin may deduct fee from output currency. + Use the actual settled balance rather than the arithmetic output. */ + double actual = ws_client_latest_balance(et->ws, out_ccy); + if (actual > 0 && actual < raw_output) { + leg_output[leg] = actual; + fills[leg][0] = actual; + raw_output = actual; + } + } + } + /* Fee hold reduction + increment floor for next leg */ if (leg < 2) { const signal_leg_t *nsl = &sig->legs.legs[leg + 1]; @@ -331,10 +351,8 @@ void executor_execute_triangle(executor_thread_t *et, /* ── Compute PnL ── */ double profit = 0, effective_bps = 0; if (success && fills[2][0] > 0) { - double leg0_in = (strcmp(sig->legs.legs[0].side, "buy") == 0) - ? fills[0][3] : fills[0][0]; - double leg2_out = (strcmp(sig->legs.legs[2].side, "buy") == 0) - ? fills[2][0] : fills[2][0]; + double leg0_in = fills[0][4]; + double leg2_out = fills[2][0]; /* Subtract fee if fee currency matches leg 2 output currency */ { const signal_leg_t *sl2 = &sig->legs.legs[2]; diff --git a/src/executor.h b/src/executor.h index f0a37c0..04f8516 100644 --- a/src/executor.h +++ b/src/executor.h @@ -5,21 +5,19 @@ #include "fill_handler.h" #include "config.h" #include "queue.h" +#include "ws_client.h" /* Per-thread data for the executor. */ typedef struct executor_thread_s executor_thread_t; /* Create an executor thread (one per concurrent slot). */ executor_thread_t *executor_thread_create(const config_t *cfg, - fill_channel_t *fill_ch); - -/* Set the fill channel for this executor thread. */ -void executor_thread_set_fill_ch(executor_thread_t *et, fill_channel_t *ch); + fill_channel_t *fill_ch, + ws_client_t *ws); /* Execute a single triangle signal (blocking, called from executor thread). */ void executor_execute_triangle(executor_thread_t *et, - signal_entry_t *sig, - fill_channel_t *fill_ch); + signal_entry_t *sig); /* Report status text. */ void executor_write_report(const char *fmt, ...); diff --git a/src/http_server.c b/src/http_server.c deleted file mode 100644 index 1c282c1..0000000 --- a/src/http_server.c +++ /dev/null @@ -1,369 +0,0 @@ -/* - * http_server.c - Simple single-threaded HTTP server for health & book queries - * - * Provides REST endpoints: - * GET /health - connection status - * GET /book/{symbol} - single order book snapshot - * GET /books - all order books - * GET /symbols - list tracked symbols - * POST /symbols - dynamically add symbols (subscribe via WS) - * DELETE /symbols/{name} - remove symbol - */ - -#include "log.h" -#include "http_server.h" -#include "cJSON.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include - -static int set_nonblocking(int fd) { - int flags = fcntl(fd, F_GETFL, 0); - if (flags < 0) return -1; - return fcntl(fd, F_SETFL, flags | O_NONBLOCK); -} - -int http_server_init(http_server_t *srv, const char *host, int port, - order_book_t *books, symbol_table_t *symbols, - ws_client_t *ws_client, evaluator_t *evaluator, - config_t *cfg) { - memset(srv, 0, sizeof(*srv)); - srv->books = books; - srv->symbols = symbols; - srv->ws_client = ws_client; - srv->evaluator = evaluator; - srv->cfg = cfg; - srv->client_fd = -1; - srv->running = true; - - srv->listen_fd = socket(AF_INET, SOCK_STREAM, 0); - if (srv->listen_fd < 0) { - perror("socket"); - return -1; - } - - int opt = 1; - setsockopt(srv->listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); - - struct sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - if (strcmp(host, "0.0.0.0") == 0) { - addr.sin_addr.s_addr = INADDR_ANY; - } else { - inet_pton(AF_INET, host, &addr.sin_addr); - } - - if (bind(srv->listen_fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { - perror("bind"); - close(srv->listen_fd); - return -1; - } - - if (listen(srv->listen_fd, 16) < 0) { - perror("listen"); - close(srv->listen_fd); - return -1; - } - - set_nonblocking(srv->listen_fd); - log_write("[HTTP] Server listening on %s:%d\n", host, port); - return 0; -} - -void http_server_destroy(http_server_t *srv) { - srv->running = false; - if (srv->client_fd >= 0) close(srv->client_fd); - if (srv->listen_fd >= 0) close(srv->listen_fd); -} - -int http_server_accept(http_server_t *srv) { - if (srv->client_fd >= 0) { - close(srv->client_fd); - } - struct sockaddr_in client_addr; - socklen_t addr_len = sizeof(client_addr); - srv->client_fd = accept(srv->listen_fd, (struct sockaddr *)&client_addr, &addr_len); - if (srv->client_fd < 0) { - if (errno != EAGAIN && errno != EWOULDBLOCK) { - perror("accept"); - } - return -1; - } - set_nonblocking(srv->client_fd); - srv->recv_len = 0; - return srv->client_fd; -} - -/* Write an HTTP response with status line, headers, and body to the client. */ -static void http_send(http_server_t *srv, const char *status, - const char *content_type, const char *body) { - char header[512]; - int body_len = body ? (int)strlen(body) : 0; - int hdr_len = snprintf(header, sizeof(header), - "HTTP/1.1 %s\r\n" - "Content-Type: %s\r\n" - "Content-Length: %d\r\n" - "Connection: close\r\n" - "\r\n", - status, content_type, body_len); - write(srv->client_fd, header, (size_t)hdr_len); - if (body && body_len > 0) { - write(srv->client_fd, body, (size_t)body_len); - } -} - -/* Send a 200 OK JSON response. */ -static void http_send_json(http_server_t *srv, const char *body) { - http_send(srv, "200 OK", "application/json", body); -} - -/* Send an error response with the given status code and plain text body. */ -static void http_send_error(http_server_t *srv, const char *status, const char *msg) { - http_send(srv, status, "text/plain", msg); -} - -/* GET /health — return WS connection count and symbol count as JSON. */ -static void handle_health(http_server_t *srv) { - char body[256]; - int connected = 0; - if (srv->ws_client) { - for (uint32_t i = 0; i < srv->ws_client->connection_count; i++) { - if (srv->ws_client->connections[i].state == WS_STATE_CONNECTED) connected++; - } - } - snprintf(body, sizeof(body), - "{\"status\":\"ok\",\"ws_connections\":%d,\"symbols\":%u}", - connected, srv->symbols ? srv->symbols->count : 0); - http_send_json(srv, body); -} - -/* GET /book/{symbol} — return a single order book as JSON, or 404. */ -static void handle_book(http_server_t *srv, const char *symbol) { - if (!srv->symbols || !srv->books) { - http_send_error(srv, "500 Internal Server Error", "not initialized\n"); - return; - } - - int16_t idx = symbol_table_lookup(srv->symbols, symbol); - if (idx < 0) { - char body[128]; - snprintf(body, sizeof(body), "{\"error\":\"symbol not found\",\"symbol\":\"%s\"}", symbol); - http_send(srv, "404 Not Found", "application/json", body); - return; - } - - order_book_t *book = &srv->books[idx]; - char body[2048]; - int off = 0; - off += snprintf(body + off, sizeof(body) - (size_t)off, - "{\"symbol\":\"%s\",\"ts_ms\":%lld,\"sequence\":%lld,\"bids\":[", - book->symbol, (long long)book->ts_ms, (long long)book->sequence); - for (uint8_t i = 0; i < book->bid_count; i++) { - off += snprintf(body + off, sizeof(body) - (size_t)off, - "%s[%.6g,%.8g]", i ? "," : "", book->bids[i][0], book->bids[i][1]); - } - off += snprintf(body + off, sizeof(body) - (size_t)off, "],\"asks\":["); - for (uint8_t i = 0; i < book->ask_count; i++) { - off += snprintf(body + off, sizeof(body) - (size_t)off, - "%s[%.6g,%.8g]", i ? "," : "", book->asks[i][0], book->asks[i][1]); - } - off += snprintf(body + off, sizeof(body) - (size_t)off, "]}"); - http_send_json(srv, body); -} - -/* GET /books — return all order books as a JSON array. */ -static void handle_books(http_server_t *srv) { - if (!srv->symbols || !srv->books) { - http_send_error(srv, "500 Internal Server Error", "not initialized\n"); - return; - } - - char body[65536]; - int off = snprintf(body, sizeof(body), "["); - for (uint32_t i = 0; i < srv->symbols->count && (size_t)off < sizeof(body) - 512; i++) { - order_book_t *book = &srv->books[i]; - if (book->ts_ms <= 0) continue; - if (off > 1) off += snprintf(body + off, sizeof(body) - (size_t)off, ","); - off += snprintf(body + off, sizeof(body) - (size_t)off, - "{\"symbol\":\"%s\",\"ts\":%lld,\"bids\":[", - book->symbol, (long long)book->ts_ms); - for (uint8_t j = 0; j < book->bid_count; j++) { - off += snprintf(body + off, sizeof(body) - (size_t)off, - "%s[%.6g,%.8g]", j ? "," : "", book->bids[j][0], book->bids[j][1]); - } - off += snprintf(body + off, sizeof(body) - (size_t)off, "],\"asks\":["); - for (uint8_t j = 0; j < book->ask_count; j++) { - off += snprintf(body + off, sizeof(body) - (size_t)off, - "%s[%.6g,%.8g]", j ? "," : "", book->asks[j][0], book->asks[j][1]); - } - off += snprintf(body + off, sizeof(body) - (size_t)off, "]}"); - } - off += snprintf(body + off, sizeof(body) - (size_t)off, "]"); - http_send_json(srv, body); -} - -/* GET /symbols — list all tracked symbol names as a JSON array. */ -static void handle_symbols_list(http_server_t *srv) { - if (!srv->symbols) { - http_send_error(srv, "500 Internal Server Error", "not initialized\n"); - return; - } - - char body[65536]; - int off = snprintf(body, sizeof(body), "["); - for (uint32_t i = 0; i < srv->symbols->count && (size_t)off < sizeof(body) - 64; i++) { - off += snprintf(body + off, sizeof(body) - (size_t)off, - "%s\"%s\"", i ? "," : "", srv->symbols->entries[i].name); - } - off += snprintf(body + off, sizeof(body) - (size_t)off, "]"); - http_send_json(srv, body); -} - -/* POST /symbols — subscribe to new symbols (add to symbol table + WS subscribe). */ -static void handle_symbols_add(http_server_t *srv, const char *body) { - if (!srv->symbols || !srv->ws_client) { - http_send_error(srv, "500 Internal Server Error", "not initialized\n"); - return; - } - - cJSON *root = cJSON_Parse(body); - if (!root) { - http_send_error(srv, "400 Bad Request", "invalid JSON\n"); - return; - } - - char resp[1024] = "{\"added\":["; - int resp_len = (int)strlen(resp); - - cJSON *item; - cJSON_ArrayForEach(item, root) { - if (!cJSON_IsString(item)) continue; - const char *sym = item->valuestring; - if (!sym) continue; - - int16_t existing = symbol_table_lookup(srv->symbols, sym); - if (existing >= 0) continue; - - if (symbol_table_add(srv->symbols, sym) == 0) { - int16_t idx = symbol_table_lookup(srv->symbols, sym); - if (idx >= 0) { - if (resp_len > 11) resp_len += snprintf(resp + resp_len, - sizeof(resp) - (size_t)resp_len, ","); - resp_len += snprintf(resp + resp_len, - sizeof(resp) - (size_t)resp_len, "\"%s\"", sym); - - // Auto-subscribe to WS stream for the new symbol - uint16_t uidx = (uint16_t)idx; - if (srv->ws_client->connection_count > 0) { - ws_client_subscribe(srv->ws_client, 0, &uidx, 1); - } - } - } - } - resp_len += snprintf(resp + resp_len, sizeof(resp) - (size_t)resp_len, "]}"); - cJSON_Delete(root); - http_send_json(srv, resp); -} - -/* DELETE /symbols/{name} — unsubscribe and remove from symbol table. */ -static void handle_symbols_remove(http_server_t *srv, const char *symbol) { - if (!srv->symbols || !srv->ws_client) { - http_send_error(srv, "500 Internal Server Error", "not initialized\n"); - return; - } - - int16_t idx = symbol_table_lookup(srv->symbols, symbol); - if (idx < 0) { - char body[128]; - snprintf(body, sizeof(body), "{\"error\":\"symbol not found\",\"symbol\":\"%s\"}", symbol); - http_send(srv, "404 Not Found", "application/json", body); - return; - } - - uint16_t uidx = (uint16_t)idx; - if (srv->ws_client->connection_count > 0) { - ws_client_unsubscribe(srv->ws_client, 0, &uidx, 1); - } - - // Compact the symbol table by shifting entries - for (uint32_t i = 0; i < srv->symbols->count; i++) { - if (strcmp(srv->symbols->entries[i].name, symbol) == 0) { - memmove(&srv->symbols->entries[i], &srv->symbols->entries[i + 1], - (srv->symbols->count - i - 1) * sizeof(symbol_entry_t)); - srv->symbols->count--; - for (uint32_t j = i; j < srv->symbols->count; j++) { - srv->symbols->entries[j].index = (uint16_t)j; - } - break; - } - } - - char resp[128]; - snprintf(resp, sizeof(resp), "{\"removed\":\"%s\"}", symbol); - http_send_json(srv, resp); -} - -int http_server_handle_request(http_server_t *srv) { - ssize_t n = read(srv->client_fd, srv->recv_buf + srv->recv_len, - sizeof(srv->recv_buf) - srv->recv_len - 1); - if (n <= 0) { - if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { - perror("read"); - } - close(srv->client_fd); - srv->client_fd = -1; - srv->recv_len = 0; - return -1; - } - srv->recv_len += (size_t)n; - srv->recv_buf[srv->recv_len] = '\0'; - - char *headers_end = strstr((char *)srv->recv_buf, "\r\n\r\n"); - if (!headers_end) return 0; - - char *request_line = (char *)srv->recv_buf; - char *rl_end = strchr(request_line, '\r'); - if (!rl_end) return 0; - *rl_end = '\0'; - - char method[16] = {0}, path[512] = {0}; - sscanf(request_line, "%15s %511s", method, path); - - char *body_start = headers_end + 4; - size_t body_len = srv->recv_len - (size_t)(body_start - (char *)srv->recv_buf); - - if (strcmp(path, "/health") == 0) { - handle_health(srv); - } else if (strncmp(path, "/book/", 6) == 0) { - handle_book(srv, path + 6); - } else if (strcmp(path, "/books") == 0) { - handle_books(srv); - } else if (strcmp(path, "/symbols") == 0) { - if (strcmp(method, "POST") == 0) { - handle_symbols_add(srv, body_start); - } else { - handle_symbols_list(srv); - } - } else if (strncmp(path, "/symbols/", 9) == 0) { - if (strcmp(method, "DELETE") == 0) { - handle_symbols_remove(srv, path + 9); - } else { - http_send_error(srv, "405 Method Not Allowed", "use DELETE\n"); - } - } else { - http_send_error(srv, "404 Not Found", "not found\n"); - } - - close(srv->client_fd); - srv->client_fd = -1; - srv->recv_len = 0; - return 0; -} diff --git a/src/http_server.h b/src/http_server.h deleted file mode 100644 index 7e3588c..0000000 --- a/src/http_server.h +++ /dev/null @@ -1,38 +0,0 @@ -#ifndef FUSED_HTTP_SERVER_H -#define FUSED_HTTP_SERVER_H - -#include -#include -#include "book.h" -#include "hash.h" -#include "config.h" -#include "ws_client.h" -#include "evaluate.h" - -/* Embedded HTTP server for health/status endpoints */ -typedef struct { - int listen_fd; /* server listening socket */ - int client_fd; /* currently connected client socket */ - uint8_t recv_buf[8192]; /* request receive buffer */ - size_t recv_len; /* bytes received so far */ - order_book_t *books; /* pointer to shared order books */ - symbol_table_t *symbols; /* pointer to shared symbol table */ - ws_client_t *ws_client; /* pointer to WebSocket client state */ - evaluator_t *evaluator; /* pointer to evaluator state */ - config_t *cfg; /* pointer to configuration */ - bool running; /* false signals server to stop */ -} http_server_t; - -/* Initialise and bind the HTTP server */ -int http_server_init(http_server_t *srv, const char *host, int port, - order_book_t *books, symbol_table_t *symbols, - ws_client_t *ws_client, evaluator_t *evaluator, - config_t *cfg); -/* Destroy the HTTP server and close sockets */ -void http_server_destroy(http_server_t *srv); -/* Accept a new client connection (non-blocking) */ -int http_server_accept(http_server_t *srv); -/* Read, parse, and respond to the current client request */ -int http_server_handle_request(http_server_t *srv); - -#endif diff --git a/src/main.c b/src/main.c index 2476d97..53b7845 100644 --- a/src/main.c +++ b/src/main.c @@ -26,7 +26,6 @@ #include "evaluate.h" #include "queue.h" #include "events.h" -#include "http_server.h" static volatile sig_atomic_t g_running = 1; @@ -159,12 +158,6 @@ int main(int argc, char *argv[]) { return 1; } - http_server_t http_srv; - if (http_server_init(&http_srv, cfg.rest_host, cfg.rest_port, - books, &symbols, &ws_client, &evaluator, &cfg) != 0) { - log_write("[MAIN] HTTP server init failed (non-fatal)\n"); - } - if (symbols.count > 0) { uint16_t all_indices[MAX_SYMBOLS]; uint32_t n = symbols.count > MAX_SYMBOLS ? MAX_SYMBOLS : symbols.count; @@ -214,15 +207,8 @@ int main(int argc, char *argv[]) { log_write("[MAIN] Fused engine running. Press Ctrl+C to stop.\n"); - // Main loop: accept HTTP connections and reconnect disconnected WS + // Main loop: reconnect disconnected WS while (g_running) { - if (http_srv.listen_fd >= 0 && http_srv.client_fd < 0) { - http_server_accept(&http_srv); - } - if (http_srv.client_fd >= 0) { - http_server_handle_request(&http_srv); - } - struct timespec ts = {0, 100000000}; nanosleep(&ts, NULL); @@ -249,13 +235,12 @@ int main(int argc, char *argv[]) { ws_client.running = false; uint64_t val = 1; - ssize_t wr = write(events.wakeup_fd, &val, sizeof(val)); + ssize_t wr = write(events.wakeup_fd, &val, sizeof(val)); (void)wr; pthread_join(hot_thread, NULL); pthread_join(cold_thread, NULL); - http_server_destroy(&http_srv); event_loops_destroy(&events); ws_client_destroy(&ws_client); spsc_destroy(&signal_queue); diff --git a/src/symbols_api.c b/src/symbols_api.c index 695daac..d5101bd 100644 --- a/src/symbols_api.c +++ b/src/symbols_api.c @@ -336,15 +336,15 @@ int discover_symbols(symbol_table_t *symbols, triangle_set_t *triangles, t->use_bid[1] = (strcmp(pairs.pairs[i2].quote, y) == 0) ? 1 : 0; t->use_bid[2] = (strcmp(pairs.pairs[i3].quote, hold) == 0) ? 1 : 0; t->id = (uint16_t)tri_count; - strncpy(t->symbol_names[0], pairs.pairs[i1].symbol, SYMBOL_NAME_LEN - 1); - strncpy(t->symbol_names[1], pairs.pairs[i2].symbol, SYMBOL_NAME_LEN - 1); - strncpy(t->symbol_names[2], pairs.pairs[i3].symbol, SYMBOL_NAME_LEN - 1); - strncpy(t->fee_currency[0], pairs.pairs[i1].fee_currency, CURRENCY_NAME_LEN - 1); - strncpy(t->fee_currency[1], pairs.pairs[i2].fee_currency, CURRENCY_NAME_LEN - 1); - strncpy(t->fee_currency[2], pairs.pairs[i3].fee_currency, CURRENCY_NAME_LEN - 1); - strncpy(t->base, hold, CURRENCY_NAME_LEN - 1); - strncpy(t->mid, x, CURRENCY_NAME_LEN - 1); - strncpy(t->quote, y, CURRENCY_NAME_LEN - 1); + { size_t _l = strlen(pairs.pairs[i1].symbol); if (_l >= SYMBOL_NAME_LEN) _l = SYMBOL_NAME_LEN - 1; memcpy(t->symbol_names[0], pairs.pairs[i1].symbol, _l); t->symbol_names[0][_l] = '\0'; } + { size_t _l = strlen(pairs.pairs[i2].symbol); if (_l >= SYMBOL_NAME_LEN) _l = SYMBOL_NAME_LEN - 1; memcpy(t->symbol_names[1], pairs.pairs[i2].symbol, _l); t->symbol_names[1][_l] = '\0'; } + { size_t _l = strlen(pairs.pairs[i3].symbol); if (_l >= SYMBOL_NAME_LEN) _l = SYMBOL_NAME_LEN - 1; memcpy(t->symbol_names[2], pairs.pairs[i3].symbol, _l); t->symbol_names[2][_l] = '\0'; } + { size_t _l = strlen(pairs.pairs[i1].fee_currency); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->fee_currency[0], pairs.pairs[i1].fee_currency, _l); t->fee_currency[0][_l] = '\0'; } + { size_t _l = strlen(pairs.pairs[i2].fee_currency); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->fee_currency[1], pairs.pairs[i2].fee_currency, _l); t->fee_currency[1][_l] = '\0'; } + { size_t _l = strlen(pairs.pairs[i3].fee_currency); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->fee_currency[2], pairs.pairs[i3].fee_currency, _l); t->fee_currency[2][_l] = '\0'; } + { size_t _l = strlen(hold); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->base, hold, _l); t->base[_l] = '\0'; } + { size_t _l = strlen(x); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->mid, x, _l); t->mid[_l] = '\0'; } + { size_t _l = strlen(y); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->quote, y, _l); t->quote[_l] = '\0'; } t->fee_factor[0] = 1.0 - fee_rate_for_pair(&pairs.pairs[i1], kcs); t->fee_factor[1] = 1.0 - fee_rate_for_pair(&pairs.pairs[i2], kcs); t->fee_factor[2] = 1.0 - fee_rate_for_pair(&pairs.pairs[i3], kcs); diff --git a/src/ws_client.c b/src/ws_client.c index 73bac58..c0bbb40 100644 --- a/src/ws_client.c +++ b/src/ws_client.c @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -186,6 +187,12 @@ int ws_client_init(ws_client_t *client, const config_t *cfg, log_write("[WS] Failed to create fill channel\n"); } + pthread_mutex_init(&client->balance_lock, NULL); + client->balance_wake_fd = eventfd(0, EFD_NONBLOCK); + if (client->balance_wake_fd < 0) { + log_write("[WS] Failed to create balance wake eventfd\n"); + } + return 0; } @@ -199,6 +206,8 @@ void ws_client_destroy(ws_client_t *client) { conn->ctx = NULL; } if (ctx) SSL_CTX_free(ctx); + if (client->balance_wake_fd >= 0) close(client->balance_wake_fd); + pthread_mutex_destroy(&client->balance_lock); } /* @@ -268,7 +277,10 @@ int ws_client_fetch_token_priv(ws_connection_t *conn, const config_t *cfg) { } char *slash = strchr(host_start, '/'); if (slash) *slash = '\0'; - strncpy(conn->host, host_start, sizeof(conn->host) - 1); + size_t hl = strlen(host_start); + if (hl >= sizeof(conn->host)) hl = sizeof(conn->host) - 1; + memcpy(conn->host, host_start, hl); + conn->host[hl] = '\0'; } if (cJSON_IsNumber(pingInterval)) conn->ping_interval_ms = (uint32_t)pingInterval->valuedouble; @@ -371,7 +383,7 @@ int ws_client_connect(ws_client_t *client, uint32_t conn_idx) { ws_client_subscribe(client, conn_idx, conn->symbol_indices, conn->symbol_count); } - /* Subscribe to private order-change channel for fill events */ + /* Subscribe to private channels */ { char msg[256]; snprintf(msg, sizeof(msg), @@ -380,6 +392,14 @@ int ws_client_connect(ws_client_t *client, uint32_t conn_idx) { ws_send_text(conn, msg); log_write("[WS] Subscribed to tradeOrdersV2\n"); } + { + char msg[256]; + snprintf(msg, sizeof(msg), + "{\"type\":\"subscribe\",\"topic\":\"/account/balance\"," + "\"response\":true,\"privateChannel\":\"true\"}"); + ws_send_text(conn, msg); + log_write("[WS] Subscribed to /account/balance\n"); + } return 0; } @@ -648,7 +668,10 @@ static int16_t parse_book_update(cJSON *root, ws_client_t *client) { } book->symbol_idx = (uint16_t)sym_idx; - strncpy(book->symbol, symbol, SYMBOL_NAME_LEN - 1); + size_t sl = strlen(symbol); + if (sl >= sizeof(book->symbol)) sl = sizeof(book->symbol) - 1; + memcpy(book->symbol, symbol, sl); + book->symbol[sl] = '\0'; return sym_idx; } @@ -779,6 +802,33 @@ int16_t ws_client_process_frame(ws_client_t *client, uint32_t conn_idx) { } } } + } else if (cJSON_IsString(subject) && + strcmp(subject->valuestring, "account.balance") == 0) { + cJSON *data = cJSON_GetObjectItem(msg_root, "data"); + if (data) { + cJSON *ccy = cJSON_GetObjectItem(data, "currency"); + cJSON *avail = cJSON_GetObjectItem(data, "available"); + if (cJSON_IsString(ccy) && cJSON_IsString(avail)) { + pthread_mutex_lock(&client->balance_lock); + int idx = -1; + for (int i = 0; i < client->balance_count; i++) { + if (strcmp(client->balance_cache[i].currency, ccy->valuestring) == 0) { + idx = i; break; + } + } + if (idx < 0 && client->balance_count < MAX_BALANCE_ENTRIES) { + idx = client->balance_count++; + strncpy(client->balance_cache[idx].currency, ccy->valuestring, 15); + } + if (idx >= 0) + client->balance_cache[idx].available = atof(avail->valuestring); + pthread_mutex_unlock(&client->balance_lock); + if (client->balance_wake_fd >= 0) { + uint64_t one = 1; + if (write(client->balance_wake_fd, &one, sizeof(one)) < 0) {} + } + } + } } else { sym_idx = parse_book_update(msg_root, client); } @@ -914,3 +964,68 @@ int ws_client_read(ws_client_t *client, uint32_t conn_idx) { return 0; } + +bool ws_client_await_balance(ws_client_t *client, const char *currency, + double min_amount, int64_t timeout_ms) { + int64_t deadline = (int64_t)ws_client_now_ms() + timeout_ms; + + /* Check cache first */ + { + pthread_mutex_lock(&client->balance_lock); + double avail = 0; + bool found = false; + for (int i = 0; i < client->balance_count; i++) { + if (strcmp(client->balance_cache[i].currency, currency) == 0) { + avail = client->balance_cache[i].available; + found = true; + break; + } + } + pthread_mutex_unlock(&client->balance_lock); + if (found && avail >= min_amount - 1e-12) + return true; + } + + /* Wait on the balance wake eventfd */ + while ((int64_t)ws_client_now_ms() < deadline) { + struct pollfd pfd = { .fd = client->balance_wake_fd, .events = POLLIN }; + int remaining = (int)(deadline - (int64_t)ws_client_now_ms()); + if (remaining <= 0) break; + + int ret = poll(&pfd, 1, remaining); + if (ret > 0) { + uint64_t val; + if (read(client->balance_wake_fd, &val, sizeof(val)) < 0) {} + } + + /* Re-check the cache on any wakeup or timeout */ + pthread_mutex_lock(&client->balance_lock); + double avail = 0; + bool found = false; + for (int i = 0; i < client->balance_count; i++) { + if (strcmp(client->balance_cache[i].currency, currency) == 0) { + avail = client->balance_cache[i].available; + found = true; + break; + } + } + pthread_mutex_unlock(&client->balance_lock); + if (found && avail >= min_amount - 1e-12) + return true; + } + + return false; +} + +double ws_client_latest_balance(ws_client_t *client, const char *currency) { + pthread_mutex_lock(&client->balance_lock); + double avail = 0; + for (int i = 0; i < client->balance_count; i++) { + if (strcmp(client->balance_cache[i].currency, currency) == 0) { + avail = client->balance_cache[i].available; + break; + } + } + pthread_mutex_unlock(&client->balance_lock); + return avail; +} diff --git a/src/ws_client.h b/src/ws_client.h index 2bde3d6..9f847fd 100644 --- a/src/ws_client.h +++ b/src/ws_client.h @@ -3,6 +3,7 @@ #include #include +#include #include #include #include "book.h" @@ -11,6 +12,13 @@ #include "evaluate.h" #include "fill_handler.h" +#define MAX_BALANCE_ENTRIES 64 + +typedef struct { + char currency[16]; + double available; +} balance_entry_t; + #define WS_MAX_FRAME_SIZE (128 * 1024) #define WS_MAX_CONNECTIONS 8 #define WS_READ_BUF_SIZE (64 * 1024) @@ -74,6 +82,10 @@ typedef struct { bool running; /* false signals client to stop */ fill_channel_t *fill_ch; /* fill event channel (hot→executor) */ int fill_drop_warn; /* rate-limited fill drop warning counter */ + balance_entry_t balance_cache[MAX_BALANCE_ENTRIES]; /* latest available balances from WS */ + int balance_count; + pthread_mutex_t balance_lock; + int balance_wake_fd; /* eventfd: written on every balance update */ } ws_client_t; /* Initialise a WebSocket client with config, symbol table, books, and evaluator */ @@ -106,4 +118,14 @@ int ws_client_send_ping(ws_connection_t *conn); /* Get current monotonic timestamp in milliseconds */ uint64_t ws_client_now_ms(void); +/* Wait for balance WS to confirm available >= min_amount for a currency. + Blocks on the balance wake eventfd for up to timeout_ms. + Returns true once the condition is met, false on timeout. */ +bool ws_client_await_balance(ws_client_t *client, const char *currency, + double min_amount, int64_t timeout_ms); + +/* Read the latest known available balance for a currency from cache (thread-safe). + Returns 0 if currency is not in cache. */ +double ws_client_latest_balance(ws_client_t *client, const char *currency); + #endif