cleanup: remove Python executor, dead config/HTTP server; add balance wait; fix fee hold, PnL, warnings

- Remove executor/ and common/ Python directories (dead code after C migration)
- Remove src/http_server.c/.h (was for Python executor, generates warnings)
- Remove dead config keys: socket_path, executor_socket_path, send_signals, rest_host, rest_port
- Remove dead UDS code in events.c/h (send_signal_to_executor, unix_* functions)
- Fix fee hold on leg 0 buys (apply_fee_hold to prevent Balance insufficient)
- Fix PnL leg0_in to use fills[0][4] instead of wrong currency field
- Fix REST keepalive warmup currency (use initial_capital[0] instead of hardcoded USDT)
- Add balance wait between legs via /account/balance WS + eventfd wake
- Fix all strncpy truncation warnings in config.c, symbols_api.c, ws_client.c
This commit is contained in:
nicolas 2026-05-27 12:14:10 -03:00
parent 06706ca479
commit 562fddf124
24 changed files with 211 additions and 3388 deletions

View File

@ -1,6 +0,0 @@
"""Common utilities for triangular arbitrage bot."""
from common.config import Settings
from common.log import configure_logging
__all__ = ["Settings", "configure_logging"]

View File

@ -1,67 +0,0 @@
"""Deprecated config schema for the old fh_ob Python process. Kept so that
common/log.py can be imported without errors. Not used at runtime."""
import asyncio
from pathlib import Path
from typing import Optional
import yaml
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings
class FHobSettings(BaseModel):
symbols: list[str] = Field(
default_factory=list,
description="Trading pairs to subscribe to. Empty = no subscriptions. oe_em adds pairs via REST.",
)
log_level: str = Field(default="INFO", description="Logging level")
log_file: Path = Field(
default=Path("/tmp/fh_ob.log"),
description="Path to log file. Logs are written here in addition to stdout.",
)
socket_path: Path = Field(
default=Path("/tmp/fh_ob.sock"),
description="Unix domain socket path for OE+EM",
)
rest_host: str = Field(default="0.0.0.0", description="FastAPI debug host")
rest_port: int = Field(default=8000, description="FastAPI debug port")
ws_url: str = Field(
default="wss://ws-api-spot.kucoin.com",
description="KuCoin WebSocket endpoint",
)
token_url: str = Field(
default="https://api.kucoin.com/api/v1/bullet-public",
description="KuCoin public token endpoint",
)
reconnect_base_delay: float = Field(
default=1.0,
description="Base delay for reconnect exponential backoff (seconds)",
)
reconnect_max_delay: float = Field(
default=60.0,
description="Max delay for reconnect exponential backoff (seconds)",
)
heartbeat_interval: float = Field(
default=18.0,
description="WS ping interval (seconds) - KuCoin uses 18s",
)
class Settings(BaseSettings):
fh_ob: FHobSettings = Field(default_factory=FHobSettings)
@classmethod
async def from_yaml(cls, path: Path) -> "Settings":
loop = asyncio.get_running_loop()
def _read() -> dict:
with open(path) as f:
return yaml.safe_load(f) or {}
data = await loop.run_in_executor(None, _read)
fh_ob_data = data.get("fh_ob", {})
if fh_ob_data.get("symbols") is None:
fh_ob_data["symbols"] = []
return cls(**data)
model_config = {"env_prefix": "TRIARB_", "extra": "ignore"}

View File

@ -1,133 +0,0 @@
"""
Shared logging configuration for all components.
Provides configure_logging() which sets up structlog with JSON output to stdout
and an optional plain-text file handler. All components (fh_ob, oe_em, executor)
call this at startup before any other logging.
"""
import asyncio
import logging
import sys
from pathlib import Path
from typing import Optional
import structlog
class _AsyncFileHandler(logging.Handler):
"""Non-blocking file handler that queues log records for async writing."""
def __init__(self, filepath: Path) -> None:
super().__init__()
self._filepath = filepath
self._queue: Optional[asyncio.Queue] = None
self._task: Optional[asyncio.Task] = None
def _ensure_loop(self) -> None:
if self._queue is None:
self._queue = asyncio.Queue(maxsize=4096)
loop = asyncio.get_running_loop()
self._task = loop.create_task(self._writer_loop())
async def _writer_loop(self) -> None:
loop = asyncio.get_running_loop()
log_file = self._filepath
def _write(msg: str) -> None:
with open(log_file, "a") as f:
f.write(msg + "\n")
while True:
record = await self._queue.get()
try:
msg = self.format(record)
await loop.run_in_executor(None, _write, msg)
except Exception:
pass
self._queue.task_done()
def emit(self, record: logging.LogRecord) -> None:
self._ensure_loop()
try:
self._queue.put_nowait(record)
except asyncio.QueueFull:
pass
def close(self) -> None:
"""Override stdlib Handler.close() — no-op, use async _flush() instead."""
pass
def flush(self) -> None:
"""Override stdlib Handler.flush() — no-op, queue is non-blocking."""
pass
async def _flush(self) -> None:
"""Wait for all queued records to be written."""
if self._queue:
await self._queue.join()
_async_file_handler: Optional[_AsyncFileHandler] = None
def configure_logging(level: str = "INFO", log_file: Path | None = None) -> None:
"""
Configure structlog with JSON output to stdout and optional file handler.
Uses stdlib logging as the backend so that standard-library integrations
(e.g. uvicorn, aiohttp) produce structured JSON too.
Parameters
----------
level : str
Log level string (DEBUG, INFO, WARNING, ERROR).
log_file : Path or None
If set, a FileHandler is added to the root logger writing the
same JSON lines to disk.
"""
global _async_file_handler
logging.basicConfig(
level=getattr(logging, level.upper()),
format="%(message)s",
handlers=[],
)
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, level.upper()))
root_logger.handlers.clear()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter("%(message)s"))
console_handler.setLevel(getattr(logging, level.upper()))
root_logger.addHandler(console_handler)
structlog.configure(
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, level.upper())
),
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
processors=[
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S.%f"),
structlog.processors.JSONRenderer(),
],
)
if log_file:
_async_file_handler = _AsyncFileHandler(log_file)
_async_file_handler.setFormatter(logging.Formatter("%(message)s"))
_async_file_handler.setLevel(getattr(logging, level.upper()))
root_logger.addHandler(_async_file_handler)
root_logger.propagate = False
async def close_logging() -> None:
"""Flush and close the async file handler."""
global _async_file_handler
if _async_file_handler:
await _async_file_handler._flush()
_async_file_handler = None

View File

@ -5,7 +5,6 @@ fused_engine:
signal_threshold_bps: 2
excluded_currencies: [EUR, BRL]
hold_currencies: [USDT, USDC, USD1]
send_signals: true
ws_url: wss://ws-api-spot.kucoin.com
token_url: https://api.kucoin.com/api/v1/bullet-public
reconnect_base_delay: 1.0
@ -18,16 +17,6 @@ fused_engine:
USDC: 5
USD1: 5
executor:
fill_timeout_ms: 1000
log_level: INFO
log_file: /tmp/executor.log
socket_path: /tmp/executor.sock
concurrent_slots: 1
enforce_same_base_isolation: true
enforce_pair_isolation: true
rest_port: 8002
kucoin_api_key: ""
kucoin_api_secret: ""
kucoin_api_passphrase: ""

View File

@ -1,9 +0,0 @@
"""
Executor package.
Re-exports the Executor class used by the fused_engine process.
"""
from executor.executor import Executor
__all__ = ["Executor"]

View File

@ -1,132 +0,0 @@
"""
Executor process entry point.
Starts the Unix-socket signal server, REST API control interface,
and orchestrates clean shutdown on SIGTERM/SIGINT.
"""
import asyncio
import signal
from pathlib import Path
from typing import Optional
import structlog
import uvicorn
from common.log import configure_logging
from executor.config import Settings
from executor.executor import Executor
from executor.kucoin_api import KuCoinAPI
from executor.rest_api import create_app
from executor.socket_server import SignalSocketServer
from executor.ws_client import KuCoinWSClient
async def main() -> None:
config_path = Path("config.yaml")
settings = await Settings.from_yaml(config_path) if config_path.exists() else Settings()
configure_logging(settings.executor.log_level) # file I/O handled by Executor's _DualLogger
log = structlog.get_logger().bind(component="executor")
log.info("executor_starting", live_mode=settings.live_mode)
# Always initialise KuCoinAPI even in paper mode — symbol metadata is
# needed for size/precision validation regardless of execution mode.
api = KuCoinAPI(
api_key=settings.kucoin_api_key,
api_secret=settings.kucoin_api_secret,
api_passphrase=settings.kucoin_api_passphrase,
)
await api.fetch_symbols()
ws_client: Optional[KuCoinWSClient] = None
if settings.live_mode:
# Live mode requires the private WebSocket client to receive fill events.
ws_client = KuCoinWSClient(
kucoin_api=api,
private_token_url=settings.executor.private_token_url,
)
executor = Executor(
kucoin_api=api,
settings=settings.executor,
ws_client=ws_client,
log_file=settings.executor.log_file,
live_mode=settings.live_mode,
)
await executor.start()
should_exit = asyncio.Event()
def shutdown_callback() -> None:
should_exit.set()
rest_app = create_app(executor, shutdown_callback=shutdown_callback)
rest_config = uvicorn.Config(
rest_app,
host="127.0.0.1",
port=settings.executor.rest_port,
log_level="warning",
)
rest_server = uvicorn.Server(rest_config)
socket_server = SignalSocketServer(
socket_path=settings.executor.socket_path,
on_signal=executor.handle_signal,
)
socket_task: asyncio.Task | None = None
rest_task: asyncio.Task | None = None
exit_task: asyncio.Task | None = None
ws_task: asyncio.Task | None = None
async def shutdown(sig: signal.Signals) -> None:
"""Clean up on shutdown signal: pause executor, cancel tasks, close server."""
log.info("shutdown_signal_received", signal=sig.name)
await executor.pause()
await executor.close()
if socket_task is not None and not socket_task.done():
socket_task.cancel()
if rest_task is not None:
rest_server.should_exit = True
if ws_client is not None:
await ws_client.stop()
should_exit.set()
loop = asyncio.get_running_loop()
# Register signal handlers so shutdown runs in the asyncio event loop
# rather than in a plain threading context.
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(shutdown(s)))
socket_task = asyncio.create_task(socket_server.start())
rest_task = asyncio.create_task(rest_server.serve())
exit_task = asyncio.create_task(should_exit.wait())
if ws_client is not None:
ws_task = asyncio.create_task(ws_client.start())
log.info(
"executor_ready",
rest_endpoint=f"http://127.0.0.1:{settings.executor.rest_port}",
socket_path=str(settings.executor.socket_path),
live_mode=settings.live_mode,
)
tasks = {t for t in (socket_task, rest_task, exit_task, ws_task) if t is not None}
try:
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
except asyncio.CancelledError:
log.info("executor_cancelled")
finally:
rest_server.should_exit = True
for t in tasks:
if not t.done():
await asyncio.wait({t}, timeout=3.0)
if not t.done():
t.cancel()
log.info("executor_shutdown_complete")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,93 +0,0 @@
"""
Executor configuration loaded from config.yaml.
Defines settings for the triangular arbitrage executor including
Unix socket path, concurrency limits, KuCoin credentials, and logging.
live_mode at the top level controls whether the executor places real
orders (live) or validates via order_test and simulates fills (paper).
"""
import asyncio
import logging
from decimal import Decimal
from pathlib import Path
from typing import Optional
import yaml
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings
log = logging.getLogger("executor-config")
class ExecutorSettings(BaseModel):
"""Settings that control executor runtime behaviour."""
socket_path: Path = Field(
default=Path("/tmp/executor.sock"),
description="Unix domain socket path for fused_engine -> executor",
)
concurrent_slots: int = Field(default=1, ge=1, description="Max concurrent triangle executions")
enforce_same_base_isolation: bool = Field(
default=True,
description="Block concurrent triangles sharing the same base currency",
)
enforce_pair_isolation: bool = Field(
default=True,
description="Block concurrent triangles that share any trading pair symbol",
)
log_file: Path = Field(
default=Path("/tmp/executor.log"),
description="Path to log file",
)
log_level: str = Field(default="INFO", description="Logging level")
rest_port: int = Field(
default=8002,
description="HTTP REST API port (8000=fh_ob, 8001=oe_em)",
)
initial_capital: dict[str, Decimal] = Field(
default_factory=lambda: {"USDT": Decimal("10")},
description="Starting capital per currency. Caps the max_volume for each triangle's primary_quote.",
)
private_token_url: str = Field(
default="https://api.kucoin.com/api/v1/bullet-private",
description="KuCoin private bullet token endpoint",
)
fill_timeout_ms: float = Field(
default=1000,
description="Per-leg fill wait timeout in milliseconds",
)
await_balance: bool = Field(
default=False,
description="Wait for balance WS update before next leg",
)
class Settings(BaseSettings):
"""Top-level settings parsed from config.yaml."""
# live_mode is the master switch: False = paper (order_test + simulated fills),
# True = real orders on KuCoin. Set at top level of config.yaml.
live_mode: bool = Field(default=False, description="false = paper; true = live orders")
executor: ExecutorSettings = Field(default_factory=ExecutorSettings)
kucoin_api_key: str = Field(default="", description="KuCoin API key")
kucoin_api_secret: str = Field(default="", description="KuCoin API secret")
kucoin_api_passphrase: str = Field(default="", description="KuCoin API passphrase")
@classmethod
async def from_yaml(cls, path: Path) -> "Settings":
"""Load settings from a YAML file, ignoring unknown keys."""
loop = asyncio.get_running_loop()
def _read() -> dict:
with open(path) as f:
return yaml.safe_load(f) or {}
data = await loop.run_in_executor(None, _read)
known = {"live_mode", "fused_engine", "executor", "kucoin_api_key", "kucoin_api_secret", "kucoin_api_passphrase", "kcs_discount_active"}
unknown = set(data.keys()) - known
if unknown:
log.warning("ignoring unknown config keys: %s", sorted(unknown))
return cls(**data)
model_config = {"env_prefix": "TRIArb_", "extra": "ignore"}

