triangular_arbitrage_bot/src/ws_client.c

1080 lines
41 KiB
C

/*
* ws_client.c - KuCoin WebSocket client for level2 depth (top 5) book updates
*
* Handles: token fetch via REST /api/v1/bullet-public, TLS connections,
* RFC 6455 frame read/write/mask, subscription management,
* book update JSON parsing, and per-connection epoll-driven reads.
* Each WS connection handles up to 400 symbol subscriptions.
*/
#include "log.h"
#include "ws_client.h"
#include "http_client.h"
#include "cJSON.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <inttypes.h>
#include <time.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <poll.h>
#include <sys/eventfd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netdb.h>
#include <netinet/tcp.h>
#include <openssl/bio.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
static int ws_send_text(ws_connection_t *conn, const char *text);
static uint64_t now_ms_impl(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (uint64_t)ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
}
uint64_t ws_client_now_ms(void) {
return now_ms_impl();
}
static double now_realtime_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (double)ts.tv_sec * 1000.0 + (double)ts.tv_nsec / 1000000.0;
}
/* Reset a WebSocket connection to its initial disconnected state, freeing
any SSL objects, BIOs, and the socket fd. Safe to call multiple times. */
static void ws_connection_reset(ws_connection_t *conn) {
conn->state = WS_STATE_DISCONNECTED;
conn->read_pos = 0;
conn->read_len = 0;
conn->frame_payload_len = 0;
conn->frame_finished = false;
conn->frame_opcode = 0;
if (conn->ssl) {
SSL_free(conn->ssl);
conn->ssl = NULL;
}
if (conn->bio_mem) {
BIO_free(conn->bio_mem);
conn->bio_mem = NULL;
}
if (conn->bio_ssl) {
BIO_free(conn->bio_ssl);
conn->bio_ssl = NULL;
}
if (conn->bio_socket) {
BIO_free(conn->bio_socket);
conn->bio_socket = NULL;
}
conn->token[0] = '\0';
if (conn->fd >= 0) {
close(conn->fd);
conn->fd = -1;
}
}
/* Create a shared SSL context for all WS connections (TLS client, no peer verification). */
static SSL_CTX *create_ssl_ctx(void) {
SSL_CTX *ctx = SSL_CTX_new(TLS_client_method());
if (!ctx) {
log_write("[WS] SSL_CTX_new failed: %s\n",
ERR_error_string(ERR_get_error(), NULL));
return NULL;
}
SSL_CTX_set_verify(ctx, SSL_VERIFY_NONE, NULL);
return ctx;
}
/* Resolve hostname via getaddrinfo and try each address until one connects.
Returns a connected socket fd, or -1 on failure. */
static int resolve_and_connect(const char *host, int port) {
struct addrinfo hints = {0}, *res = NULL;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
char port_str[8];
snprintf(port_str, sizeof(port_str), "%d", port);
int ret = getaddrinfo(host, port_str, &hints, &res);
if (ret != 0) {
log_write("[WS] getaddrinfo failed for %s:%d: %s\n", host, port, gai_strerror(ret));
return -1;
}
int fd = -1;
for (struct addrinfo *rp = res; rp; rp = rp->ai_next) {
fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (fd < 0) continue;
if (connect(fd, rp->ai_addr, rp->ai_addrlen) == 0) break;
close(fd);
fd = -1;
}
freeaddrinfo(res);
return fd;
}
/* Create SSL object, set SNI hostname, attach socket fd, and perform TLS handshake.
Returns 0 on success, -1 on failure. */
static int setup_tls(ws_connection_t *conn) {
conn->ssl = SSL_new(conn->ctx);
if (!conn->ssl) {
log_write("[WS] SSL_new failed\n");
return -1;
}
if (SSL_set_tlsext_host_name(conn->ssl, conn->host) != 1) {
log_write("[WS] SSL_set_tlsext_host_name failed\n");
return -1;
}
SSL_set_fd(conn->ssl, conn->fd);
log_write("[WS] Connecting SSL to %s:%d (fd=%d)\n", conn->host, conn->port, conn->fd);
int r = SSL_connect(conn->ssl);
if (r != 1) {
int err = SSL_get_error(conn->ssl, r);
unsigned long err2 = ERR_peek_last_error();
log_write("[WS] TLS handshake failed: SSL_error=%d errno=%d r=%d err2=%lu\n", err, errno, r, err2);
if (err2) {
char err_str[256];
ERR_error_string_n(err2, err_str, sizeof(err_str));
log_write("[WS] OpenSSL error: %s\n", err_str);
}
return -1;
}
log_write("[WS] TLS handshake OK, cipher=%s\n", SSL_get_cipher(conn->ssl));
return 0;
}
int ws_client_init(ws_client_t *client, const config_t *cfg,
symbol_table_t *symbols, order_book_t *books,
evaluator_t *evaluator,
executor_slot_t *slots, int n_slots) {
memset(client, 0, sizeof(*client));
client->cfg = cfg;
client->symbols = symbols;
client->books = books;
client->evaluator = evaluator;
client->running = true;
client->slots = slots;
client->n_slots = n_slots;
SSL_CTX *shared_ctx = create_ssl_ctx();
if (!shared_ctx) {
log_write("[WS] Failed to create SSL context\n");
return -1;
}
for (uint32_t i = 0; i < WS_MAX_CONNECTIONS; i++) {
ws_connection_t *conn = &client->connections[i];
conn->fd = -1;
conn->ctx = shared_ctx;
conn->reconnect_base_delay = cfg->reconnect_base_delay;
conn->reconnect_max_delay = cfg->reconnect_max_delay;
conn->reconnect_delay = cfg->reconnect_base_delay;
strncpy(conn->host, "ws-api-spot.kucoin.com", sizeof(conn->host));
conn->port = 443;
}
client->connection_count = 1;
client->fill_ch = fill_channel_create();
if (!client->fill_ch) {
log_write("[WS] Failed to create fill channel\n");
}
pthread_mutex_init(&client->balance_lock, NULL);
pthread_mutex_init(&client->order_slots_lock, NULL);
client->balance_wake_fd = eventfd(0, EFD_NONBLOCK);
if (client->balance_wake_fd < 0) {
log_write("[WS] Failed to create balance wake eventfd\n");
}
return 0;
}
void ws_client_register_order_slot(ws_client_t *client,
const char *order_id, int slot_index) {
if (!order_id || !order_id[0]) return;
pthread_mutex_lock(&client->order_slots_lock);
for (int i = 0; i < MAX_ORDER_SLOT_ENTRIES; i++) {
if (!atomic_load_explicit(&client->order_slots[i].active, memory_order_relaxed)) {
strncpy(client->order_slots[i].order_id, order_id, sizeof(client->order_slots[i].order_id) - 1);
client->order_slots[i].slot_index = slot_index;
atomic_store_explicit(&client->order_slots[i].active, 1, memory_order_release);
pthread_mutex_unlock(&client->order_slots_lock);
return;
}
}
pthread_mutex_unlock(&client->order_slots_lock);
}
void ws_client_destroy(ws_client_t *client) {
client->running = false;
SSL_CTX *ctx = NULL;
for (uint32_t i = 0; i < client->connection_count; i++) {
ws_connection_t *conn = &client->connections[i];
if (i == 0 && conn->ctx) ctx = conn->ctx;
ws_connection_reset(conn);
conn->ctx = NULL;
}
if (ctx) SSL_CTX_free(ctx);
if (client->balance_wake_fd >= 0) close(client->balance_wake_fd);
pthread_mutex_destroy(&client->balance_lock);
}
/*
* Fetch a WebSocket token and server endpoint from KuCoin's /api/v1/bullet-public.
* Stores token, host, ping_interval_ms, ping_timeout_ms in the connection struct.
*/
int ws_client_fetch_token_priv(ws_connection_t *conn, const config_t *cfg) {
int out_len = 0;
char *response = NULL;
/* Try private token first (needed for tradeOrdersV2 + balance). Fall
back to public token (level2Depth only) if no API key configured. */
if (cfg && cfg->kucoin_api_key[0] && cfg->kucoin_api_secret[0]) {
response = https_post_auth("api.kucoin.com", 443, "/api/v1/bullet-private",
cfg->kucoin_api_key,
cfg->kucoin_api_secret,
cfg->kucoin_api_passphrase,
"{}", &out_len);
}
if (!response || out_len <= 0) {
free(response);
response = https_post("api.kucoin.com", 443, "/api/v1/bullet-public",
"", 0, &out_len);
}
if (!response || out_len <= 0) {
log_write("[WS] Failed to fetch token\n");
free(response);
return -1;
}
cJSON *root = cJSON_Parse(response);
free(response);
if (!root) {
log_write("[WS] Failed to parse token response\n");
return -1;
}
cJSON *data = cJSON_GetObjectItem(root, "data");
if (!cJSON_IsObject(data)) {
log_write("[WS] No 'data' in token response\n");
cJSON_Delete(root);
return -1;
}
cJSON *token = cJSON_GetObjectItem(data, "token");
if (cJSON_IsString(token)) {
strncpy(conn->token, token->valuestring, sizeof(conn->token) - 1);
}
cJSON *servers = cJSON_GetObjectItem(data, "instanceServers");
if (cJSON_IsArray(servers) && cJSON_GetArraySize(servers) > 0) {
cJSON *inst = cJSON_GetArrayItem(servers, 0);
cJSON *endpoint = cJSON_GetObjectItem(inst, "endpoint");
cJSON *pingInterval = cJSON_GetObjectItem(inst, "pingInterval");
cJSON *pingTimeout = cJSON_GetObjectItem(inst, "pingTimeout");
if (cJSON_IsString(endpoint)) {
const char *ep = endpoint->valuestring;
const char *slash_pair = strstr(ep, "//");
char host_start[256] = {0};
if (slash_pair) {
strncpy(host_start, slash_pair + 2, sizeof(host_start) - 1);
} else {
strncpy(host_start, ep, sizeof(host_start) - 1);
}
char *slash = strchr(host_start, '/');
if (slash) *slash = '\0';
size_t hl = strlen(host_start);
if (hl >= sizeof(conn->host)) hl = sizeof(conn->host) - 1;
memcpy(conn->host, host_start, hl);
conn->host[hl] = '\0';
}
if (cJSON_IsNumber(pingInterval)) conn->ping_interval_ms = (uint32_t)pingInterval->valuedouble;
if (cJSON_IsNumber(pingTimeout)) conn->ping_timeout_ms = (uint32_t)pingTimeout->valuedouble;
}
if (!conn->token[0]) {
log_write("[WS] Empty token received\n");
cJSON_Delete(root);
return -1;
}
if (!conn->ping_interval_ms) conn->ping_interval_ms = 18000;
if (!conn->ping_timeout_ms) conn->ping_timeout_ms = 10000;
log_write("[WS] Token fetched, ping_interval=%u ms, host='%s'\n", conn->ping_interval_ms, conn->host);
cJSON_Delete(root);
return 0;
}
int ws_client_connect(ws_client_t *client, uint32_t conn_idx) {
if (conn_idx >= WS_MAX_CONNECTIONS) return -1;
ws_connection_t *conn = &client->connections[conn_idx];
if (conn->state == WS_STATE_CONNECTED) return 0;
ws_connection_reset(conn);
if (!conn->ctx) {
conn->ctx = create_ssl_ctx();
if (!conn->ctx) return -1;
}
if (!conn->token[0]) {
conn->state = WS_STATE_GETTING_TOKEN;
if (ws_client_fetch_token_priv(conn, client->cfg) != 0) return -1;
}
snprintf(conn->connect_id, sizeof(conn->connect_id), "fused-%" PRIu32, conn_idx + 1);
conn->fd = resolve_and_connect(conn->host, conn->port);
if (conn->fd < 0) {
log_write("[WS] Connection failed for %s:%d\n", conn->host, conn->port);
return -1;
}
{
int opt = 1;
setsockopt(conn->fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
int rcvbuf = 256 * 1024;
setsockopt(conn->fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf));
}
if (setup_tls(conn) != 0) {
log_write("[WS] TLS setup failed\n");
close(conn->fd);
conn->fd = -1;
return -1;
}
char url[512];
snprintf(url, sizeof(url),
"GET /?token=%s&connectId=%s HTTP/1.1\r\n"
"Host: %s\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
"Sec-WebSocket-Version: 13\r\n"
"\r\n",
conn->token, conn->connect_id, conn->host);
int r = ws_client_write(conn, url, strlen(url));
if (r < 0) {
log_write("[WS] Failed to send upgrade request\n");
return -1;
}
conn->state = WS_STATE_CONNECTING;
conn->last_activity_ms = now_ms_impl();
// Read HTTP 101 Switching Protocols response
char resp_buf[1024];
int resp_len = SSL_read(conn->ssl, resp_buf, sizeof(resp_buf) - 1);
if (resp_len <= 0) {
int err = SSL_get_error(conn->ssl, resp_len);
log_write("[WS] No response to upgrade request: SSL_error=%d\n", err);
return -1;
}
resp_buf[resp_len] = '\0';
if (strstr(resp_buf, "101 Switching Protocols") == NULL) {
log_write("[WS] Upgrade failed:\n%s\n", resp_buf);
return -1;
}
conn->state = WS_STATE_CONNECTED;
log_write("[WS] Connected to %s:%d\n", conn->host, conn->port);
if (conn->symbol_count > 0) {
ws_client_subscribe(client, conn_idx, conn->symbol_indices, conn->symbol_count);
}
/* Subscribe to private channels — only on connection 0 to avoid
duplicate delivery of order/balance events across connections */
if (conn_idx == 0) {
char msg[256];
snprintf(msg, sizeof(msg),
"{\"type\":\"subscribe\",\"topic\":\"/spotMarket/tradeOrdersV2\","
"\"response\":true,\"privateChannel\":\"true\"}");
ws_send_text(conn, msg);
log_write("[WS] Subscribed to tradeOrdersV2\n");
snprintf(msg, sizeof(msg),
"{\"type\":\"subscribe\",\"topic\":\"/account/balance\","
"\"response\":true,\"privateChannel\":\"true\"}");
ws_send_text(conn, msg);
log_write("[WS] Subscribed to /account/balance\n");
}
return 0;
}
void ws_client_disconnect(ws_client_t *client, uint32_t conn_idx) {
if (conn_idx >= WS_MAX_CONNECTIONS) return;
ws_connection_t *conn = &client->connections[conn_idx];
ws_connection_reset(conn);
}
int ws_client_write(ws_connection_t *conn, const void *data, size_t len) {
if (conn->ssl == NULL) return -1;
int written = 0;
while ((size_t)written < len) {
int r = SSL_write(conn->ssl, (const char *)data + written, len - written);
if (r <= 0) {
int err = SSL_get_error(conn->ssl, r);
log_write("[WS] SSL_write error: %d\n", err);
conn->state = WS_STATE_DISCONNECTED;
return -1;
}
written += r;
}
return written;
}
/*
* Build and send an RFC 6455 WebSocket frame with masking.
* Client frames MUST be masked (bit 0x80 in byte 1).
* Supports payload lengths < 126 (inline), < 65536 (16-bit ext), and >= 65536 (64-bit ext).
* Masking key derived from monotonic clock to avoid predictable patterns.
*/
/* Build and send an RFC 6455 WebSocket frame with masking.
Supports payload lengths < 126 (inline), < 65536 (16-bit ext), >= 65536 (64-bit ext).
Masking key derived from monotonic clock. Returns bytes sent on success, -1 on error. */
static int ws_send_frame(ws_connection_t *conn, uint8_t opcode,
const uint8_t *payload, size_t payload_len) {
uint8_t header[14];
int hdr_len = 2;
header[0] = 0x80 | opcode;
if (payload_len < 126) {
header[1] = 0x80 | (uint8_t)payload_len;
} else if (payload_len < 65536) {
header[1] = 0x80 | 126;
header[2] = (uint8_t)(payload_len >> 8);
header[3] = (uint8_t)(payload_len & 0xFF);
hdr_len = 4;
} else {
header[1] = 0x80 | 127;
uint64_t len64 = (uint64_t)payload_len;
for (int i = 0; i < 8; i++) {
header[2 + i] = (uint8_t)(len64 >> ((7 - i) * 8));
}
hdr_len = 10;
}
uint8_t mask[4];
uint64_t m_now = now_ms_impl();
mask[0] = (uint8_t)(m_now & 0xFF);
mask[1] = (uint8_t)((m_now >> 8) & 0xFF);
mask[2] = (uint8_t)((m_now >> 16) & 0xFF);
mask[3] = (uint8_t)((m_now >> 24) & 0xFF);
memcpy(header + hdr_len, mask, 4);
hdr_len += 4;
int total = 0;
int r = ws_client_write(conn, header, (size_t)hdr_len);
if (r < 0) return r;
total += r;
if (payload_len > 0) {
// XOR payload with mask before sending (RFC 6455 §5.3)
uint8_t buf[4096];
size_t off = 0;
while (off < payload_len) {
size_t chunk = payload_len - off;
if (chunk > sizeof(buf)) chunk = sizeof(buf);
for (size_t i = 0; i < chunk; i++) {
buf[i] = payload[off + i] ^ mask[(off + i) % 4];
}
r = ws_client_write(conn, buf, chunk);
if (r < 0) return r;
total += r;
off += chunk;
}
}
conn->last_activity_ms = now_ms_impl();
return total;
}
static int ws_send_text(ws_connection_t *conn, const char *text) {
return ws_send_frame(conn, 0x1, (const uint8_t *)text, strlen(text));
}
int ws_client_send_ping(ws_connection_t *conn) {
return ws_send_frame(conn, 0x9, NULL, 0);
}
int ws_client_subscribe(ws_client_t *client, uint32_t conn_idx,
const uint16_t *symbol_indices, uint32_t count) {
if (conn_idx >= WS_MAX_CONNECTIONS) return -1;
ws_connection_t *conn = &client->connections[conn_idx];
// Batch subscriptions: KuCoin accepts up to ~100 symbols per subscribe message
for (uint32_t batch = 0; batch < count; batch += 100) {
uint32_t batch_end = batch + 100;
if (batch_end > count) batch_end = count;
uint32_t batch_count = batch_end - batch;
// Build topic: /spotMarket/level2Depth5:S1,S2,...
char sym_list[4096] = {0};
for (uint32_t i = batch; i < batch_end; i++) {
char sep = (i == batch) ? ':' : ',';
char part[SYMBOL_NAME_LEN + 1];
strncpy(part, client->symbols->entries[symbol_indices[i]].name, SYMBOL_NAME_LEN);
part[SYMBOL_NAME_LEN] = '\0';
char tmp[SYMBOL_NAME_LEN + 2];
snprintf(tmp, sizeof(tmp), "%c%s", sep, part);
strncat(sym_list, tmp, sizeof(sym_list) - 1);
}
char msg[4600];
snprintf(msg, sizeof(msg),
"{\"type\":\"subscribe\",\"topic\":\"/spotMarket/level2Depth5%s\",\"response\":true}",
sym_list);
int r = ws_send_text(conn, msg);
if (r < 0) {
log_write("[WS] Subscribe failed for batch %u\n", batch);
return -1;
}
log_write("[WS] Subscribed to %u symbols (batch %u-%u)\n",
batch_count, batch, batch_end - 1);
}
conn->symbol_count = count;
for (uint32_t i = 0; i < count && i < MAX_SYMBOLS; i++) {
conn->symbol_indices[i] = symbol_indices[i];
}
conn->state = WS_STATE_CONNECTED;
return 0;
}
int ws_client_unsubscribe(ws_client_t *client, uint32_t conn_idx,
const uint16_t *symbol_indices, uint32_t count) {
if (conn_idx >= WS_MAX_CONNECTIONS) return -1;
ws_connection_t *conn = &client->connections[conn_idx];
char topic[4096];
topic[0] = '\0';
for (uint32_t i = 0; i < count; i++) {
char sep = (i == 0) ? ':' : ',';
char part[SYMBOL_NAME_LEN + 1];
strncpy(part, client->symbols->entries[symbol_indices[i]].name, SYMBOL_NAME_LEN);
part[SYMBOL_NAME_LEN] = '\0';
char tmp[SYMBOL_NAME_LEN + 2];
snprintf(tmp, sizeof(tmp), "%c%s", sep, part);
strncat(topic, tmp, sizeof(topic) - 1);
}
char full_topic[4096];
snprintf(full_topic, sizeof(full_topic), "/spotMarket/level2Depth5:%s", topic);
char msg[4600];
snprintf(msg, sizeof(msg),
"{\"type\":\"unsubscribe\",\"topic\":\"%s\",\"response\":true}", full_topic);
return ws_send_text(conn, msg);
}
/*
* Parse a KuCoin level2Depth5 book update JSON (cJSON) and update the
* in-memory order book. Topic format: /spotMarket/level2Depth5:{symbol}
* Extracts timestamp/sequence, bids, asks (each [price, size] pair).
* Returns symbol index on success, -1 on failure.
* The caller is responsible for calling evaluate_symbol afterwards
* (coalesced per-symbol batching is done in ws_client_read).
*/
static int16_t parse_book_update(cJSON *root, ws_client_t *client) {
cJSON *type = cJSON_GetObjectItem(root, "type");
if (!cJSON_IsString(type) || strcmp(type->valuestring, "message") != 0)
return -1;
cJSON *topic = cJSON_GetObjectItem(root, "topic");
cJSON *data = cJSON_GetObjectItem(root, "data");
if (!cJSON_IsString(topic) || !cJSON_IsObject(data))
return -1;
const char *topic_str = topic->valuestring;
const char *sym_start = strstr(topic_str, "level2Depth5:");
if (!sym_start) return -1;
sym_start += 13;
char symbol[SYMBOL_NAME_LEN] = {0};
strncpy(symbol, sym_start, SYMBOL_NAME_LEN - 1);
char *comma = strchr(symbol, ',');
if (comma) *comma = '\0';
int16_t sym_idx = symbol_table_lookup(client->symbols, symbol);
if (sym_idx < 0) return -1;
order_book_t *book = &client->books[sym_idx];
cJSON *ts_val = cJSON_GetObjectItem(data, "timestamp");
cJSON *seq_val = cJSON_GetObjectItem(data, "sequence");
cJSON *seqNum_val = cJSON_GetObjectItem(data, "sequenceNum");
if (cJSON_IsNumber(ts_val)) book->ts_ms = (int64_t)ts_val->valuedouble;
if (!book->ts_ms && cJSON_IsNumber(seq_val))
book->ts_ms = (int64_t)seq_val->valuedouble;
if (!book->ts_ms) {
cJSON *time_val = cJSON_GetObjectItem(data, "time");
if (cJSON_IsNumber(time_val))
book->ts_ms = (int64_t)time_val->valuedouble;
}
if (cJSON_IsNumber(seq_val))
book->sequence = (int64_t)seq_val->valuedouble;
else if (cJSON_IsNumber(seqNum_val))
book->sequence = (int64_t)seqNum_val->valuedouble;
cJSON *bids = cJSON_GetObjectItem(data, "bids");
cJSON *asks = cJSON_GetObjectItem(data, "asks");
if (cJSON_IsArray(bids)) {
int count = 0;
cJSON *bid;
cJSON_ArrayForEach(bid, bids) {
if (count >= MAX_BOOK_LEVELS) break;
if (cJSON_IsArray(bid) && cJSON_GetArraySize(bid) >= 2) {
cJSON *price = cJSON_GetArrayItem(bid, 0);
cJSON *size = cJSON_GetArrayItem(bid, 1);
double p = cJSON_IsNumber(price) ? price->valuedouble :
cJSON_IsString(price) ? atof(price->valuestring) : 0.0;
double s = cJSON_IsNumber(size) ? size->valuedouble :
cJSON_IsString(size) ? atof(size->valuestring) : 0.0;
if (p > 0 && s > 0) {
book->bids[count][0] = p;
book->bids[count][1] = s;
count++;
}
}
}
book->bid_count = (uint8_t)count;
}
if (cJSON_IsArray(asks)) {
int count = 0;
cJSON *ask;
cJSON_ArrayForEach(ask, asks) {
if (count >= MAX_BOOK_LEVELS) break;
if (cJSON_IsArray(ask) && cJSON_GetArraySize(ask) >= 2) {
cJSON *price = cJSON_GetArrayItem(ask, 0);
cJSON *size = cJSON_GetArrayItem(ask, 1);
double p = cJSON_IsNumber(price) ? price->valuedouble :
cJSON_IsString(price) ? atof(price->valuestring) : 0.0;
double s = cJSON_IsNumber(size) ? size->valuedouble :
cJSON_IsString(size) ? atof(size->valuestring) : 0.0;
if (p > 0 && s > 0) {
book->asks[count][0] = p;
book->asks[count][1] = s;
count++;
}
}
}
book->ask_count = (uint8_t)count;
}
book->symbol_idx = (uint16_t)sym_idx;
size_t sl = strlen(symbol);
if (sl >= sizeof(book->symbol)) sl = sizeof(book->symbol) - 1;
memcpy(book->symbol, symbol, sl);
book->symbol[sl] = '\0';
return sym_idx;
}
/*
* Process a single complete WebSocket frame after it has been fully read.
* Handles: ping (0x9) -> pong (0xA), close (0x8), text (0x1).
* For book-update text frames (type=message), uses jsmn (zero-alloc)
* and returns the symbol index so the caller can batch evaluations.
* For other text frames, falls back to cJSON parse.
* Returns symbol index on book update, -1 otherwise.
*/
int16_t ws_client_process_frame(ws_client_t *client, uint32_t conn_idx) {
if (conn_idx >= WS_MAX_CONNECTIONS) return -1;
ws_connection_t *conn = &client->connections[conn_idx];
uint8_t *payload = conn->frame_buf;
size_t payload_len = conn->frame_payload_len;
uint8_t opcode = conn->frame_opcode;
if (payload_len == 0 && opcode != 0x8 && opcode != 0xA) {
conn->frame_payload_len = 0;
conn->frame_finished = false;
return -1;
}
if (opcode == 0x9) {
ws_send_frame(conn, 0xA, payload, payload_len);
conn->last_activity_ms = now_ms_impl();
conn->frame_payload_len = 0;
conn->frame_finished = false;
return -1;
}
if (opcode == 0x8) {
if (payload_len >= 2) {
uint16_t code = ((uint16_t)payload[0] << 8) | payload[1];
char reason[128] = {0};
if (payload_len > 2) {
uint32_t rlen = payload_len - 2;
if (rlen > sizeof(reason) - 1) rlen = sizeof(reason) - 1;
memcpy(reason, payload + 2, rlen);
}
log_write("[WS] Close frame on conn %u: code=%u reason='%s'\n", conn_idx, code, reason);
} else {
log_write("[WS] Close frame received on conn %u\n", conn_idx);
}
conn->state = WS_STATE_DISCONNECTED;
conn->frame_payload_len = 0;
conn->frame_finished = false;
return -1;
}
if (opcode == 0xA) {
conn->frame_payload_len = 0;
conn->frame_finished = false;
return -1;
}
if (opcode == 0x1) {
cJSON *msg_root = cJSON_ParseWithLength((const char *)payload, payload_len);
if (!msg_root) {
static int parse_fails = 0;
if (++parse_fails <= 3)
log_write("[WS] JSON parse fail: %.*s\n",
(int)(payload_len > 100 ? 100 : payload_len), (const char *)payload);
conn->frame_payload_len = 0;
conn->frame_finished = false;
return -1;
}
cJSON *msg_type = cJSON_GetObjectItem(msg_root, "type");
int16_t sym_idx = -1;
if (cJSON_IsString(msg_type)) {
if (strcmp(msg_type->valuestring, "welcome") == 0) {
log_write("[WS] Welcome message received\n");
} else if (strcmp(msg_type->valuestring, "ack") == 0) {
static int ack_count = 0;
if (++ack_count <= 5) log_write("[WS] Ack #%d: %.*s\n", ack_count,
(int)(payload_len > 200 ? 200 : payload_len), (const char *)payload);
} else if (strcmp(msg_type->valuestring, "message") == 0) {
cJSON *subject = cJSON_GetObjectItem(msg_root, "subject");
if (cJSON_IsString(subject) &&
strcmp(subject->valuestring, "orderChange") == 0) {
/* Private order-change event — dispatch fill */
cJSON *data = cJSON_GetObjectItem(msg_root, "data");
if (data && client->fill_ch) {
cJSON *oid = cJSON_GetObjectItem(data, "clientOid");
cJSON *evt_type = cJSON_GetObjectItem(data, "type");
cJSON *status = cJSON_GetObjectItem(data, "status");
cJSON *order_id = cJSON_GetObjectItem(data, "orderId");
if (cJSON_IsString(oid) && cJSON_IsString(evt_type)) {
fill_event_t fe = {0};
strncpy(fe.client_oid, oid->valuestring, MAX_CLIENT_OID - 1);
if (order_id && cJSON_IsString(order_id))
strncpy(fe.order_id, order_id->valuestring, sizeof(fe.order_id) - 1);
bool is_match = (strcmp(evt_type->valuestring, "match") == 0);
bool is_terminal = ((strcmp(evt_type->valuestring, "filled") == 0 ||
(strcmp(evt_type->valuestring, "canceled") == 0)) &&
cJSON_IsString(status) &&
strcmp(status->valuestring, "done") == 0);
/* Route to the correct slot's fill channel.
First try clientOid (slot encoded in bytes 1-2),
then fall back to orderId lookup. */
fill_channel_t *target_ch = client->fill_ch;
const char *coid = oid->valuestring;
if (coid[0] == 'c' && coid[1] && coid[2]) {
unsigned long slot_idx = 0;
for (int h = 1; h <= 2; h++) {
char c = coid[h];
slot_idx *= 16;
if (c >= '0' && c <= '9') slot_idx += (unsigned long)(c - '0');
else if (c >= 'a' && c <= 'f') slot_idx += (unsigned long)(c - 'a' + 10);
else if (c >= 'A' && c <= 'F') slot_idx += (unsigned long)(c - 'A' + 10);
}
if (slot_idx < (unsigned long)client->n_slots && client->slots)
target_ch = client->slots[slot_idx].fill_ch;
}
/* If clientOid routing failed, try orderId mapping */
if (target_ch == client->fill_ch && order_id && cJSON_IsString(order_id)) {
const char *oid_str = order_id->valuestring;
pthread_mutex_lock(&client->order_slots_lock);
for (int i = 0; i < MAX_ORDER_SLOT_ENTRIES; i++) {
if (atomic_load_explicit(&client->order_slots[i].active, memory_order_acquire) &&
strcmp(client->order_slots[i].order_id, oid_str) == 0) {
int si = client->order_slots[i].slot_index;
if (si >= 0 && si < client->n_slots && client->slots)
target_ch = client->slots[si].fill_ch;
break;
}
}
pthread_mutex_unlock(&client->order_slots_lock);
}
if (is_match) {
cJSON *ms = cJSON_GetObjectItem(data, "matchSize");
cJSON *mp = cJSON_GetObjectItem(data, "matchPrice");
cJSON *mf = cJSON_GetObjectItem(data, "matchFee");
fe.match_size = cJSON_IsString(ms) ? atof(ms->valuestring) :
cJSON_IsNumber(ms) ? ms->valuedouble : 0;
fe.match_price = cJSON_IsString(mp) ? atof(mp->valuestring) :
cJSON_IsNumber(mp) ? mp->valuedouble : 0;
fe.match_fee = cJSON_IsString(mf) ? atof(mf->valuestring) :
cJSON_IsNumber(mf) ? mf->valuedouble : 0;
if (!fill_channel_push(target_ch, &fe) &&
++client->fill_drop_warn <= 3)
log_write("[WS] Fill ring full, dropping match\n");
} else if (is_terminal) {
fe.is_terminal = true;
if (!fill_channel_push(target_ch, &fe) &&
++client->fill_drop_warn <= 3)
log_write("[WS] Fill ring full, dropping terminal\n");
}
}
}
} else if (cJSON_IsString(subject) &&
strcmp(subject->valuestring, "account.balance") == 0) {
cJSON *data = cJSON_GetObjectItem(msg_root, "data");
if (data) {
cJSON *ccy = cJSON_GetObjectItem(data, "currency");
cJSON *avail = cJSON_GetObjectItem(data, "available");
if (cJSON_IsString(ccy) && cJSON_IsString(avail)) {
pthread_mutex_lock(&client->balance_lock);
int idx = -1;
for (int i = 0; i < client->balance_count; i++) {
if (strcmp(client->balance_cache[i].currency, ccy->valuestring) == 0) {
idx = i; break;
}
}
if (idx < 0 && client->balance_count < MAX_BALANCE_ENTRIES) {
idx = client->balance_count++;
strncpy(client->balance_cache[idx].currency, ccy->valuestring, 15);
}
if (idx >= 0)
client->balance_cache[idx].available = atof(avail->valuestring);
pthread_mutex_unlock(&client->balance_lock);
if (client->balance_wake_fd >= 0) {
uint64_t one = 1;
if (write(client->balance_wake_fd, &one, sizeof(one)) < 0) {}
}
}
}
} else {
sym_idx = parse_book_update(msg_root, client);
}
} else if (strcmp(msg_type->valuestring, "error") == 0) {
log_write("[WS] Error message: %.*s\n",
(int)(payload_len > 200 ? 200 : payload_len), (const char *)payload);
}
}
cJSON_Delete(msg_root);
conn->frame_payload_len = 0;
conn->frame_finished = false;
return sym_idx;
}
conn->frame_payload_len = 0;
conn->frame_finished = false;
return -1;
}
/*
* Read raw bytes from SSL socket and reassemble WebSocket frames.
* Handles mask unmasking (RFC 6455: server frames are unmasked) and
* the extended payload length encoding (16-bit and 64-bit extended fields).
* Accumulates partial frames across multiple read() calls.
* Dispatches complete frames via ws_client_process_frame.
*/
int ws_client_read(ws_client_t *client, uint32_t conn_idx) {
if (conn_idx >= WS_MAX_CONNECTIONS) return -1;
ws_connection_t *conn = &client->connections[conn_idx];
if (conn->state != WS_STATE_CONNECTED) return -1;
int r = SSL_read(conn->ssl, conn->read_buf + conn->read_len,
WS_READ_BUF_SIZE - conn->read_len);
if (r < 0) {
int err = SSL_get_error(conn->ssl, r);
if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) return 0;
log_write("[WS] SSL_read error: %d\n", err);
conn->state = WS_STATE_DISCONNECTED;
return -1;
}
if (r == 0) {
log_write("[WS] Connection closed on conn %u\n", conn_idx);
conn->state = WS_STATE_DISCONNECTED;
return -1;
}
conn->read_len += (size_t)r;
conn->t_sock_arrive_ms = (int64_t)now_realtime_ms();
/* Coalesce: track updated symbols, evaluate once per symbol at end */
#define MAX_DIRTY_BATCH 2048
uint16_t dirty[MAX_DIRTY_BATCH];
uint32_t dirty_count = 0;
// Process all complete frames in the read buffer
while (conn->read_len >= 2) {
if (!conn->frame_finished && conn->frame_payload_len == 0) {
uint8_t fin = conn->read_buf[0] & 0x80;
bool masked = (conn->read_buf[1] & 0x80) != 0;
uint64_t payload_len_raw = conn->read_buf[1] & 0x7F;
size_t hdr_len = 2;
if (payload_len_raw == 126) {
if (conn->read_len < 4) break;
payload_len_raw = ((uint64_t)conn->read_buf[2] << 8) | conn->read_buf[3];
hdr_len = 4;
} else if (payload_len_raw == 127) {
if (conn->read_len < 10) break;
payload_len_raw = 0;
for (int i = 0; i < 8; i++) {
payload_len_raw = (payload_len_raw << 8) | conn->read_buf[2 + i];
}
hdr_len = 10;
}
if (masked) hdr_len += 4;
if (conn->read_len < hdr_len) break;
if (payload_len_raw > WS_MAX_FRAME_SIZE) {
log_write("[WS] Frame too large: %lu\n", (unsigned long)payload_len_raw);
conn->state = WS_STATE_DISCONNECTED;
return -1;
}
// Unmask payload in-place (server frames from KuCoin are unmasked,
// but handle masking per spec just in case)
if (masked) {
uint8_t mask[4];
memcpy(mask, conn->read_buf + hdr_len - 4, 4);
for (uint64_t i = 0; i < payload_len_raw; i++) {
conn->read_buf[hdr_len + i] ^= mask[i % 4];
}
}
size_t frame_size = hdr_len + (size_t)payload_len_raw;
if (conn->read_len < frame_size) break;
conn->frame_payload_len = (size_t)payload_len_raw;
conn->frame_opcode = conn->read_buf[0] & 0x0F;
memcpy(conn->frame_buf, conn->read_buf + hdr_len, payload_len_raw);
conn->frame_finished = (fin != 0);
// Consume the frame data from the read buffer
memmove(conn->read_buf, conn->read_buf + frame_size, conn->read_len - frame_size);
conn->read_len -= frame_size;
}
if (conn->frame_finished && conn->frame_payload_len > 0) {
int16_t sym_idx = ws_client_process_frame(client, conn_idx);
if (sym_idx >= 0 && dirty_count < MAX_DIRTY_BATCH) {
bool seen = false;
for (uint32_t d = 0; d < dirty_count; d++) {
if (dirty[d] == (uint16_t)sym_idx) { seen = true; break; }
}
if (!seen) dirty[dirty_count++] = (uint16_t)sym_idx;
}
}
if (conn->frame_finished && conn->frame_payload_len == 0) {
conn->frame_finished = false;
}
if (conn->read_len == 0) break;
}
/* Flush: evaluate all symbols updated in this burst */
if (dirty_count > 0) {
double t_arrive = now_realtime_ms();
for (uint32_t d = 0; d < dirty_count; d++) {
evaluate_symbol(client->evaluator, dirty[d],
conn->t_sock_arrive_ms, t_arrive);
}
}
return 0;
}
bool ws_client_await_balance(ws_client_t *client, const char *currency,
double min_amount, int64_t timeout_ms) {
int64_t deadline = (int64_t)ws_client_now_ms() + timeout_ms;
/* Check cache first */
{
pthread_mutex_lock(&client->balance_lock);
double avail = 0;
bool found = false;
for (int i = 0; i < client->balance_count; i++) {
if (strcmp(client->balance_cache[i].currency, currency) == 0) {
avail = client->balance_cache[i].available;
found = true;
break;
}
}
pthread_mutex_unlock(&client->balance_lock);
if (found && avail >= min_amount - 1e-12)
return true;
}
/* Wait on the balance wake eventfd */
while ((int64_t)ws_client_now_ms() < deadline) {
struct pollfd pfd = { .fd = client->balance_wake_fd, .events = POLLIN };
int remaining = (int)(deadline - (int64_t)ws_client_now_ms());
if (remaining <= 0) break;
int ret = poll(&pfd, 1, remaining);
if (ret > 0) {
uint64_t val;
if (read(client->balance_wake_fd, &val, sizeof(val)) < 0) {}
}
/* Re-check the cache on any wakeup or timeout */
pthread_mutex_lock(&client->balance_lock);
double avail = 0;
bool found = false;
for (int i = 0; i < client->balance_count; i++) {
if (strcmp(client->balance_cache[i].currency, currency) == 0) {
avail = client->balance_cache[i].available;
found = true;
break;
}
}
pthread_mutex_unlock(&client->balance_lock);
if (found && avail >= min_amount - 1e-12)
return true;
}
return false;
}
double ws_client_latest_balance(ws_client_t *client, const char *currency) {
pthread_mutex_lock(&client->balance_lock);
double avail = 0;
for (int i = 0; i < client->balance_count; i++) {
if (strcmp(client->balance_cache[i].currency, currency) == 0) {
avail = client->balance_cache[i].available;
break;
}
}
pthread_mutex_unlock(&client->balance_lock);
return avail;
}