fix: throws errors when we use the URLs for the demo trading environment

This commit is contained in:
007live 2023-12-26 17:21:19 +08:00
parent 7cf0ca5cd3
commit 12e983d1a6
17 changed files with 234 additions and 397 deletions

View File

@ -16,12 +16,12 @@ class SpreadTradingAPI(Client):
# Cancel Order
def cancel_order(self,ordId='', clOrdId=''):
params = {'ordId': ordId, 'clOrdId': clOrdId}
return self._request_with_params(POST, SPREAD_CANAEL_ORDER, params)
return self._request_with_params(POST, SPREAD_CANCEL_ORDER, params)
# Cancel All orders
def cancel_all_orders(self, sprdId=''):
params = {'sprdId': sprdId}
return self._request_with_params(POST, SPREAD_CANAEL_ALL_ORDERS, params)
return self._request_with_params(POST, SPREAD_CANCEL_ALL_ORDERS, params)
# Get order details
def get_order_details(self, ordId='', clOrdId=''):

View File

@ -25,11 +25,11 @@ class TradeAPI(Client):
# Cancel Order
def cancel_order(self, instId, ordId='', clOrdId=''):
params = {'instId': instId, 'ordId': ordId, 'clOrdId': clOrdId}
return self._request_with_params(POST, CANAEL_ORDER, params)
return self._request_with_params(POST, CANCEL_ORDER, params)
# Cancel Multiple Orders
def cancel_multiple_orders(self, orders_data):
return self._request_with_params(POST, CANAEL_BATCH_ORDERS, orders_data)
return self._request_with_params(POST, CANCEL_BATCH_ORDERS, orders_data)
# Amend Order
def amend_order(self, instId, cxlOnFail='', ordId='', clOrdId='', reqId='', newSz='', newPx='', newTpTriggerPx='',

View File

@ -7,7 +7,7 @@ from . import consts as c, utils, exceptions
class Client(object):
def __init__(self, api_key = '-1', api_secret_key = '-1', passphrase = '-1', use_server_time=False, flag='1', base_api = c.API_URL,debug = 'True'):
def __init__(self, api_key = '-1', api_secret_key = '-1', passphrase = '-1', use_server_time=False, flag='1', base_api = c.API_URL,debug = 'True', proxies = None):
self.API_KEY = api_key
self.API_SECRET_KEY = api_secret_key
self.PASSPHRASE = passphrase
@ -15,7 +15,7 @@ class Client(object):
self.flag = flag
self.domain = base_api
self.debug = debug
self.client = httpx.Client(base_url=base_api, http2=True)
self.client = httpx.Client(base_url=base_api, http2=True, proxies=proxies)
def _request(self, method, request_path, params):
if method == c.GET:

View File

@ -138,8 +138,8 @@ TAKER_FLOW = '/api/v5/rubik/stat/option/taker-block-volume'
# TRADE-Complete
PLACR_ORDER = '/api/v5/trade/order'
BATCH_ORDERS = '/api/v5/trade/batch-orders'
CANAEL_ORDER = '/api/v5/trade/cancel-order'
CANAEL_BATCH_ORDERS = '/api/v5/trade/cancel-batch-orders'
CANCEL_ORDER = '/api/v5/trade/cancel-order'
CANCEL_BATCH_ORDERS = '/api/v5/trade/cancel-batch-orders'
AMEND_ORDER = '/api/v5/trade/amend-order'
AMEND_BATCH_ORDER = '/api/v5/trade/amend-batch-orders'
CLOSE_POSITION = '/api/v5/trade/close-position'
@ -279,8 +279,8 @@ GET_UNREALIZED_PROFIT_SHARING_DETAILS = '/api/v5/copytrading/unrealized-profit-s
# Spread Trading˚
SPREAD_PLACE_ORDER= '/api/v5/sprd/order'
SPREAD_CANAEL_ORDER = '/api/v5/sprd/cancel-order'
SPREAD_CANAEL_ALL_ORDERS = '/api/v5/sprd/mass-cancel'
SPREAD_CANCEL_ORDER = '/api/v5/sprd/cancel-order'
SPREAD_CANCEL_ALL_ORDERS = '/api/v5/sprd/mass-cancel'
SPREAD_GET_ORDER_DETAILS = '/api/v5/sprd/order'
SPREAD_GET_ACTIVE_ORDERS = '/api/v5/sprd/orders-pending'
SPREAD_GET_ORDERS = '/api/v5/sprd/orders-history'

View File

@ -0,0 +1,33 @@
import asyncio
import logging
import ssl
import certifi
import websockets
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("WebSocketFactory")
class WebSocketFactory:
def __init__(self, url):
self.url = url
self.websocket = None
self.loop = asyncio.get_event_loop()
async def connect(self):
ssl_context = ssl.create_default_context()
ssl_context.load_verify_locations(certifi.where())
try:
self.websocket = await websockets.connect(self.url, ssl=ssl_context)
logger.info("WebSocket connection established.")
return self.websocket
except Exception as e:
logger.error(f"Error connecting to WebSocket: {e}")
return None
async def close(self):
if self.websocket:
await self.websocket.close()
self.websocket = None

View File

@ -1,49 +0,0 @@
from autobahn.twisted.websocket import WebSocketClientFactory
from twisted.internet.protocol import ReconnectingClientFactory
from .WsClientProtocol import *
class WsReconnectingClientFactory(ReconnectingClientFactory):
"""
@ivar maxDelay: Maximum number of seconds between connection attempts.
@ivar initialDelay: Delay for the first reconnection attempt.
@ivar maxRetries: Maximum number of consecutive unsuccessful connection
attempts, after which no further connection attempts will be made. If
this is not explicitly set, no maximum is applied.
"""
initialDelay = 0.1
maxDelay = 2
maxRetries = 5
class WsClientFactory(WebSocketClientFactory, WsReconnectingClientFactory):
reachMaxRetriesError = {"e": "error", "m": "reached max connect retries"}
def __init__(self, *args, payload=None, **kwargs):
WebSocketClientFactory.__init__(self, *args, **kwargs)
self.instance = None
self.subscribeSet = set()
self.payload = payload
self.logger = logging.getLogger(__name__)
def startedConnecting(self, connector):
self.logger.info("WsClientFactory execute startedConnecting")
def clientConnectionFailed(self, connector, reason):
self.logger.error(
"Can't connect to server. Reason: {}. Retrying: {}".format(reason, self.retries + 1))
self.retry(connector)
if self.retries > self.maxRetries:
self.callback(self.reachMaxRetriesError)
def clientConnectionLost(self, connector, reason):
self.logger.error("WsClientFactory execute clientConnectionLost. Reason: {},retried {} times".format(reason,
self.retries + 1))
self.retry(connector)
if self.retries > self.maxRetries:
self.callback(self.reachMaxRetriesError)
def buildProtocol(self, addr):
protocol = WsClientProtocol(self, payload=self.payload)
return protocol

View File

@ -1,46 +0,0 @@
import json
import logging
from autobahn.twisted.websocket import WebSocketClientProtocol
class WsClientProtocol(WebSocketClientProtocol):
def __init__(self, factory, payload=None):
super().__init__()
self.autoPingInterval = 5
self.factory = factory
self.payload = payload
self.logger = logging.getLogger(__name__)
def onOpen(self):
self.factory.instance = self
def onConnect(self, response):
self.logger.info("WsClientProtocol execute onConnect")
if self.payload:
self.logger.info("WsClientProtocol will Send message to OKX Server")
self.sendMessage(self.payload, isBinary=False)
self.factory.resetDelay()
def onMessage(self, payload, isBinary):
self.logger.info("WsClientProtocol execute onMessage begin")
if not isBinary:
try:
payload_obj = json.loads(payload.decode("utf8"))
except Exception as e:
self.logger.error("WsClientProtocol onMessage error;e:{}".format(e))
else:
self.factory.callback(payload_obj)
def onClose(self, wasClean, code, reason):
self.logger.info(
"WsClientProtocol WS connection will be closed; wasClean={0}, code={1}, reason: {2}".format(wasClean, code,
reason))
def onPing(self, payload):
self.logger.info("WsClientProtocol execute onPing")
self.sendPong()
self.logger.info("WsClientProtocol execute onPing finish")
def onPong(self, payload):
self.logger.info("WsClientProtocol execute onPong")

View File

@ -1,136 +0,0 @@
import threading
import time
from autobahn.twisted.websocket import connectWS
from twisted.internet import reactor
from twisted.internet.error import ReactorAlreadyRunning
from . import WsUtils
from .WsClientFactory import *
class WsConnectManager(threading.Thread):
def __init__(self, url, isPrivate):
threading.Thread.__init__(self)
self.factories = {}
self.isPrivate = isPrivate
self._connected_event = threading.Event()
self.url = url
self.conns = {}
self.callback = None
self.logger = logging.getLogger(__name__)
def subscribeSocket(self, args: list, callback):
channelArgs = {}
channelParamMap = {}
WsUtils.checkSocketParams(args, channelArgs, channelParamMap)
if len(channelArgs) < 1:
return False
for channel in channelArgs:
subSet = channelParamMap.get(channel, set())
if self.isPrivate:
privateKey = self.getPrivateKey(channel)
if privateKey not in self.factories:
reactor.callFromThread(self.loginSocket, channel)
time.sleep(2)
newFactory = self.initSubscribeFactory(args=channelArgs[channel], subSet=subSet, callback=callback)
reactor.callFromThread(self.resetConnection, newFactory, channel)
continue
factory = self.initSubscribeFactory(args=channelArgs[channel], subSet=subSet, callback=callback)
self.factories[channel] = factory
reactor.callFromThread(self.addConnection, channel)
def unsubscribeSocket(self, args: list, callback):
channelArgs = {}
channelParamMap = {}
WsUtils.checkSocketParams(args, channelArgs, channelParamMap)
if len(channelArgs) < 1:
return False
for channel in channelArgs:
if self.isPrivate:
privateKey = self.getPrivateKey(channel)
else:
privateKey = channel
if privateKey not in self.factories:
continue
factory = self.factories[privateKey]
ifFiledParams = factory.subscribeSet - channelParamMap[channel]
if len(ifFiledParams) < 1:
self.disconnect(channel)
else:
payload = json.dumps({"op": "unsubscribe", "args": channelArgs[channel]}, ensure_ascii=False).encode(
"utf8")
factory = WsClientFactory(self.url, payload=payload)
factory.client = self
factory.protocol = WsClientProtocol
factory.callback = callback
factory.subscribeSet = ifFiledParams
reactor.callFromThread(self.resetConnection, factory, channel)
def addConnection(self, channel):
self.conns[channel] = connectWS(self.factories[channel])
def disconnect(self, channel):
if channel not in self.conns:
self.logger.error("WsConnectManager disconnect error,channel is not able".format(channel))
return
self.conns[channel].factory = WebSocketClientFactory(self.url)
self.conns[channel].disconnect()
del self.conns[channel]
privateKey = channel
if self.isPrivate:
privateKey = self.getPrivateKey(channel)
del self.factories[privateKey]
def initSubscribeFactory(self, args, subSet: set, callback):
payload = json.dumps({"op": "subscribe", "args": args}, ensure_ascii=False).encode(
"utf8")
factory = WsClientFactory(self.url, payload=payload)
factory.payload = payload
factory.protocol = WsClientProtocol
factory.callback = callback
factory.subscribeSet = factory.subscribeSet | subSet
return factory
def loginSocket(self, channel: str):
payload = WsUtils.initLoginParams(useServerTime=self.useServerTime, apiKey=self.apiKey,
passphrase=self.passphrase, secretKey=self.secretKey)
factory = WsClientFactory(self.url, payload=payload)
factory.protocol = WsClientProtocol
factory.callback = loginSocketCallBack
privateKey = self.getPrivateKey(channel)
self.factories[privateKey] = factory
self.conns[channel] = connectWS(factory)
def resetConnection(self, newFactory, channel):
if self.isPrivate:
privateKey = self.getPrivateKey(channel)
preFactory = self.factories[privateKey]
else:
preFactory = self.factories[channel]
instance = preFactory.instance
if instance is None:
raise ValueError("instance must not none")
instance.factory = newFactory
instance.payload = newFactory.payload
instance.onConnect(None)
def getPrivateKey(self, channel) -> str:
return str(self.apiKey) + "@" + channel
def run(self):
try:
reactor.run(installSignalHandlers=False)
except ReactorAlreadyRunning as e:
self.logger.error("WsConnectManager reactor.run error;e:{}".format(e))
def close(self):
keys = set(self.conns.keys())
for key in keys:
self.closeConnection(key)
self.conns = {}
def loginSocketCallBack(message):
print("loginSocket callback:", message)

View File

@ -1,59 +0,0 @@
# import time
#
# import WsUtils
# from WsClientProtocol import *
# from autobahn.twisted.websocket import WebSocketClientFactory
# from twisted.internet.protocol import ReconnectingClientFactory
#
#
# class WsReconnectingClientFactory(ReconnectingClientFactory):
# """
# @ivar maxDelay: Maximum number of seconds between connection attempts.
# @ivar initialDelay: Delay for the first reconnection attempt.
# @ivar maxRetries: Maximum number of consecutive unsuccessful connection
# attempts, after which no further connection attempts will be made. If
# this is not explicitly set, no maximum is applied.
# """
# initialDelay = 0.1
# maxDelay = 1
# maxRetries = 4
#
#
# class WsLoginFactory(WebSocketClientFactory, WsReconnectingClientFactory):
# reachMaxRetriesError = {"e": "error", "m": "reached max connect retries"}
#
# def __init__(self, *args, useServerTime: str, apiKey: str, passphrase: str, secretKey: str, **kwargs):
# WebSocketClientFactory.__init__(self, *args, **kwargs)
# self.apiKey = apiKey
# self.passphrase = passphrase
# self.secretKey = secretKey
# self.useServerTime = useServerTime
# self.instance = None
# self.preTime = time.time()
# self.logger = logging.getLogger(__name__)
#
# def startedConnecting(self, connector):
# self.logger.info("WsClientFactory execute startedConnecting")
#
# def clientConnectionFailed(self, connector, reason):
# self.logger.error(
# "Can't connect to server. Reason: {}. Retrying: {}".format(reason, self.retries + 1))
# self.retry(connector)
# if self.retries > self.maxRetries:
# self.callback(self.reachMaxRetriesError)
#
# def clientConnectionLost(self, connector, reason):
# cur = time.time()
# print("WsClientFactory,pre team=", cur - self.preTime)
# self.preTime = cur
# self.logger.error("WsClientFactory execute clientConnectionLost. Reason: {},retried {} times".format(reason,
# self.retries + 1))
# self.retry(connector)
# if self.retries > self.maxRetries:
# self.callback(self.reachMaxRetriesError)
#
# def buildProtocol(self, addr):
# payload = WsUtils.initLoginParams(useServerTime=self.useServerTime, apiKey=self.apiKey,
# passphrase=self.passphrase, secretKey=self.secretKey)
# protocol = WsClientProtocol(self, payload=payload)
# return protocol

View File

@ -0,0 +1,77 @@
import asyncio
import json
import logging
from okx.websocket import WsUtils
from okx.websocket.WebSocketFactory import WebSocketFactory
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("WsPrivate")
class WsPrivateAsync:
def __init__(self, apiKey, passphrase, secretKey, url, useServerTime):
self.url = url
self.subscriptions = set()
self.callback = None
self.loop = asyncio.get_event_loop()
self.factory = WebSocketFactory(url)
self.apiKey = apiKey
self.passphrase = passphrase
self.secretKey = secretKey
self.useServerTime = useServerTime
async def connect(self):
self.websocket = await self.factory.connect()
async def consume(self):
async for message in self.websocket:
logger.info("Received message: {%s}", message)
if self.callback:
self.callback(message)
async def subscribe(self, params: list, callback):
self.callback = callback
logRes = await self.login()
await asyncio.sleep(5)
if logRes:
payload = json.dumps({
"op": "subscribe",
"args": params
})
await self.websocket.send(payload)
# await self.consume()
async def login(self):
loginPayload = WsUtils.initLoginParams(
useServerTime=self.useServerTime,
apiKey=self.apiKey,
passphrase=self.passphrase,
secretKey=self.secretKey
)
await self.websocket.send(loginPayload)
return True
async def unsubscribe(self, params: list, callback):
self.callback = callback
payload = json.dumps({
"op": "unsubscribe",
"args": params
})
logger.info(f"unsubscribe: {payload}")
await self.websocket.send(payload)
# for param in params:
# self.subscriptions.discard(param)
async def stop(self):
await self.factory.close()
self.loop.stop()
async def start(self):
logger.info("Connecting to WebSocket...")
await self.connect()
self.loop.create_task(self.consume())
def stop_sync(self):
self.loop.run_until_complete(self.stop())

View File

@ -1,30 +0,0 @@
from twisted.internet import reactor
from . import WsUtils
from .WsConnectManager import WsConnectManager
class WsPrivate(WsConnectManager):
def __init__(self, apiKey: str, passphrase: str, secretKey: str, url: str, useServerTime: False):
if ~WsUtils.isNotBlankStr(apiKey) or ~WsUtils.isNotBlankStr(passphrase) or ~WsUtils.isNotBlankStr(
secretKey) or ~WsUtils.isNotBlankStr(url):
return
super().__init__(url, isPrivate=True)
self.apiKey = apiKey
self.passphrase = passphrase
self.secretKey = secretKey
self.useServerTime = useServerTime
def subscribe(self, params: list, callback):
self.subscribeSocket(params, callback)
def unsubscribe(self, params: list, callback):
self.unsubscribeSocket(params, callback)
def stop(self):
try:
self.close()
finally:
reactor.stop()

View File

@ -1,20 +0,0 @@
from twisted.internet import reactor
from .WsConnectManager import WsConnectManager
class WsPublic(WsConnectManager):
def __init__(self, url):
super().__init__(url, isPrivate=False)
def subscribe(self, params: list, callback):
self.subscribeSocket(params, callback)
def unsubscribe(self, params: list, callback):
self.unsubscribeSocket(params, callback)
def stop(self):
try:
self.close()
finally:
reactor.stop()

View File

@ -0,0 +1,56 @@
import asyncio
import json
import logging
from okx.websocket.WebSocketFactory import WebSocketFactory
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("WsPublic")
class WsPublicAsync:
def __init__(self, url):
self.url = url
self.subscriptions = set()
self.callback = None
self.loop = asyncio.get_event_loop()
self.factory = WebSocketFactory(url)
async def connect(self):
self.websocket = await self.factory.connect()
async def consume(self):
async for message in self.websocket:
logger.info("Received message: {%s}", message)
if self.callback:
self.callback(message)
async def subscribe(self, params: list, callback):
self.callback = callback
payload = json.dumps({
"op": "subscribe",
"args": params
})
await self.websocket.send(payload)
# await self.consume()
async def unsubscribe(self, params: list, callback):
self.callback = callback
payload = json.dumps({
"op": "unsubscribe",
"args": params
})
logger.info(f"unsubscribe: {payload}")
await self.websocket.send(payload)
async def stop(self):
await self.factory.close()
self.loop.stop()
async def start(self):
logger.info("Connecting to WebSocket...")
await self.connect()
self.loop.create_task(self.consume())
def stop_sync(self):
self.loop.run_until_complete(self.stop())

View File

@ -16,7 +16,7 @@ def initLoginParams(useServerTime: bool, apiKey, passphrase, secretKey):
sign = base64.b64encode(d)
arg = {"apiKey": apiKey, "passphrase": passphrase, "timestamp": timestamp, "sign": sign.decode("utf-8")}
payload = {"op": "login", "args": [arg]}
return json.dumps(payload, ensure_ascii=False).encode("utf8")
return json.dumps(payload)
def isNotBlankStr(param: str) -> bool:

View File

@ -0,0 +1,39 @@
import asyncio
from okx.websocket.WsPprivateAsync import WsPrivateAsync
def privateCallback(message):
print("privateCallback", message)
async def main():
url = "wss://wspap.okx.com:8443/ws/v5/private?brokerId=9999"
ws = WsPrivateAsync(
apiKey="your apiKey",
passphrase="your passphrase",
secretKey="your secretKey",
url=url,
useServerTime=False
)
await ws.start()
args = []
arg1 = {"channel": "account", "instType": "BTC"}
arg2 = {"channel": "orders", "instType": "ANY"}
arg3 = {"channel": "balance_and_position"}
args.append(arg1)
args.append(arg2)
args.append(arg3)
await ws.subscribe(args, callback=privateCallback)
await asyncio.sleep(30)
print("-----------------------------------------unsubscribe--------------------------------------------")
args2 = [arg2]
await ws.unsubscribe(args2, callback=privateCallback)
await asyncio.sleep(30)
print("-----------------------------------------unsubscribe all--------------------------------------------")
args3 = [arg1, arg3]
await ws.unsubscribe(args3, callback=privateCallback)
if __name__ == '__main__':
asyncio.run(main())

View File

@ -1,33 +0,0 @@
import time
from okx.websocket.WsPrivate import WsPrivate
def privateCallback(message):
print("WsPrivate subscribe callback:", message)
if __name__ == '__main__':
url = "wss://ws.okx.com:8443/ws/v5/private"
ws = WsPrivate(apiKey="your_apiKey",
passphrase="your_passphrase",
secretKey="your_secretKey",
url=url,
useServerTime=False)
ws.start()
args = []
arg1 = {"channel": "account", "instType": "BTC"}
arg2 = {"channel": "orders", "instType": "ANY"}
arg3 = {"channel": "balance_and_position"}
args.append(arg1)
args.append(arg2)
args.append(arg3)
ws.subscribe(args, callback=privateCallback)
time.sleep(30)
print("-----------------------------------------unsubscribe--------------------------------------------")
args2 = [arg2]
ws.unsubscribe(args2, callback=privateCallback)
time.sleep(30)
print("-----------------------------------------unsubscribe all--------------------------------------------")
args3 = [arg1, arg3]
ws.unsubscribe(args3, callback=privateCallback)

View File

@ -1,32 +1,37 @@
import time
import asyncio
from okx.websocket.WsPublic import WsPublic
from okx.websocket.WsPublicAsync import WsPublicAsync
def publicCallback(message):
print("publicCallback", message)
if __name__ == '__main__':
#url = "wss://wspri.coinall.ltd:8443/ws/v5/ipublic?brokerId=9999"
url = "wss://wspap.okx.com:8443/ws/v5/public"
ws = WsPublic(url=url)
ws.start()
async def main():
# url = "wss://wspap.okex.com:8443/ws/v5/public?brokerId=9999"
url = "wss://wspap.okx.com:8443/ws/v5/public?brokerId=9999"
ws = WsPublicAsync(url=url)
await ws.start()
args = []
arg1 = {"channel": "instruments", "instType": "FUTURES"}
arg2 = {"channel": "instruments", "instType": "SPOT"}
arg3 = {"channel": "tickers", "instId": "BTC-USDT"}
arg3 = {"channel": "tickers", "instId": "BTC-USDT-SWAP"}
arg4 = {"channel": "tickers", "instId": "ETH-USDT"}
args.append(arg1)
args.append(arg2)
args.append(arg3)
args.append(arg4)
ws.subscribe(args, publicCallback)
time.sleep(10)
await ws.subscribe(args, publicCallback)
await asyncio.sleep(5)
print("-----------------------------------------unsubscribe--------------------------------------------")
args2 = [arg4]
ws.unsubscribe(args2, publicCallback)
time.sleep(10)
await ws.unsubscribe(args2, publicCallback)
await asyncio.sleep(5)
print("-----------------------------------------unsubscribe all--------------------------------------------")
args3 = [arg1, arg2, arg3]
ws.unsubscribe(args3, publicCallback)
await ws.unsubscribe(args3, publicCallback)
if __name__ == '__main__':
asyncio.run(main())