diff --git a/okx/SpreadTrading.py b/okx/SpreadTrading.py index 5dadd52..c460ace 100644 --- a/okx/SpreadTrading.py +++ b/okx/SpreadTrading.py @@ -16,12 +16,12 @@ class SpreadTradingAPI(Client): # Cancel Order def cancel_order(self,ordId='', clOrdId=''): params = {'ordId': ordId, 'clOrdId': clOrdId} - return self._request_with_params(POST, SPREAD_CANAEL_ORDER, params) + return self._request_with_params(POST, SPREAD_CANCEL_ORDER, params) # Cancel All orders def cancel_all_orders(self, sprdId=''): params = {'sprdId': sprdId} - return self._request_with_params(POST, SPREAD_CANAEL_ALL_ORDERS, params) + return self._request_with_params(POST, SPREAD_CANCEL_ALL_ORDERS, params) # Get order details def get_order_details(self, ordId='', clOrdId=''): diff --git a/okx/Trade.py b/okx/Trade.py index 3df71fc..d52d143 100644 --- a/okx/Trade.py +++ b/okx/Trade.py @@ -25,11 +25,11 @@ class TradeAPI(Client): # Cancel Order def cancel_order(self, instId, ordId='', clOrdId=''): params = {'instId': instId, 'ordId': ordId, 'clOrdId': clOrdId} - return self._request_with_params(POST, CANAEL_ORDER, params) + return self._request_with_params(POST, CANCEL_ORDER, params) # Cancel Multiple Orders def cancel_multiple_orders(self, orders_data): - return self._request_with_params(POST, CANAEL_BATCH_ORDERS, orders_data) + return self._request_with_params(POST, CANCEL_BATCH_ORDERS, orders_data) # Amend Order def amend_order(self, instId, cxlOnFail='', ordId='', clOrdId='', reqId='', newSz='', newPx='', newTpTriggerPx='', diff --git a/okx/client.py b/okx/client.py index fc9acd3..74d4cea 100644 --- a/okx/client.py +++ b/okx/client.py @@ -7,7 +7,7 @@ from . import consts as c, utils, exceptions class Client(object): - def __init__(self, api_key = '-1', api_secret_key = '-1', passphrase = '-1', use_server_time=False, flag='1', base_api = c.API_URL,debug = 'True'): + def __init__(self, api_key = '-1', api_secret_key = '-1', passphrase = '-1', use_server_time=False, flag='1', base_api = c.API_URL,debug = 'True', proxies = None): self.API_KEY = api_key self.API_SECRET_KEY = api_secret_key self.PASSPHRASE = passphrase @@ -15,7 +15,7 @@ class Client(object): self.flag = flag self.domain = base_api self.debug = debug - self.client = httpx.Client(base_url=base_api, http2=True) + self.client = httpx.Client(base_url=base_api, http2=True, proxies=proxies) def _request(self, method, request_path, params): if method == c.GET: diff --git a/okx/consts.py b/okx/consts.py index 1b137d6..618e6ca 100644 --- a/okx/consts.py +++ b/okx/consts.py @@ -138,8 +138,8 @@ TAKER_FLOW = '/api/v5/rubik/stat/option/taker-block-volume' # TRADE-Complete PLACR_ORDER = '/api/v5/trade/order' BATCH_ORDERS = '/api/v5/trade/batch-orders' -CANAEL_ORDER = '/api/v5/trade/cancel-order' -CANAEL_BATCH_ORDERS = '/api/v5/trade/cancel-batch-orders' +CANCEL_ORDER = '/api/v5/trade/cancel-order' +CANCEL_BATCH_ORDERS = '/api/v5/trade/cancel-batch-orders' AMEND_ORDER = '/api/v5/trade/amend-order' AMEND_BATCH_ORDER = '/api/v5/trade/amend-batch-orders' CLOSE_POSITION = '/api/v5/trade/close-position' @@ -279,8 +279,8 @@ GET_UNREALIZED_PROFIT_SHARING_DETAILS = '/api/v5/copytrading/unrealized-profit-s # Spread Trading˚ SPREAD_PLACE_ORDER= '/api/v5/sprd/order' -SPREAD_CANAEL_ORDER = '/api/v5/sprd/cancel-order' -SPREAD_CANAEL_ALL_ORDERS = '/api/v5/sprd/mass-cancel' +SPREAD_CANCEL_ORDER = '/api/v5/sprd/cancel-order' +SPREAD_CANCEL_ALL_ORDERS = '/api/v5/sprd/mass-cancel' SPREAD_GET_ORDER_DETAILS = '/api/v5/sprd/order' SPREAD_GET_ACTIVE_ORDERS = '/api/v5/sprd/orders-pending' SPREAD_GET_ORDERS = '/api/v5/sprd/orders-history' diff --git a/okx/websocket/WebSocketFactory.py b/okx/websocket/WebSocketFactory.py new file mode 100644 index 0000000..9ce1e3a --- /dev/null +++ b/okx/websocket/WebSocketFactory.py @@ -0,0 +1,33 @@ +import asyncio +import logging +import ssl + +import certifi +import websockets + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("WebSocketFactory") + + +class WebSocketFactory: + + def __init__(self, url): + self.url = url + self.websocket = None + self.loop = asyncio.get_event_loop() + + async def connect(self): + ssl_context = ssl.create_default_context() + ssl_context.load_verify_locations(certifi.where()) + try: + self.websocket = await websockets.connect(self.url, ssl=ssl_context) + logger.info("WebSocket connection established.") + return self.websocket + except Exception as e: + logger.error(f"Error connecting to WebSocket: {e}") + return None + + async def close(self): + if self.websocket: + await self.websocket.close() + self.websocket = None diff --git a/okx/websocket/WsClientFactory.py b/okx/websocket/WsClientFactory.py deleted file mode 100644 index c016222..0000000 --- a/okx/websocket/WsClientFactory.py +++ /dev/null @@ -1,49 +0,0 @@ -from autobahn.twisted.websocket import WebSocketClientFactory -from twisted.internet.protocol import ReconnectingClientFactory - -from .WsClientProtocol import * - - -class WsReconnectingClientFactory(ReconnectingClientFactory): - """ - @ivar maxDelay: Maximum number of seconds between connection attempts. - @ivar initialDelay: Delay for the first reconnection attempt. - @ivar maxRetries: Maximum number of consecutive unsuccessful connection - attempts, after which no further connection attempts will be made. If - this is not explicitly set, no maximum is applied. - """ - initialDelay = 0.1 - maxDelay = 2 - maxRetries = 5 - - -class WsClientFactory(WebSocketClientFactory, WsReconnectingClientFactory): - reachMaxRetriesError = {"e": "error", "m": "reached max connect retries"} - - def __init__(self, *args, payload=None, **kwargs): - WebSocketClientFactory.__init__(self, *args, **kwargs) - self.instance = None - self.subscribeSet = set() - self.payload = payload - self.logger = logging.getLogger(__name__) - - def startedConnecting(self, connector): - self.logger.info("WsClientFactory execute startedConnecting") - - def clientConnectionFailed(self, connector, reason): - self.logger.error( - "Can't connect to server. Reason: {}. Retrying: {}".format(reason, self.retries + 1)) - self.retry(connector) - if self.retries > self.maxRetries: - self.callback(self.reachMaxRetriesError) - - def clientConnectionLost(self, connector, reason): - self.logger.error("WsClientFactory execute clientConnectionLost. Reason: {},retried {} times".format(reason, - self.retries + 1)) - self.retry(connector) - if self.retries > self.maxRetries: - self.callback(self.reachMaxRetriesError) - - def buildProtocol(self, addr): - protocol = WsClientProtocol(self, payload=self.payload) - return protocol diff --git a/okx/websocket/WsClientProtocol.py b/okx/websocket/WsClientProtocol.py deleted file mode 100644 index b753215..0000000 --- a/okx/websocket/WsClientProtocol.py +++ /dev/null @@ -1,46 +0,0 @@ -import json -import logging - -from autobahn.twisted.websocket import WebSocketClientProtocol - - -class WsClientProtocol(WebSocketClientProtocol): - def __init__(self, factory, payload=None): - super().__init__() - self.autoPingInterval = 5 - self.factory = factory - self.payload = payload - self.logger = logging.getLogger(__name__) - - def onOpen(self): - self.factory.instance = self - - def onConnect(self, response): - self.logger.info("WsClientProtocol execute onConnect") - if self.payload: - self.logger.info("WsClientProtocol will Send message to OKX Server") - self.sendMessage(self.payload, isBinary=False) - self.factory.resetDelay() - - def onMessage(self, payload, isBinary): - self.logger.info("WsClientProtocol execute onMessage begin") - if not isBinary: - try: - payload_obj = json.loads(payload.decode("utf8")) - except Exception as e: - self.logger.error("WsClientProtocol onMessage error;e:{}".format(e)) - else: - self.factory.callback(payload_obj) - - def onClose(self, wasClean, code, reason): - self.logger.info( - "WsClientProtocol WS connection will be closed; wasClean={0}, code={1}, reason: {2}".format(wasClean, code, - reason)) - - def onPing(self, payload): - self.logger.info("WsClientProtocol execute onPing") - self.sendPong() - self.logger.info("WsClientProtocol execute onPing finish") - - def onPong(self, payload): - self.logger.info("WsClientProtocol execute onPong") diff --git a/okx/websocket/WsConnectManager.py b/okx/websocket/WsConnectManager.py deleted file mode 100644 index cd71eda..0000000 --- a/okx/websocket/WsConnectManager.py +++ /dev/null @@ -1,136 +0,0 @@ -import threading -import time - -from autobahn.twisted.websocket import connectWS -from twisted.internet import reactor -from twisted.internet.error import ReactorAlreadyRunning - -from . import WsUtils -from .WsClientFactory import * - - -class WsConnectManager(threading.Thread): - - def __init__(self, url, isPrivate): - threading.Thread.__init__(self) - self.factories = {} - self.isPrivate = isPrivate - self._connected_event = threading.Event() - self.url = url - self.conns = {} - self.callback = None - self.logger = logging.getLogger(__name__) - - def subscribeSocket(self, args: list, callback): - channelArgs = {} - channelParamMap = {} - WsUtils.checkSocketParams(args, channelArgs, channelParamMap) - if len(channelArgs) < 1: - return False - for channel in channelArgs: - subSet = channelParamMap.get(channel, set()) - if self.isPrivate: - privateKey = self.getPrivateKey(channel) - if privateKey not in self.factories: - reactor.callFromThread(self.loginSocket, channel) - time.sleep(2) - newFactory = self.initSubscribeFactory(args=channelArgs[channel], subSet=subSet, callback=callback) - reactor.callFromThread(self.resetConnection, newFactory, channel) - continue - factory = self.initSubscribeFactory(args=channelArgs[channel], subSet=subSet, callback=callback) - self.factories[channel] = factory - reactor.callFromThread(self.addConnection, channel) - - def unsubscribeSocket(self, args: list, callback): - channelArgs = {} - channelParamMap = {} - WsUtils.checkSocketParams(args, channelArgs, channelParamMap) - if len(channelArgs) < 1: - return False - for channel in channelArgs: - if self.isPrivate: - privateKey = self.getPrivateKey(channel) - else: - privateKey = channel - if privateKey not in self.factories: - continue - factory = self.factories[privateKey] - ifFiledParams = factory.subscribeSet - channelParamMap[channel] - if len(ifFiledParams) < 1: - self.disconnect(channel) - else: - payload = json.dumps({"op": "unsubscribe", "args": channelArgs[channel]}, ensure_ascii=False).encode( - "utf8") - factory = WsClientFactory(self.url, payload=payload) - factory.client = self - factory.protocol = WsClientProtocol - factory.callback = callback - factory.subscribeSet = ifFiledParams - reactor.callFromThread(self.resetConnection, factory, channel) - - def addConnection(self, channel): - self.conns[channel] = connectWS(self.factories[channel]) - - def disconnect(self, channel): - if channel not in self.conns: - self.logger.error("WsConnectManager disconnect error,channel is not able".format(channel)) - return - self.conns[channel].factory = WebSocketClientFactory(self.url) - self.conns[channel].disconnect() - del self.conns[channel] - privateKey = channel - if self.isPrivate: - privateKey = self.getPrivateKey(channel) - del self.factories[privateKey] - - def initSubscribeFactory(self, args, subSet: set, callback): - payload = json.dumps({"op": "subscribe", "args": args}, ensure_ascii=False).encode( - "utf8") - factory = WsClientFactory(self.url, payload=payload) - factory.payload = payload - factory.protocol = WsClientProtocol - factory.callback = callback - factory.subscribeSet = factory.subscribeSet | subSet - return factory - - def loginSocket(self, channel: str): - payload = WsUtils.initLoginParams(useServerTime=self.useServerTime, apiKey=self.apiKey, - passphrase=self.passphrase, secretKey=self.secretKey) - factory = WsClientFactory(self.url, payload=payload) - factory.protocol = WsClientProtocol - factory.callback = loginSocketCallBack - privateKey = self.getPrivateKey(channel) - self.factories[privateKey] = factory - self.conns[channel] = connectWS(factory) - - def resetConnection(self, newFactory, channel): - if self.isPrivate: - privateKey = self.getPrivateKey(channel) - preFactory = self.factories[privateKey] - else: - preFactory = self.factories[channel] - instance = preFactory.instance - if instance is None: - raise ValueError("instance must not none") - instance.factory = newFactory - instance.payload = newFactory.payload - instance.onConnect(None) - - def getPrivateKey(self, channel) -> str: - return str(self.apiKey) + "@" + channel - - def run(self): - try: - reactor.run(installSignalHandlers=False) - except ReactorAlreadyRunning as e: - self.logger.error("WsConnectManager reactor.run error;e:{}".format(e)) - - def close(self): - keys = set(self.conns.keys()) - for key in keys: - self.closeConnection(key) - self.conns = {} - - -def loginSocketCallBack(message): - print("loginSocket callback:", message) diff --git a/okx/websocket/WsLoginFactory.py b/okx/websocket/WsLoginFactory.py deleted file mode 100644 index e00ac84..0000000 --- a/okx/websocket/WsLoginFactory.py +++ /dev/null @@ -1,59 +0,0 @@ -# import time -# -# import WsUtils -# from WsClientProtocol import * -# from autobahn.twisted.websocket import WebSocketClientFactory -# from twisted.internet.protocol import ReconnectingClientFactory -# -# -# class WsReconnectingClientFactory(ReconnectingClientFactory): -# """ -# @ivar maxDelay: Maximum number of seconds between connection attempts. -# @ivar initialDelay: Delay for the first reconnection attempt. -# @ivar maxRetries: Maximum number of consecutive unsuccessful connection -# attempts, after which no further connection attempts will be made. If -# this is not explicitly set, no maximum is applied. -# """ -# initialDelay = 0.1 -# maxDelay = 1 -# maxRetries = 4 -# -# -# class WsLoginFactory(WebSocketClientFactory, WsReconnectingClientFactory): -# reachMaxRetriesError = {"e": "error", "m": "reached max connect retries"} -# -# def __init__(self, *args, useServerTime: str, apiKey: str, passphrase: str, secretKey: str, **kwargs): -# WebSocketClientFactory.__init__(self, *args, **kwargs) -# self.apiKey = apiKey -# self.passphrase = passphrase -# self.secretKey = secretKey -# self.useServerTime = useServerTime -# self.instance = None -# self.preTime = time.time() -# self.logger = logging.getLogger(__name__) -# -# def startedConnecting(self, connector): -# self.logger.info("WsClientFactory execute startedConnecting") -# -# def clientConnectionFailed(self, connector, reason): -# self.logger.error( -# "Can't connect to server. Reason: {}. Retrying: {}".format(reason, self.retries + 1)) -# self.retry(connector) -# if self.retries > self.maxRetries: -# self.callback(self.reachMaxRetriesError) -# -# def clientConnectionLost(self, connector, reason): -# cur = time.time() -# print("WsClientFactory,pre team=", cur - self.preTime) -# self.preTime = cur -# self.logger.error("WsClientFactory execute clientConnectionLost. Reason: {},retried {} times".format(reason, -# self.retries + 1)) -# self.retry(connector) -# if self.retries > self.maxRetries: -# self.callback(self.reachMaxRetriesError) -# -# def buildProtocol(self, addr): -# payload = WsUtils.initLoginParams(useServerTime=self.useServerTime, apiKey=self.apiKey, -# passphrase=self.passphrase, secretKey=self.secretKey) -# protocol = WsClientProtocol(self, payload=payload) -# return protocol diff --git a/okx/websocket/WsPprivateAsync.py b/okx/websocket/WsPprivateAsync.py new file mode 100644 index 0000000..3fa7768 --- /dev/null +++ b/okx/websocket/WsPprivateAsync.py @@ -0,0 +1,77 @@ +import asyncio +import json +import logging + +from okx.websocket import WsUtils +from okx.websocket.WebSocketFactory import WebSocketFactory + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("WsPrivate") + + +class WsPrivateAsync: + def __init__(self, apiKey, passphrase, secretKey, url, useServerTime): + self.url = url + self.subscriptions = set() + self.callback = None + self.loop = asyncio.get_event_loop() + self.factory = WebSocketFactory(url) + self.apiKey = apiKey + self.passphrase = passphrase + self.secretKey = secretKey + self.useServerTime = useServerTime + + async def connect(self): + self.websocket = await self.factory.connect() + + async def consume(self): + async for message in self.websocket: + logger.info("Received message: {%s}", message) + if self.callback: + self.callback(message) + + async def subscribe(self, params: list, callback): + self.callback = callback + + logRes = await self.login() + await asyncio.sleep(5) + if logRes: + payload = json.dumps({ + "op": "subscribe", + "args": params + }) + await self.websocket.send(payload) + # await self.consume() + + async def login(self): + loginPayload = WsUtils.initLoginParams( + useServerTime=self.useServerTime, + apiKey=self.apiKey, + passphrase=self.passphrase, + secretKey=self.secretKey + ) + await self.websocket.send(loginPayload) + return True + + async def unsubscribe(self, params: list, callback): + self.callback = callback + payload = json.dumps({ + "op": "unsubscribe", + "args": params + }) + logger.info(f"unsubscribe: {payload}") + await self.websocket.send(payload) + # for param in params: + # self.subscriptions.discard(param) + + async def stop(self): + await self.factory.close() + self.loop.stop() + + async def start(self): + logger.info("Connecting to WebSocket...") + await self.connect() + self.loop.create_task(self.consume()) + + def stop_sync(self): + self.loop.run_until_complete(self.stop()) diff --git a/okx/websocket/WsPrivate.py b/okx/websocket/WsPrivate.py deleted file mode 100644 index b1bb189..0000000 --- a/okx/websocket/WsPrivate.py +++ /dev/null @@ -1,30 +0,0 @@ - -from twisted.internet import reactor - -from . import WsUtils -from .WsConnectManager import WsConnectManager - - -class WsPrivate(WsConnectManager): - def __init__(self, apiKey: str, passphrase: str, secretKey: str, url: str, useServerTime: False): - if ~WsUtils.isNotBlankStr(apiKey) or ~WsUtils.isNotBlankStr(passphrase) or ~WsUtils.isNotBlankStr( - secretKey) or ~WsUtils.isNotBlankStr(url): - return - super().__init__(url, isPrivate=True) - self.apiKey = apiKey - self.passphrase = passphrase - self.secretKey = secretKey - self.useServerTime = useServerTime - - def subscribe(self, params: list, callback): - self.subscribeSocket(params, callback) - - def unsubscribe(self, params: list, callback): - self.unsubscribeSocket(params, callback) - - def stop(self): - try: - self.close() - finally: - reactor.stop() - diff --git a/okx/websocket/WsPublic.py b/okx/websocket/WsPublic.py deleted file mode 100644 index c38e9d7..0000000 --- a/okx/websocket/WsPublic.py +++ /dev/null @@ -1,20 +0,0 @@ -from twisted.internet import reactor - -from .WsConnectManager import WsConnectManager - - -class WsPublic(WsConnectManager): - def __init__(self, url): - super().__init__(url, isPrivate=False) - - def subscribe(self, params: list, callback): - self.subscribeSocket(params, callback) - - def unsubscribe(self, params: list, callback): - self.unsubscribeSocket(params, callback) - - def stop(self): - try: - self.close() - finally: - reactor.stop() diff --git a/okx/websocket/WsPublicAsync.py b/okx/websocket/WsPublicAsync.py new file mode 100644 index 0000000..c8dfcd7 --- /dev/null +++ b/okx/websocket/WsPublicAsync.py @@ -0,0 +1,56 @@ +import asyncio +import json +import logging + +from okx.websocket.WebSocketFactory import WebSocketFactory + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("WsPublic") + + +class WsPublicAsync: + def __init__(self, url): + self.url = url + self.subscriptions = set() + self.callback = None + self.loop = asyncio.get_event_loop() + self.factory = WebSocketFactory(url) + + async def connect(self): + self.websocket = await self.factory.connect() + + async def consume(self): + async for message in self.websocket: + logger.info("Received message: {%s}", message) + if self.callback: + self.callback(message) + + async def subscribe(self, params: list, callback): + self.callback = callback + payload = json.dumps({ + "op": "subscribe", + "args": params + }) + await self.websocket.send(payload) + # await self.consume() + + async def unsubscribe(self, params: list, callback): + self.callback = callback + payload = json.dumps({ + "op": "unsubscribe", + "args": params + }) + logger.info(f"unsubscribe: {payload}") + await self.websocket.send(payload) + + async def stop(self): + await self.factory.close() + self.loop.stop() + + async def start(self): + logger.info("Connecting to WebSocket...") + await self.connect() + self.loop.create_task(self.consume()) + + def stop_sync(self): + self.loop.run_until_complete(self.stop()) diff --git a/okx/websocket/WsUtils.py b/okx/websocket/WsUtils.py index 28aa2b9..fcb0db2 100644 --- a/okx/websocket/WsUtils.py +++ b/okx/websocket/WsUtils.py @@ -16,7 +16,7 @@ def initLoginParams(useServerTime: bool, apiKey, passphrase, secretKey): sign = base64.b64encode(d) arg = {"apiKey": apiKey, "passphrase": passphrase, "timestamp": timestamp, "sign": sign.decode("utf-8")} payload = {"op": "login", "args": [arg]} - return json.dumps(payload, ensure_ascii=False).encode("utf8") + return json.dumps(payload) def isNotBlankStr(param: str) -> bool: diff --git a/test/WsPrivateAsyncTest.py b/test/WsPrivateAsyncTest.py new file mode 100644 index 0000000..7af614b --- /dev/null +++ b/test/WsPrivateAsyncTest.py @@ -0,0 +1,39 @@ +import asyncio + +from okx.websocket.WsPprivateAsync import WsPrivateAsync + + +def privateCallback(message): + print("privateCallback", message) + + +async def main(): + url = "wss://wspap.okx.com:8443/ws/v5/private?brokerId=9999" + ws = WsPrivateAsync( + apiKey="your apiKey", + passphrase="your passphrase", + secretKey="your secretKey", + url=url, + useServerTime=False + ) + await ws.start() + args = [] + arg1 = {"channel": "account", "instType": "BTC"} + arg2 = {"channel": "orders", "instType": "ANY"} + arg3 = {"channel": "balance_and_position"} + args.append(arg1) + args.append(arg2) + args.append(arg3) + await ws.subscribe(args, callback=privateCallback) + await asyncio.sleep(30) + print("-----------------------------------------unsubscribe--------------------------------------------") + args2 = [arg2] + await ws.unsubscribe(args2, callback=privateCallback) + await asyncio.sleep(30) + print("-----------------------------------------unsubscribe all--------------------------------------------") + args3 = [arg1, arg3] + await ws.unsubscribe(args3, callback=privateCallback) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/test/WsPrivateTest.py b/test/WsPrivateTest.py deleted file mode 100644 index cf2ab5e..0000000 --- a/test/WsPrivateTest.py +++ /dev/null @@ -1,33 +0,0 @@ -import time - -from okx.websocket.WsPrivate import WsPrivate - - -def privateCallback(message): - print("WsPrivate subscribe callback:", message) - - -if __name__ == '__main__': - url = "wss://ws.okx.com:8443/ws/v5/private" - ws = WsPrivate(apiKey="your_apiKey", - passphrase="your_passphrase", - secretKey="your_secretKey", - url=url, - useServerTime=False) - ws.start() - args = [] - arg1 = {"channel": "account", "instType": "BTC"} - arg2 = {"channel": "orders", "instType": "ANY"} - arg3 = {"channel": "balance_and_position"} - args.append(arg1) - args.append(arg2) - args.append(arg3) - ws.subscribe(args, callback=privateCallback) - time.sleep(30) - print("-----------------------------------------unsubscribe--------------------------------------------") - args2 = [arg2] - ws.unsubscribe(args2, callback=privateCallback) - time.sleep(30) - print("-----------------------------------------unsubscribe all--------------------------------------------") - args3 = [arg1, arg3] - ws.unsubscribe(args3, callback=privateCallback) diff --git a/test/WsPublicTest.py b/test/WsPublicAsyncTest.py similarity index 53% rename from test/WsPublicTest.py rename to test/WsPublicAsyncTest.py index 957f234..14276a0 100644 --- a/test/WsPublicTest.py +++ b/test/WsPublicAsyncTest.py @@ -1,32 +1,37 @@ -import time +import asyncio -from okx.websocket.WsPublic import WsPublic +from okx.websocket.WsPublicAsync import WsPublicAsync def publicCallback(message): print("publicCallback", message) -if __name__ == '__main__': - #url = "wss://wspri.coinall.ltd:8443/ws/v5/ipublic?brokerId=9999" - url = "wss://wspap.okx.com:8443/ws/v5/public" - ws = WsPublic(url=url) - ws.start() +async def main(): + + # url = "wss://wspap.okex.com:8443/ws/v5/public?brokerId=9999" + url = "wss://wspap.okx.com:8443/ws/v5/public?brokerId=9999" + ws = WsPublicAsync(url=url) + await ws.start() args = [] arg1 = {"channel": "instruments", "instType": "FUTURES"} arg2 = {"channel": "instruments", "instType": "SPOT"} - arg3 = {"channel": "tickers", "instId": "BTC-USDT"} + arg3 = {"channel": "tickers", "instId": "BTC-USDT-SWAP"} arg4 = {"channel": "tickers", "instId": "ETH-USDT"} args.append(arg1) args.append(arg2) args.append(arg3) args.append(arg4) - ws.subscribe(args, publicCallback) - time.sleep(10) + await ws.subscribe(args, publicCallback) + await asyncio.sleep(5) print("-----------------------------------------unsubscribe--------------------------------------------") args2 = [arg4] - ws.unsubscribe(args2, publicCallback) - time.sleep(10) + await ws.unsubscribe(args2, publicCallback) + await asyncio.sleep(5) print("-----------------------------------------unsubscribe all--------------------------------------------") args3 = [arg1, arg2, arg3] - ws.unsubscribe(args3, publicCallback) + await ws.unsubscribe(args3, publicCallback) + + +if __name__ == '__main__': + asyncio.run(main())