statistics server optimizations

This commit is contained in:
Nicolás Sánchez 2025-08-15 10:10:37 -03:00
parent 4d23503cee
commit e11be69f00
2 changed files with 172 additions and 291 deletions

View File

@ -13,6 +13,7 @@ Mandatory:
7. Implement api key hashing.
8. Dockerize.
9. Cache generated status strings, only recalculate when prices change.
10. Inspect orderbook liquidity prior to changing mode from short to long (big sell market order needs to have liquidity).
Would be nice to have:

View File

@ -1,37 +1,43 @@
import sqlite3
import sys
import datetime
import time
import ccxt
import credentials
import calendar
import requests
import logging
import threading
import os
from contextlib import contextmanager
from flask import Flask, jsonify, request
from waitress import serve
cache_requests = True if len(sys.argv)>1 and sys.argv[1]=="--cache_requests" else False
profits_database = "../profits/profits_database.db"
hashes_db = {"fetch_last_n_deals":0,
"fetch_last_n_deals_without_history":0,
"fetch_full_log":0,
"fetch_log":0,
"daily_totals":0,
"daily_totals_by_pair":0,
"monthly_totals":0,
"monthly_totals_by_pair":0,
"get_averages":0,
"total_profit":0,
"total_profit_by_pair":0}
_local_storage = threading.local()
def get_market_caps(limit):
api_key = credentials.get_credentials("CMC")["key"]
url = f"https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest?CMC_PRO_API_KEY={api_key}&convert=USD&limit={limit}"
return requests.get(url).json()["data"]
def get_db_connection():
current_time = time.time()
if not hasattr(_local_storage, 'connection') or not hasattr(_local_storage, 'created_at') or (current_time - _local_storage.created_at) > 3600: # Reconnect every hour
if hasattr(_local_storage, 'connection'):
try:
_local_storage.connection.close()
except:
pass
_local_storage.connection = sqlite3.connect(profits_database, check_same_thread=False)
_local_storage.connection.row_factory = sqlite3.Row
_local_storage.created_at = current_time
return _local_storage.connection
@contextmanager
def db_cursor():
conn = get_db_connection()
cur = conn.cursor()
try:
yield cur
except Exception:
conn.rollback()
raise
def load_keys_from_db(file_name):
#valid_keys = []
@ -48,13 +54,16 @@ def load_keys_from_db(file_name):
return valid_keys
def get_valid_keys():
if not hasattr(get_valid_keys, '_keys'):
get_valid_keys._keys = load_keys_from_db("api_credentials.db")
return get_valid_keys._keys
def profit_report():
##Queries
connection = sqlite3.connect(profits_database)
cursor = connection.cursor()
#Last 60 days query
with db_cursor() as cursor:
cursor.execute("""SELECT strftime('%Y-%m-%d', timestamp, 'unixepoch', '-3 hours') AS day_utc3,
SUM(amount) AS total_amount
FROM profits_table
@ -64,18 +73,21 @@ def profit_report():
last_60_days_rows = cursor.fetchall()
#Last 30 days query
#cursor.execute("""SELECT strftime('%Y-%m-%d', timestamp, 'unixepoch', '-3 hours') AS day_utc3,
with db_cursor() as cursor:
cursor.execute("""SELECT strftime('%Y-%m-%d', timestamp, 'unixepoch', '-3 hours') AS day_utc3,
SUM(amount) AS total_amount
FROM profits_table
WHERE strftime('%s', 'now') - timestamp <= 30 * 24 * 60 * 60 -- 30 days in seconds;""")
last_30_days = cursor.fetchall()
#Last 7 days query
with db_cursor() as cursor:
cursor.execute("""SELECT strftime('%Y-%m-%d', timestamp, 'unixepoch', '-3 hours') AS day_utc3,
SUM(amount) AS total_amount
FROM profits_table
WHERE strftime('%s', 'now') - timestamp <= 7 * 24 * 60 * 60 -- 7 days in seconds;""")
last_7_days = cursor.fetchall()
#Last n months query
with db_cursor() as cursor:
cursor.execute("""SELECT strftime('%Y-%m', timestamp, 'unixepoch', '-3 hours') AS year_month_utc3,
SUM(amount) AS total_amount
FROM profits_table
@ -92,6 +104,7 @@ def profit_report():
# ORDER BY year_utc3;""")
# yearly_totals = cursor.fetchall()
#Per exchange
with db_cursor() as cursor:
cursor.execute("""SELECT
exchange_name,
CASE
@ -111,7 +124,7 @@ def profit_report():
per_exchange = cursor.fetchall()
#Close db
cursor.close()
#cursor.close()
#Projection calculation
@ -127,16 +140,17 @@ def profit_report():
okex_amount = 0
for row in per_exchange:
if row[0]=="binance":
exchange_name = row[0].strip().lower()
if exchange_name=="binance":
if row[1]=="This Month":
binance_amount = row[2]
elif row[0]=="gateio":
elif exchange_name=="gateio":
if row[1]=="This Month":
gateio_amount = row[2]
elif row[0]=="kucoin":
elif exchange_name=="kucoin":
if row[1]=="This Month":
kucoin_amount = row[2]
elif row[0]=="okex":
elif exchange_name=="okex":
if row[1]=="This Month":
okex_amount = row[2]
@ -174,21 +188,19 @@ def query_total_profit(pair=None):
Returns total profit of the trading pair.
If no pair specified, returns the grand total of all pairs.
'''
connection = sqlite3.connect(profits_database)
cursor = connection.cursor()
if pair is None:
query = "SELECT SUM(amount) AS total_profit FROM profits_table"
with db_cursor() as cursor:
cursor.execute(query)
connection.close()
query_result = cursor.fetchall()
return query_result[0][0]
else:
query = """SELECT pair, SUM(amount) AS total_profit
FROM profits_table
GROUP BY pair;"""
with db_cursor() as cursor:
cursor.execute(query)
connection.close()
query_result = cursor.fetchall()
for item in query_result:
if item[0].replace("/","")==pair:
@ -200,9 +212,8 @@ def daily_and_monthly_totals():
'''
Returns a tuple with the current day and the current month's total profit.
'''
#Connect to db
connection = sqlite3.connect(profits_database)
cursor = connection.cursor()
now = datetime.datetime.now()
# Create a datetime object for the start of the day
@ -216,9 +227,9 @@ def daily_and_monthly_totals():
query = """SELECT * FROM profits_table
WHERE timestamp >= ?
ORDER BY timestamp DESC;"""
with db_cursor() as cursor:
cursor.execute(query, (start_of_month_unix,))
query_result = cursor.fetchall()
connection.close()
monthly_total = sum([item[2] for item in query_result])
daily_total = sum([item[2] for item in query_result if item[0]>=start_of_day_unix])
@ -231,9 +242,6 @@ def query_daily_totals(pair=None):
Returns a dictionary of daily totals of the trading pair.
If no pair specified, returns the totals of all pairs.
'''
#Connect to db
connection = sqlite3.connect(profits_database)
cursor = connection.cursor()
result = {}
@ -242,9 +250,9 @@ def query_daily_totals(pair=None):
SUM(amount) AS total_profit
FROM profits_table
GROUP BY day_utc3;"""
with db_cursor() as cursor:
cursor.execute(query)
query_result = cursor.fetchall()
connection.close()
for item in query_result:
result[item[0]] = item[1]
else:
@ -252,9 +260,9 @@ def query_daily_totals(pair=None):
SUM(amount) AS total_profit
FROM profits_table
GROUP BY pair, day_utc3;"""
with db_cursor() as cursor:
cursor.execute(query)
query_result = cursor.fetchall()
connection.close()
for item in query_result:
if item[0].replace("/","")==pair:
result[item[1]] = item[2]
@ -266,9 +274,6 @@ def query_monthly_totals(pair=None):
Returns a dictionary of monthly totals of the trading pair.
If no pair specified, returns the totals of all pairs.
'''
#Connect to db
connection = sqlite3.connect(profits_database)
cursor = connection.cursor()
result = {}
@ -277,9 +282,9 @@ def query_monthly_totals(pair=None):
SUM(amount) AS total_profit
FROM profits_table
GROUP BY month;"""
with db_cursor() as cursor:
cursor.execute(query)
query_result = cursor.fetchall()
connection.close()
for item in query_result:
result[item[0]] = item[1]
else:
@ -287,9 +292,9 @@ def query_monthly_totals(pair=None):
SUM(amount) AS total_profit
FROM profits_table
GROUP BY pair, month;"""
with db_cursor() as cursor:
cursor.execute(query)
query_result = cursor.fetchall()
connection.close()
for item in query_result:
if item[0].replace("/","")==pair:
result[item[1]] = item[2]
@ -300,11 +305,9 @@ def last_n_deals(n):
'''
Returns a list of the latest n deals
'''
connection = sqlite3.connect(profits_database)
cursor = connection.cursor()
with db_cursor() as cursor:
cursor.execute("SELECT * FROM profits_table ORDER BY timestamp DESC LIMIT ?",(n,))
result = cursor.fetchall()
connection.close()
return result
@ -338,98 +341,34 @@ def last_n_lines(file_name,width,amount=4,full_log=False):
return result[:amount],len(file_contents)
def return_parkinson_backtests(broker, days, max_rank):
'''
Returns a dictionary containing backtests with the format {coin: value}
'''
if broker not in ["binance", "gateio", "kucoin", "okx", "bybit"]:
return {}
def tail_log(filename, lines=200):
if not os.path.exists(filename):
return []
evaluation_dictionary = {}
start_of_day = int(time.mktime(datetime.datetime.now().date().timetuple()))
since = int(start_of_day - 60*60*24*days)
block_size = 1024
blocks = []
with open(filename, 'rb') as f:
f.seek(0, 2)
#total_bytes = remaining_bytes = f.tell()
remaining_bytes = f.tell()
# Getting the data from the database
print("Querying database...")
conn = sqlite3.connect(f"data/{broker}.db")
cursor = conn.cursor()
cursor.execute('SELECT * FROM volatilities_table WHERE timestamp > ?', (since,))
rows = cursor.fetchall()
conn.close()
while len(blocks) < lines and remaining_bytes > 0:
read_bytes = min(block_size, remaining_bytes)
f.seek(-read_bytes, 1)
block = f.read(read_bytes).splitlines()
f.seek(-read_bytes, 1)
# Parse the data
print("Parsing the data...")
for row in rows:
if row[0] not in evaluation_dictionary:
evaluation_dictionary[row[0]] = [row[2]]
else:
evaluation_dictionary[row[0]].append(row[2])
# Prepend to blocks (since we're reading backwards)
blocks = block[-(len(blocks)+1):] + blocks
remaining_bytes -= read_bytes
#Calculate weighted averages
print("Calculating weighted averages")
weighted_averages = {}
for key in evaluation_dictionary:
multiplier = len(evaluation_dictionary[key])
total = 0
for value in evaluation_dictionary[key][::-1]:
total+=value*multiplier/len(evaluation_dictionary[key])
multiplier-=1
weighted_averages[key] = total/len(evaluation_dictionary[key])
#Filter by rank
print("Filtering results by CMC rank")
coins_accepted = []
market_caps = get_market_caps(max_rank)
for result in market_caps:
coins_accepted.append(result["symbol"])
for coin in weighted_averages.copy():
if coin.split("/")[0] not in coins_accepted:
del(weighted_averages[coin])
#Checking open markets
print("Filtering results by market state")
exchange_class = getattr(ccxt, broker)
broker = exchange_class({
"apiKey": "",
"secret": "",
"timeout": 30000,
"enableRateLimit": True,
'options': {
'newOrderRespType': 'FULL'}
})
markets = broker.load_markets()
for key in weighted_averages.copy():
if key not in markets or not markets[key]["active"]:
del(weighted_averages[key])
return weighted_averages
# Decode and filter empty lines
result = [line.decode('utf-8', errors='ignore').strip() for line in blocks if line.strip()]
return result[-lines:],len(result[-lines:])
stats_api = Flask(__name__)
@stats_api.route("/fetch_backtests")
def fetch_backtests():
'''
GET request
Parameters: 'exchange_name" -> string
'days' -> int
'max_rank' -> int
'''
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in valid_keys:
try:
broker = request.args.get("exchange_name")
days = int(request.args.get("days")) # type: ignore
max_rank = int(request.args.get("max_rank")) # type: ignore
return return_parkinson_backtests(broker,days,max_rank)
except Exception as e:
print(e)
return jsonify({"HORROR": f"{e}"})
return jsonify({'Error': 'API key invalid'}), 401
@stats_api.route("/fetch_profit_report")
def fetch_profit_report():
'''
@ -438,7 +377,7 @@ def fetch_profit_report():
Returns: JSON object with profit report data
'''
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in valid_keys:
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in get_valid_keys():
try:
return jsonify(profit_report())
except Exception as e:
@ -454,7 +393,7 @@ def clear_hashes():
'''
GET request
'''
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in valid_keys:
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in get_valid_keys():
hashes_db = {"fetch_last_n_deals":0,
"fetch_last_n_deals_without_history":0,
"fetch_full_log":0,
@ -476,17 +415,11 @@ def fetch_last_n_deals():
GET request
Parameter: 'amount_of_deals' -> int
'''
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in valid_keys:
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in get_valid_keys():
try:
parameter = request.args.get("amount_of_deals")
response_value = last_n_deals(parameter)
if not cache_requests:
return jsonify({"last_deals": response_value})
response_hash = hash(str({"last_deals": response_value}))
if hashes_db["fetch_last_n_deals"]!=response_hash:
hashes_db["fetch_last_n_deals"] = response_hash
return jsonify({"last_deals": response_value})
return jsonify({"no_changes": True})
except Exception as e:
print(e)
return jsonify({"last_deals":""})
@ -499,18 +432,12 @@ def fetch_last_n_deals_without_history():
GET request
Parameter: 'amount_of_deals' -> int
'''
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in valid_keys:
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in get_valid_keys():
try:
parameter = request.args.get("amount_of_deals")
#return jsonify({"last_deals": last_n_deals_without_history(parameter)})
response_value = last_n_deals_without_history(parameter)
if not cache_requests:
return jsonify({"last_deals": response_value})
response_hash = hash(str({"last_deals": response_value}))
if hashes_db["fetch_last_n_deals_without_history"]!=response_hash:
hashes_db["fetch_last_n_deals_without_history"] = response_hash
return jsonify({"last_deals": response_value})
return jsonify({"no_changes": True})
except Exception as e:
print(e)
return jsonify({"last_deals":""})
@ -525,18 +452,13 @@ def fetch_full_log():
It trims the full log to 200 lines, to avoid sending too much data to the client.
'''
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in valid_keys:
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in get_valid_keys():
try:
exchange_name = request.args.get("exchange_name")
width = 0
last_lines,amount_of_lines = last_n_lines(f"../logs/{exchange_name}.log",width,0,full_log=True)
if not cache_requests:
#last_lines,amount_of_lines = last_n_lines(f"../logs/{exchange_name}.log",width,0,full_log=True)
last_lines, amount_of_lines = tail_log(f"../logs/{exchange_name}.log", 200)
return jsonify({"line": last_lines[-200:], "amount_of_lines": amount_of_lines})
response_hash = hash(str({"line": last_lines, "amount_of_lines": amount_of_lines}))
if hashes_db["fetch_full_log"]!=response_hash:
hashes_db["fetch_full_log"] = response_hash
return jsonify({"line": last_lines[-200:], "amount_of_lines": amount_of_lines})
return jsonify({"no_changes": True})
except Exception as e:
print(e)
return {"line": [""]*width,"amount_of_lines": 0}
@ -551,19 +473,13 @@ def fetch_log():
'width' -> int
'amount' -> int
'''
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in valid_keys:
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in get_valid_keys():
try:
exchange_name = request.args.get("exchange_name")
width = int(request.args.get("width")) # type: ignore
amount = int(request.args.get("amount")) # type: ignore
last_lines,total_amount_of_lines = last_n_lines(f"../logs/{exchange_name}.log",width,amount)
if not cache_requests:
return jsonify({"line": last_lines, "amount_of_lines": total_amount_of_lines})
response_hash = hash(str({"line": last_lines, "amount_of_lines": total_amount_of_lines}))
if hashes_db["fetch_log"]!=response_hash:
hashes_db["fetch_log"] = response_hash
return jsonify({"line": last_lines, "amount_of_lines": total_amount_of_lines})
return jsonify({"no_changes": True})
except Exception as e:
print(e)
return {"line": [""]*10,"amount_of_lines": 0}
@ -572,7 +488,7 @@ def fetch_log():
@stats_api.route("/combined_totals")
def combined_totals():
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in valid_keys:
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in get_valid_keys():
daily_totals = daily_and_monthly_totals()
return jsonify({"combined": daily_totals})
return jsonify({'Error': 'API key invalid'}), 401
@ -580,15 +496,9 @@ def combined_totals():
@stats_api.route("/daily_totals")
def get_daily_totals():
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in valid_keys:
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in get_valid_keys():
daily_totals = query_daily_totals()
if not cache_requests:
return jsonify(daily_totals)
response_hash = hash(str(daily_totals))
if hashes_db["daily_totals"]!=response_hash:
hashes_db["daily_totals"] = response_hash
return jsonify(daily_totals)
return jsonify({"no_changes": True})
return jsonify({'Error': 'API key invalid'}), 401
@ -599,18 +509,12 @@ def get_daily_totals_by_pair():
Parameters: 'base' -> string
'quote' -> string
'''
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in valid_keys:
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in get_valid_keys():
try:
base = request.args.get("base")
quote = request.args.get("quote")
daily_totals = query_daily_totals(f"{base}{quote}")
if not cache_requests:
return jsonify(daily_totals)
response_hash = hash(str(daily_totals))
if hashes_db["daily_totals_by_pair"]!=response_hash:
hashes_db["daily_totals_by_pair"] = response_hash
return jsonify(daily_totals)
return jsonify({"no_changes": True})
except Exception as e:
print(e)
return jsonify({'Error': 'Halp'})
@ -619,15 +523,9 @@ def get_daily_totals_by_pair():
@stats_api.route("/monthly_totals")
def get_monthly_totals():
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in valid_keys:
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in get_valid_keys():
monthly_totals = query_monthly_totals()
if not cache_requests:
return jsonify(monthly_totals)
response_hash = hash(str(monthly_totals))
if hashes_db["monthly_totals"]!=response_hash:
hashes_db["monthly_totals"] = response_hash
return jsonify(monthly_totals)
return jsonify({"no_changes": True})
return jsonify({'Error': 'API key invalid'}), 401
@ -638,18 +536,12 @@ def get_monthly_totals_by_pair():
Parameters: 'base' -> string
'quote' -> string
'''
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in valid_keys:
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in get_valid_keys():
try:
base = request.args.get("base")
quote = request.args.get("quote")
monthly_totals = query_monthly_totals(f"{base}{quote}")
if not cache_requests:
return jsonify(monthly_totals)
response_hash = hash(str(monthly_totals))
if hashes_db["monthly_totals_by_pair"]!=response_hash:
hashes_db["monthly_totals_by_pair"] = response_hash
return jsonify(monthly_totals)
return jsonify({"no_changes": True})
except Exception as e:
print(e)
return jsonify({'Error': 'Halp'})
@ -658,29 +550,29 @@ def get_monthly_totals_by_pair():
@stats_api.route("/get_averages")
def get_averages():
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in valid_keys:
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in get_valid_keys():
try:
daily_totals = query_daily_totals()
val_30 = 0
val_7 = 0
acc_30 = []
acc_7 = []
for x in sorted(daily_totals):
acc_30.append(daily_totals[x])
acc_7.append(daily_totals[x])
#acc_30 = []
#acc_7 = []
#for x in sorted(daily_totals):
# acc_30.append(daily_totals[x])
# acc_7.append(daily_totals[x])
recent_days = sorted(daily_totals.keys(), reverse=True)[:30]
acc_30 = [daily_totals[date] for date in recent_days[:30]]
acc_7 = [daily_totals[date] for date in recent_days[:7]]
length_30 = min(30,len(acc_30)) #Last 30 days
length_7 = min(7,len(acc_7)) #Last 7 days
for _ in range(length_30):
val_30 += acc_30.pop()
for _ in range(length_7):
val_7 += acc_7.pop()
if not cache_requests:
return jsonify({"30_day": val_30/length_30, "7_day": val_7/length_7})
response_hash = hash(str({"30_day": val_30/length_30, "7_day": val_7/length_7}))
if hashes_db["get_averages"]!=response_hash:
hashes_db["get_averages"] = response_hash
return jsonify({"30_day": val_30/length_30, "7_day": val_7/length_7})
return jsonify({"no_changes": True})
except Exception as e:
print(e)
return jsonify({'Error': 'Halp'})
@ -689,15 +581,9 @@ def get_averages():
@stats_api.route("/total_profit")
def total_profit():
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in valid_keys:
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in get_valid_keys():
total = query_total_profit()
if not cache_requests:
return jsonify({"Total profit": total})
response_hash = hash(str({"Total profit": total}))
if hashes_db["total_profit"]!=response_hash:
hashes_db["total_profit"] = response_hash
return jsonify({"Total profit": total})
return jsonify({"no_changes": True})
return jsonify({'Error': 'API key invalid'}), 401
@ -708,18 +594,12 @@ def total_profit_by_pair():
Parameters: 'base' -> string
'quote' -> string
'''
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in valid_keys:
if "X-API-KEY" in request.headers and request.headers.get("X-API-KEY") in get_valid_keys():
try:
base = request.args.get("base")
quote = request.args.get("quote")
total = query_total_profit(f"{base}{quote}")
if not cache_requests:
return jsonify({"Total profit": total})
response_hash = hash(str({"Total profit": total}))
if hashes_db["total_profit_by_pair"]!=response_hash:
hashes_db["total_profit_by_pair"] = response_hash
return jsonify({"Total profit": total})
return jsonify({"no_changes": True})
except Exception as e:
print(e)
return jsonify({'Error': 'Halp'})
@ -729,7 +609,7 @@ def total_profit_by_pair():
if __name__=="__main__":
# Load valid keys from database
valid_keys = load_keys_from_db("api_credentials.db")
#valid_keys = load_keys_from_db("api_credentials.db")
#Waitress
logger = logging.getLogger('waitress')