File diff suppressed because it is too large Load Diff

View File

@ -1,457 +0,0 @@
"""
KuCoin REST API client for the executor.
Covers symbol metadata fetch, HF order-test endpoint for paper mode,
real order placement for live mode, and private WebSocket token
acquisition required by ws_client.py.
"""
import aiohttp
import base64
import hmac
import hashlib
import json
import time
import uuid
import structlog
from decimal import Decimal
from typing import Optional
logger = structlog.get_logger().bind(component="executor-kucoin")
KUCoin_SYMBOLS_URL = "https://api.kucoin.com/api/v1/symbols"
KUCoin_ORDER_TEST_URL = "https://api.kucoin.com/api/v1/hf/orders/test"
KUCoin_ORDER_PLACE_URL = "https://api.kucoin.com/api/v1/hf/orders"
KUCoin_SERVER_TIME_URL = "https://api.kucoin.com/api/v1/time"
KUCoin_BULLET_PRIVATE_URL = "https://api.kucoin.com/api/v1/bullet-private"
def _signRequest(timestamp: str, method: str, path: str, secret: str, data: str = "") -> str:
"""
Compute the HMAC-SHA256 signature for a KuCoin API request.
Parameters
----------
timestamp : str
Millisecond Unix timestamp.
method : str
HTTP method (e.g. "POST").
path : str
API endpoint path (e.g. "/api/v1/hf/orders/test").
secret : str
API secret as returned by the user.
data : str
Request body. Empty string for GET requests.
Returns
-------
str
Base64-encoded HMAC-SHA256 digest suitable for the KC-API-SIGN header.
"""
# KuCoin signs timestamp + method + path + body concatenated.
message = timestamp + method + path + data
mac = hmac.new(secret.encode("utf-8"), message.encode("utf-8"), hashlib.sha256)
return base64.b64encode(mac.digest()).decode("utf-8")
def _encryptPassphrase(passphrase: str, secret: str) -> str:
"""
Encrypt the API passphrase for KuCoin.
KuCoin requires the passphrase to be encrypted with HMAC-SHA256
using the API secret as the key, then base64-encoded.
Returns
-------
str
Base64-encoded encrypted passphrase for the KC-API-PASSPHRASE header.
"""
mac = hmac.new(secret.encode("utf-8"), passphrase.encode("utf-8"), hashlib.sha256)
return base64.b64encode(mac.digest()).decode("utf-8")
class SymbolMeta:
"""
Cached metadata for a single KuCoin trading pair.
Fetched once at startup from /api/v1/symbols and held in memory
for the lifetime of the process.
"""
def __init__(
self,
symbol: str,
base: str,
quote: str,
base_min_size: Decimal,
quote_min_size: Decimal,
base_increment: Decimal,
quote_increment: Decimal,
min_funds: Decimal,
taker_fee: Decimal,
maker_fee: Decimal,
incr_min_size: Decimal,
base_precision: int,
quote_precision: int,
) -> None:
self.symbol = symbol
self.base = base
self.quote = quote
self.base_min_size = base_min_size
self.quote_min_size = quote_min_size
self.base_increment = base_increment
self.quote_increment = quote_increment
self.min_funds = min_funds
self.taker_fee = taker_fee
self.maker_fee = maker_fee
self.incr_min_size = incr_min_size
self.base_precision = base_precision
self.quote_precision = quote_precision
def _cost_to_precision(amount: Decimal, quote_increment: Decimal) -> Decimal:
"""
Round a quote-currency amount *up* to the nearest valid increment.
KuCoin requires quote amounts to be multiples of quote_increment. When
computing the minimum order size in quote currency we therefore round up
so we never underestimate the minimum.
"""
if quote_increment <= 0:
return amount
mult = Decimal("1") / quote_increment
return (amount * mult).to_integral_value(rounding="ROUND_UP") / mult
def _amount_to_precision(
amount: Decimal,
base_increment: Decimal,
round_down: bool = False,
) -> Decimal:
"""
Round a base-currency amount to the nearest valid increment.
Use round_down=True for sell orders so the resulting size never
exceeds the available balance. Use round_down=False (default) for
buy orders and minimum-size computations where rounding up is safe.
"""
if base_increment <= 0:
return amount
mult = Decimal("1") / base_increment
rounding = "ROUND_DOWN" if round_down else "ROUND_UP"
return (amount * mult).to_integral_value(rounding=rounding) / mult
class KuCoinAPI:
"""
Thin client for KuCoin REST API calls needed by the executor.
Handles HMAC signing, symbol metadata caching, order placement
(live) and order validation (paper), plus private token fetch
for the WebSocket client.
"""
def __init__(self, api_key: str = "", api_secret: str = "", api_passphrase: str = "") -> None:
self._api_key = api_key
self._api_secret = api_secret
self._api_passphrase = api_passphrase
# Pre-encrypt the passphrase at init time so we don't recompute per request.
self._encrypted_passphrase = _encryptPassphrase(api_passphrase, api_secret) if api_passphrase and api_secret else ""
self._symbols: dict[str, SymbolMeta] = {}
self._log = logger
async def fetch_symbols(self) -> None:
"""
Fetch all trading pair metadata from KuCoin and populate _symbols.
Logs a warning for any symbol that fails to parse and skips it.
"""
async with aiohttp.ClientSession() as session:
async with session.get(KUCoin_SYMBOLS_URL) as resp:
resp.raise_for_status()
payload = await resp.json()
for item in payload.get("data", []):
symbol = item.get("symbol", "")
base = item.get("baseCurrency", "")
quote = item.get("quoteCurrency", "")
if not all([symbol, base, quote]):
continue
raw_base_min = item.get("baseMinSize") or "0"
raw_quote_min = item.get("quoteMinSize") or "0"
raw_base_inc = item.get("baseIncrement") or "0"
raw_quote_inc = item.get("quoteIncrement") or "0"
raw_min_funds = item.get("minFunds") or "0"
raw_maker = item.get("makerFeeRate") or "0"
raw_taker = item.get("takerFeeRate") or "0"
raw_incr_min = item.get("incrMinSize") or "0"
raw_base_prec = item.get("basePrecision") or "0"
raw_quote_prec = item.get("quotePrecision") or "0"
try:
base_min_size = Decimal(raw_base_min)
quote_min_size = Decimal(raw_quote_min)
base_increment = Decimal(raw_base_inc)
quote_increment = Decimal(raw_quote_inc)
min_funds = Decimal(raw_min_funds)
taker_fee = Decimal(raw_taker)
maker_fee = Decimal(raw_maker)
incr_min_size = Decimal(raw_incr_min)
base_precision = int(raw_base_prec) if raw_base_prec else 0
quote_precision = int(raw_quote_prec) if raw_quote_prec else 0
except Exception as e:
self._log.warning("symbol_field_parse_error", symbol=symbol, error=str(e),
baseMinSize=repr(raw_base_min), quoteMinSize=repr(raw_quote_min),
baseIncrement=repr(raw_base_inc), quoteIncrement=repr(raw_quote_inc),
minFunds=repr(raw_min_funds), makerFeeRate=repr(raw_maker),
takerFeeRate=repr(raw_taker), incrMinSize=repr(raw_incr_min),
basePrecision=repr(raw_base_prec), quotePrecision=repr(raw_quote_prec))
continue
self._symbols[symbol] = SymbolMeta(
symbol=symbol,
base=base,
quote=quote,
base_min_size=base_min_size,
quote_min_size=quote_min_size,
base_increment=base_increment,
quote_increment=quote_increment,
min_funds=min_funds,
taker_fee=taker_fee,
maker_fee=maker_fee,
incr_min_size=incr_min_size,
base_precision=base_precision,
quote_precision=quote_precision,
)
self._log.info("symbols_loaded", count=len(self._symbols))
def get_symbol_meta(self, symbol: str) -> Optional[SymbolMeta]:
"""Return cached SymbolMeta for a pair, or None if not yet loaded."""
return self._symbols.get(symbol)
async def order_test(
self,
session: aiohttp.ClientSession,
symbol: str,
side: str,
order_type: str,
price: Optional[Decimal] = None,
size: Optional[Decimal] = None,
funds: Optional[Decimal] = None,
) -> tuple[bool, str, Optional[str]]:
"""
Validate an order against KuCoin's paper-trading endpoint.
In paper mode the executor uses this instead of a real order so that
fill simulation can proceed without risking capital.
Parameters
----------
session : aiohttp.ClientSession
Shared session for connection pooling.
symbol : str
KuCoin symbol e.g. "BTC-USDT".
side : str
"buy" or "sell".
order_type : str
Order type string (e.g. "market").
price, size, funds : Optional[Decimal]
Order parameters; at least one must be provided.
Returns
-------
tuple[bool, str, Optional[str]]
(success, error_message, order_id). order_id is populated on success.
"""
timestamp = str(int(time.time() * 1000))
path = "/api/v1/hf/orders/test"
body: dict = {
"symbol": symbol,
"type": order_type,
"side": side,
"clientOid": f"paper-{timestamp}-{uuid.uuid4().hex[:8]}",
}
if price is not None:
body["price"] = str(price)
if size is not None:
body["size"] = str(size)
if funds is not None:
body["funds"] = str(funds)
raw_body = json.dumps(body, separators=(",", ":"))
sign = _signRequest(timestamp, "POST", path, self._api_secret, raw_body)
headers = {
"KC-API-TIMESTAMP": timestamp,
"KC-API-SIGN": sign,
"KC-API-KEY": self._api_key,
"KC-API-PASSPHRASE": self._encrypted_passphrase,
"KC-API-SIGN-TYPE": "2",
"KC-API-KEY-VERSION": "3",
"Content-Type": "application/json",
}
try:
async with session.post(
KUCoin_ORDER_TEST_URL,
data=raw_body.encode(),
headers=headers,
) as resp:
text = await resp.text()
if resp.status == 200:
data = json.loads(text)
order_id = data.get("data", {}).get("orderId", "")
return True, "", order_id
else:
data = json.loads(text)
code = data.get("code", "")
msg = data.get("msg", text)
return False, f"{code}: {msg}", None
except (json.JSONDecodeError, ValueError) as e:
return False, f"invalid_response: {e}", None
except (aiohttp.ClientError, OSError) as e:
return False, f"http_error: {e}", None
async def order_place(
self,
session: aiohttp.ClientSession,
symbol: str,
side: str,
order_type: str,
price: Optional[Decimal] = None,
size: Optional[Decimal] = None,
funds: Optional[Decimal] = None,
client_oid: str = "",
) -> tuple[bool, str, Optional[str]]:
"""
Place a real market order on KuCoin.
Parameters
----------
session : aiohttp.ClientSession
Shared session for connection pooling.
symbol : str
KuCoin symbol e.g. "BTC-USDT".
side : str
"buy" or "sell".
order_type : str
Order type string (e.g. "market").
price, size, funds : Optional[Decimal]
Order parameters; at least one must be provided.
client_oid : str
Client-order ID for matching with WS fill events.
Returns
-------
tuple[bool, str, Optional[str]]
(success, error_message, order_id). order_id is populated on success.
"""
timestamp = str(int(time.time() * 1000))
path = "/api/v1/hf/orders"
body: dict = {
"symbol": symbol,
"type": order_type,
"side": side,
"clientOid": client_oid or f"live-{timestamp}-{uuid.uuid4().hex[:8]}",
}
if price is not None:
body["price"] = str(price)
if size is not None:
body["size"] = str(size)
if funds is not None:
body["funds"] = str(funds)
raw_body = json.dumps(body, separators=(",", ":"))
sign = _signRequest(timestamp, "POST", path, self._api_secret, raw_body)
headers = {
"KC-API-TIMESTAMP": timestamp,
"KC-API-SIGN": sign,
"KC-API-KEY": self._api_key,
"KC-API-PASSPHRASE": self._encrypted_passphrase,
"KC-API-SIGN-TYPE": "2",
"KC-API-KEY-VERSION": "3",
"Content-Type": "application/json",
}
try:
async with session.post(
KUCoin_ORDER_PLACE_URL,
data=raw_body.encode(),
headers=headers,
) as resp:
text = await resp.text()
self._log.info("order_place_raw_response", status=resp.status, response=text[:300])
data = json.loads(text)
if resp.status == 200 and data.get("code") == "200000":
order_id = data.get("data", {}).get("orderId", "")
return True, "", order_id
else:
code = data.get("code", resp.status)
msg = data.get("msg", text)
return False, f"{code}: {msg}", None
except (json.JSONDecodeError, ValueError) as e:
return False, f"invalid_response: {e}", None
except (aiohttp.ClientError, OSError) as e:
return False, f"http_error: {e}", None
async def get_private_token(self, session: aiohttp.ClientSession) -> Optional[dict]:
"""
Request a private WebSocket token from KuCoin.
Returns the full data dict from the bullet-private response, containing
'token', 'instanceServers' with endpoint, pingInterval, pingTimeout.
Returns None on error.
"""
timestamp = str(int(time.time() * 1000))
path = "/api/v1/bullet-private"
sign = _signRequest(timestamp, "POST", path, self._api_secret, "")
headers = {
"KC-API-TIMESTAMP": timestamp,
"KC-API-SIGN": sign,
"KC-API-KEY": self._api_key,
"KC-API-PASSPHRASE": self._encrypted_passphrase,
"KC-API-SIGN-TYPE": "2",
"KC-API-KEY-VERSION": "3",
"Content-Type": "application/json",
}
try:
async with session.post(
KUCoin_BULLET_PRIVATE_URL,
data=b"",
headers=headers,
) as resp:
if resp.status == 200:
data = await resp.json()
return data.get("data")
else:
text = await resp.text()
self._log.error("private_token_failed", status=resp.status, response=text[:200])
return None
except (aiohttp.ClientError, OSError) as e:
self._log.error("private_token_http_error", error=str(e))
return None
async def warmup_session(self, session: aiohttp.ClientSession) -> None:
"""Warm up the authenticated HTTP connection pool with a minimal GET."""
timestamp = str(int(time.time() * 1000))
path = "/api/v1/accounts"
sign = _signRequest(timestamp, "GET", path, self._api_secret, "")
headers = {
"KC-API-TIMESTAMP": timestamp,
"KC-API-SIGN": sign,
"KC-API-KEY": self._api_key,
"KC-API-PASSPHRASE": self._encrypted_passphrase,
"KC-API-SIGN-TYPE": "2",
"KC-API-KEY-VERSION": "3",
}
try:
async with session.get(
f"https://api.kucoin.com{path}",
headers=headers,
) as resp:
await resp.read()
pass
except Exception as e:
self._log.warning("session_warmup_failed", error=str(e))

