From e1370b39b77e7a03ff927bdf504c916038d39a56 Mon Sep 17 00:00:00 2001 From: okxv5api Date: Mon, 12 Dec 2022 10:15:07 +0800 Subject: [PATCH] support websocket --- README.md | 59 +++++-------- okx/__init__.py | 7 +- okx/websocket/WsClientFactory.py | 49 +++++++++++ okx/websocket/WsClientProtocol.py | 46 ++++++++++ okx/websocket/WsConnectManager.py | 136 ++++++++++++++++++++++++++++++ okx/websocket/WsLoginFactory.py | 59 +++++++++++++ okx/websocket/WsPrivate.py | 31 +++++++ okx/websocket/WsPublic.py | 20 +++++ okx/websocket/WsUtils.py | 78 +++++++++++++++++ okx/websocket/__init__.py | 0 test/WsPrivateTest.py | 33 ++++++++ test/WsPublicTest.py | 31 +++++++ 12 files changed, 505 insertions(+), 44 deletions(-) create mode 100644 okx/websocket/WsClientFactory.py create mode 100644 okx/websocket/WsClientProtocol.py create mode 100644 okx/websocket/WsConnectManager.py create mode 100644 okx/websocket/WsLoginFactory.py create mode 100644 okx/websocket/WsPrivate.py create mode 100644 okx/websocket/WsPublic.py create mode 100644 okx/websocket/WsUtils.py create mode 100644 okx/websocket/__init__.py create mode 100644 test/WsPrivateTest.py create mode 100644 test/WsPublicTest.py diff --git a/README.md b/README.md index b0ec7b7..e67d4b9 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,9 @@ ### 如何使用? -`python版本:3.6 - 3.8` +`python版本:>=3.9 -`WebSocketAPI:建议websockets库版本为6.0` +`WebSocketAPI:autobahn.twisted>=22.10.0` #### 第一步:下载SDK,安装相关所需库 @@ -16,15 +16,14 @@ ```python pip install requests -pip install websockets==6.0 +pip install autobahn\[twisted\] +pip install pyOpenSSL ``` #### 第二步:配置个人信息 2.1 如果还未有API,可[点击](https://www.okx.com/account/users/myApi)前往官网进行申请 -2.2 将各项信息在`example.py(RestAPI)`和`websocket_example.py(WebSocketAPI)`中填写 - ```python api_key = "" secret_key = "" @@ -40,50 +39,40 @@ passphrase = "" * 解开相应方法的注释传参调用各接口即可 * WebSocketAPI - - * 运行`websocket_example.py` - - * 根据`公共频道`/`私有频道`/`交易`选择对应`url`、对应启动方法,传入相应参数即可 + * 参考Test文件夹下`WsPrivate`和`WsPublic`文件示例; + * 根据`公共频道`/`私有频道`选择对应`url`(如果是私有频道需要设置登陆信息),传入相应参数即可。 ```python # WebSocket公共频道 - url = "wss://ws.okx.com:8443/ws/v5/public?brokerId=9999" - + url = "wss://ws.okx.com:8443/ws/v5/public" # WebSocket私有频道 - url = "wss://ws.okx.com:8443/ws/v5/private?brokerId=9999" + url = "wss://ws.okx.com:8443/ws/v5/private" ``` ```python # 公共频道 不需要登录(行情,持仓总量,K线,标记价格,深度,资金费率等) - loop.run_until_complete(subscribe_without_login(url, channels)) + 参考 WsPublicTest.py # 私有频道 需要登录(账户,持仓,订单等) - loop.run_until_complete(subscribe(url, api_key, passphrase, seceret_key, channels)) - - # 交易(下单,撤单,改单等) - loop.run_until_complete(trade(url, api_key, passphrase, seceret_key, trade_param)) + 参考 WsPrivateTest.py ``` 附言: -* 如果对API尚不了解,建议参考`OKx`官方[API文档](https://www.okx.com/docs-v5/zh/) +* 如果对API尚不了解,建议参考`OKX`官方[API文档](https://www.okx.com/docs-v5/zh/) -* 使用RestAPI的用户可以通过设置`example.py`文件下的flag参数来选择接入实盘或者模拟盘 +* 使用RestAPI的用户可以通过参考Test文件夹下的示例,设置正确的参数即可 -* 使用WebSocketAPI的用户可以通过解开`websocket_example.py`文件下对应的url注释来选择接入实盘或者模拟盘 - -* RestAPI 提供了http2的请求方式,可参考http2_example.py +* 使用WebSocketAPI的用户可以通过参考Test文件夹下的`WsPublicTest.py`和`WsPrivateTest.py`,设置正确的参数即可 * 若使用`WebSocketAPI`遇到问题建议参考相关链接 * `asyncio`、`websockets`文档/`github`: - https://docs.python.org/3/library/asyncio-dev.html https://websockets.readthedocs.io/en/stable/intro.html https://github.com/aaugustin/websockets * 关于`code=1006`: - https://github.com/Rapptz/discord.py/issues/1996 https://github.com/aaugustin/websockets/issues/587 @@ -91,9 +80,9 @@ passphrase = "" ### How to use ? -`python version:3.6 - 3.8` +`python version:>=3.9` -`WebSocketAPI: websockets package advise version 6.0` +`WebSocketAPI: autobahn.twisted>=22.10.0` #### Step 1: Download the SDK and install the necessary libraries @@ -105,7 +94,8 @@ passphrase = "" ```python pip install requests -pip install websockets==6.0 +pip install autobahn\[twisted\] +pip install pyOpenSSL ``` #### Step 2: Configure Personal Information @@ -126,8 +116,8 @@ passphrase = "" - Run `example.py` - Uncomment the corresponding method and then pass the arguments and call the interfaces - WebSocketAPI - - Run `websocket_example.py` - - According to the `public channel`/`private channel`/`trade`, select the corresponding `url`, the corresponding start method, and pass in the corresponding parameters + - Open `websocket_example.py` + - According to the `public channel`/`private channel`, select the corresponding `url`, the corresponding start method, and pass in the corresponding parameters ```python # WebSocket public channel @@ -137,17 +127,6 @@ url = "wss://ws.okx.com:8443/ws/v5/public?brokerId=9999" url = "wss://ws.okx.com:8443/ws/v5/private?brokerId=9999" ``` -```Python -# Not necessary for public channel to login (Instrument, Tickers, Index, Mark price, Order Book, Funding rate, etc) -loop.run_until_complete(subscribe_without_login(url, channels)) - -# necessary for private channel to login(Account,Positions, Order, etc) -loop.run_until_complete(subscribe(url, api_key, passphrase, seceret_key, channels)) - -# trade(Place Order, Cancel Order, Amend Order, etc) -loop.run_until_complete(trade(url, api_key, passphrase, seceret_key, trade_param)) -``` - P.S. - If you know little about API, advise consulting the offical [API document](https://www.okx.com/docs-v5/en/) diff --git a/okx/__init__.py b/okx/__init__.py index a79ac7a..71c25d8 100644 --- a/okx/__init__.py +++ b/okx/__init__.py @@ -1,6 +1,5 @@ -"""An unofficial Python wrapper for the OKEx exchange API v3 - -.. moduleauthor:: Sam McHardy +""" +Python SDK for the OKX API v5 """ -__version__="0.0.12" \ No newline at end of file +__version__="0.1.0" \ No newline at end of file diff --git a/okx/websocket/WsClientFactory.py b/okx/websocket/WsClientFactory.py new file mode 100644 index 0000000..a003ac0 --- /dev/null +++ b/okx/websocket/WsClientFactory.py @@ -0,0 +1,49 @@ +from autobahn.twisted.websocket import WebSocketClientFactory +from twisted.internet.protocol import ReconnectingClientFactory + +from WsClientProtocol import * + + +class WsReconnectingClientFactory(ReconnectingClientFactory): + """ + @ivar maxDelay: Maximum number of seconds between connection attempts. + @ivar initialDelay: Delay for the first reconnection attempt. + @ivar maxRetries: Maximum number of consecutive unsuccessful connection + attempts, after which no further connection attempts will be made. If + this is not explicitly set, no maximum is applied. + """ + initialDelay = 0.1 + maxDelay = 2 + maxRetries = 5 + + +class WsClientFactory(WebSocketClientFactory, WsReconnectingClientFactory): + reachMaxRetriesError = {"e": "error", "m": "reached max connect retries"} + + def __init__(self, *args, payload=None, **kwargs): + WebSocketClientFactory.__init__(self, *args, **kwargs) + self.instance = None + self.subscribeSet = set() + self.payload = payload + self.logger = logging.getLogger(__name__) + + def startedConnecting(self, connector): + self.logger.info("WsClientFactory execute startedConnecting") + + def clientConnectionFailed(self, connector, reason): + self.logger.error( + "Can't connect to server. Reason: {}. Retrying: {}".format(reason, self.retries + 1)) + self.retry(connector) + if self.retries > self.maxRetries: + self.callback(self.reachMaxRetriesError) + + def clientConnectionLost(self, connector, reason): + self.logger.error("WsClientFactory execute clientConnectionLost. Reason: {},retried {} times".format(reason, + self.retries + 1)) + self.retry(connector) + if self.retries > self.maxRetries: + self.callback(self.reachMaxRetriesError) + + def buildProtocol(self, addr): + protocol = WsClientProtocol(self, payload=self.payload) + return protocol diff --git a/okx/websocket/WsClientProtocol.py b/okx/websocket/WsClientProtocol.py new file mode 100644 index 0000000..b753215 --- /dev/null +++ b/okx/websocket/WsClientProtocol.py @@ -0,0 +1,46 @@ +import json +import logging + +from autobahn.twisted.websocket import WebSocketClientProtocol + + +class WsClientProtocol(WebSocketClientProtocol): + def __init__(self, factory, payload=None): + super().__init__() + self.autoPingInterval = 5 + self.factory = factory + self.payload = payload + self.logger = logging.getLogger(__name__) + + def onOpen(self): + self.factory.instance = self + + def onConnect(self, response): + self.logger.info("WsClientProtocol execute onConnect") + if self.payload: + self.logger.info("WsClientProtocol will Send message to OKX Server") + self.sendMessage(self.payload, isBinary=False) + self.factory.resetDelay() + + def onMessage(self, payload, isBinary): + self.logger.info("WsClientProtocol execute onMessage begin") + if not isBinary: + try: + payload_obj = json.loads(payload.decode("utf8")) + except Exception as e: + self.logger.error("WsClientProtocol onMessage error;e:{}".format(e)) + else: + self.factory.callback(payload_obj) + + def onClose(self, wasClean, code, reason): + self.logger.info( + "WsClientProtocol WS connection will be closed; wasClean={0}, code={1}, reason: {2}".format(wasClean, code, + reason)) + + def onPing(self, payload): + self.logger.info("WsClientProtocol execute onPing") + self.sendPong() + self.logger.info("WsClientProtocol execute onPing finish") + + def onPong(self, payload): + self.logger.info("WsClientProtocol execute onPong") diff --git a/okx/websocket/WsConnectManager.py b/okx/websocket/WsConnectManager.py new file mode 100644 index 0000000..7343ea7 --- /dev/null +++ b/okx/websocket/WsConnectManager.py @@ -0,0 +1,136 @@ +import threading +import time + +from autobahn.twisted.websocket import connectWS +from twisted.internet import reactor +from twisted.internet.error import ReactorAlreadyRunning + +import WsUtils +from WsClientFactory import * + + +class WsConnectManager(threading.Thread): + + def __init__(self, url, isPrivate): + threading.Thread.__init__(self) + self.factories = {} + self.isPrivate = isPrivate + self._connected_event = threading.Event() + self.url = url + self.conns = {} + self.callback = None + self.logger = logging.getLogger(__name__) + + def subscribeSocket(self, args: list, callback): + channelArgs = {} + channelParamMap = {} + WsUtils.checkSocketParams(args, channelArgs, channelParamMap) + if len(channelArgs) < 1: + return False + for channel in channelArgs: + subSet = channelParamMap.get(channel, set()) + if self.isPrivate: + privateKey = self.getPrivateKey(channel) + if privateKey not in self.factories: + reactor.callFromThread(self.loginSocket, channel) + time.sleep(2) + newFactory = self.initSubscribeFactory(args=channelArgs[channel], subSet=subSet, callback=callback) + reactor.callFromThread(self.resetConnection, newFactory, channel) + continue + factory = self.initSubscribeFactory(args=channelArgs[channel], subSet=subSet, callback=callback) + self.factories[channel] = factory + reactor.callFromThread(self.addConnection, channel) + + def unsubscribeSocket(self, args: list, callback): + channelArgs = {} + channelParamMap = {} + WsUtils.checkSocketParams(args, channelArgs, channelParamMap) + if len(channelArgs) < 1: + return False + for channel in channelArgs: + if self.isPrivate: + privateKey = self.getPrivateKey(channel) + else: + privateKey = channel + if privateKey not in self.factories: + continue + factory = self.factories[privateKey] + ifFiledParams = factory.subscribeSet - channelParamMap[channel] + if len(ifFiledParams) < 1: + self.disconnect(channel) + else: + payload = json.dumps({"op": "unsubscribe", "args": channelArgs[channel]}, ensure_ascii=False).encode( + "utf8") + factory = WsClientFactory(self.url, payload=payload) + factory.client = self + factory.protocol = WsClientProtocol + factory.callback = callback + factory.subscribeSet = ifFiledParams + reactor.callFromThread(self.resetConnection, factory, channel) + + def addConnection(self, channel): + self.conns[channel] = connectWS(self.factories[channel]) + + def disconnect(self, channel): + if channel not in self.conns: + self.logger.error("WsConnectManager disconnect error,channel is not able".format(channel)) + return + self.conns[channel].factory = WebSocketClientFactory(self.url) + self.conns[channel].disconnect() + del self.conns[channel] + privateKey = channel + if self.isPrivate: + privateKey = self.getPrivateKey(channel) + del self.factories[privateKey] + + def initSubscribeFactory(self, args, subSet: set, callback): + payload = json.dumps({"op": "subscribe", "args": args}, ensure_ascii=False).encode( + "utf8") + factory = WsClientFactory(self.url, payload=payload) + factory.payload = payload + factory.protocol = WsClientProtocol + factory.callback = callback + factory.subscribeSet = factory.subscribeSet | subSet + return factory + + def loginSocket(self, channel: str): + payload = WsUtils.initLoginParams(useServerTime=self.useServerTime, apiKey=self.apiKey, + passphrase=self.passphrase, secretKey=self.secretKey) + factory = WsClientFactory(self.url, payload=payload) + factory.protocol = WsClientProtocol + factory.callback = loginSocketCallBack + privateKey = self.getPrivateKey(channel) + self.factories[privateKey] = factory + self.conns[channel] = connectWS(factory) + + def resetConnection(self, newFactory, channel): + if self.isPrivate: + privateKey = self.getPrivateKey(channel) + preFactory = self.factories[privateKey] + else: + preFactory = self.factories[channel] + instance = preFactory.instance + if instance is None: + raise ValueError("instance must not none") + instance.factory = newFactory + instance.payload = newFactory.payload + instance.onConnect(None) + + def getPrivateKey(self, channel) -> str: + return str(self.apiKey) + "@" + channel + + def run(self): + try: + reactor.run(installSignalHandlers=False) + except ReactorAlreadyRunning as e: + self.logger.error("WsConnectManager reactor.run error;e:{}".format(e)) + + def close(self): + keys = set(self.conns.keys()) + for key in keys: + self.closeConnection(key) + self.conns = {} + + +def loginSocketCallBack(message): + print("loginSocket callback:", message) diff --git a/okx/websocket/WsLoginFactory.py b/okx/websocket/WsLoginFactory.py new file mode 100644 index 0000000..e00ac84 --- /dev/null +++ b/okx/websocket/WsLoginFactory.py @@ -0,0 +1,59 @@ +# import time +# +# import WsUtils +# from WsClientProtocol import * +# from autobahn.twisted.websocket import WebSocketClientFactory +# from twisted.internet.protocol import ReconnectingClientFactory +# +# +# class WsReconnectingClientFactory(ReconnectingClientFactory): +# """ +# @ivar maxDelay: Maximum number of seconds between connection attempts. +# @ivar initialDelay: Delay for the first reconnection attempt. +# @ivar maxRetries: Maximum number of consecutive unsuccessful connection +# attempts, after which no further connection attempts will be made. If +# this is not explicitly set, no maximum is applied. +# """ +# initialDelay = 0.1 +# maxDelay = 1 +# maxRetries = 4 +# +# +# class WsLoginFactory(WebSocketClientFactory, WsReconnectingClientFactory): +# reachMaxRetriesError = {"e": "error", "m": "reached max connect retries"} +# +# def __init__(self, *args, useServerTime: str, apiKey: str, passphrase: str, secretKey: str, **kwargs): +# WebSocketClientFactory.__init__(self, *args, **kwargs) +# self.apiKey = apiKey +# self.passphrase = passphrase +# self.secretKey = secretKey +# self.useServerTime = useServerTime +# self.instance = None +# self.preTime = time.time() +# self.logger = logging.getLogger(__name__) +# +# def startedConnecting(self, connector): +# self.logger.info("WsClientFactory execute startedConnecting") +# +# def clientConnectionFailed(self, connector, reason): +# self.logger.error( +# "Can't connect to server. Reason: {}. Retrying: {}".format(reason, self.retries + 1)) +# self.retry(connector) +# if self.retries > self.maxRetries: +# self.callback(self.reachMaxRetriesError) +# +# def clientConnectionLost(self, connector, reason): +# cur = time.time() +# print("WsClientFactory,pre team=", cur - self.preTime) +# self.preTime = cur +# self.logger.error("WsClientFactory execute clientConnectionLost. Reason: {},retried {} times".format(reason, +# self.retries + 1)) +# self.retry(connector) +# if self.retries > self.maxRetries: +# self.callback(self.reachMaxRetriesError) +# +# def buildProtocol(self, addr): +# payload = WsUtils.initLoginParams(useServerTime=self.useServerTime, apiKey=self.apiKey, +# passphrase=self.passphrase, secretKey=self.secretKey) +# protocol = WsClientProtocol(self, payload=payload) +# return protocol diff --git a/okx/websocket/WsPrivate.py b/okx/websocket/WsPrivate.py new file mode 100644 index 0000000..7dd8433 --- /dev/null +++ b/okx/websocket/WsPrivate.py @@ -0,0 +1,31 @@ +import time + +from twisted.internet import reactor + +import WsUtils +from WsConnectManager import WsConnectManager + + +class WsPrivate(WsConnectManager): + def __init__(self, apiKey: str, passphrase: str, secretKey: str, url: str, useServerTime: False): + if ~WsUtils.isNotBlankStr(apiKey) or ~WsUtils.isNotBlankStr(passphrase) or ~WsUtils.isNotBlankStr( + secretKey) or ~WsUtils.isNotBlankStr(url): + return + super().__init__(url, isPrivate=True) + self.apiKey = apiKey + self.passphrase = passphrase + self.secretKey = secretKey + self.useServerTime = useServerTime + + def subscribe(self, params: list, callback): + self.subscribeSocket(params, callback) + + def unsubscribe(self, params: list, callback): + self.unsubscribeSocket(params, callback) + + def stop(self): + try: + self.close() + finally: + reactor.stop() + diff --git a/okx/websocket/WsPublic.py b/okx/websocket/WsPublic.py new file mode 100644 index 0000000..a3f24b6 --- /dev/null +++ b/okx/websocket/WsPublic.py @@ -0,0 +1,20 @@ +from twisted.internet import reactor + +from WsConnectManager import WsConnectManager + + +class WsPublic(WsConnectManager): + def __init__(self, url): + super().__init__(url, isPrivate=False) + + def subscribe(self, params: list, callback): + self.subscribeSocket(params, callback) + + def unsubscribe(self, params: list, callback): + self.unsubscribeSocket(params, callback) + + def stop(self): + try: + self.close() + finally: + reactor.stop() diff --git a/okx/websocket/WsUtils.py b/okx/websocket/WsUtils.py new file mode 100644 index 0000000..28aa2b9 --- /dev/null +++ b/okx/websocket/WsUtils.py @@ -0,0 +1,78 @@ +import base64 +import hmac +import json +import time + +import requests + + +def initLoginParams(useServerTime: bool, apiKey, passphrase, secretKey): + timestamp = getLocalTime() + if useServerTime: + timestamp = getServerTime() + message = str(timestamp) + 'GET' + '/users/self/verify' + mac = hmac.new(bytes(secretKey, encoding='utf8'), bytes(message, encoding='utf-8'), digestmod='sha256') + d = mac.digest() + sign = base64.b64encode(d) + arg = {"apiKey": apiKey, "passphrase": passphrase, "timestamp": timestamp, "sign": sign.decode("utf-8")} + payload = {"op": "login", "args": [arg]} + return json.dumps(payload, ensure_ascii=False).encode("utf8") + + +def isNotBlankStr(param: str) -> bool: + return param is not None and isinstance(param, str) and (~param.isspace()) + + +def getParamKey(arg: dict) -> str: + s = "" + for k in arg: + if k == 'channel': + continue + s = s + "@" + arg.get(k) + return s + + +def initSubscribeSet(arg: dict) -> set: + paramsSet = set() + if arg is None: + return paramsSet + elif isinstance(arg, dict): + paramsSet.add(getParamKey(arg)) + return paramsSet + else: + raise ValueError("arg must dict") + + +def checkSocketParams(args: list, channelArgs, channelParamMap): + for arg in args: + channel = arg['channel'].strip() + if ~isNotBlankStr(channel): + raise ValueError("channel must not none") + argSet = channelParamMap.get(channel, set()) + argKey = getParamKey(arg) + if argKey in argSet: + continue + else: + validParams = initSubscribeSet(arg) + if len(validParams) < 1: + continue + p = {} + for k in arg: + p[k.strip()] = arg.get(k).strip() + channelParamMap[channel] = channelParamMap.get(channel, set()) | validParams + if channel not in channelArgs: + channelArgs[channel] = [] + channelArgs[channel].append(p) + + +def getServerTime(): + url = "https://www.okx.com/api/v5/public/time" + response = requests.get(url) + if response.status_code == 200: + return response.json()['data'][0]['ts'] + else: + return "" + + +def getLocalTime(): + return int(time.time()) diff --git a/okx/websocket/__init__.py b/okx/websocket/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/WsPrivateTest.py b/test/WsPrivateTest.py new file mode 100644 index 0000000..2a8f0a0 --- /dev/null +++ b/test/WsPrivateTest.py @@ -0,0 +1,33 @@ +import time + +from WsPrivate import WsPrivate + + +def privateCallback(message): + print("WsPrivate subscribe callback:", message) + + +if __name__ == '__main__': + url = "wss://ws.okx.com:8443/ws/v5/private" + ws = WsPrivate(apiKey="your_apiKey", + passphrase="your_passphrase", + secretKey="your_secretKey", + url=url, + useServerTime=False) + ws.start() + args = [] + arg1 = {"channel": "account", "instType": "BTC"} + arg2 = {"channel": "orders", "instType": "ANY"} + arg3 = {"channel": "balance_and_position"} + args.append(arg1) + args.append(arg2) + args.append(arg3) + ws.subscribe(args, callback=privateCallback) + time.sleep(30) + print("-----------------------------------------unsubscribe--------------------------------------------") + args2 = [arg2] + ws.unsubscribe(args2, callback=privateCallback) + time.sleep(30) + print("-----------------------------------------unsubscribe all--------------------------------------------") + args3 = [arg1, arg3] + ws.unsubscribe(args3, callback=privateCallback) diff --git a/test/WsPublicTest.py b/test/WsPublicTest.py new file mode 100644 index 0000000..17c106e --- /dev/null +++ b/test/WsPublicTest.py @@ -0,0 +1,31 @@ +import time + +from WsPublic import WsPublic + + +def publicCallback(message): + print("publicCallback", message) + + +if __name__ == '__main__': + url = "wss://ws.okx.com:8443/ws/v5/public" + ws = WsPublic(url=url) + ws.start() + args = [] + arg1 = {"channel": "instruments", "instType": "FUTURES"} + arg2 = {"channel": "instruments", "instType": "SPOT"} + arg3 = {"channel": "tickers", "instId": "BTC-USDT"} + arg4 = {"channel": "tickers", "instId": "ETH-USDT"} + args.append(arg1) + args.append(arg2) + args.append(arg3) + args.append(arg4) + ws.subscribe(args, publicCallback) + time.sleep(10) + print("-----------------------------------------unsubscribe--------------------------------------------") + args2 = [arg4] + ws.unsubscribe(args2, publicCallback) + time.sleep(10) + print("-----------------------------------------unsubscribe all--------------------------------------------") + args3 = [arg1, arg2, arg3] + ws.unsubscribe(args3, publicCallback)