''' Returns information on all running short traders: ''' import json, datetime, credentials, requests, time def profit_since_date(base, quote, timestamp): url = f"{base_statistics_url}/total_profit_by_pair_since_date?base={base}"e={quote}×tamp={timestamp}" #type: ignore result = json.loads(requests.get(url,headers=headers).content) return result["Total profit"] def timestamp_to_seconds(timestamp_string): ''' Converts a timestamp string from this format "[YYYY/mm/dd HH:MM:SS]" to seconds. :param timestamp_string: timestamp string. :return: linux time in seconds. ''' dt = datetime.datetime.strptime(timestamp_string.strip('[]'), "%Y/%m/%d %H:%M:%S") return int(dt.timestamp()) if __name__=="__main__": api_key = credentials.get_credentials("mainnet_api_key")["key"] base_statistics_url = f"{credentials.get_url('mainnet')}/statistics_server" #type: ignore base_url = f"{credentials.get_url('mainnet')}" #type: ignore results_dict = {} headers = {"X-API-KEY": api_key} exchanges = ["binance", "gateio", "kucoin", "okex"] for exchange in exchanges: #fetch all traders status url = f"{base_url}/{exchange}/get_all_worker_status" while True: query = requests.get(url,headers=headers) if query.status_code != 503: break time.sleep(.2) workers_status = json.loads(query.content) #for each status_dict, extract pair, switch_date, old_long_amount for item in workers_status: if workers_status[item]["is_short"]: print("Processing",item) base,quote = item.split("/") switch_date = timestamp_to_seconds(workers_status[item]["old_long"]["datetime"]) profit_since_short = profit_since_date(base,quote,switch_date) total_current_value = workers_status[item]["quote_spent"] + (workers_status[item]["old_long"]["tp_amount"]-workers_status[item]["base_bought"])*workers_status[item]["price"] results_dict[item] = { "switch_date": workers_status[item]["old_long"]["datetime"], "profit_since_short": round(profit_since_short,2), "old_long_quote_spent": round(workers_status[item]["old_long"]["quote_spent"]), "total_current_value": round(total_current_value,2) } for item in results_dict: print(item, results_dict[item])