import sqlite3 import datetime import time import calendar import logging import threading import os from collections import deque from typing import Iterable, List, Tuple from contextlib import contextmanager from flask import Flask, jsonify, request, Response from waitress import serve profits_database = "../profits/profits_database.db" _local_storage = threading.local() def get_db_connection(): current_time = time.time() if not hasattr(_local_storage, 'connection') or not hasattr(_local_storage, 'created_at') or (current_time - _local_storage.created_at) > 3600: # Reconnect every hour if hasattr(_local_storage, 'connection'): try: _local_storage.connection.close() except: pass _local_storage.connection = sqlite3.connect(profits_database, check_same_thread=False) _local_storage.connection.row_factory = sqlite3.Row _local_storage.created_at = current_time return _local_storage.connection @contextmanager def db_cursor(): conn = get_db_connection() cur = conn.cursor() try: yield cur except Exception: conn.rollback() raise def load_keys_from_db(file_name): connection = sqlite3.connect(file_name) cursor = connection.cursor() cursor.execute("SELECT * FROM credentials_table") data = cursor.fetchall() connection.close() valid_keys = [line[1] for line in data] return valid_keys def get_valid_keys(): if not hasattr(get_valid_keys, '_keys'): get_valid_keys._keys = load_keys_from_db("api_credentials.db") return get_valid_keys._keys def profit_report(): ##Queries #Last 60 days query with db_cursor() as cursor: cursor.execute("""SELECT strftime('%Y-%m-%d', timestamp, 'unixepoch', '-3 hours') AS day_utc3, SUM(amount) AS total_amount FROM profits_table WHERE strftime('%s', 'now') - timestamp <= 60 * 24 * 60 * 60 -- 60 days in seconds GROUP BY day_utc3 ORDER BY day_utc3;""") last_60_days_rows = cursor.fetchall() #Last 30 days query with db_cursor() as cursor: cursor.execute("""SELECT strftime('%Y-%m-%d', timestamp, 'unixepoch', '-3 hours') AS day_utc3, SUM(amount) AS total_amount FROM profits_table WHERE strftime('%s', 'now') - timestamp <= 30 * 24 * 60 * 60 -- 30 days in seconds;""") last_30_days = cursor.fetchall() #Last 7 days query with db_cursor() as cursor: cursor.execute("""SELECT strftime('%Y-%m-%d', timestamp, 'unixepoch', '-3 hours') AS day_utc3, SUM(amount) AS total_amount FROM profits_table WHERE strftime('%s', 'now') - timestamp <= 7 * 24 * 60 * 60 -- 7 days in seconds;""") last_7_days = cursor.fetchall() #Last n months query with db_cursor() as cursor: cursor.execute("""SELECT strftime('%Y-%m', timestamp, 'unixepoch', '-3 hours') AS year_month_utc3, SUM(amount) AS total_amount FROM profits_table WHERE strftime('%s', 'now') - timestamp <= 18 * 30 * 24 * 60 * 60 -- 18 months in seconds GROUP BY year_month_utc3 ORDER BY year_month_utc3;""") last_n_months_rows = cursor.fetchall() #Yearly totals # cursor.execute("""SELECT strftime('%Y', timestamp, 'unixepoch', '-3 hours') AS year_utc3, # SUM(amount) AS total_amount # FROM profits_table # WHERE strftime('%s', 'now') - timestamp <= 24 * 365 * 60 * 60 -- 365 days in seconds # GROUP BY year_utc3 # ORDER BY year_utc3;""") # yearly_totals = cursor.fetchall() #Per exchange with db_cursor() as cursor: cursor.execute("""SELECT exchange_name, CASE WHEN strftime('%Y-%m', timestamp, 'unixepoch', '-3 hours') = strftime('%Y-%m', 'now', 'localtime') THEN 'This Month' WHEN strftime('%Y-%m', timestamp, 'unixepoch', '-3 hours') = strftime('%Y-%m', 'now', 'localtime', '-1 month') THEN 'Last Month' ELSE 'Other Months' END AS month_group, SUM(amount) AS total_amount FROM profits_table WHERE strftime('%s', 'now') - timestamp <= 60 * 24 * 60 * 60 -- 60 days in seconds GROUP BY exchange_name, month_group ORDER BY exchange_name, month_group;""") per_exchange = cursor.fetchall() #Projection calculation days_in_month = calendar.monthrange(datetime.date.today().year, datetime.date.today().month)[1] daily_combined_media = (last_30_days[0][1]/30+last_7_days[0][1]/7)/2 current_amount = last_n_months_rows[-1][1] days_past_this_month = int(last_60_days_rows[-1][0][8:10]) #Per exchange binance_amount = 0 gateio_amount = 0 kucoin_amount = 0 okex_amount = 0 for row in per_exchange: exchange_name = row[0].strip().lower() if exchange_name=="binance": if row[1]=="This Month": binance_amount = row[2] elif exchange_name=="gateio": if row[1]=="This Month": gateio_amount = row[2] elif exchange_name=="kucoin": if row[1]=="This Month": kucoin_amount = row[2] elif exchange_name=="okex": if row[1]=="This Month": okex_amount = row[2] total_amount = binance_amount+gateio_amount+kucoin_amount+okex_amount last_60_days_result = {row[0]: round(row[1],2) for row in last_60_days_rows} last_18_months_result = {row[0]: round(row[1],2) for row in last_n_months_rows} last_30_days_average = last_30_days[0][1]/30 last_7_days_average = last_7_days[0][1]/7 this_month_projection = current_amount + daily_combined_media*(days_in_month-days_past_this_month) binance_percentage = binance_amount/total_amount*100 gateio_percentage = gateio_amount/total_amount*100 kucoin_percentage = kucoin_amount/total_amount*100 okex_percentage = okex_amount/total_amount*100 return {"Last 60 days": last_60_days_result, "Last 18 months": last_18_months_result, "Last 30 days average": last_30_days_average, "Last 7 days average": last_7_days_average, "This month projection": this_month_projection, "Binance": binance_amount, "Binance percentage": binance_percentage, "Gateio": gateio_amount, "Gateio percentage": gateio_percentage, "Kucoin": kucoin_amount, "Kucoin percentage": kucoin_percentage, "OKX": okex_amount, "OKX percentage": okex_percentage, "Total profit": total_amount} def query_total_profit(pair=None): ''' Returns total profit of the trading pair. If no pair specified, returns the grand total of all pairs. ''' if pair is None: query = "SELECT SUM(amount) AS total_profit FROM profits_table" with db_cursor() as cursor: cursor.execute(query) query_result = cursor.fetchall() return query_result[0][0] else: query = """SELECT pair, SUM(amount) AS total_profit FROM profits_table GROUP BY pair;""" with db_cursor() as cursor: cursor.execute(query) query_result = cursor.fetchall() for item in query_result: if item[0].replace("/","")==pair: return item[1] return 0 def daily_and_monthly_totals() -> tuple[float, float]: ''' Returns a tuple with the current day and the current month's total profit. ''' now = datetime.datetime.now() # Create a datetime object for the start of the day start_of_day = datetime.datetime(now.year, now.month, now.day) start_of_month = datetime.datetime(now.year, now.month, 1) # Convert the start of the day to Unix time start_of_day_unix = int(time.mktime(start_of_day.timetuple())) start_of_month_unix = int(time.mktime(start_of_month.timetuple())) query = """SELECT COALESCE(SUM(CASE WHEN timestamp >= :day THEN amount END),0) AS daily_total, COALESCE(SUM(CASE WHEN timestamp >= :month THEN amount END),0) AS monthly_total FROM profits_table; """ with db_cursor() as cur: cur.execute(query, {"day": start_of_day_unix, "month": start_of_month_unix}) row = cur.fetchone() daily_total = float(row["daily_total"]) monthly_total = float(row["monthly_total"]) return (daily_total, monthly_total) def query_daily_totals(pair=None): ''' Returns a dictionary of daily totals of the trading pair. If no pair specified, returns the totals of all pairs. ''' result = {} if pair is None: query = """SELECT strftime('%Y-%m-%d', timestamp, 'unixepoch', '-3 hours') AS day_utc3, SUM(amount) AS total_profit FROM profits_table GROUP BY day_utc3;""" with db_cursor() as cursor: cursor.execute(query) query_result = cursor.fetchall() for item in query_result: result[item[0]] = item[1] else: query = """SELECT pair, strftime('%Y-%m-%d', timestamp, 'unixepoch', '-3 hours') AS day_utc3, SUM(amount) AS total_profit FROM profits_table GROUP BY pair, day_utc3;""" with db_cursor() as cursor: cursor.execute(query) query_result = cursor.fetchall() for item in query_result: if item[0].replace("/","")==pair: result[item[1]] = item[2] return result def query_monthly_totals(pair=None): ''' Returns a dictionary of monthly totals of the trading pair. If no pair specified, returns the totals of all pairs. ''' result = {} if pair is None: query = """SELECT strftime('%Y-%m', datetime(timestamp, 'unixepoch', '-3 hours')) AS month, SUM(amount) AS total_profit FROM profits_table GROUP BY month;""" with db_cursor() as cursor: cursor.execute(query) query_result = cursor.fetchall() for item in query_result: result[item[0]] = item[1] else: query = """SELECT pair, strftime('%Y-%m', datetime(timestamp, 'unixepoch', '-3 hours')) AS month, SUM(amount) AS total_profit FROM profits_table GROUP BY pair, month;""" with db_cursor() as cursor: cursor.execute(query) query_result = cursor.fetchall() for item in query_result: if item[0].replace("/","")==pair: result[item[1]] = item[2] return result def last_n_deals(n): ''' Returns a list of the latest n deals ''' with db_cursor() as cursor: cursor.execute("SELECT * FROM profits_table ORDER BY timestamp DESC LIMIT ?",(n,)) result = cursor.fetchall() return result def last_n_deals_without_history(n): ''' Like last_n_deals, but without returning the order history. Useful in bandwidth-restricted scenarios. ''' return [(row[0],row[1],row[2],row[3],row[4],"") for row in last_n_deals(n)] def last_n_lines(file_name,width,amount=4,full_log=False): file_contents = [] result = [] with open(file_name) as f: file_contents = f.readlines() if full_log: for line in file_contents: result.append(line.strip()) return result,len(file_contents) for line in file_contents[::-1][:amount]: trimmed = line.strip() result.append(trimmed[:width]) if len(trimmed)>width: result.append(trimmed[width:width*2]) return result[:amount],len(file_contents) def tail_log(filename, lines=200): if not os.path.exists(filename): return [] block_size = 1024 blocks = [] with open(filename, 'rb') as f: f.seek(0, 2) #total_bytes = remaining_bytes = f.tell() remaining_bytes = f.tell() while len(blocks) < lines and remaining_bytes > 0: read_bytes = min(block_size, remaining_bytes) f.seek(-read_bytes, 1) block = f.read(read_bytes).splitlines() f.seek(-read_bytes, 1) # Prepend to blocks (since we're reading backwards) blocks = block[-(len(blocks)+1):] + blocks remaining_bytes -= read_bytes # Decode and filter empty lines result = [line.decode('utf-8', errors='ignore').strip() for line in blocks if line.strip()] return result[-lines:],len(result[-lines:]) stats_api = Flask(__name__) @stats_api.route("/fetch_profit_report") def fetch_profit_report(): ''' GET request Parameters: None Returns: JSON object with profit report data ''' if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys(): return jsonify({'Error': 'API key invalid'}), 401 try: return jsonify(profit_report()) except Exception as e: print(e) return jsonify({"Error": f"{e}"}) @stats_api.route("/fetch_last_n_deals") def fetch_last_n_deals(): ''' GET request Parameter: 'amount_of_deals' -> int ''' if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys(): return jsonify({'Error': 'API key invalid'}), 401 try: parameter = request.args.get("amount_of_deals") response_value = last_n_deals(parameter) return jsonify({"last_deals": response_value}) except Exception as e: print(e) return jsonify({"last_deals":""}) @stats_api.route("/fetch_last_n_deals_without_history") def fetch_last_n_deals_without_history(): ''' GET request Parameter: 'amount_of_deals' -> int ''' if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys(): return jsonify({'Error': 'API key invalid'}), 401 try: parameter = request.args.get("amount_of_deals") #return jsonify({"last_deals": last_n_deals_without_history(parameter)}) response_value = last_n_deals_without_history(parameter) return jsonify({"last_deals": response_value}) except Exception as e: print(e) return jsonify({"last_deals":""}) @stats_api.route("/fetch_full_log") def fetch_full_log(): ''' GET request Parameters: 'exchange_name" -> string It trims the full log to 200 lines, to avoid sending too much data to the client. ''' if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys(): return jsonify({'Error': 'API key invalid'}), 401 try: exchange_name = request.args.get("exchange_name") width = 0 last_lines, amount_of_lines = tail_log(f"../logs/{exchange_name}.log", 200) return jsonify({"line": last_lines[-200:], "amount_of_lines": amount_of_lines}) except Exception as e: print(e) return {"line": [""]*width,"amount_of_lines": 0} @stats_api.route("/fetch_log") def fetch_log(): ''' GET request Parameters: 'exchange_name" -> string 'width' -> int 'amount' -> int ''' if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys(): return jsonify({'Error': 'API key invalid'}), 401 try: exchange_name = request.args.get("exchange_name") width = int(request.args.get("width")) # type: ignore amount = int(request.args.get("amount")) # type: ignore last_lines,total_amount_of_lines = last_n_lines(f"../logs/{exchange_name}.log",width,amount) return jsonify({"line": last_lines, "amount_of_lines": total_amount_of_lines}) except Exception as e: print(e) return {"line": [""]*10,"amount_of_lines": 0} @stats_api.route("/combined_totals") def combined_totals(): if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys(): return jsonify({'Error': 'API key invalid'}), 401 daily_totals = daily_and_monthly_totals() return jsonify({"combined": daily_totals}) @stats_api.route("/daily_totals") def get_daily_totals(): if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys(): return jsonify({'Error': 'API key invalid'}), 401 daily_totals = query_daily_totals() return jsonify(daily_totals) @stats_api.route("/daily_totals_by_pair") def get_daily_totals_by_pair(): ''' GET request Parameters: 'base' -> string 'quote' -> string ''' if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys(): return jsonify({'Error': 'API key invalid'}), 401 try: base = request.args.get("base") quote = request.args.get("quote") daily_totals = query_daily_totals(f"{base}{quote}") return jsonify(daily_totals) except Exception as e: print(e) return jsonify({'Error': 'Halp'}) @stats_api.route("/monthly_totals") def get_monthly_totals(): if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys(): return jsonify({'Error': 'API key invalid'}), 401 monthly_totals = query_monthly_totals() return jsonify(monthly_totals) @stats_api.route("/monthly_totals_by_pair") def get_monthly_totals_by_pair(): ''' GET request Parameters: 'base' -> string 'quote' -> string ''' if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys(): return jsonify({'Error': 'API key invalid'}), 401 try: base = request.args.get("base") quote = request.args.get("quote") monthly_totals = query_monthly_totals(f"{base}{quote}") return jsonify(monthly_totals) except Exception as e: print(e) return jsonify({'Error': 'Halp'}) @stats_api.route("/get_averages") def get_averages(): if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys(): return jsonify({'Error': 'API key invalid'}), 401 try: daily_totals = query_daily_totals() val_30 = 0 val_7 = 0 recent_days = sorted(daily_totals.keys(), reverse=True)[:30] acc_30 = [daily_totals[date] for date in recent_days[:30]] acc_7 = [daily_totals[date] for date in recent_days[:7]] length_30 = min(30,len(acc_30)) #Last 30 days length_7 = min(7,len(acc_7)) #Last 7 days for _ in range(length_30): val_30 += acc_30.pop() for _ in range(length_7): val_7 += acc_7.pop() return jsonify({"30_day": val_30/length_30, "7_day": val_7/length_7}) except Exception as e: print(e) return jsonify({'Error': 'Halp'}) @stats_api.route("/total_profit") def total_profit(): if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys(): return jsonify({'Error': 'API key invalid'}), 401 total = query_total_profit() return jsonify({"Total profit": total}) @stats_api.route("/total_profit_by_pair") def total_profit_by_pair(): ''' GET request Parameters: 'base' -> string 'quote' -> string ''' if not "X-API-KEY" in request.headers or not request.headers.get("X-API-KEY") in get_valid_keys(): return jsonify({'Error': 'API key invalid'}), 401 try: base = request.args.get("base") quote = request.args.get("quote") total = query_total_profit(f"{base}{quote}") return jsonify({"Total profit": total}) except Exception as e: print(e) return jsonify({'Error': 'Halp'}) if __name__=="__main__": #Waitress logger = logging.getLogger('waitress') logger.setLevel(logging.INFO) serve(stats_api,host="0.0.0.0",port=5010) #Flask # app.run(host="0.0.0.0", port=5010, debug=True)