triangular_arbitrage_bot/test_fused.py

239 lines
7.1 KiB
Python

#!/usr/bin/env python3
"""Test harness for fused_engine binary.
Starts the binary, adds symbols via REST API, listens for signals
on the executor Unix socket, and reports results.
"""
import asyncio
import json
import os
import signal
import socket
import subprocess
import sys
import time
from pathlib import Path
BINARY = Path(__file__).parent / "build" / "fused_engine"
CONFIG = Path(__file__).parent / "config.yaml"
REST_URL = "http://127.0.0.1:8000"
SOCKET_PATH = "/tmp/executor.sock"
SIGNAL_FILE = Path(__file__).parent / "test_signals.jsonl"
LOG_FILE = Path(__file__).parent / "test_stderr.log"
TEST_DURATION = 120 # seconds
SYMBOLS_TO_ADD = [
"BTC-USDT", "ETH-USDT", "ETH-BTC",
"BNB-USDT", "BNB-BTC", "BNB-ETH",
"XRP-USDT", "XRP-BTC", "XRP-ETH",
"SOL-USDT", "SOL-BTC", "SOL-ETH",
"ADA-USDT", "ADA-BTC", "ADA-ETH",
"DOGE-USDT", "DOGE-BTC", "DOGE-ETH",
"MATIC-USDT", "MATIC-BTC", "MATIC-ETH",
"DOT-USDT", "DOT-BTC", "DOT-ETH",
]
async def wait_for_rest(timeout=15):
"""Wait until the REST API is reachable."""
import urllib.request
deadline = time.time() + timeout
while time.time() < deadline:
try:
req = urllib.request.urlopen(f"{REST_URL}/health", timeout=2)
if req.status == 200:
return True
except Exception:
pass
await asyncio.sleep(0.5)
return False
async def add_symbols():
"""Add symbols via POST /symbols."""
import urllib.request
try:
data = json.dumps(SYMBOLS_TO_ADD).encode()
req = urllib.request.Request(
f"{REST_URL}/symbols",
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
resp = urllib.request.urlopen(req, timeout=5)
body = json.loads(resp.read())
print(f" Added symbols: {body.get('added', [])}")
except Exception as e:
print(f" Symbol add failed: {e}")
async def check_health():
"""Print health status."""
import urllib.request
try:
req = urllib.request.urlopen(f"{REST_URL}/health", timeout=2)
body = json.loads(req.read())
return body
except Exception as e:
return {"error": str(e)}
async def listen_for_signals(duration):
"""Listen on the executor Unix socket for signal JSON lines."""
signals = []
errors = []
deadline = time.time() + duration
while time.time() < deadline:
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(2.0)
sock.connect(SOCKET_PATH)
buf = b""
while time.time() < deadline:
try:
chunk = sock.recv(4096)
if not chunk:
break
buf += chunk
while b"\n" in buf:
line, buf = buf.split(b"\n", 1)
line = line.strip()
if not line:
continue
try:
sig = json.loads(line)
signals.append(sig)
bps = sig.get("predicted_bps", "?")
key = sig.get("triangle_key", "?")
t_arrive = sig.get("t_arrive_ms", "?")
t_eval = sig.get("t_eval_ms", "?")
print(f" [SIGNAL] {bps} bps | {key} | arrive={t_arrive} eval={t_eval}")
except json.JSONDecodeError:
errors.append(f"Bad JSON: {line[:80]}")
except socket.timeout:
continue
except Exception as e:
errors.append(f"Read error: {e}")
break
sock.close()
except Exception as e:
await asyncio.sleep(1)
return signals, errors
async def main():
print(f"=== Fused Engine Test ===")
print(f"Binary: {BINARY}")
print(f"Config: {CONFIG}")
print(f"Duration: {TEST_DURATION}s")
print(f"Symbols to add: {len(SYMBOLS_TO_ADD)}")
print()
if not BINARY.exists():
print(f"ERROR: Binary not found at {BINARY}")
sys.exit(1)
# Clean up old socket
try:
os.unlink(SOCKET_PATH)
except OSError:
pass
# Start the binary
print("[1] Starting fused_engine...")
with LOG_FILE.open("w") as log_f:
proc = subprocess.Popen(
[str(BINARY), str(CONFIG)],
stdout=subprocess.PIPE,
stderr=log_f,
text=True,
)
print(f" PID: {proc.pid}")
# Wait for REST API
print("[2] Waiting for REST API...")
if not await wait_for_rest(20):
print(" ERROR: REST API did not come up")
proc.terminate()
proc.wait()
print(" stderr output:")
print(LOG_FILE.read_text()[-2000:])
sys.exit(1)
print(" REST API is up")
# Check health
health = await check_health()
print(f" Health: {health}")
# Add symbols
print("[3] Adding symbols...")
await add_symbols()
# Wait a moment for subscriptions to take effect
await asyncio.sleep(3)
# Check health again
health = await check_health()
print(f" Health after add: {health}")
# Listen for signals
print(f"[4] Listening for signals for {TEST_DURATION}s...")
signals, errors = await listen_for_signals(TEST_DURATION)
# Stop the binary
print("[5] Stopping fused_engine...")
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait()
# Report results
print()
print(f"=== Results ===")
print(f"Signals received: {len(signals)}")
print(f"Socket errors: {len(errors)}")
if signals:
SIGNAL_FILE.write_text(json.dumps(signals, indent=2) + "\n")
print(f"Signals saved to: {SIGNAL_FILE}")
bps_values = [s.get("predicted_bps", 0) for s in signals]
print(f"BPS range: {min(bps_values):.4f} - {max(bps_values):.4f}")
triangles = set(s.get("triangle_key", "?") for s in signals)
print(f"Unique triangles: {triangles}")
for s in signals[:5]:
print(f" {s.get('triangle_key', '?')} | {s.get('predicted_bps', '?')} bps | "
f"t_arrive={s.get('t_arrive_ms', '?')} t_eval={s.get('t_eval_ms', '?')}")
else:
print("No signals received during test period.")
if errors:
print(f"Errors: {errors[:5]}")
# Print last lines of stderr log
print()
print("=== Last 40 lines of stderr ===")
log_text = LOG_FILE.read_text()
lines = log_text.strip().split("\n")
for line in lines[-40:]:
print(f" {line}")
print(f"\nFull log: {LOG_FILE}")
print(f"Signals: {SIGNAL_FILE if signals else '(none)'}")
print()
if len(signals) > 0:
print("PASS: Signals were received.")
else:
print("INFO: No signals. This may be normal if no arbitrage opportunities arose.")
print(" Check stderr above for connection/subscription issues.")
if __name__ == "__main__":
asyncio.run(main())