DCAv2/utils/statistics_server_v3.py

581 lines
21 KiB
Python

import sqlite3
import datetime
import time
import calendar
import logging
import threading
import os
from collections import deque
from typing import Iterable, List, Tuple
from contextlib import contextmanager
from flask import Flask, jsonify, request, Response
from waitress import serve
profits_database = "../profits/profits_database.db"
_local_storage = threading.local()
def get_db_connection():
current_time = time.time()
if not hasattr(_local_storage, 'connection') or not hasattr(_local_storage, 'created_at') or (current_time - _local_storage.created_at) > 3600: # Reconnect every hour
if hasattr(_local_storage, 'connection'):
try:
_local_storage.connection.close()
except:
pass
_local_storage.connection = sqlite3.connect(profits_database, check_same_thread=False)
_local_storage.connection.row_factory = sqlite3.Row
_local_storage.created_at = current_time
return _local_storage.connection
@contextmanager
def db_cursor():
conn = get_db_connection()
cur = conn.cursor()
try:
yield cur
except Exception:
conn.rollback()
raise
def load_keys_from_db(file_name):
connection = sqlite3.connect(file_name)
cursor = connection.cursor()
cursor.execute("SELECT * FROM credentials_table")
data = cursor.fetchall()
connection.close()
valid_keys = [line[1] for line in data]
return valid_keys
def get_valid_keys():
if not hasattr(get_valid_keys, '_keys'):
get_valid_keys._keys = load_keys_from_db("api_credentials.db")
return get_valid_keys._keys
def profit_report():
##Queries
#Last 60 days query
with db_cursor() as cursor:
cursor.execute("""SELECT strftime('%Y-%m-%d', timestamp, 'unixepoch', '-3 hours') AS day_utc3,
SUM(amount) AS total_amount
FROM profits_table
WHERE strftime('%s', 'now') - timestamp <= 60 * 24 * 60 * 60 -- 60 days in seconds
GROUP BY day_utc3
ORDER BY day_utc3;""")
last_60_days_rows = cursor.fetchall()
#Last 30 days query
with db_cursor() as cursor:
cursor.execute("""SELECT strftime('%Y-%m-%d', timestamp, 'unixepoch', '-3 hours') AS day_utc3,
SUM(amount) AS total_amount
FROM profits_table
WHERE strftime('%s', 'now') - timestamp <= 30 * 24 * 60 * 60 -- 30 days in seconds;""")
last_30_days = cursor.fetchall()
#Last 7 days query
with db_cursor() as cursor:
cursor.execute("""SELECT strftime('%Y-%m-%d', timestamp, 'unixepoch', '-3 hours') AS day_utc3,
SUM(amount) AS total_amount
FROM profits_table
WHERE strftime('%s', 'now') - timestamp <= 7 * 24 * 60 * 60 -- 7 days in seconds;""")
last_7_days = cursor.fetchall()
#Last n months query
with db_cursor() as cursor:
cursor.execute("""SELECT strftime('%Y-%m', timestamp, 'unixepoch', '-3 hours') AS year_month_utc3,
SUM(amount) AS total_amount
FROM profits_table
WHERE strftime('%s', 'now') - timestamp <= 18 * 30 * 24 * 60 * 60 -- 18 months in seconds
GROUP BY year_month_utc3
ORDER BY year_month_utc3;""")
last_n_months_rows = cursor.fetchall()
#Yearly totals
# cursor.execute("""SELECT strftime('%Y', timestamp, 'unixepoch', '-3 hours') AS year_utc3,
# SUM(amount) AS total_amount
# FROM profits_table
# WHERE strftime('%s', 'now') - timestamp <= 24 * 365 * 60 * 60 -- 365 days in seconds
# GROUP BY year_utc3
# ORDER BY year_utc3;""")
# yearly_totals = cursor.fetchall()
#Per exchange
with db_cursor() as cursor:
cursor.execute("""SELECT
exchange_name,
CASE
WHEN strftime('%Y-%m', timestamp, 'unixepoch', '-3 hours') = strftime('%Y-%m', 'now', 'localtime') THEN 'This Month'
WHEN strftime('%Y-%m', timestamp, 'unixepoch', '-3 hours') = strftime('%Y-%m', 'now', 'localtime', '-1 month') THEN 'Last Month'
ELSE 'Other Months'
END AS month_group,
SUM(amount) AS total_amount
FROM
profits_table
WHERE
strftime('%s', 'now') - timestamp <= 60 * 24 * 60 * 60 -- 60 days in seconds
GROUP BY
exchange_name, month_group
ORDER BY
exchange_name, month_group;""")
per_exchange = cursor.fetchall()
#Projection calculation
days_in_month = calendar.monthrange(datetime.date.today().year, datetime.date.today().month)[1]
daily_combined_media = (last_30_days[0][1]/30+last_7_days[0][1]/7)/2
current_amount = last_n_months_rows[-1][1]
days_past_this_month = int(last_60_days_rows[-1][0][8:10])
#Per exchange
binance_amount = 0
gateio_amount = 0
kucoin_amount = 0
okex_amount = 0
for row in per_exchange:
exchange_name = row[0].strip().lower()
if exchange_name=="binance":
if row[1]=="This Month":
binance_amount = row[2]
elif exchange_name=="gateio":
if row[1]=="This Month":
gateio_amount = row[2]
elif exchange_name=="kucoin":
if row[1]=="This Month":
kucoin_amount = row[2]
elif exchange_name=="okex":
if row[1]=="This Month":
okex_amount = row[2]
total_amount = binance_amount+gateio_amount+kucoin_amount+okex_amount
last_60_days_result = {row[0]: round(row[1],2) for row in last_60_days_rows}
last_18_months_result = {row[0]: round(row[1],2) for row in last_n_months_rows}
last_30_days_average = last_30_days[0][1]/30
last_7_days_average = last_7_days[0][1]/7
this_month_projection = current_amount + daily_combined_media*(days_in_month-days_past_this_month)
binance_percentage = binance_amount/total_amount*100
gateio_percentage = gateio_amount/total_amount*100
kucoin_percentage = kucoin_amount/total_amount*100
okex_percentage = okex_amount/total_amount*100
return {"Last 60 days": last_60_days_result,
"Last 18 months": last_18_months_result,
"Last 30 days average": last_30_days_average,
"Last 7 days average": last_7_days_average,
"This month projection": this_month_projection,
"Binance": binance_amount,
"Binance percentage": binance_percentage,
"Gateio": gateio_amount,
"Gateio percentage": gateio_percentage,
"Kucoin": kucoin_amount,
"Kucoin percentage": kucoin_percentage,
"OKX": okex_amount,
"OKX percentage": okex_percentage,
"Total profit": total_amount}
def query_total_profit(pair=None):
'''
Returns total profit of the trading pair.
If no pair specified, returns the grand total of all pairs.
'''
if pair is None:
query = "SELECT SUM(amount) AS total_profit FROM profits_table"
with db_cursor() as cursor:
cursor.execute(query)
query_result = cursor.fetchall()
return query_result[0][0]
else:
query = """SELECT pair, SUM(amount) AS total_profit
FROM profits_table
GROUP BY pair;"""
with db_cursor() as cursor:
cursor.execute(query)
query_result = cursor.fetchall()
for item in query_result:
if item[0].replace("/","")==pair:
return item[1]
return 0
def daily_and_monthly_totals() -> tuple[float, float]:
'''
Returns a tuple with the current day and the current month's total profit.
'''
now = datetime.datetime.now()
# Create a datetime object for the start of the day
start_of_day = datetime.datetime(now.year, now.month, now.day)
start_of_month = datetime.datetime(now.year, now.month, 1)
# Convert the start of the day to Unix time
start_of_day_unix = int(time.mktime(start_of_day.timetuple()))
start_of_month_unix = int(time.mktime(start_of_month.timetuple()))
query = """SELECT
COALESCE(SUM(CASE WHEN timestamp >= :day THEN amount END),0) AS daily_total,
COALESCE(SUM(CASE WHEN timestamp >= :month THEN amount END),0) AS monthly_total
FROM profits_table;
"""
with db_cursor() as cur:
cur.execute(query, {"day": start_of_day_unix, "month": start_of_month_unix})
row = cur.fetchone()
daily_total = float(row["daily_total"])
monthly_total = float(row["monthly_total"])
return (daily_total, monthly_total)
def query_daily_totals(pair=None):
'''
Returns a dictionary of daily totals of the trading pair.
If no pair specified, returns the totals of all pairs.
'''
result = {}
if pair is None:
query = """SELECT strftime('%Y-%m-%d', timestamp, 'unixepoch', '-3 hours') AS day_utc3,
SUM(amount) AS total_profit
FROM profits_table
GROUP BY day_utc3;"""
with db_cursor() as cursor:
cursor.execute(query)
query_result = cursor.fetchall()
for item in query_result:
result[item[0]] = item[1]
else:
query = """SELECT pair, strftime('%Y-%m-%d', timestamp, 'unixepoch', '-3 hours') AS day_utc3,
SUM(amount) AS total_profit
FROM profits_table
GROUP BY pair, day_utc3;"""
with db_cursor() as cursor:
cursor.execute(query)
query_result = cursor.fetchall()
for item in query_result:
if item[0].replace("/","")==pair:
result[item[1]] = item[2]
return result
def query_monthly_totals(pair=None):
'''
Returns a dictionary of monthly totals of the trading pair.
If no pair specified, returns the totals of all pairs.
'''
result = {}
if pair is None:
query = """SELECT strftime('%Y-%m', datetime(timestamp, 'unixepoch', '-3 hours')) AS month,
SUM(amount) AS total_profit
FROM profits_table
GROUP BY month;"""
with db_cursor() as cursor:
cursor.execute(query)
query_result = cursor.fetchall()
for item in query_result:
result[item[0]] = item[1]
else:
query = """SELECT pair, strftime('%Y-%m', datetime(timestamp, 'unixepoch', '-3 hours')) AS month,
SUM(amount) AS total_profit
FROM profits_table
GROUP BY pair, month;"""
with db_cursor() as cursor:
cursor.execute(query)
query_result = cursor.fetchall()
for item in query_result:
if item[0].replace("/","")==pair:
result[item[1]] = item[2]
return result
def last_n_deals(n):
'''
Returns a list of the latest n deals
'''
with db_cursor() as cursor:
cursor.execute("SELECT * FROM profits_table ORDER BY timestamp DESC LIMIT ?",(n,))
result = cursor.fetchall()
return result
def last_n_deals_without_history(n):
'''
Like last_n_deals, but without returning the order history. Useful in bandwidth-restricted scenarios.
'''
return [(row[0],row[1],row[2],row[3],row[4],"") for row in last_n_deals(n)]
def last_n_lines(file_name,width,amount=4,full_log=False):
file_contents = []
result = []
with open(file_name) as f:
file_contents = f.readlines()
if full_log:
for line in file_contents:
result.append(line.strip())
return result,len(file_contents)
for line in file_contents[::-1][:amount]:
trimmed = line.strip()
result.append(trimmed[:width])
if len(trimmed)>width:
result.append(trimmed[width:width*2])
return result[:amount],len(file_contents)
def tail_log(filename, lines=200):
if not os.path.exists(filename):
return []
block_size = 1024
blocks = []
with open(filename, 'rb') as f:
f.seek(0, 2)
#total_bytes = remaining_bytes = f.tell()
remaining_bytes = f.tell()
while len(blocks) < lines and remaining_bytes > 0:
read_bytes = min(block_size, remaining_bytes)
f.seek(-read_bytes, 1)
block = f.read(read_bytes).splitlines()
f.seek(-read_bytes, 1)
# Prepend to blocks (since we're reading backwards)
blocks = block[-(len(blocks)+1):] + blocks
remaining_bytes -= read_bytes
# Decode and filter empty lines
result = [line.decode('utf-8', errors='ignore').strip() for line in blocks if line.strip()]
return result[-lines:],len(result[-lines:])
stats_api = Flask(__name__)
@stats_api.route("/fetch_profit_report")
def fetch_profit_report():
'''
GET request
Parameters: None
Returns: JSON object with profit report data
'''
if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys():
return jsonify({'Error': 'API key invalid'}), 401
try:
return jsonify(profit_report())
except Exception as e:
print(e)
return jsonify({"Error": f"{e}"})
@stats_api.route("/fetch_last_n_deals")
def fetch_last_n_deals():
'''
GET request
Parameter: 'amount_of_deals' -> int
'''
if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys():
return jsonify({'Error': 'API key invalid'}), 401
try:
parameter = request.args.get("amount_of_deals")
response_value = last_n_deals(parameter)
return jsonify({"last_deals": response_value})
except Exception as e:
print(e)
return jsonify({"last_deals":""})
@stats_api.route("/fetch_last_n_deals_without_history")
def fetch_last_n_deals_without_history():
'''
GET request
Parameter: 'amount_of_deals' -> int
'''
if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys():
return jsonify({'Error': 'API key invalid'}), 401
try:
parameter = request.args.get("amount_of_deals")
#return jsonify({"last_deals": last_n_deals_without_history(parameter)})
response_value = last_n_deals_without_history(parameter)
return jsonify({"last_deals": response_value})
except Exception as e:
print(e)
return jsonify({"last_deals":""})
@stats_api.route("/fetch_full_log")
def fetch_full_log():
'''
GET request
Parameters: 'exchange_name" -> string
It trims the full log to 200 lines, to avoid sending too much data to the client.
'''
if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys():
return jsonify({'Error': 'API key invalid'}), 401
try:
exchange_name = request.args.get("exchange_name")
width = 0
last_lines, amount_of_lines = tail_log(f"../logs/{exchange_name}.log", 200)
return jsonify({"line": last_lines[-200:], "amount_of_lines": amount_of_lines})
except Exception as e:
print(e)
return {"line": [""]*width,"amount_of_lines": 0}
@stats_api.route("/fetch_log")
def fetch_log():
'''
GET request
Parameters: 'exchange_name" -> string
'width' -> int
'amount' -> int
'''
if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys():
return jsonify({'Error': 'API key invalid'}), 401
try:
exchange_name = request.args.get("exchange_name")
width = int(request.args.get("width")) # type: ignore
amount = int(request.args.get("amount")) # type: ignore
last_lines,total_amount_of_lines = last_n_lines(f"../logs/{exchange_name}.log",width,amount)
return jsonify({"line": last_lines, "amount_of_lines": total_amount_of_lines})
except Exception as e:
print(e)
return {"line": [""]*10,"amount_of_lines": 0}
@stats_api.route("/combined_totals")
def combined_totals():
if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys():
return jsonify({'Error': 'API key invalid'}), 401
daily_totals = daily_and_monthly_totals()
return jsonify({"combined": daily_totals})
@stats_api.route("/daily_totals")
def get_daily_totals():
if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys():
return jsonify({'Error': 'API key invalid'}), 401
daily_totals = query_daily_totals()
return jsonify(daily_totals)
@stats_api.route("/daily_totals_by_pair")
def get_daily_totals_by_pair():
'''
GET request
Parameters: 'base' -> string
'quote' -> string
'''
if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys():
return jsonify({'Error': 'API key invalid'}), 401
try:
base = request.args.get("base")
quote = request.args.get("quote")
daily_totals = query_daily_totals(f"{base}{quote}")
return jsonify(daily_totals)
except Exception as e:
print(e)
return jsonify({'Error': 'Halp'})
@stats_api.route("/monthly_totals")
def get_monthly_totals():
if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys():
return jsonify({'Error': 'API key invalid'}), 401
monthly_totals = query_monthly_totals()
return jsonify(monthly_totals)
@stats_api.route("/monthly_totals_by_pair")
def get_monthly_totals_by_pair():
'''
GET request
Parameters: 'base' -> string
'quote' -> string
'''
if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys():
return jsonify({'Error': 'API key invalid'}), 401
try:
base = request.args.get("base")
quote = request.args.get("quote")
monthly_totals = query_monthly_totals(f"{base}{quote}")
return jsonify(monthly_totals)
except Exception as e:
print(e)
return jsonify({'Error': 'Halp'})
@stats_api.route("/get_averages")
def get_averages():
if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys():
return jsonify({'Error': 'API key invalid'}), 401
try:
daily_totals = query_daily_totals()
val_30 = 0
val_7 = 0
recent_days = sorted(daily_totals.keys(), reverse=True)[:30]
acc_30 = [daily_totals[date] for date in recent_days[:30]]
acc_7 = [daily_totals[date] for date in recent_days[:7]]
length_30 = min(30,len(acc_30)) #Last 30 days
length_7 = min(7,len(acc_7)) #Last 7 days
for _ in range(length_30):
val_30 += acc_30.pop()
for _ in range(length_7):
val_7 += acc_7.pop()
return jsonify({"30_day": val_30/length_30, "7_day": val_7/length_7})
except Exception as e:
print(e)
return jsonify({'Error': 'Halp'})
@stats_api.route("/total_profit")
def total_profit():
if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys():
return jsonify({'Error': 'API key invalid'}), 401
total = query_total_profit()
return jsonify({"Total profit": total})
@stats_api.route("/total_profit_by_pair")
def total_profit_by_pair():
'''
GET request
Parameters: 'base' -> string
'quote' -> string
'''
if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys():
return jsonify({'Error': 'API key invalid'}), 401
try:
base = request.args.get("base")
quote = request.args.get("quote")
total = query_total_profit(f"{base}{quote}")
return jsonify({"Total profit": total})
except Exception as e:
print(e)
return jsonify({'Error': 'Halp'})
if __name__=="__main__":
#Waitress
logger = logging.getLogger('waitress')
logger.setLevel(logging.INFO)
serve(stats_api,host="0.0.0.0",port=5010)
#Flask
# app.run(host="0.0.0.0", port=5010, debug=True)