From 550ab3f3f6260f12ed0c38150e4d48bbc570a1f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20S=C3=A1nchez?= Date: Mon, 18 Aug 2025 09:37:42 -0300 Subject: [PATCH] 2025.08.18 --- changelog.txt | 3 + exchange_wrapper.py | 131 ++++++++++++++++++-------------------------- main.py | 9 ++- 3 files changed, 63 insertions(+), 80 deletions(-) diff --git a/changelog.txt b/changelog.txt index 13768ee..1a2d7a2 100755 --- a/changelog.txt +++ b/changelog.txt @@ -1,3 +1,6 @@ +2025.08.18: +. Database handling optimization. + 2025.08.17: . Minor refactorings. diff --git a/exchange_wrapper.py b/exchange_wrapper.py index 503fe48..41e5b37 100755 --- a/exchange_wrapper.py +++ b/exchange_wrapper.py @@ -1,6 +1,7 @@ import time import credentials import sqlite3 +from contextlib import contextmanager from requests import get as requests_get from json import load, dumps from copy import deepcopy @@ -28,20 +29,22 @@ class Broker: #Initialize database self.profits_database_filename = "profits/profits_database.db" - self.database_connection = sqlite3.connect(self.profits_database_filename) - self.database_cursor = self.database_connection.cursor() - self.database_cursor.execute(''' - CREATE TABLE IF NOT EXISTS profits_table ( - timestamp REAL PRIMARY KEY, - pair TEXT, - amount REAL, - exchange_name TEXT, - order_id TEXT, - order_history TEXT - ) - ''') - self.database_connection.commit() - self.database_connection.close() + + self._db = sqlite3.connect(self.profits_database_filename, + detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, + check_same_thread=False) + self._db.row_factory = sqlite3.Row + with self._db: + self._db.execute(''' + CREATE TABLE IF NOT EXISTS profits_table ( + timestamp REAL PRIMARY KEY, + pair TEXT, + amount REAL, + exchange_name TEXT, + order_id TEXT, + order_history TEXT + ) + ''') #Load markets self.markets = self.exchange.load_markets() @@ -51,16 +54,27 @@ class Broker: self.deals_list = self.preload_deals(amount_to_preload=self.deals_cache_length) + @contextmanager + def _cur(self): + ''' + Database cursor + ''' + cur = self._db.cursor() + try: + yield cur + finally: + cur.close() + + def preload_deals(self,amount_to_preload=10): ''' Reads the last n deals from the database and returns them in a list ''' - connection = sqlite3.connect(self.profits_database_filename) - cursor = connection.cursor() - cursor.execute(f"SELECT * FROM profits_table WHERE exchange_name = ? ORDER BY timestamp DESC LIMIT ?", (self.get_exchange_name(), amount_to_preload)) - result = cursor.fetchall() - connection.close() - + query = "SELECT * FROM profits_table WHERE exchange_name = ? ORDER BY timestamp DESC LIMIT ?" + with self._cur() as cur: + cur.execute(query, (self.get_exchange_name(), amount_to_preload)) + result = cur.fetchall() + return [(row[0],row[1],row[2],row[3],row[4],"") for row in result] @@ -121,21 +135,13 @@ class Broker: Returns the timestamps of the last trades from the database for the boosting algorithm ''' - retries = self.retries - while retries>0: - try: - database_connection = sqlite3.connect(self.profits_database_filename) - database_cursor = database_connection.cursor() - database_cursor.execute(f"SELECT * FROM profits_table WHERE timestamp >= {time.time()-timespan} ORDER BY timestamp") - rows = database_cursor.fetchall() - return [item[0] for item in rows if item[1]==pair] - except Exception as e: - self.logger.log_this(f"Exception in preload_timestamps: {e}") - if no_retries: - break - retries-=1 - time.sleep(self.wait_time) - return [] + limit = time.time()-timespan + query = "SELECT * FROM profits_table WHERE timestamp >= ? ORDER BY timestamp" + + with self._cur() as cur: + cur.execute(query,(limit,)) + rows = cur.fetchall() + return [item[0] for item in rows if item[1]==pair] def write_profit_to_cache(self,dataset): @@ -152,22 +158,11 @@ class Broker: ''' dataset format: (timestamp,pair,amount,exchange_name,order_id,order_history) ''' - retries = self.retries - while retries>0: - try: - database_connection = sqlite3.connect(self.profits_database_filename) - database_cursor = database_connection.cursor() - database_cursor.execute('INSERT INTO profits_table VALUES(?, ?, ?, ?, ?, ?)', dataset) - database_connection.commit() - database_connection.close() - except Exception as e: - self.logger.log_this(f"Exception in write_profit_to_db: {e}") - if no_retries: - break - retries-=1 - time.sleep(self.wait_time) - return 0 - return 1 + + query = "INSERT INTO profits_table VALUES(?, ?, ?, ?, ?, ?)" + with self._db: + self._db.execute(query, dataset) + return 0 def check_for_duplicate_profit_in_db(self,order,no_retries=False): @@ -176,24 +171,13 @@ class Broker: Compares the id of the last profit order with the one in the database. ''' - retries = self.retries - while retries>0: - try: - database_connection = sqlite3.connect(self.profits_database_filename) - database_cursor = database_connection.cursor() - database_cursor.execute(f"SELECT * FROM profits_table WHERE pair = '{order['symbol']}' ORDER BY timestamp DESC LIMIT 1;") - rows = database_cursor.fetchall() - database_connection.close() - if rows==[]: - return False - return order["id"]==rows[0][4] - except Exception as e: - self.logger.log_this(f"Exception in check_for_duplicate_profit_in_db: {e}",1) - if no_retries: - break - retries-=1 - time.sleep(self.wait_time) - return False + query = f"SELECT * FROM profits_table WHERE pair = ? ORDER BY timestamp DESC LIMIT 1;" + with self._cur() as cur: + cur.execute(query, (order['symbol'],)) + result = cur.fetchone() + if result is None: + return False + return order["id"]==result[4] def get_write_order_history(self): @@ -395,9 +379,7 @@ class Broker: a = self.exchange.fetch_last_prices(pair_list) return {x: a[x]["price"] for x in a.keys()} else: - #a = self.exchange.fetch_tickers(pair_list) a = self.exchange.fetch_tickers() - #return {x.upper(): a[x]["close"] for x in a.keys() if x.upper() in pair_list} if pair_list is None: return {x: a[x]["close"] for x in a.keys()} return {x: a[x]["close"] for x in a.keys() if x in pair_list} @@ -418,13 +400,10 @@ class Broker: :param no_retries: if True, will not retry if exception occurs :return: closing price of trading pair ''' - retries = self.retries while retries>0: try: - pair = symbol - a = self.exchange.fetch_ticker(pair) - self.last_price = a["close"] + self.last_price = self.exchange.fetch_ticker(symbol) return self.last_price except Exception as e: self.logger.log_this(f"Exception in get_ticker_price: {e}",1) @@ -553,7 +532,6 @@ class Broker: if pairs is None: pairs = [] try: - #id_list = [] if self.get_exchange_name()=="binance": return self.get_opened_orders_binance(pairs) return self.get_opened_orders() @@ -573,7 +551,6 @@ class Broker: if pairs is None: pairs = [] try: - #id_list = [] if self.get_exchange_name()=="binance": return self.get_closed_orders_binance(pairs) return self.get_closed_orders() diff --git a/main.py b/main.py index 815e192..5a99c9a 100644 --- a/main.py +++ b/main.py @@ -18,7 +18,7 @@ import exchange_wrapper import trader -version = "2025.08.17" +version = "2025.08.18" ''' Color definitions. If you want to change them, check the reference at https://en.wikipedia.org/wiki/ANSI_escape_code#Colors @@ -33,7 +33,10 @@ bright_green = "\033[0;92;40m" white = "\033[0;37;40m" #Threading definitions -MAX_WORKERS = 35 +worker_threads_overprovisioning = 3 #Number of worker threads to create over the number of traders. + #A value between 1 and 5 is recommended. + #Make it larger if you plan to add a lot of traders, + #Only use 0 if you are sure that you won't be adding any. executor = None def shutdown_handler(signum, _): @@ -272,7 +275,7 @@ def main_routine(): global reload_interval global screen_buffer - executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) + executor = ThreadPoolExecutor(max_workers=len(running_traders)+worker_threads_overprovisioning) is_testnet = "TESTNET " if broker.get_config()["is_sandbox"] else "" exchange_version_label = f"{bright_white}{broker.get_config()['exchange'].upper()} {is_testnet}{white}| DCAv2 {version} | CCXT v{ccxt.__version__}" separator_line = blue + "="*80 + white