View File

@ -1,171 +0,0 @@
"""
FastAPI control interface for the executor.
Exposes endpoints for status inspection, configuration changes, execution
cancellation, and pause/resume. Intended for operator use; not used in the
normal signal flow.
"""
from typing import Optional
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from executor.executor import Executor
class ConfigResponse(BaseModel):
"""Current executor configuration."""
live_mode: bool
concurrent_slots: int
enforce_same_base_isolation: bool
socket_path: str
class ConfigPatchRequest(BaseModel):
"""Request body for configuration update endpoints."""
live_mode: Optional[bool] = None
concurrent_slots: Optional[int] = None
enforce_same_base_isolation: Optional[bool] = None
class ConfigPatchResponse(BaseModel):
"""Response after a configuration update, echoing the new values."""
ok: bool
live_mode: Optional[bool] = None
concurrent_slots: Optional[int] = None
enforce_same_base_isolation: Optional[bool] = None
class PauseResponse(BaseModel):
"""Response after a pause request."""
ok: bool
class ResumeResponse(BaseModel):
"""Response after a resume request."""
ok: bool
class CancelResponse(BaseModel):
"""Response after a cancellation request."""
ok: bool
message: str
class StatusResponse(BaseModel):
"""Runtime status of the executor."""
status: str
version: str
live_mode: bool
slots_used: int
slots_total: int
uptime_seconds: float
class ShutdownResponse(BaseModel):
"""Response after a shutdown request."""
ok: bool
def create_app(executor: Executor, shutdown_callback) -> FastAPI:
"""
Build the FastAPI application and wire all routes to the executor.
Parameters
----------
executor : Executor
The live Executor instance to control.
shutdown_callback : callable
Called when /api/v1/shutdown is hit to begin clean shutdown.
Returns
-------
FastAPI
Configured app ready to be passed to uvicorn.
"""
app = FastAPI(title="Executor API", description="Control interface for the triangular arbitrage executor")
@app.get("/api/v1/status", response_model=StatusResponse)
async def get_status() -> StatusResponse:
config = executor.get_config()
in_flight = executor.get_in_flight()
return StatusResponse(
status="ok",
version="0.1.0",
live_mode=config["live_mode"],
slots_used=len(in_flight),
slots_total=config["concurrent_slots"],
uptime_seconds=executor.get_uptime_seconds(),
)
@app.get("/api/v1/config", response_model=ConfigResponse)
async def get_config() -> ConfigResponse:
cfg = executor.get_config()
return ConfigResponse(
paper_mode=cfg["paper_mode"],
concurrent_slots=cfg["concurrent_slots"],
enforce_same_base_isolation=cfg["enforce_same_base_isolation"],
socket_path=cfg["socket_path"],
)
@app.patch("/api/v1/config/live_mode", response_model=ConfigPatchResponse)
async def patch_live_mode(req: ConfigPatchRequest) -> ConfigPatchResponse:
if req.live_mode is None:
raise HTTPException(status_code=400, detail="live_mode required")
executor._live_mode = req.live_mode
return ConfigPatchResponse(ok=True, live_mode=req.live_mode)
@app.patch("/api/v1/config/concurrent_slots", response_model=ConfigPatchResponse)
async def patch_concurrent_slots(req: ConfigPatchRequest) -> ConfigPatchResponse:
if req.concurrent_slots is None or req.concurrent_slots < 1:
raise HTTPException(status_code=400, detail="concurrent_slots must be >= 1")
executor.set_concurrent_slots(req.concurrent_slots)
return ConfigPatchResponse(ok=True, concurrent_slots=req.concurrent_slots)
@app.patch("/api/v1/config/enforce_same_base_isolation", response_model=ConfigPatchResponse)
async def patch_isolation(req: ConfigPatchRequest) -> ConfigPatchResponse:
if req.enforce_same_base_isolation is None:
raise HTTPException(status_code=400, detail="enforce_same_base_isolation required")
executor._settings.enforce_same_base_isolation = req.enforce_same_base_isolation
return ConfigPatchResponse(ok=True, enforce_same_base_isolation=req.enforce_same_base_isolation)
@app.get("/api/v1/executions")
async def get_executions() -> list[dict]:
return executor.get_in_flight()
@app.get("/api/v1/executions/{correlation_id}")
async def get_execution(correlation_id: str) -> dict:
in_flight = executor.get_in_flight()
for inf in in_flight:
if inf["correlation_id"] == correlation_id:
return inf
raise HTTPException(status_code=404, detail="Execution not found")
@app.post("/api/v1/cancel/{correlation_id}", response_model=CancelResponse)
async def cancel_execution(correlation_id: str) -> CancelResponse:
ok = await executor.cancel_execution(correlation_id)
if ok:
return CancelResponse(ok=True, message=f"Cancellation requested for {correlation_id}")
raise HTTPException(status_code=404, detail="Execution not found")
@app.post("/api/v1/pause", response_model=PauseResponse)
async def pause() -> PauseResponse:
await executor.pause()
return PauseResponse(ok=True)
@app.post("/api/v1/resume", response_model=ResumeResponse)
async def resume() -> ResumeResponse:
await executor.resume()
return ResumeResponse(ok=True)
@app.post("/api/v1/shutdown", response_model=ShutdownResponse)
async def shutdown() -> ShutdownResponse:
await executor.pause()
shutdown_callback()
return ShutdownResponse(ok=True)
@app.get("/api/v1/reports")
async def get_reports(limit: int = 50) -> list[dict]:
return executor.get_reports(limit=limit)
return app

View File

