128 lines
5.0 KiB
Python
128 lines
5.0 KiB
Python
'''
|
|
https://github.com/Kucoin/kucoin-universal-sdk/blob/main/sdk/python/README.md
|
|
'''
|
|
|
|
from credentials import get_api_key
|
|
|
|
from kucoin_universal_sdk.api.client import DefaultClient
|
|
from kucoin_universal_sdk.model.client_option import ClientOptionBuilder
|
|
from kucoin_universal_sdk.model.constants import GLOBAL_API_ENDPOINT, GLOBAL_FUTURES_API_ENDPOINT, GLOBAL_BROKER_API_ENDPOINT
|
|
from kucoin_universal_sdk.model.transport_option import TransportOptionBuilder
|
|
from kucoin_universal_sdk.generate.earn.earn.model_get_savings_products_req import GetSavingsProductsReqBuilder
|
|
from kucoin_universal_sdk.generate.earn.earn.model_get_account_holding_req import GetAccountHoldingReqBuilder
|
|
from kucoin_universal_sdk.generate.earn.earn.model_purchase_req import PurchaseReqBuilder
|
|
from kucoin_universal_sdk.generate.earn.earn.model_redeem_req import RedeemReqBuilder
|
|
from kucoin_universal_sdk.generate.account.account.model_get_spot_account_list_req import GetSpotAccountListReqBuilder
|
|
|
|
class kucoin_earn:
|
|
def __init__(self):
|
|
api_key, api_secret, api_passphrase = get_api_key("kucoin")
|
|
|
|
http_transport_option = (
|
|
TransportOptionBuilder()
|
|
.set_keep_alive(True)
|
|
.set_max_pool_size(10)
|
|
.set_max_connection_per_pool(10)
|
|
.build()
|
|
)
|
|
|
|
client_option = (
|
|
ClientOptionBuilder()
|
|
.set_key(api_key)
|
|
.set_secret(api_secret)
|
|
.set_passphrase(api_passphrase)
|
|
.set_spot_endpoint(GLOBAL_API_ENDPOINT)
|
|
.set_futures_endpoint(GLOBAL_FUTURES_API_ENDPOINT)
|
|
.set_broker_endpoint(GLOBAL_BROKER_API_ENDPOINT)
|
|
.set_transport_option(http_transport_option)
|
|
.build()
|
|
)
|
|
|
|
self.client = DefaultClient(client_option)
|
|
kucoin_rest_service = self.client.rest_service()
|
|
self.account_api = kucoin_rest_service.get_account_service().get_account_api
|
|
self.earn_api = kucoin_rest_service.get_earn_service().get_earn_api
|
|
|
|
|
|
def get_trading_balance(self, coin):
|
|
'''
|
|
Returns the free available balance of a coin in the trading account (or the equivalent account in the exchange)
|
|
'''
|
|
request = GetSpotAccountListReqBuilder().set_currency(coin).set_type("trade").build()
|
|
response = self.account_api().get_spot_account_list(request)
|
|
balance = response.to_dict()["data"][0]["available"]
|
|
return {coin: balance}
|
|
|
|
|
|
def get_available_products(self, coin):
|
|
request = GetSavingsProductsReqBuilder().set_currency(coin).build()
|
|
response = self.earn_api().get_savings_products(request)
|
|
return response.to_dict()
|
|
|
|
|
|
def subscribe_product(self, product_id, amount, auto_subscribe=True, source_account="SPOT"):
|
|
'''
|
|
source_account: "TRADE" or "MAIN"
|
|
auto_subscribe is ignored here
|
|
'''
|
|
if source_account.upper() in ["SPOT","TRADE","ALL"]:
|
|
source="trade"
|
|
elif source_account.upper() in ["FUND","MAIN"]:
|
|
source="main" #In Binance SPOT is TRADE and MAIN is FUND
|
|
else:
|
|
return {"Error": "Invalid source_account. Values should be TRADE or MAIN for kucoin, SPOT, FUND or ALL for binance"}
|
|
|
|
request = PurchaseReqBuilder(product_id=product_id, amount=str(amount), account_type=source).build()
|
|
response = self.earn_api().purchase(request)
|
|
return response.to_dict()
|
|
|
|
|
|
def redeem_product(self, order_id, amount="0", source_account="SPOT"):
|
|
'''
|
|
source_account is ignored unless orderID=ETH2: "TRADE" or "MAIN"
|
|
auto_subscribe is ignored here
|
|
'''
|
|
|
|
if source_account in ["SPOT","TRADE","ALL"]:
|
|
source="TRADE"
|
|
elif source_account in ["FUND","MAIN"]:
|
|
source="MAIN" #In Binance SPOT is TRADE and MAIN is FUND
|
|
else:
|
|
return {"Error": "Invalid source_account. Values should be TRADE or MAIN for kucoin, SPOT, FUND or ALL for binance"}
|
|
|
|
request = RedeemReqBuilder(order_id=order_id, amount=str(amount), from_account_type=source).build()
|
|
response = self.earn_api().purchase(request)
|
|
return response.to_dict()
|
|
|
|
|
|
def get_position(self, coin, **kwargs):
|
|
request = GetAccountHoldingReqBuilder().set_currency(coin).build()
|
|
response = self.earn_api().get_account_holding(request)
|
|
return response.to_dict()
|
|
|
|
|
|
def get_account(self,coin,**kwargs):
|
|
request = GetAccountHoldingReqBuilder().build()
|
|
response = self.earn_api().get_account_holding(request)
|
|
return response.to_dict()
|
|
|
|
|
|
def get_personal_left_quota(self, *args, **kwargs):
|
|
return {"Error": "N/A"}
|
|
|
|
|
|
def get_subscription_record(self, **kwargs):
|
|
return {"Error": "N/A"}
|
|
|
|
|
|
def get_redemption_record(self, **kwargs):
|
|
return {"Error": "N/A"}
|
|
|
|
|
|
def get_rewards_history(self, type="ALL", **kwargs):
|
|
return {"Error": "N/A"}
|
|
|
|
|
|
def get_subscription_preview(self, product_id, amount, **kwargs):
|
|
return {"Error": "N/A"}
|
|
|