diff --git a/executor/ws_client.py b/executor/ws_client.py index e816da5..a68a395 100644 --- a/executor/ws_client.py +++ b/executor/ws_client.py @@ -39,6 +39,7 @@ class FillAccumulator: self.match_count = 0 self.side = "" self.symbol = "" + self._done = False def add_match(self, data: dict) -> None: match_price = Decimal(str(data.get("matchPrice", "0"))) @@ -163,6 +164,13 @@ class KuCoinWSClient: future: asyncio.Future = asyncio.get_event_loop().create_future() self._fill_futures[client_oid] = future + if client_oid in self._accumulators and self._accumulators[client_oid]._done: + acc = self._accumulators[client_oid] + result = (True, acc.to_dict()) + del self._accumulators[client_oid] + self._fill_futures.pop(client_oid, None) + return result + try: await asyncio.wait_for(future, timeout=timeout_ms / 1000.0) except asyncio.TimeoutError: @@ -364,6 +372,8 @@ class KuCoinWSClient: future = self._fill_futures[client_oid] if not future.done(): future.set_result((True, fill_data)) + elif client_oid in self._accumulators: + self._accumulators[client_oid]._done = True elif event_type in ("canceled", "failed"): self._log.warning( @@ -398,6 +408,8 @@ class KuCoinWSClient: future = self._fill_futures[client_oid] if not future.done(): future.set_result((True, fill_data)) + elif client_oid in self._accumulators: + self._accumulators[client_oid]._done = True return if client_oid in self._fill_futures: diff --git a/src/ws_client.c b/src/ws_client.c index 2a93197..b667ad4 100644 --- a/src/ws_client.c +++ b/src/ws_client.c @@ -11,6 +11,9 @@ #include "ws_client.h" #include "http_client.h" #include "cJSON.h" + +#define JSMN_STATIC +#include "jsmn.h" #include #include #include @@ -507,129 +510,222 @@ int ws_client_unsubscribe(ws_client_t *client, uint32_t conn_idx, } /* - * Parse a KuCoin level2Depth5 book update JSON and update the in-memory order book. + * Parse a KuCoin level2Depth5 book update JSON using jsmn (zero-alloc). * Topic format: /spotMarket/level2Depth5:{symbol} - * Extracts timestamp, sequence, bids array, asks array (each [price, size]). - * Falls back through timestamp fields: timestamp -> sequence -> time. - * Converts both numeric and string-encoded price/size values. - * After updating the book, triggers evaluate_symbol for the updated symbol. + * Extracts timestamp/sequence, bids, asks (each [price, size] pair). + * Returns symbol index on success, -1 on failure. + * The caller is responsible for calling evaluate_symbol afterwards + * (coalesced per-symbol batching is done in ws_client_read). */ -static void parse_book_update(ws_connection_t *conn, cJSON *root, - ws_client_t *client, uint32_t conn_idx) { - cJSON *type = cJSON_GetObjectItem(root, "type"); - if (type && cJSON_IsString(type) && strcmp(type->valuestring, "message") == 0) { - cJSON *topic = cJSON_GetObjectItem(root, "topic"); - cJSON *data = cJSON_GetObjectItem(root, "data"); - if (!cJSON_IsString(topic) || !cJSON_IsObject(data)) { - return; +#define JSMN_BOOK_TOKENS 128 + +static bool jsmn_eq(const char *json, const jsmntok_t *tok, const char *str) { + size_t len = strlen(str); + return tok->type == JSMN_STRING + && (size_t)(tok->end - tok->start) == len + && memcmp(json + tok->start, str, len) == 0; +} + +static int16_t parse_book_update(ws_client_t *client, const char *payload, + size_t payload_len) { + jsmn_parser parser; + jsmntok_t tokens[JSMN_BOOK_TOKENS]; + + jsmn_init(&parser); + int ntokens = jsmn_parse(&parser, payload, payload_len, + tokens, JSMN_BOOK_TOKENS); + if (ntokens <= 0 || tokens[0].type != JSMN_OBJECT) return -1; + + /* Walk top-level keys: "type", "topic", "data" */ + const char *topic_start = NULL; + int topic_len = 0; + const jsmntok_t *data_obj = NULL; + int n = tokens[0].size; + int pos = 1; + + for (int i = 0; i < n && pos < ntokens; i++) { + const jsmntok_t *key = &tokens[pos]; + const jsmntok_t *val = &tokens[pos + 1]; + + if (jsmn_eq(payload, key, "type")) { + if (!jsmn_eq(payload, val, "message") && val->type != JSMN_STRING) + return -1; + } else if (jsmn_eq(payload, key, "topic")) { + topic_start = payload + val->start; + topic_len = val->end - val->start; + } else if (jsmn_eq(payload, key, "data") && val->type == JSMN_OBJECT) { + data_obj = val; } - // Extract symbol from topic: /spotMarket/level2Depth5:{symbol} - const char *topic_str = topic->valuestring; - const char *sym_start = strstr(topic_str, "level2Depth5:"); - if (!sym_start) { return; } - sym_start += 13; - char symbol[SYMBOL_NAME_LEN] = {0}; - strncpy(symbol, sym_start, SYMBOL_NAME_LEN - 1); - char *comma = strchr(symbol, ','); - if (comma) *comma = '\0'; - - int16_t sym_idx = symbol_table_lookup(client->symbols, symbol); - if (sym_idx < 0) { return; } - - order_book_t *book = &client->books[sym_idx]; - - // Try multiple timestamp fields (KuCoin version-dependent) - cJSON *ts_val = cJSON_GetObjectItem(data, "timestamp"); - cJSON *seq_val = cJSON_GetObjectItem(data, "sequence"); - cJSON *seqNum_val = cJSON_GetObjectItem(data, "sequenceNum"); - if (cJSON_IsNumber(ts_val)) book->ts_ms = (int64_t)ts_val->valuedouble; - if (!book->ts_ms && cJSON_IsNumber(seq_val)) book->ts_ms = (int64_t)seq_val->valuedouble; - if (!book->ts_ms) { - cJSON *time_val = cJSON_GetObjectItem(data, "time"); - if (cJSON_IsNumber(time_val)) book->ts_ms = (int64_t)time_val->valuedouble; + /* Skip value and its children */ + if (val->type == JSMN_OBJECT) { + pos += 2; /* key + val */ + int children = val->size * 2; + for (int c = 0; c < children && pos < ntokens; c++) { + const jsmntok_t *ck = &tokens[pos]; + pos++; + if (ck->type == JSMN_OBJECT || ck->type == JSMN_ARRAY) { + int grand = ck->size; + if (ck->type == JSMN_OBJECT) grand *= 2; + for (int g = 0; g < grand && pos < ntokens; g++) pos++; + } + } + } else if (val->type == JSMN_ARRAY) { + pos += 2; + int children = val->size; + for (int c = 0; c < children && pos < ntokens; c++) { + const jsmntok_t *ca = &tokens[pos]; + pos++; + if (ca->type == JSMN_OBJECT || ca->type == JSMN_ARRAY) { + int grand = ca->size; + if (ca->type == JSMN_OBJECT) grand *= 2; + for (int g = 0; g < grand && pos < ntokens; g++) pos++; + } + } + } else { + pos += 2; } - if (cJSON_IsNumber(seq_val)) book->sequence = (int64_t)seq_val->valuedouble; - else if (cJSON_IsNumber(seqNum_val)) book->sequence = (int64_t)seqNum_val->valuedouble; + } - cJSON *bids = cJSON_GetObjectItem(data, "bids"); - cJSON *asks = cJSON_GetObjectItem(data, "asks"); + if (!topic_start || !data_obj) return -1; - if (cJSON_IsArray(bids)) { + /* Extract symbol from topic: /spotMarket/level2Depth5:{symbol} */ + const char *sym_start = NULL; + const char *marker = "/level2Depth5:"; + for (int i = 0; i <= topic_len - 14; i++) { + if (memcmp(topic_start + i, marker, 14) == 0) { + sym_start = topic_start + i + 14; + break; + } + } + if (!sym_start) return -1; + + char symbol[SYMBOL_NAME_LEN] = {0}; + int sym_len = (int)(topic_start + topic_len - sym_start); + if (sym_len >= SYMBOL_NAME_LEN) sym_len = SYMBOL_NAME_LEN - 1; + memcpy(symbol, sym_start, (size_t)sym_len); + char *comma = strchr(symbol, ','); + if (comma) *comma = '\0'; + + int16_t sym_idx = symbol_table_lookup(client->symbols, symbol); + if (sym_idx < 0) return -1; + + order_book_t *book = &client->books[sym_idx]; + + /* Walk "data" object keys: bids, asks, timestamp/sequence/time */ + int nd = data_obj->size; + int dpos = pos; + + /* Reset book bid/ask counts before filling */ + book->bid_count = 0; + book->ask_count = 0; + + for (int i = 0; i < nd && dpos < ntokens; i++) { + const jsmntok_t *dkey = &tokens[dpos]; + const jsmntok_t *dval = &tokens[dpos + 1]; + + if (jsmn_eq(payload, dkey, "bids") && dval->type == JSMN_ARRAY) { + int n_bids = dval->size; + int bpos = dpos + 2; int count = 0; - cJSON *bid; - cJSON_ArrayForEach(bid, bids) { - if (count >= MAX_BOOK_LEVELS) break; - if (cJSON_IsArray(bid) && cJSON_GetArraySize(bid) >= 2) { - cJSON *price = cJSON_GetArrayItem(bid, 0); - cJSON *size = cJSON_GetArrayItem(bid, 1); - double p = cJSON_IsNumber(price) ? price->valuedouble : - cJSON_IsString(price) ? atof(price->valuestring) : 0.0; - double s = cJSON_IsNumber(size) ? size->valuedouble : - cJSON_IsString(size) ? atof(size->valuestring) : 0.0; - if (p > 0 && s > 0) { - book->bids[count][0] = p; - book->bids[count][1] = s; + for (int b = 0; b < n_bids && count < MAX_BOOK_LEVELS && bpos < ntokens; b++) { + const jsmntok_t *entry = &tokens[bpos]; + if (entry->type == JSMN_ARRAY && entry->size >= 2) { + const jsmntok_t *p = &tokens[bpos + 1]; + const jsmntok_t *s = &tokens[bpos + 2]; + double price = atof(payload + p->start); + double size = atof(payload + s->start); + if (price > 0 && size > 0) { + book->bids[count][0] = price; + book->bids[count][1] = size; count++; } } + /* Skip entry array + its children */ + bpos += 1 + entry->size; } book->bid_count = (uint8_t)count; - } - - if (cJSON_IsArray(asks)) { + dpos = bpos; + } else if (jsmn_eq(payload, dkey, "asks") && dval->type == JSMN_ARRAY) { + int n_asks = dval->size; + int apos = dpos + 2; int count = 0; - cJSON *ask; - cJSON_ArrayForEach(ask, asks) { - if (count >= MAX_BOOK_LEVELS) break; - if (cJSON_IsArray(ask) && cJSON_GetArraySize(ask) >= 2) { - cJSON *price = cJSON_GetArrayItem(ask, 0); - cJSON *size = cJSON_GetArrayItem(ask, 1); - double p = cJSON_IsNumber(price) ? price->valuedouble : - cJSON_IsString(price) ? atof(price->valuestring) : 0.0; - double s = cJSON_IsNumber(size) ? size->valuedouble : - cJSON_IsString(size) ? atof(size->valuestring) : 0.0; - if (p > 0 && s > 0) { - book->asks[count][0] = p; - book->asks[count][1] = s; + for (int a = 0; a < n_asks && count < MAX_BOOK_LEVELS && apos < ntokens; a++) { + const jsmntok_t *entry = &tokens[apos]; + if (entry->type == JSMN_ARRAY && entry->size >= 2) { + const jsmntok_t *p = &tokens[apos + 1]; + const jsmntok_t *s = &tokens[apos + 2]; + double price = atof(payload + p->start); + double size = atof(payload + s->start); + if (price > 0 && size > 0) { + book->asks[count][0] = price; + book->asks[count][1] = size; count++; } } + apos += 1 + entry->size; } book->ask_count = (uint8_t)count; + dpos = apos; + } else if (jsmn_eq(payload, dkey, "timestamp") || + jsmn_eq(payload, dkey, "time")) { + if (!book->ts_ms) + book->ts_ms = (int64_t)atof(payload + dval->start); + dpos += 2; + } else if (jsmn_eq(payload, dkey, "sequence") || + jsmn_eq(payload, dkey, "sequenceNum")) { + book->sequence = (int64_t)atof(payload + dval->start); + if (!book->ts_ms) + book->ts_ms = book->sequence; + dpos += 2; + } else { + /* Skip unknown fields */ + if (dval->type == JSMN_OBJECT) { + dpos += 2 + dval->size * 2; + } else if (dval->type == JSMN_ARRAY) { + dpos += 2; + for (int c = 0; c < dval->size && dpos < ntokens; c++) { + dpos += 1 + tokens[dpos].size; + } + } else { + dpos += 2; + } } - - book->symbol_idx = (uint16_t)sym_idx; - strncpy(book->symbol, symbol, SYMBOL_NAME_LEN - 1); - - static uint64_t book_count = 0; - book_count++; - - int64_t t_arrive = (int64_t)now_realtime_ms(); - evaluate_symbol(client->evaluator, (uint16_t)sym_idx, conn->t_sock_arrive_ms, t_arrive); } + + book->symbol_idx = (uint16_t)sym_idx; + strncpy(book->symbol, symbol, SYMBOL_NAME_LEN - 1); + + return sym_idx; } /* * Process a single complete WebSocket frame after it has been fully read. * Handles: ping (0x9) -> pong (0xA), close (0x8), text (0x1). - * Text frames are JSON-parsed and dispatched: welcome, ack, message (-> book update), error. + * For book-update text frames (type=message), uses jsmn (zero-alloc) + * and returns the symbol index so the caller can batch evaluations. + * For other text frames, falls back to cJSON parse. + * Returns symbol index on book update, -1 otherwise. */ -void ws_client_process_frame(ws_client_t *client, uint32_t conn_idx) { - if (conn_idx >= WS_MAX_CONNECTIONS) return; +int16_t ws_client_process_frame(ws_client_t *client, uint32_t conn_idx) { + if (conn_idx >= WS_MAX_CONNECTIONS) return -1; ws_connection_t *conn = &client->connections[conn_idx]; uint8_t *payload = conn->frame_buf; size_t payload_len = conn->frame_payload_len; uint8_t opcode = conn->frame_opcode; - if (payload_len == 0 && opcode != 0x8 && opcode != 0xA) return; + if (payload_len == 0 && opcode != 0x8 && opcode != 0xA) { + conn->frame_payload_len = 0; + conn->frame_finished = false; + return -1; + } if (opcode == 0x9) { ws_send_frame(conn, 0xA, payload, payload_len); conn->last_activity_ms = now_ms_impl(); conn->frame_payload_len = 0; conn->frame_finished = false; - return; + return -1; } if (opcode == 0x8) { @@ -648,23 +744,36 @@ void ws_client_process_frame(ws_client_t *client, uint32_t conn_idx) { conn->state = WS_STATE_DISCONNECTED; conn->frame_payload_len = 0; conn->frame_finished = false; - return; + return -1; } if (opcode == 0xA) { conn->frame_payload_len = 0; conn->frame_finished = false; - return; + return -1; } if (opcode == 0x1) { + /* Quick check: book updates start with {"type":"message" */ + if (payload_len > 18 && + memcmp(payload, "{\"type\":\"message\"", 16) == 0) { + int16_t sym_idx = parse_book_update(client, + (const char *)payload, payload_len); + conn->frame_payload_len = 0; + conn->frame_finished = false; + return sym_idx; + } + + /* Other JSON messages: use cJSON (welcome, ack, error) */ cJSON *msg_root = cJSON_ParseWithLength((const char *)payload, payload_len); if (!msg_root) { static int parse_fails = 0; - if (++parse_fails <= 3) log_write("[WS] JSON parse fail: %.*s\n", (int)(payload_len > 100 ? 100 : payload_len), (const char *)payload); + if (++parse_fails <= 3) + log_write("[WS] JSON parse fail: %.*s\n", + (int)(payload_len > 100 ? 100 : payload_len), (const char *)payload); conn->frame_payload_len = 0; conn->frame_finished = false; - return; + return -1; } cJSON *msg_type = cJSON_GetObjectItem(msg_root, "type"); @@ -675,8 +784,6 @@ void ws_client_process_frame(ws_client_t *client, uint32_t conn_idx) { static int ack_count = 0; if (++ack_count <= 5) log_write("[WS] Ack #%d: %.*s\n", ack_count, (int)(payload_len > 200 ? 200 : payload_len), (const char *)payload); - } else if (strcmp(msg_type->valuestring, "message") == 0) { - parse_book_update(conn, msg_root, client, conn_idx); } else if (strcmp(msg_type->valuestring, "error") == 0) { log_write("[WS] Error message: %.*s\n", (int)(payload_len > 200 ? 200 : payload_len), (const char *)payload); @@ -688,6 +795,7 @@ void ws_client_process_frame(ws_client_t *client, uint32_t conn_idx) { conn->frame_payload_len = 0; conn->frame_finished = false; + return -1; } /* @@ -718,6 +826,11 @@ int ws_client_read(ws_client_t *client, uint32_t conn_idx) { conn->read_len += (size_t)r; conn->t_sock_arrive_ms = (int64_t)now_realtime_ms(); + /* Coalesce: track updated symbols, evaluate once per symbol at end */ + #define MAX_DIRTY_BATCH 64 + uint16_t dirty[MAX_DIRTY_BATCH]; + uint32_t dirty_count = 0; + // Process all complete frames in the read buffer while (conn->read_len >= 2) { if (!conn->frame_finished && conn->frame_payload_len == 0) { @@ -772,7 +885,14 @@ int ws_client_read(ws_client_t *client, uint32_t conn_idx) { } if (conn->frame_finished && conn->frame_payload_len > 0) { - ws_client_process_frame(client, conn_idx); + int16_t sym_idx = ws_client_process_frame(client, conn_idx); + if (sym_idx >= 0 && dirty_count < MAX_DIRTY_BATCH) { + bool seen = false; + for (uint32_t d = 0; d < dirty_count; d++) { + if (dirty[d] == (uint16_t)sym_idx) { seen = true; break; } + } + if (!seen) dirty[dirty_count++] = (uint16_t)sym_idx; + } } if (conn->frame_finished && conn->frame_payload_len == 0) { conn->frame_finished = false; @@ -781,5 +901,14 @@ int ws_client_read(ws_client_t *client, uint32_t conn_idx) { if (conn->read_len == 0) break; } + /* Flush: evaluate all symbols updated in this burst */ + if (dirty_count > 0) { + int64_t t_arrive = (int64_t)now_realtime_ms(); + for (uint32_t d = 0; d < dirty_count; d++) { + evaluate_symbol(client->evaluator, dirty[d], + conn->t_sock_arrive_ms, t_arrive); + } + } + return 0; } diff --git a/src/ws_client.h b/src/ws_client.h index b0784f4..1a066ff 100644 --- a/src/ws_client.h +++ b/src/ws_client.h @@ -95,8 +95,9 @@ int ws_client_unsubscribe(ws_client_t *client, uint32_t conn_idx, const uint16_t *symbol_indices, uint32_t count); /* Fetch a WebSocket token from the KuCoin API */ int ws_client_fetch_token(ws_connection_t *conn); -/* Process a received WebSocket frame (dispatch to book updates, etc.) */ -void ws_client_process_frame(ws_client_t *client, uint32_t conn_idx); +/* Process a received WebSocket frame (dispatch to book updates, etc.). + * Returns symbol index on book update, -1 otherwise. */ +int16_t ws_client_process_frame(ws_client_t *client, uint32_t conn_idx); /* Send a WebSocket ping frame */ int ws_client_send_ping(ws_connection_t *conn); /* Get current monotonic timestamp in milliseconds */