@ -1,100 +0,0 @@
"""
Unix-domain socket server that receives opportunity signals from oe_em.
Each JSON line on the socket is parsed and dispatched as an asyncio task
to avoid blocking the reader.
"""
import asyncio
import json
import time
from pathlib import Path
import structlog
logger = structlog.get_logger().bind(component="signal_socket_server")
class SignalSocketServer:
"""
Accepts JSON-serialized signals over a Unix domain socket.
Every valid "signal" message is wrapped in create_task so processing
is concurrent and a slow handler never blocks new connections.
"""
def __init__(self, socket_path: Path, on_signal) -> None:
self._socket_path = socket_path
self._on_signal = on_signal
self._log = logger
self._server: asyncio.Server | None = None
self._running = False
async def start(self) -> None:
"""Remove any stale socket file and start accepting connections."""
if self._socket_path.exists():
self._socket_path.unlink()
self._running = True
self._server = await asyncio.start_unix_server(
self._accept_client,
path=str(self._socket_path),
)
self._log.info("signal_socket_server_started", path=str(self._socket_path))
async with self._server:
await self._server.serve_forever()
async def stop(self) -> None:
"""Stop the server and remove the socket file."""
self._running = False
if self._server:
self._server.close()
await self._server.wait_closed()
if self._socket_path.exists():
self._socket_path.unlink()
self._log.info("signal_socket_server_stopped")
async def _accept_client(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
"""
Handle one client connection: read lines, parse JSON, dispatch signals.
The client is assumed to be oe_em's SignalSocketClient. Any line that
is not JSON or is not type="signal" is silently ignored.
"""
self._log.info("client_connected", addr=writer.get_extra_info("peername"))
try:
while self._running:
try:
line = await reader.readline()
except (ConnectionResetError, BrokenPipeError, asyncio.CancelledError):
break
except Exception:
break
if not line:
break
arrived_ms = int(time.time() * 1000)
try:
data = json.loads(line.decode())
except (json.JSONDecodeError, UnicodeDecodeError) as e:
self._log.warning("invalid_json", line=line[:50], error=str(e))
continue
if data.get("type") == "signal":
data["_receiver_ts_ms"] = arrived_ms
asyncio.create_task(self._on_signal(data))
else:
self._log.debug("ignored_non_signal", type=data.get("type"))
except asyncio.CancelledError:
pass
except Exception as e:
self._log.warning("client_error", error=str(e))
finally:
writer.close()
try:
await asyncio.wait_for(writer.wait_closed(), timeout=1.0)
except Exception:
pass
self._log.info("client_disconnected")

View File

@ -1,483 +0,0 @@
"""
KuCoin private WebSocket client for fill event delivery.
Manages the private WebSocket connection, authenticates via bullet-private,
subscribes to /spotMarket/tradeOrdersV2, and dispatches fill events to
waiting executor coroutines.
KuCoin market orders placed with ``funds`` often complete with a tiny
unfilled remainder, emitting ``canceled`` + ``status=done`` rather than
``filled``. The message handler in ``_handle_message`` treats this
terminal combination as a successful fill when match events have been
accumulated.
"""
import asyncio
import json
import time
import uuid
from decimal import Decimal
from typing import Optional
import aiohttp
import structlog
import websockets
from executor.kucoin_api import KuCoinAPI
logger = structlog.get_logger().bind(component="executor-ws")
_D0 = Decimal("0")
class FillAccumulator:
"""Accumulate match events for a single order, compute aggregated totals."""
def __init__(self, client_oid: str, order_id: str = "") -> None:
self.client_oid = client_oid
self.order_id = order_id
self.total_size = _D0
self.total_funds = _D0
self.match_count = 0
self.side = ""
self.symbol = ""
self._done = False
def add_match(self, data: dict) -> None:
match_price = Decimal(str(data.get("matchPrice", "0")))
match_size = Decimal(str(data.get("matchSize", "0")))
self.total_size += match_size
self.total_funds += match_price * match_size
self.match_count += 1
if not self.side:
self.side = data.get("side", "")
if not self.symbol:
self.symbol = data.get("symbol", "")
if not self.order_id:
self.order_id = data.get("orderId", "")
@property
def weighted_avg_price(self) -> Decimal:
if self.total_size <= 0:
return _D0
return self.total_funds / self.total_size
def to_dict(self) -> dict:
return {
"total_size": self.total_size,
"total_funds": self.total_funds,
"weighted_avg_price": self.weighted_avg_price,
"match_count": self.match_count,
"order_id": self.order_id,
"side": self.side,
"symbol": self.symbol,
}
class KuCoinWSClient:
"""
Private WebSocket client for KuCoin execution events.
Subscribes to /spotMarket/tradeOrdersV2 (global topic) and dispatches
fill events to awaiting executor coroutines via await_fill().
"""
def __init__(
self,
kucoin_api: KuCoinAPI,
private_token_url: str = "https://api.kucoin.com/api/v1/bullet-private",
reconnect_base_delay: float = 1.0,
reconnect_max_delay: float = 30.0,
) -> None:
self._api = kucoin_api
self._private_token_url = private_token_url
self._reconnect_base_delay = reconnect_base_delay
self._reconnect_max_delay = reconnect_max_delay
self._reconnect_delay = reconnect_base_delay
self._log = logger
self._running = False
self._ws: Optional[websockets.WebSocketClientProtocol] = None
self._connected = False
self._ping_interval: float = 18.0
self._ping_timeout: float = 10.0
self._fill_futures: dict[str, asyncio.Future] = {}
self._accumulators: dict[str, FillAccumulator] = {}
self._balance_futures: dict[str, asyncio.Future] = {}
self._latest_balance: dict[str, Decimal] = {}
self._worker_task: Optional[asyncio.Task] = None
self._pending_acks: dict[str, asyncio.Event] = {}
@property
def is_connected(self) -> bool:
return self._connected
async def start(self) -> None:
"""Start the WebSocket connection worker with reconnection loop."""
self._running = True
self._worker_task = asyncio.create_task(self._connection_worker())
try:
await self._worker_task
except asyncio.CancelledError:
pass
async def stop(self) -> None:
"""Stop the WebSocket connection and resolve any pending futures."""
self._running = False
for future in self._fill_futures.values():
if not future.done():
future.set_result((False, {}))
self._fill_futures.clear()
self._accumulators.clear()
for future in self._balance_futures.values():
if not future.done():
future.set_result(False)
self._balance_futures.clear()
for evt in self._pending_acks.values():
evt.set()
self._pending_acks.clear()
if self._worker_task is not None and not self._worker_task.done():
self._worker_task.cancel()
if self._ws is not None:
try:
await self._ws.close()
except Exception:
pass
self._connected = False
self._log.debug("ws_client_stopped")
async def await_fill(
self, client_oid: str, timeout_ms: float
) -> tuple[bool, dict]:
"""
Wait for the order identified by client_oid to be fully filled.
Parameters
----------
client_oid : str
The client-order ID used when placing the order.
timeout_ms : float
Maximum wait time in milliseconds.
Returns
-------
tuple[bool, dict]
(success, aggregated_fill_data) on fill.
(False, {}) on timeout, failure, or disconnected.
"""
if not self._connected:
self._log.warning(
"await_fill_not_connected",
client_oid=client_oid,
)
return (False, {})
future: asyncio.Future = asyncio.get_event_loop().create_future()
self._fill_futures[client_oid] = future
if client_oid in self._accumulators and self._accumulators[client_oid]._done:
acc = self._accumulators[client_oid]
result = (True, acc.to_dict())
del self._accumulators[client_oid]
self._fill_futures.pop(client_oid, None)
return result
try:
await asyncio.wait_for(future, timeout=timeout_ms / 1000.0)
except asyncio.TimeoutError:
self._fill_futures.pop(client_oid, None)
self._log.warning(
"fill_timeout",
client_oid=client_oid,
timeout_ms=timeout_ms,
accumulator=(
self._accumulators.get(client_oid).to_dict()
if client_oid in self._accumulators
else None
),
)
return (False, {})
except asyncio.CancelledError:
self._fill_futures.pop(client_oid, None)
raise
result = future.result()
self._fill_futures.pop(client_oid, None)
if client_oid in self._accumulators:
del self._accumulators[client_oid]
return result
async def await_balance(
self, currency: str, min_available: Decimal, timeout_ms: float
) -> bool:
"""
Wait until the available balance for *currency* reaches at least *min_available*.
If the latest known balance already meets the threshold, returns immediately.
Otherwise registers a future and waits for a balance WS event.
Returns True once the threshold is met, False on timeout.
"""
key = currency.upper()
current = self._latest_balance.get(key, _D0)
if current >= min_available:
return True
future: asyncio.Future = asyncio.Future()
self._balance_futures[key] = future
try:
await asyncio.wait_for(future, timeout=timeout_ms / 1000.0)
return True
except asyncio.TimeoutError:
self._log.warning(
"await_balance_timeout",
currency=key,
min_available=str(min_available),
latest=str(self._latest_balance.get(key, _D0)),
)
return False
except asyncio.CancelledError:
raise
finally:
if self._balance_futures.get(key) is future:
del self._balance_futures[key]
def latest_balance(self, currency: str) -> Decimal:
"""Return the latest known available balance for *currency*, or 0."""
return self._latest_balance.get(currency.upper(), _D0)
async def _connection_worker(self) -> None:
"""Main connection loop with exponential backoff reconnection."""
while self._running:
try:
await self._connect_and_run()
except asyncio.CancelledError:
break
except Exception as e:
if not self._running:
break
self._connected = False
self._log.warning(
"ws_reconnecting",
error=str(e),
delay=self._reconnect_delay,
)
await asyncio.sleep(self._reconnect_delay)
self._reconnect_delay = min(
self._reconnect_delay * 2,
self._reconnect_max_delay,
)
async def _connect_and_run(self) -> None:
"""Authenticate, connect, subscribe, and run the message loop."""
async with aiohttp.ClientSession() as session:
token_data = await self._api.get_private_token(session)
if not token_data:
raise RuntimeError("Failed to obtain private token")
token = token_data.get("token", "")
servers = token_data.get("instanceServers", [])
if not servers:
raise RuntimeError("No instance servers in token response")
server = servers[0]
endpoint = server.get("endpoint", "")
self._ping_interval = server.get("pingInterval", 18000) / 1000.0
self._ping_timeout = server.get("pingTimeout", 10000) / 1000.0
ws_url = f"{endpoint}?token={token}&connectId={uuid.uuid4()}"
self._log.debug("ws_connecting", url=ws_url[:80])
self._ws = await websockets.connect(
ws_url,
ping_interval=self._ping_interval,
ping_timeout=self._ping_timeout,
)
self._connected = True
self._reconnect_delay = self._reconnect_base_delay
self._log.info("ws_connected")
sub_task = asyncio.create_task(self._subscribe())
try:
async for msg in self._ws:
await self._handle_message(msg)
except websockets.ConnectionClosed as e:
self._log.warning(
"ws_connection_closed",
code=e.code,
reason=e.reason,
)
except asyncio.CancelledError:
sub_task.cancel()
raise
except Exception as e:
self._log.error("ws_message_loop_error", error=str(e))
finally:
self._connected = False
async def _subscribe(self) -> None:
"""Subscribe to tradeOrdersV2 and balance channels."""
if self._ws is None:
return
ack_id1 = str(int(time.time() * 1000))
evt1 = asyncio.Event()
self._pending_acks[ack_id1] = evt1
sub_msg = {
"id": int(ack_id1),
"type": "subscribe",
"topic": "/spotMarket/tradeOrdersV2",
"response": True,
"privateChannel": "true",
}
await self._ws.send(json.dumps(sub_msg))
self._log.info("subscribe_sent", topic="/spotMarket/tradeOrdersV2")
ack_id2 = str(int(time.time() * 1000) + 1)
evt2 = asyncio.Event()
self._pending_acks[ack_id2] = evt2
bal_msg = {
"id": int(ack_id2),
"type": "subscribe",
"topic": "/account/balance",
"response": True,
"privateChannel": "true",
}
await self._ws.send(json.dumps(bal_msg))
self._log.info("bal_subscribe_sent", topic="/account/balance")
await asyncio.wait_for(evt2.wait(), timeout=5.0)
self._log.info("bal_subscribe_ack_received")
async def _handle_message(self, msg: str) -> None:
"""Parse incoming WS message and dispatch fill events."""
try:
data = json.loads(msg)
except json.JSONDecodeError:
self._log.warning("ws_raw_message_parse_error", raw=msg[:500])
return
msg_type = data.get("type")
if msg_type == "welcome":
return
if msg_type == "pong":
return
if msg_type == "ack":
ack_id = str(data.get("id", ""))
evt = self._pending_acks.pop(ack_id, None)
if evt is not None:
evt.set()
return
subject = data.get("subject", "")
if subject == "account.balance":
payload = data.get("data", {})
currency = (payload.get("currency", "")).upper()
available_raw = payload.get("available")
if currency and available_raw is not None:
available = Decimal(str(available_raw))
self._latest_balance[currency] = available
self._log.debug("balance_update", currency=currency, available=str(available))
future = self._balance_futures.get(currency)
if future is not None and not future.done():
future.set_result(True)
return
if subject != "orderChange":
return
payload = data.get("data", {})
event_type = payload.get("type", "")
client_oid = payload.get("clientOid", "")
order_id = payload.get("orderId", "")
status = payload.get("status", "")
if not client_oid:
return
if event_type == "match":
if client_oid not in self._accumulators:
self._accumulators[client_oid] = FillAccumulator(client_oid, order_id)
self._accumulators[client_oid].add_match(payload)
elif event_type == "filled" and status == "done":
if client_oid in self._accumulators:
acc = self._accumulators[client_oid]
acc.order_id = order_id or acc.order_id
fill_data = acc.to_dict()
self._log.debug(
"fill_received",
client_oid=client_oid,
order_id=order_id,
total_size=fill_data["total_size"],
total_funds=fill_data["total_funds"],
avg_price=fill_data["weighted_avg_price"],
match_count=fill_data["match_count"],
)
else:
# Shouldn't happen in normal flow, but handle defensively.
fill_data = {
"total_size": Decimal(str(payload.get("filledSize", "0"))),
"total_funds": _D0,
"weighted_avg_price": _D0,
"match_count": 0,
"order_id": order_id,
"side": payload.get("side", ""),
"symbol": payload.get("symbol", ""),
}
self._log.warning(
"filled_without_matches",
client_oid=client_oid,
order_id=order_id,
)
if client_oid in self._fill_futures:
future = self._fill_futures[client_oid]
if not future.done():
future.set_result((True, fill_data))
elif client_oid in self._accumulators:
self._accumulators[client_oid]._done = True
elif event_type in ("canceled", "failed"):
self._log.warning(
"ws_terminal_event_full_payload",
client_oid=client_oid,
event_type=event_type,
status=status,
full_payload=json.dumps(payload, indent=2),
)
# Market orders with `funds` send type="canceled" + status="done" when
# the order completes with a tiny remainder. If we have accumulated
# matches, this is actually a successful fill.
if (
event_type == "canceled"
and status == "done"
and client_oid in self._accumulators
):
acc = self._accumulators[client_oid]
if acc.match_count > 0:
acc.order_id = order_id or acc.order_id
fill_data = acc.to_dict()
self._log.info(
"fill_via_cancel_done",
client_oid=client_oid,
order_id=order_id,
total_size=fill_data["total_size"],
total_funds=fill_data["total_funds"],
avg_price=fill_data["weighted_avg_price"],
match_count=fill_data["match_count"],
)
if client_oid in self._fill_futures:
future = self._fill_futures[client_oid]
if not future.done():
future.set_result((True, fill_data))
elif client_oid in self._accumulators:
self._accumulators[client_oid]._done = True
return
if client_oid in self._fill_futures:
future = self._fill_futures[client_oid]
if not future.done():
future.set_result((False, {}))

View File

@ -62,12 +62,6 @@ static void handle_value(parse_state_t *st, const char *val) {
}
} else if (strcmp(key, "log_level") == 0) {
copy_string(val, st->cfg->log_level, sizeof(st->cfg->log_level));
} else if (strcmp(key, "socket_path") == 0) {
copy_string(val, st->cfg->socket_path, sizeof(st->cfg->socket_path));
} else if (strcmp(key, "rest_host") == 0) {
copy_string(val, st->cfg->rest_host, sizeof(st->cfg->rest_host));
} else if (strcmp(key, "rest_port") == 0) {
st->cfg->rest_port = atoi(val);
} else if (strcmp(key, "ws_url") == 0) {
copy_string(val, st->cfg->ws_url, sizeof(st->cfg->ws_url));
} else if (strcmp(key, "token_url") == 0) {
@ -82,10 +76,6 @@ static void handle_value(parse_state_t *st, const char *val) {
st->cfg->signal_threshold_bps = atof(val);
} else if (strcmp(key, "kcs_discount_active") == 0) {
st->cfg->kcs_discount_active = (strcmp(val, "true") == 0 || strcmp(val, "yes") == 0);
} else if (strcmp(key, "executor_socket_path") == 0) {
copy_string(val, st->cfg->executor_socket_path, sizeof(st->cfg->executor_socket_path));
} else if (strcmp(key, "send_signals") == 0) {
st->cfg->send_signals = (strcmp(val, "true") == 0 || strcmp(val, "yes") == 0);
} else if (strcmp(key, "cooldown_seconds") == 0) {
st->cfg->cooldown_seconds = atof(val);
} else if (strcmp(key, "stats_interval_seconds") == 0) {
@ -121,9 +111,6 @@ int config_load(const char *path, config_t *cfg) {
memset(cfg, 0, sizeof(config_t));
copy_string("INFO", cfg->log_level, sizeof(cfg->log_level));
copy_string("/tmp/fh_ob.sock", cfg->socket_path, sizeof(cfg->socket_path));
copy_string("0.0.0.0", cfg->rest_host, sizeof(cfg->rest_host));
cfg->rest_port = 8000;
copy_string("wss://ws-api-spot.kucoin.com", cfg->ws_url, sizeof(cfg->ws_url));
copy_string("https://api.kucoin.com/api/v1/bullet-public", cfg->token_url, sizeof(cfg->token_url));
cfg->reconnect_base_delay = 1.0;
@ -134,8 +121,6 @@ int config_load(const char *path, config_t *cfg) {
copy_string("USDT", cfg->hold_currencies[0], CURRENCY_NAME_LEN);
cfg->hold_currency_count = 1;
cfg->kcs_discount_active = false;
copy_string("/tmp/executor.sock", cfg->executor_socket_path, sizeof(cfg->executor_socket_path));
cfg->send_signals = false;
cfg->cooldown_seconds = 0.0;
cfg->stats_interval_seconds = 60.0;
cfg->live_mode = false;
@ -174,8 +159,10 @@ int config_load(const char *path, config_t *cfg) {
st.expect_key = false;
} else {
if (st.cfg->initial_capital_count < MAX_CAPITAL_ENTRIES) {
strncpy(st.cfg->initial_capital[st.cfg->initial_capital_count].currency,
st.capital_currency, CURRENCY_NAME_LEN - 1);
size_t _cl = strlen(st.capital_currency);
if (_cl >= sizeof(st.cfg->initial_capital[0].currency)) _cl = sizeof(st.cfg->initial_capital[0].currency) - 1;
memcpy(st.cfg->initial_capital[st.cfg->initial_capital_count].currency, st.capital_currency, _cl);
st.cfg->initial_capital[st.cfg->initial_capital_count].currency[_cl] = '\0';
st.cfg->initial_capital[st.cfg->initial_capital_count].amount = atof(s);
st.cfg->initial_capital_count++;
}
@ -198,7 +185,10 @@ int config_load(const char *path, config_t *cfg) {
if (st.mapping_depth == 1 &&
(strcmp(st.current_key, "fused_engine") == 0 ||
strcmp(st.current_key, "executor") == 0)) {
strncpy(st.section, st.current_key, sizeof(st.section) - 1);
size_t _sk = strlen(st.current_key);
if (_sk >= sizeof(st.section)) _sk = sizeof(st.section) - 1;
memcpy(st.section, st.current_key, _sk);
st.section[_sk] = '\0';
}
// Nested mapping inside fused_engine section (depth 2)
if (st.mapping_depth == 2 && strcmp(st.section, "fused_engine") == 0 &&

View File

@ -21,9 +21,6 @@ typedef struct {
char symbols[MAX_SYMBOLS][SYMBOL_NAME_LEN]; /* subscribed trading symbol names */
uint32_t symbol_count; /* number of symbols in the list */
char log_level[8]; /* log verbosity level string */
char socket_path[256]; /* unix socket path for inter-process comm */
char rest_host[64]; /* KuCoin REST API hostname */
int rest_port; /* KuCoin REST API port */
char ws_url[256]; /* KuCoin WebSocket base URL */
char token_url[256]; /* KuCoin token endpoint URL */
double reconnect_base_delay; /* initial WebSocket reconnect delay (seconds) */
@ -38,7 +35,6 @@ typedef struct {
uint32_t excluded_currency_count; /* number of excluded currencies */
bool kcs_discount_active; /* whether KCS fee discount applies */
char executor_socket_path[256]; /* unix socket path for signal executor */
bool send_signals; /* whether to actually emit signals */
double cooldown_seconds; /* min seconds between signals for same triangle */
double stats_interval_seconds; /* period between stats log dumps */
bool live_mode; /* live trading vs paper/simulation */

View File

@ -3,9 +3,9 @@
*
* Two-thread architecture:
* HOT thread: epoll_wait on WebSocket fds + timer fd for keep-alive pings
* COLD thread: polls SPSC signal queue + Unix domain socket to executor
* EXECUTOR thread: polls SPSC signal queue + fill wake fd, executes triangles
*
* Signals flow: evaluate.c -> SPSC queue -> COLD thread -> executor via UDS
* Signals flow: evaluate.c -> SPSC queue -> executor thread (direct execution)
*/
#include "log.h"
@ -28,7 +28,6 @@ static int64_t now_mono_ms(void) {
#include <poll.h>
#include <time.h>
#include <pthread.h>
#include <sys/un.h>
#include <sys/timerfd.h>
#include <sys/eventfd.h>
#include <sys/socket.h>
@ -103,7 +102,6 @@ int event_loops_init(event_loops_t *loops, ws_client_t *ws_client,
loops->ws_client = ws_client;
loops->signal_queue = signal_queue;
loops->running = true;
loops->unix_client_fd = -1;
loops->wakeup_fd = wakeup_fd;
epoll_set_init(&loops->hot_epoll);
@ -126,213 +124,10 @@ void event_loops_destroy(event_loops_t *loops) {
loops->running = false;
if (loops->timer_fd >= 0) close(loops->timer_fd);
if (loops->wakeup_fd >= 0) close(loops->wakeup_fd);
if (loops->unix_client_fd >= 0) close(loops->unix_client_fd);
if (loops->http_server_fd >= 0) close(loops->http_server_fd);
if (loops->hot_epoll.epoll_fd >= 0) close(loops->hot_epoll.epoll_fd);
if (loops->cold_epoll.epoll_fd >= 0) close(loops->cold_epoll.epoll_fd);
}
/* Connect to a Unix domain socket at the given path. Uses SOCK_NONBLOCK
with a poll-based 100ms timeout for the connection to complete.
Returns connected fd on success, -1 on failure. */
int unix_client_connect(const char *socket_path) {
int fd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (fd < 0) return -1;
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1);
if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
if (errno != EINPROGRESS) {
close(fd);
return -1;
}
struct pollfd pfd = { .fd = fd, .events = POLLOUT };
if (poll(&pfd, 1, 100) <= 0) { // 100 ms timeout
close(fd);
return -1;
}
}
return fd;
}
/* Create and bind a Unix domain stream socket server, remove stale socket
file first. Sets O_NONBLOCK on the listening fd. Returns fd, or -1. */
int unix_server_create(const char *socket_path) {
int fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0) return -1;
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1);
unlink(socket_path);
if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
close(fd);
return -1;
}
if (listen(fd, 5) < 0) {
close(fd);
return -1;
}
set_nonblocking(fd);
return fd;
}
/*
* Build a JSON signal message and send it to the external executor over a Unix socket.
*
* JSON structure:
* {
* "type": "signal",
* "correlation_id": "<hex>",
* "triangle_key": ["base","mid","quote"],
* "primary_quote": "<currency>",
* "live": true/false,
* "starting_volume": "<volume>",
* "legs": [{...}, {...}, {...}],
* "predicted_bps": <float>,
* "ts_ms", "book_ts_ms", "t_sock_arrive_ms", "t_arrive_ms", "t_eval_ms": <timestamp>,
* "books": [...] (snapshot, only when !live)
* }
*
* correlation_id is a mix of address/ts/bps values for best-effort uniqueness.
* Connects lazily on first signal; reconnects on write failure.
*/
static void send_signal_to_executor(event_loops_t *loops, signal_entry_t *sig) {
(void)loops; (void)sig;
/* No longer used — execution is direct in the executor thread */
if (loops->unix_client_fd < 0) {
loops->unix_client_fd = unix_client_connect(loops->ws_client->cfg->executor_socket_path);
if (loops->unix_client_fd < 0) {
log_write("[EVENTS] Cannot connect to executor at %s\n",
loops->ws_client->cfg->executor_socket_path);
return;
}
event_loops_add_fd(&loops->cold_epoll, loops->unix_client_fd,
FD_TYPE_UNIX_CLIENT, 0, NULL, EPOLLIN);
}
char json_buf[4096];
char corr_id[37];
snprintf(corr_id, sizeof(corr_id),
"%08x%08x%08x%08x",
(unsigned)(uintptr_t)&sig->legs.legs[0] ^ (unsigned)sig->ts_ms,
(unsigned)sig->ts_ms ^ (unsigned)sig->book_ts_ms,
(unsigned)sig->predicted_bps,
(unsigned)sig->t_arrive_ms);
char legs_json[1024];
legs_json[0] = '\0';
for (uint8_t l = 0; l < 3; l++) {
const signal_leg_t *sl = &sig->legs.legs[l];
char tmp[384];
snprintf(tmp, sizeof(tmp),
"%s{\"pair\":\"%s\",\"side\":\"%s\","
"\"order_param\":\"%s\","
"\"fee_rate\":%.6f,\"fee_currency\":\"%s\","
"\"base_increment\":\"%.10g\",\"quote_increment\":\"%.10g\",\"base_min_size\":\"%.10g\"}",
l ? "," : "", sl->symbol, sl->side,
sl->order_param,
sl->fee_rate, sl->fee_currency,
sl->base_increment, sl->quote_increment, sl->base_min_size);
strncat(legs_json, tmp, sizeof(legs_json) - 1);
}
// triangle_key as JSON array ["base","mid","quote"]
char triangle_key_json[96];
{
char parts[3][16] = {{0}};
const char *tk = sig->triangle_key;
const char *s1 = strchr(tk, '/');
const char *s2 = s1 ? strchr(s1 + 1, '/') : NULL;
if (s1 && s2) {
uint32_t l1 = s1 - tk;
if (l1 > 15) l1 = 15;
memcpy(parts[0], tk, l1);
uint32_t l2 = s2 - s1 - 1;
if (l2 > 15) l2 = 15;
memcpy(parts[1], s1 + 1, l2);
strncpy(parts[2], s2 + 1, 15);
snprintf(triangle_key_json, sizeof(triangle_key_json),
"[\"%s\",\"%s\",\"%s\"]", parts[0], parts[1], parts[2]);
} else {
snprintf(triangle_key_json, sizeof(triangle_key_json), "[\"%s\"]", tk);
}
}
// Book tops included for display and drift analysis
char books_json_str[2048] = "";
if (sig->book_count > 0) {
char *bp = books_json_str;
size_t rem = sizeof(books_json_str);
for (uint8_t b = 0; b < sig->book_count; b++) {
const signal_book_t *sb = &sig->books[b];
char bid_arr[256] = {0}, ask_arr[256] = {0};
// Only top-of-book level included for display / drift analysis
char tmp[64];
if (sb->bid_count > 0) {
snprintf(tmp, sizeof(tmp), "{\"price\":\"%.6g\",\"size\":\"%.8g\"}",
sb->bids[0].price, sb->bids[0].size);
strncat(bid_arr, tmp, sizeof(bid_arr) - 1);
}
if (sb->ask_count > 0) {
snprintf(tmp, sizeof(tmp), "{\"price\":\"%.6g\",\"size\":\"%.8g\"}",
sb->asks[0].price, sb->asks[0].size);
strncat(ask_arr, tmp, sizeof(ask_arr) - 1);
}
int n = snprintf(bp, rem,
"%s{\"symbol\":\"%s\",\"bids\":[%s],\"asks\":[%s],\"ts_ms\":%lld}",
b ? "," : "", sb->symbol, bid_arr, ask_arr, (long long)sb->ts_ms);
if (n > 0 && (size_t)n < rem) { bp += n; rem -= (size_t)n; }
}
}
snprintf(json_buf, sizeof(json_buf),
"{\"type\":\"signal\",\"correlation_id\":\"%s\","
"\"triangle_key\":%s,\"primary_quote\":\"%s\","
"\"live\":%s,\"starting_volume\":\"%.8g\","
"\"legs\":[%s],\"predicted_bps\":%.4f,"
"\"ts_ms\":%lld,\"book_ts_ms\":%lld,\"t_sock_arrive_ms\":%lld,\"t_arrive_ms\":%lld,\"t_eval_ms\":%lld"
"%s%s%s"
"}\n",
corr_id, triangle_key_json, sig->primary_quote,
sig->live ? "true" : "false", sig->starting_volume,
legs_json, sig->predicted_bps,
(long long)sig->ts_ms, (long long)sig->book_ts_ms,
(long long)sig->t_sock_arrive_ms,
(long long)sig->t_arrive_ms, (long long)sig->t_eval_ms,
sig->book_count == 0 ? "" : ",\"books\":[",
books_json_str[0] ? books_json_str : "",
sig->book_count == 0 ? "" : "]");
size_t to_send = strlen(json_buf);
size_t sent = 0;
while (sent < to_send) {
int r = (int)write(loops->unix_client_fd, json_buf + sent, to_send - sent);
if (r > 0) {
sent += (size_t)r;
continue;
}
if (r == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) {
log_write("[EVENTS] Write to executor failed, reconnecting\n");
int old_fd = loops->unix_client_fd;
loops->unix_client_fd = -1;
close(old_fd);
event_loops_remove_fd(&loops->cold_epoll, old_fd);
break;
}
/* EAGAIN: executor buffer full, drop this signal and move on */
break;
}
}
static void arm_ping_timer(event_loops_t *loops, uint64_t interval_ms) {
if (interval_ms == 0) return;
struct itimerspec its = {0};
@ -383,7 +178,7 @@ void *event_hot_thread(void *arg) {
ws_client_read(ws, tf->ws_conn_idx);
} else if (tf->type == FD_TYPE_TIMER) {
uint64_t expirations = 0;
read(loops->timer_fd, &expirations, sizeof(expirations));
if (read(loops->timer_fd, &expirations, sizeof(expirations)) < 0) {}
for (uint32_t c = 0; c < ws->connection_count; c++) {
ws_connection_t *conn = &ws->connections[c];
@ -413,7 +208,8 @@ void *event_cold_thread(void *arg) {
log_write("[EXEC] Thread started\n");
executor_thread_t *exec = executor_thread_create(loops->ws_client->cfg,
loops->ws_client->fill_ch);
loops->ws_client->fill_ch,
loops->ws_client);
if (!exec) {
log_write("[EXEC] Failed to create executor\n");
return NULL;
@ -443,14 +239,14 @@ void *event_cold_thread(void *arg) {
while (!spsc_empty(loops->signal_queue)) {
signal_entry_t sig;
if (spsc_pop(loops->signal_queue, &sig)) {
executor_execute_triangle(exec, &sig, loops->ws_client->fill_ch);
executor_execute_triangle(exec, &sig);
}
}
/* Drain fill wakeup */
if (fds[1].revents & POLLIN) {
uint64_t val;
read(fds[1].fd, &val, sizeof(val));
if (read(fds[1].fd, &val, sizeof(val)) < 0) {}
}
/* Keepalive: warm up REST connection every 30s */
@ -463,7 +259,7 @@ void *event_cold_thread(void *arg) {
while (!spsc_empty(loops->signal_queue)) {
signal_entry_t sig;
if (spsc_pop(loops->signal_queue, &sig)) {
executor_execute_triangle(exec, &sig, loops->ws_client->fill_ch);
executor_execute_triangle(exec, &sig);
}
}
}

View File

@ -14,9 +14,6 @@ typedef enum {
FD_TYPE_WS, /* WebSocket connection */
FD_TYPE_TIMER, /* timerfd */
FD_TYPE_EVENT, /* eventfd for queue wakeup */
FD_TYPE_UNIX_SERVER, /* unix domain socket server */
FD_TYPE_HTTP_SERVER, /* HTTP server socket */
FD_TYPE_UNIX_CLIENT /* unix domain socket client */
} fd_type_t;
/* A file descriptor tracked by the epoll event loop */
@ -41,9 +38,6 @@ typedef struct {
epoll_set_t cold_epoll; /* cold epoll set for timer/http events */
ws_client_t *ws_client; /* WebSocket client instance */
spsc_queue_t *signal_queue; /* signal queue for emitting opportunities */
int unix_server_fd; /* unix domain server socket */
int unix_client_fd; /* unix domain client socket */
int http_server_fd; /* HTTP server socket */
int timer_fd; /* timerfd for periodic tasks */
int wakeup_fd; /* eventfd for waking the cold loop */
uint64_t next_ping_ms; /* next scheduled WebSocket ping timestamp */
@ -64,9 +58,5 @@ void event_loops_remove_fd(epoll_set_t *set, int fd);
void *event_hot_thread(void *arg);
/* Cold event loop thread: handles timers, HTTP, and signal dispatch */
void *event_cold_thread(void *arg);
/* Connect to a unix domain socket (non-blocking) */
int unix_client_connect(const char *socket_path);
/* Create and listen on a unix domain socket */
int unix_server_create(const char *socket_path);
#endif

View File

@ -17,6 +17,7 @@
struct executor_thread_s {
const config_t *cfg;
rest_conn_t *rest;
ws_client_t *ws;
/* Concurrency isolation state */
char in_flight_triangles[MAX_IN_FLIGHT][128]; /* triangle_key */
@ -77,11 +78,13 @@ static double apply_increment_floor(double vol, double inc) {
/* ── Core execution loop ── */
executor_thread_t *executor_thread_create(const config_t *cfg,
fill_channel_t *fill_ch) {
fill_channel_t *fill_ch,
ws_client_t *ws) {
(void)fill_ch;
executor_thread_t *et = calloc(1, sizeof(*et));
if (!et) return NULL;
et->cfg = cfg;
et->ws = ws;
et->rest = rest_conn_new();
if (et->rest) {
rest_conn_set_auth(et->rest,
@ -91,29 +94,22 @@ executor_thread_t *executor_thread_create(const config_t *cfg,
}
/* Warm up the authenticated REST connection pool */
if (et->rest && cfg->live_mode) {
if (et->rest && cfg->live_mode && cfg->initial_capital_count > 0) {
double dummy = 0;
(void)rest_get_balance(et->rest, "USDT", &dummy);
(void)rest_get_balance(et->rest, cfg->initial_capital[0].currency, &dummy);
}
return et;
}
bool executor_keepalive(executor_thread_t *et) {
if (!et->rest) return false;
if (!et->rest || et->cfg->initial_capital_count == 0) return false;
double dummy = 0;
return rest_get_balance(et->rest, "USDT", &dummy);
}
void executor_thread_set_fill_ch(executor_thread_t *et, fill_channel_t *ch) {
(void)et;
(void)ch;
/* In the initial implementation the fill channel is global (ws_client->fill_ch),
passed directly to execute_triangle. This setter is for per-thread channels. */
return rest_get_balance(et->rest, et->cfg->initial_capital[0].currency, &dummy);
}
void executor_execute_triangle(executor_thread_t *et,
signal_entry_t *sig,
fill_channel_t *fill_ch) {
signal_entry_t *sig) {
fill_channel_t *fill_ch = et->ws->fill_ch;
/* ── Concurrency isolation ── */
uint64_t pair_hashes[3] = {0};
for (int p = 0; p < 3; p++) {
@ -171,6 +167,7 @@ void executor_execute_triangle(executor_thread_t *et,
double input_vol;
if (leg == 0) {
input_vol = atof(sl->order_param);
input_vol = apply_fee_hold(input_vol, sl->fee_rate, is_buy);
} else {
input_vol = leg_output[leg - 1];
input_vol = apply_fee_hold(input_vol, sl->fee_rate, is_buy);
@ -240,7 +237,6 @@ void executor_execute_triangle(executor_thread_t *et,
/* ── Wait for fill (live only) ── */
double total_size = 0, total_funds = 0, avg_price = 0, total_fee = 0;
int match_count = 0;
if (sig->live) {
fill_result_t fr = {0};
@ -255,8 +251,7 @@ void executor_execute_triangle(executor_thread_t *et,
total_funds = fr.total_funds;
total_fee = fr.total_fee;
avg_price = fr.avg_price;
match_count = fr.match_count;
if (fr.order_id[0]) strncpy(order_id, fr.order_id, sizeof(order_id) - 1);
if (fr.order_id[0]) memcpy(order_id, fr.order_id, sizeof(order_id));
} else {
/* Paper mode: simulate fill from books[leg] top-of-book */
if (sig->book_count > leg) {
@ -312,12 +307,37 @@ void executor_execute_triangle(executor_thread_t *et,
(double)(now_mono_ms() - order_fire_ms_tracking));
/* ── Cascade ── */
leg_output[leg] = is_buy ? total_size : total_funds;
double raw_output = is_buy ? total_size : total_funds;
leg_output[leg] = raw_output;
fills[leg][0] = leg_output[leg];
fills[leg][1] = avg_price;
fills[leg][2] = total_fee;
fills[leg][3] = total_funds;
/* ── Balance wait (live only): wait for KuCoin to settle before next leg ── */
if (sig->live && leg < 2) {
const char *pair = sl->symbol;
const char *dash = strchr(pair, '-');
char out_ccy[16] = {0};
if (dash) {
const char *ccy = is_buy ? pair : (dash + 1);
size_t ccy_len = is_buy ? (size_t)(dash - pair) : strlen(dash + 1);
if (ccy_len > 15) ccy_len = 15;
memcpy(out_ccy, ccy, ccy_len);
}
if (out_ccy[0]) {
ws_client_await_balance(et->ws, out_ccy, raw_output, 2000);
/* For sells, KuCoin may deduct fee from output currency.
Use the actual settled balance rather than the arithmetic output. */
double actual = ws_client_latest_balance(et->ws, out_ccy);
if (actual > 0 && actual < raw_output) {
leg_output[leg] = actual;
fills[leg][0] = actual;
raw_output = actual;
}
}
}
/* Fee hold reduction + increment floor for next leg */
if (leg < 2) {
const signal_leg_t *nsl = &sig->legs.legs[leg + 1];
@ -331,10 +351,8 @@ void executor_execute_triangle(executor_thread_t *et,
/* ── Compute PnL ── */
double profit = 0, effective_bps = 0;
if (success && fills[2][0] > 0) {
double leg0_in = (strcmp(sig->legs.legs[0].side, "buy") == 0)
? fills[0][3] : fills[0][0];
double leg2_out = (strcmp(sig->legs.legs[2].side, "buy") == 0)
? fills[2][0] : fills[2][0];
double leg0_in = fills[0][4];
double leg2_out = fills[2][0];
/* Subtract fee if fee currency matches leg 2 output currency */
{
const signal_leg_t *sl2 = &sig->legs.legs[2];

View File

@ -5,21 +5,19 @@
#include "fill_handler.h"
#include "config.h"
#include "queue.h"
#include "ws_client.h"
/* Per-thread data for the executor. */
typedef struct executor_thread_s executor_thread_t;
/* Create an executor thread (one per concurrent slot). */
executor_thread_t *executor_thread_create(const config_t *cfg,
fill_channel_t *fill_ch);
/* Set the fill channel for this executor thread. */
void executor_thread_set_fill_ch(executor_thread_t *et, fill_channel_t *ch);
fill_channel_t *fill_ch,
ws_client_t *ws);
/* Execute a single triangle signal (blocking, called from executor thread). */
void executor_execute_triangle(executor_thread_t *et,
signal_entry_t *sig,
fill_channel_t *fill_ch);
signal_entry_t *sig);
/* Report status text. */
void executor_write_report(const char *fmt, ...);

View File

@ -1,369 +0,0 @@
/*
* http_server.c - Simple single-threaded HTTP server for health & book queries
*
* Provides REST endpoints:
* GET /health - connection status
* GET /book/{symbol} - single order book snapshot
* GET /books - all order books
* GET /symbols - list tracked symbols
* POST /symbols - dynamically add symbols (subscribe via WS)
* DELETE /symbols/{name} - remove symbol
*/
#include "log.h"
#include "http_server.h"
#include "cJSON.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
static int set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) return -1;
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
int http_server_init(http_server_t *srv, const char *host, int port,
order_book_t *books, symbol_table_t *symbols,
ws_client_t *ws_client, evaluator_t *evaluator,
config_t *cfg) {
memset(srv, 0, sizeof(*srv));
srv->books = books;
srv->symbols = symbols;
srv->ws_client = ws_client;
srv->evaluator = evaluator;
srv->cfg = cfg;
srv->client_fd = -1;
srv->running = true;
srv->listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (srv->listen_fd < 0) {
perror("socket");
return -1;
}
int opt = 1;
setsockopt(srv->listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
if (strcmp(host, "0.0.0.0") == 0) {
addr.sin_addr.s_addr = INADDR_ANY;
} else {
inet_pton(AF_INET, host, &addr.sin_addr);
}
if (bind(srv->listen_fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
perror("bind");
close(srv->listen_fd);
return -1;
}
if (listen(srv->listen_fd, 16) < 0) {
perror("listen");
close(srv->listen_fd);
return -1;
}
set_nonblocking(srv->listen_fd);
log_write("[HTTP] Server listening on %s:%d\n", host, port);
return 0;
}
void http_server_destroy(http_server_t *srv) {
srv->running = false;
if (srv->client_fd >= 0) close(srv->client_fd);
if (srv->listen_fd >= 0) close(srv->listen_fd);
}
int http_server_accept(http_server_t *srv) {
if (srv->client_fd >= 0) {
close(srv->client_fd);
}
struct sockaddr_in client_addr;
socklen_t addr_len = sizeof(client_addr);
srv->client_fd = accept(srv->listen_fd, (struct sockaddr *)&client_addr, &addr_len);
if (srv->client_fd < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
perror("accept");
}
return -1;
}
set_nonblocking(srv->client_fd);
srv->recv_len = 0;
return srv->client_fd;
}
/* Write an HTTP response with status line, headers, and body to the client. */
static void http_send(http_server_t *srv, const char *status,
const char *content_type, const char *body) {
char header[512];
int body_len = body ? (int)strlen(body) : 0;
int hdr_len = snprintf(header, sizeof(header),
"HTTP/1.1 %s\r\n"
"Content-Type: %s\r\n"
"Content-Length: %d\r\n"
"Connection: close\r\n"
"\r\n",
status, content_type, body_len);
write(srv->client_fd, header, (size_t)hdr_len);
if (body && body_len > 0) {
write(srv->client_fd, body, (size_t)body_len);
}
}
/* Send a 200 OK JSON response. */
static void http_send_json(http_server_t *srv, const char *body) {
http_send(srv, "200 OK", "application/json", body);
}
/* Send an error response with the given status code and plain text body. */
static void http_send_error(http_server_t *srv, const char *status, const char *msg) {
http_send(srv, status, "text/plain", msg);
}
/* GET /health — return WS connection count and symbol count as JSON. */
static void handle_health(http_server_t *srv) {
char body[256];
int connected = 0;
if (srv->ws_client) {
for (uint32_t i = 0; i < srv->ws_client->connection_count; i++) {
if (srv->ws_client->connections[i].state == WS_STATE_CONNECTED) connected++;
}
}
snprintf(body, sizeof(body),
"{\"status\":\"ok\",\"ws_connections\":%d,\"symbols\":%u}",
connected, srv->symbols ? srv->symbols->count : 0);
http_send_json(srv, body);
}
/* GET /book/{symbol} — return a single order book as JSON, or 404. */
static void handle_book(http_server_t *srv, const char *symbol) {
if (!srv->symbols || !srv->books) {
http_send_error(srv, "500 Internal Server Error", "not initialized\n");
return;
}
int16_t idx = symbol_table_lookup(srv->symbols, symbol);
if (idx < 0) {
char body[128];
snprintf(body, sizeof(body), "{\"error\":\"symbol not found\",\"symbol\":\"%s\"}", symbol);
http_send(srv, "404 Not Found", "application/json", body);
return;
}
order_book_t *book = &srv->books[idx];
char body[2048];
int off = 0;
off += snprintf(body + off, sizeof(body) - (size_t)off,
"{\"symbol\":\"%s\",\"ts_ms\":%lld,\"sequence\":%lld,\"bids\":[",
book->symbol, (long long)book->ts_ms, (long long)book->sequence);
for (uint8_t i = 0; i < book->bid_count; i++) {
off += snprintf(body + off, sizeof(body) - (size_t)off,
"%s[%.6g,%.8g]", i ? "," : "", book->bids[i][0], book->bids[i][1]);
}
off += snprintf(body + off, sizeof(body) - (size_t)off, "],\"asks\":[");
for (uint8_t i = 0; i < book->ask_count; i++) {
off += snprintf(body + off, sizeof(body) - (size_t)off,
"%s[%.6g,%.8g]", i ? "," : "", book->asks[i][0], book->asks[i][1]);
}
off += snprintf(body + off, sizeof(body) - (size_t)off, "]}");
http_send_json(srv, body);
}
/* GET /books — return all order books as a JSON array. */
static void handle_books(http_server_t *srv) {
if (!srv->symbols || !srv->books) {
http_send_error(srv, "500 Internal Server Error", "not initialized\n");
return;
}
char body[65536];
int off = snprintf(body, sizeof(body), "[");
for (uint32_t i = 0; i < srv->symbols->count && (size_t)off < sizeof(body) - 512; i++) {
order_book_t *book = &srv->books[i];
if (book->ts_ms <= 0) continue;
if (off > 1) off += snprintf(body + off, sizeof(body) - (size_t)off, ",");
off += snprintf(body + off, sizeof(body) - (size_t)off,
"{\"symbol\":\"%s\",\"ts\":%lld,\"bids\":[",
book->symbol, (long long)book->ts_ms);
for (uint8_t j = 0; j < book->bid_count; j++) {
off += snprintf(body + off, sizeof(body) - (size_t)off,
"%s[%.6g,%.8g]", j ? "," : "", book->bids[j][0], book->bids[j][1]);
}
off += snprintf(body + off, sizeof(body) - (size_t)off, "],\"asks\":[");
for (uint8_t j = 0; j < book->ask_count; j++) {
off += snprintf(body + off, sizeof(body) - (size_t)off,
"%s[%.6g,%.8g]", j ? "," : "", book->asks[j][0], book->asks[j][1]);
}
off += snprintf(body + off, sizeof(body) - (size_t)off, "]}");
}
off += snprintf(body + off, sizeof(body) - (size_t)off, "]");
http_send_json(srv, body);
}
/* GET /symbols — list all tracked symbol names as a JSON array. */
static void handle_symbols_list(http_server_t *srv) {
if (!srv->symbols) {
http_send_error(srv, "500 Internal Server Error", "not initialized\n");
return;
}
char body[65536];
int off = snprintf(body, sizeof(body), "[");
for (uint32_t i = 0; i < srv->symbols->count && (size_t)off < sizeof(body) - 64; i++) {
off += snprintf(body + off, sizeof(body) - (size_t)off,
"%s\"%s\"", i ? "," : "", srv->symbols->entries[i].name);
}
off += snprintf(body + off, sizeof(body) - (size_t)off, "]");
http_send_json(srv, body);
}
/* POST /symbols — subscribe to new symbols (add to symbol table + WS subscribe). */
static void handle_symbols_add(http_server_t *srv, const char *body) {
if (!srv->symbols || !srv->ws_client) {
http_send_error(srv, "500 Internal Server Error", "not initialized\n");
return;
}
cJSON *root = cJSON_Parse(body);
if (!root) {
http_send_error(srv, "400 Bad Request", "invalid JSON\n");
return;
}
char resp[1024] = "{\"added\":[";
int resp_len = (int)strlen(resp);
cJSON *item;
cJSON_ArrayForEach(item, root) {
if (!cJSON_IsString(item)) continue;
const char *sym = item->valuestring;
if (!sym) continue;
int16_t existing = symbol_table_lookup(srv->symbols, sym);
if (existing >= 0) continue;
if (symbol_table_add(srv->symbols, sym) == 0) {
int16_t idx = symbol_table_lookup(srv->symbols, sym);
if (idx >= 0) {
if (resp_len > 11) resp_len += snprintf(resp + resp_len,
sizeof(resp) - (size_t)resp_len, ",");
resp_len += snprintf(resp + resp_len,
sizeof(resp) - (size_t)resp_len, "\"%s\"", sym);
// Auto-subscribe to WS stream for the new symbol
uint16_t uidx = (uint16_t)idx;
if (srv->ws_client->connection_count > 0) {
ws_client_subscribe(srv->ws_client, 0, &uidx, 1);
}
}
}
}
resp_len += snprintf(resp + resp_len, sizeof(resp) - (size_t)resp_len, "]}");
cJSON_Delete(root);
http_send_json(srv, resp);
}
/* DELETE /symbols/{name} — unsubscribe and remove from symbol table. */
static void handle_symbols_remove(http_server_t *srv, const char *symbol) {
if (!srv->symbols || !srv->ws_client) {
http_send_error(srv, "500 Internal Server Error", "not initialized\n");
return;
}
int16_t idx = symbol_table_lookup(srv->symbols, symbol);
if (idx < 0) {
char body[128];
snprintf(body, sizeof(body), "{\"error\":\"symbol not found\",\"symbol\":\"%s\"}", symbol);
http_send(srv, "404 Not Found", "application/json", body);
return;
}
uint16_t uidx = (uint16_t)idx;
if (srv->ws_client->connection_count > 0) {
ws_client_unsubscribe(srv->ws_client, 0, &uidx, 1);
}
// Compact the symbol table by shifting entries
for (uint32_t i = 0; i < srv->symbols->count; i++) {
if (strcmp(srv->symbols->entries[i].name, symbol) == 0) {
memmove(&srv->symbols->entries[i], &srv->symbols->entries[i + 1],
(srv->symbols->count - i - 1) * sizeof(symbol_entry_t));
srv->symbols->count--;
for (uint32_t j = i; j < srv->symbols->count; j++) {
srv->symbols->entries[j].index = (uint16_t)j;
}
break;
}
}
char resp[128];
snprintf(resp, sizeof(resp), "{\"removed\":\"%s\"}", symbol);
http_send_json(srv, resp);
}
int http_server_handle_request(http_server_t *srv) {
ssize_t n = read(srv->client_fd, srv->recv_buf + srv->recv_len,
sizeof(srv->recv_buf) - srv->recv_len - 1);
if (n <= 0) {
if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
perror("read");
}
close(srv->client_fd);
srv->client_fd = -1;
srv->recv_len = 0;
return -1;
}
srv->recv_len += (size_t)n;
srv->recv_buf[srv->recv_len] = '\0';
char *headers_end = strstr((char *)srv->recv_buf, "\r\n\r\n");
if (!headers_end) return 0;
char *request_line = (char *)srv->recv_buf;
char *rl_end = strchr(request_line, '\r');
if (!rl_end) return 0;
*rl_end = '\0';
char method[16] = {0}, path[512] = {0};
sscanf(request_line, "%15s %511s", method, path);
char *body_start = headers_end + 4;
size_t body_len = srv->recv_len - (size_t)(body_start - (char *)srv->recv_buf);
if (strcmp(path, "/health") == 0) {
handle_health(srv);
} else if (strncmp(path, "/book/", 6) == 0) {
handle_book(srv, path + 6);
} else if (strcmp(path, "/books") == 0) {
handle_books(srv);
} else if (strcmp(path, "/symbols") == 0) {
if (strcmp(method, "POST") == 0) {
handle_symbols_add(srv, body_start);
} else {
handle_symbols_list(srv);
}
} else if (strncmp(path, "/symbols/", 9) == 0) {
if (strcmp(method, "DELETE") == 0) {
handle_symbols_remove(srv, path + 9);
} else {
http_send_error(srv, "405 Method Not Allowed", "use DELETE\n");
}
} else {
http_send_error(srv, "404 Not Found", "not found\n");
}
close(srv->client_fd);
srv->client_fd = -1;
srv->recv_len = 0;
return 0;
}

View File

@ -1,38 +0,0 @@
#ifndef FUSED_HTTP_SERVER_H
#define FUSED_HTTP_SERVER_H
#include <stdint.h>
#include <stdbool.h>
#include "book.h"
#include "hash.h"
#include "config.h"
#include "ws_client.h"
#include "evaluate.h"
/* Embedded HTTP server for health/status endpoints */
typedef struct {
int listen_fd; /* server listening socket */
int client_fd; /* currently connected client socket */
uint8_t recv_buf[8192]; /* request receive buffer */
size_t recv_len; /* bytes received so far */
order_book_t *books; /* pointer to shared order books */
symbol_table_t *symbols; /* pointer to shared symbol table */
ws_client_t *ws_client; /* pointer to WebSocket client state */
evaluator_t *evaluator; /* pointer to evaluator state */
config_t *cfg; /* pointer to configuration */
bool running; /* false signals server to stop */
} http_server_t;
/* Initialise and bind the HTTP server */
int http_server_init(http_server_t *srv, const char *host, int port,
order_book_t *books, symbol_table_t *symbols,
ws_client_t *ws_client, evaluator_t *evaluator,
config_t *cfg);
/* Destroy the HTTP server and close sockets */
void http_server_destroy(http_server_t *srv);
/* Accept a new client connection (non-blocking) */
int http_server_accept(http_server_t *srv);
/* Read, parse, and respond to the current client request */
int http_server_handle_request(http_server_t *srv);
#endif

View File

@ -26,7 +26,6 @@
#include "evaluate.h"
#include "queue.h"
#include "events.h"
#include "http_server.h"
static volatile sig_atomic_t g_running = 1;
@ -159,12 +158,6 @@ int main(int argc, char *argv[]) {
return 1;
}
http_server_t http_srv;
if (http_server_init(&http_srv, cfg.rest_host, cfg.rest_port,
books, &symbols, &ws_client, &evaluator, &cfg) != 0) {
log_write("[MAIN] HTTP server init failed (non-fatal)\n");
}
if (symbols.count > 0) {
uint16_t all_indices[MAX_SYMBOLS];
uint32_t n = symbols.count > MAX_SYMBOLS ? MAX_SYMBOLS : symbols.count;
@ -214,15 +207,8 @@ int main(int argc, char *argv[]) {
log_write("[MAIN] Fused engine running. Press Ctrl+C to stop.\n");
// Main loop: accept HTTP connections and reconnect disconnected WS
// Main loop: reconnect disconnected WS
while (g_running) {
if (http_srv.listen_fd >= 0 && http_srv.client_fd < 0) {
http_server_accept(&http_srv);
}
if (http_srv.client_fd >= 0) {
http_server_handle_request(&http_srv);
}
struct timespec ts = {0, 100000000};
nanosleep(&ts, NULL);
@ -249,13 +235,12 @@ int main(int argc, char *argv[]) {
ws_client.running = false;
uint64_t val = 1;
ssize_t wr = write(events.wakeup_fd, &val, sizeof(val));
ssize_t wr = write(events.wakeup_fd, &val, sizeof(val));
(void)wr;
pthread_join(hot_thread, NULL);
pthread_join(cold_thread, NULL);
http_server_destroy(&http_srv);
event_loops_destroy(&events);
ws_client_destroy(&ws_client);
spsc_destroy(&signal_queue);

View File

@ -336,15 +336,15 @@ int discover_symbols(symbol_table_t *symbols, triangle_set_t *triangles,
t->use_bid[1] = (strcmp(pairs.pairs[i2].quote, y) == 0) ? 1 : 0;
t->use_bid[2] = (strcmp(pairs.pairs[i3].quote, hold) == 0) ? 1 : 0;
t->id = (uint16_t)tri_count;
strncpy(t->symbol_names[0], pairs.pairs[i1].symbol, SYMBOL_NAME_LEN - 1);
strncpy(t->symbol_names[1], pairs.pairs[i2].symbol, SYMBOL_NAME_LEN - 1);
strncpy(t->symbol_names[2], pairs.pairs[i3].symbol, SYMBOL_NAME_LEN - 1);
strncpy(t->fee_currency[0], pairs.pairs[i1].fee_currency, CURRENCY_NAME_LEN - 1);
strncpy(t->fee_currency[1], pairs.pairs[i2].fee_currency, CURRENCY_NAME_LEN - 1);
strncpy(t->fee_currency[2], pairs.pairs[i3].fee_currency, CURRENCY_NAME_LEN - 1);
strncpy(t->base, hold, CURRENCY_NAME_LEN - 1);
strncpy(t->mid, x, CURRENCY_NAME_LEN - 1);
strncpy(t->quote, y, CURRENCY_NAME_LEN - 1);
{ size_t _l = strlen(pairs.pairs[i1].symbol); if (_l >= SYMBOL_NAME_LEN) _l = SYMBOL_NAME_LEN - 1; memcpy(t->symbol_names[0], pairs.pairs[i1].symbol, _l); t->symbol_names[0][_l] = '\0'; }
{ size_t _l = strlen(pairs.pairs[i2].symbol); if (_l >= SYMBOL_NAME_LEN) _l = SYMBOL_NAME_LEN - 1; memcpy(t->symbol_names[1], pairs.pairs[i2].symbol, _l); t->symbol_names[1][_l] = '\0'; }
{ size_t _l = strlen(pairs.pairs[i3].symbol); if (_l >= SYMBOL_NAME_LEN) _l = SYMBOL_NAME_LEN - 1; memcpy(t->symbol_names[2], pairs.pairs[i3].symbol, _l); t->symbol_names[2][_l] = '\0'; }
{ size_t _l = strlen(pairs.pairs[i1].fee_currency); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->fee_currency[0], pairs.pairs[i1].fee_currency, _l); t->fee_currency[0][_l] = '\0'; }
{ size_t _l = strlen(pairs.pairs[i2].fee_currency); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->fee_currency[1], pairs.pairs[i2].fee_currency, _l); t->fee_currency[1][_l] = '\0'; }
{ size_t _l = strlen(pairs.pairs[i3].fee_currency); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->fee_currency[2], pairs.pairs[i3].fee_currency, _l); t->fee_currency[2][_l] = '\0'; }
{ size_t _l = strlen(hold); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->base, hold, _l); t->base[_l] = '\0'; }
{ size_t _l = strlen(x); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->mid, x, _l); t->mid[_l] = '\0'; }
{ size_t _l = strlen(y); if (_l >= CURRENCY_NAME_LEN) _l = CURRENCY_NAME_LEN - 1; memcpy(t->quote, y, _l); t->quote[_l] = '\0'; }
t->fee_factor[0] = 1.0 - fee_rate_for_pair(&pairs.pairs[i1], kcs);
t->fee_factor[1] = 1.0 - fee_rate_for_pair(&pairs.pairs[i2], kcs);
t->fee_factor[2] = 1.0 - fee_rate_for_pair(&pairs.pairs[i3], kcs);

View File

@ -21,6 +21,7 @@
#include <fcntl.h>
#include <errno.h>
#include <poll.h>
#include <sys/eventfd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netdb.h>
@ -186,6 +187,12 @@ int ws_client_init(ws_client_t *client, const config_t *cfg,
log_write("[WS] Failed to create fill channel\n");
}
pthread_mutex_init(&client->balance_lock, NULL);
client->balance_wake_fd = eventfd(0, EFD_NONBLOCK);
if (client->balance_wake_fd < 0) {
log_write("[WS] Failed to create balance wake eventfd\n");
}
return 0;
}
@ -199,6 +206,8 @@ void ws_client_destroy(ws_client_t *client) {
conn->ctx = NULL;
}
if (ctx) SSL_CTX_free(ctx);
if (client->balance_wake_fd >= 0) close(client->balance_wake_fd);
pthread_mutex_destroy(&client->balance_lock);
}
/*
@ -268,7 +277,10 @@ int ws_client_fetch_token_priv(ws_connection_t *conn, const config_t *cfg) {
}
char *slash = strchr(host_start, '/');
if (slash) *slash = '\0';
strncpy(conn->host, host_start, sizeof(conn->host) - 1);
size_t hl = strlen(host_start);
if (hl >= sizeof(conn->host)) hl = sizeof(conn->host) - 1;
memcpy(conn->host, host_start, hl);
conn->host[hl] = '\0';
}
if (cJSON_IsNumber(pingInterval)) conn->ping_interval_ms = (uint32_t)pingInterval->valuedouble;
@ -371,7 +383,7 @@ int ws_client_connect(ws_client_t *client, uint32_t conn_idx) {
ws_client_subscribe(client, conn_idx, conn->symbol_indices, conn->symbol_count);
}
/* Subscribe to private order-change channel for fill events */
/* Subscribe to private channels */
{
char msg[256];
snprintf(msg, sizeof(msg),
@ -380,6 +392,14 @@ int ws_client_connect(ws_client_t *client, uint32_t conn_idx) {
ws_send_text(conn, msg);
log_write("[WS] Subscribed to tradeOrdersV2\n");
}
{
char msg[256];
snprintf(msg, sizeof(msg),
"{\"type\":\"subscribe\",\"topic\":\"/account/balance\","
"\"response\":true,\"privateChannel\":\"true\"}");
ws_send_text(conn, msg);
log_write("[WS] Subscribed to /account/balance\n");
}
return 0;
}
@ -648,7 +668,10 @@ static int16_t parse_book_update(cJSON *root, ws_client_t *client) {
}
book->symbol_idx = (uint16_t)sym_idx;
strncpy(book->symbol, symbol, SYMBOL_NAME_LEN - 1);
size_t sl = strlen(symbol);
if (sl >= sizeof(book->symbol)) sl = sizeof(book->symbol) - 1;
memcpy(book->symbol, symbol, sl);
book->symbol[sl] = '\0';
return sym_idx;
}
@ -779,6 +802,33 @@ int16_t ws_client_process_frame(ws_client_t *client, uint32_t conn_idx) {
}
}
}
} else if (cJSON_IsString(subject) &&
strcmp(subject->valuestring, "account.balance") == 0) {
cJSON *data = cJSON_GetObjectItem(msg_root, "data");
if (data) {
cJSON *ccy = cJSON_GetObjectItem(data, "currency");
cJSON *avail = cJSON_GetObjectItem(data, "available");
if (cJSON_IsString(ccy) && cJSON_IsString(avail)) {
pthread_mutex_lock(&client->balance_lock);
int idx = -1;
for (int i = 0; i < client->balance_count; i++) {
if (strcmp(client->balance_cache[i].currency, ccy->valuestring) == 0) {
idx = i; break;
}
}
if (idx < 0 && client->balance_count < MAX_BALANCE_ENTRIES) {
idx = client->balance_count++;
strncpy(client->balance_cache[idx].currency, ccy->valuestring, 15);
}
if (idx >= 0)
client->balance_cache[idx].available = atof(avail->valuestring);
pthread_mutex_unlock(&client->balance_lock);
if (client->balance_wake_fd >= 0) {
uint64_t one = 1;
if (write(client->balance_wake_fd, &one, sizeof(one)) < 0) {}
}
}
}
} else {
sym_idx = parse_book_update(msg_root, client);
}
@ -914,3 +964,68 @@ int ws_client_read(ws_client_t *client, uint32_t conn_idx) {
return 0;
}
bool ws_client_await_balance(ws_client_t *client, const char *currency,
double min_amount, int64_t timeout_ms) {
int64_t deadline = (int64_t)ws_client_now_ms() + timeout_ms;
/* Check cache first */
{
pthread_mutex_lock(&client->balance_lock);
double avail = 0;
bool found = false;
for (int i = 0; i < client->balance_count; i++) {
if (strcmp(client->balance_cache[i].currency, currency) == 0) {
avail = client->balance_cache[i].available;
found = true;
break;
}
}
pthread_mutex_unlock(&client->balance_lock);
if (found && avail >= min_amount - 1e-12)
return true;
}
/* Wait on the balance wake eventfd */
while ((int64_t)ws_client_now_ms() < deadline) {
struct pollfd pfd = { .fd = client->balance_wake_fd, .events = POLLIN };
int remaining = (int)(deadline - (int64_t)ws_client_now_ms());
if (remaining <= 0) break;
int ret = poll(&pfd, 1, remaining);
if (ret > 0) {
uint64_t val;
if (read(client->balance_wake_fd, &val, sizeof(val)) < 0) {}
}
/* Re-check the cache on any wakeup or timeout */
pthread_mutex_lock(&client->balance_lock);
double avail = 0;
bool found = false;
for (int i = 0; i < client->balance_count; i++) {
if (strcmp(client->balance_cache[i].currency, currency) == 0) {
avail = client->balance_cache[i].available;
found = true;
break;
}
}
pthread_mutex_unlock(&client->balance_lock);
if (found && avail >= min_amount - 1e-12)
return true;
}
return false;
}
double ws_client_latest_balance(ws_client_t *client, const char *currency) {
pthread_mutex_lock(&client->balance_lock);
double avail = 0;
for (int i = 0; i < client->balance_count; i++) {
if (strcmp(client->balance_cache[i].currency, currency) == 0) {
avail = client->balance_cache[i].available;
break;
}
}
pthread_mutex_unlock(&client->balance_lock);
return avail;
}

View File

@ -3,6 +3,7 @@
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include "book.h"
@ -11,6 +12,13 @@
#include "evaluate.h"
#include "fill_handler.h"
#define MAX_BALANCE_ENTRIES 64
typedef struct {
char currency[16];
double available;
} balance_entry_t;
#define WS_MAX_FRAME_SIZE (128 * 1024)
#define WS_MAX_CONNECTIONS 8
#define WS_READ_BUF_SIZE (64 * 1024)
@ -74,6 +82,10 @@ typedef struct {
bool running; /* false signals client to stop */
fill_channel_t *fill_ch; /* fill event channel (hot→executor) */
int fill_drop_warn; /* rate-limited fill drop warning counter */
balance_entry_t balance_cache[MAX_BALANCE_ENTRIES]; /* latest available balances from WS */
int balance_count;
pthread_mutex_t balance_lock;
int balance_wake_fd; /* eventfd: written on every balance update */
} ws_client_t;
/* Initialise a WebSocket client with config, symbol table, books, and evaluator */
@ -106,4 +118,14 @@ int ws_client_send_ping(ws_connection_t *conn);
/* Get current monotonic timestamp in milliseconds */
uint64_t ws_client_now_ms(void);
/* Wait for balance WS to confirm available >= min_amount for a currency.
Blocks on the balance wake eventfd for up to timeout_ms.
Returns true once the condition is met, false on timeout. */
bool ws_client_await_balance(ws_client_t *client, const char *currency,
double min_amount, int64_t timeout_ms);
/* Read the latest known available balance for a currency from cache (thread-safe).
Returns 0 if currency is not in cache. */
double ws_client_latest_balance(ws_client_t *client, const char *currency);
#endif