''' https://github.com/Kucoin/kucoin-universal-sdk/blob/main/sdk/python/README.md ''' from credentials import get_api_key from kucoin_universal_sdk.api.client import DefaultClient from kucoin_universal_sdk.model.client_option import ClientOptionBuilder from kucoin_universal_sdk.model.constants import GLOBAL_API_ENDPOINT, GLOBAL_FUTURES_API_ENDPOINT, GLOBAL_BROKER_API_ENDPOINT from kucoin_universal_sdk.model.transport_option import TransportOptionBuilder from kucoin_universal_sdk.generate.earn.earn.model_get_savings_products_req import GetSavingsProductsReqBuilder from kucoin_universal_sdk.generate.earn.earn.model_get_account_holding_req import GetAccountHoldingReqBuilder from kucoin_universal_sdk.generate.earn.earn.model_purchase_req import PurchaseReqBuilder from kucoin_universal_sdk.generate.earn.earn.model_redeem_req import RedeemReqBuilder from kucoin_universal_sdk.generate.account.account.model_get_spot_account_list_req import GetSpotAccountListReqBuilder class kucoin_earn: def __init__(self): api_key, api_secret, api_passphrase = get_api_key("kucoin") http_transport_option = ( TransportOptionBuilder() .set_keep_alive(True) .set_max_pool_size(10) .set_max_connection_per_pool(10) .build() ) client_option = ( ClientOptionBuilder() .set_key(api_key) .set_secret(api_secret) .set_passphrase(api_passphrase) .set_spot_endpoint(GLOBAL_API_ENDPOINT) .set_futures_endpoint(GLOBAL_FUTURES_API_ENDPOINT) .set_broker_endpoint(GLOBAL_BROKER_API_ENDPOINT) .set_transport_option(http_transport_option) .build() ) self.client = DefaultClient(client_option) kucoin_rest_service = self.client.rest_service() self.account_api = kucoin_rest_service.get_account_service().get_account_api self.earn_api = kucoin_rest_service.get_earn_service().get_earn_api def get_trading_balance(self, coin): ''' Args: coin (str): The coin to get the balance of Returns: dict: The free available balance of the coin in the trading account (or the equivalent account in the exchange) ''' request = GetSpotAccountListReqBuilder().set_currency(coin).set_type("trade").build() response = self.account_api().get_spot_account_list(request) balance = response.to_dict()["data"][0]["available"] return {coin: balance} def get_available_products(self, coin): request = GetSavingsProductsReqBuilder().set_currency(coin).build() response = self.earn_api().get_savings_products(request) return {"coin": response.to_dict()["common_response"]["data"][0]["currency"], "product_id": response.to_dict()["common_response"]["data"][0]["id"], "rate": response.to_dict()["common_response"]["data"][0]["returnRate"], "min_amount": response.to_dict()["common_response"]["data"][0]["userLowerLimit"], "isActive": response.to_dict()["common_response"]["data"][0]["status"]=="ONGOING"} def subscribe_product(self, product_id, amount, auto_subscribe=True, source_account="SPOT"): ''' source_account: "TRADE" or "MAIN" auto_subscribe is ignored here ''' if source_account.upper() in ["SPOT","TRADE","ALL"]: source="trade" elif source_account.upper() in ["FUND","MAIN"]: source="main" #In Binance SPOT is TRADE and MAIN is FUND else: return {"Error": "Invalid source_account. Values should be TRADE or MAIN for kucoin, SPOT, FUND or ALL for binance"} request = PurchaseReqBuilder(product_id=product_id, amount=str(amount), account_type=source).build() response = self.earn_api().purchase(request) return response.to_dict() def redeem_product(self, order_id, amount="0", source_account="SPOT"): ''' source_account is ignored unless orderID=ETH2: "TRADE" or "MAIN" auto_subscribe is ignored here ''' if source_account in ["SPOT","TRADE","ALL"]: source="TRADE" elif source_account in ["FUND","MAIN"]: source="MAIN" #In Binance SPOT is TRADE and MAIN is FUND else: return {"Error": "Invalid source_account. Values should be TRADE or MAIN for kucoin, SPOT, FUND or ALL for binance"} request = RedeemReqBuilder(order_id=order_id, amount=str(amount), from_account_type=source).build() response = self.earn_api().purchase(request) return response.to_dict() def get_position(self, coin, **kwargs): request = GetAccountHoldingReqBuilder().set_currency(coin).build() response = self.earn_api().get_account_holding(request) return response.to_dict() def get_account(self,coin,**kwargs): request = GetAccountHoldingReqBuilder().build() response = self.earn_api().get_account_holding(request) return response.to_dict() def get_personal_left_quota(self, *args, **kwargs): return {"Error": "N/A"} def get_subscription_record(self, **kwargs): return {"Error": "N/A"} def get_redemption_record(self, **kwargs): return {"Error": "N/A"} def get_rewards_history(self, type="ALL", **kwargs): return {"Error": "N/A"} def get_subscription_preview(self, product_id, amount, **kwargs): return {"Error": "N/A"}