diff --git a/changelog.txt b/changelog.txt index 7dc5338..3dfd89a 100755 --- a/changelog.txt +++ b/changelog.txt @@ -1,3 +1,6 @@ +2025.08.16: +. Improved threading. + 2025.08.15: . "deal order history" is now disabled by default. . CPU optimizations in status string generation. diff --git a/main.py b/main.py index a1418ed..56af142 100644 --- a/main.py +++ b/main.py @@ -1,11 +1,13 @@ import time import logging +import signal from sys import argv from os import _exit as os_exit from json import load from datetime import date from threading import Thread from waitress import serve +from concurrent.futures import ThreadPoolExecutor, as_completed import sqlite3 import ccxt @@ -16,7 +18,7 @@ import exchange_wrapper import trader -version = "2025.08.15" +version = "2025.08.16" ''' Color definitions. If you want to change them, check the reference at https://en.wikipedia.org/wiki/ANSI_escape_code#Colors @@ -30,6 +32,19 @@ bright_white = "\033[0;97;40m" bright_green = "\033[0;92;40m" white = "\033[0;37;40m" +#Threading definitions +MAX_WORKERS = 35 +executor = None + +def shutdown_handler(signum, frame): + broker.logger.log_this(f"Received signal {signum}, shutting down as gracefully as possible...", 0) + if executor: + executor.shutdown(wait=True, timeout=5) + os_exit(0) + +# Register signals for shutdown handler +signal.signal(signal.SIGINT, shutdown_handler) +signal.signal(signal.SIGTERM, shutdown_handler) def seconds_to_time(total_seconds: float) -> str: ''' @@ -256,6 +271,8 @@ def main_loop(): global reload_interval global screen_buffer + executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) + while True: #Restart traders that have the restart flag raised and remove traders that have the quit flag raised for instance in running_traders: @@ -284,14 +301,23 @@ def main_loop(): instances_to_add.clear() #Prepares the trader threads + futures = [] open_orders = broker.fetch_open_orders(tickers) pairs_to_fetch = [] online_pairs = [] for instance in running_traders: - threads.append(Thread(target=instance.check_status,args=(open_orders,))) + #threads.append(Thread(target=instance.check_status,args=(open_orders,))) + future = executor.submit(instance.check_status,open_orders) + futures.append(future) online_pairs.append(f"{instance.base}{instance.quote}") pairs_to_fetch.append(instance.config.get_pair()) + for future in as_completed(futures): + try: + future.result() + except Exception as e: + broker.logger.log_this(f"Error in thread - {e}") + #Here, append the dusters' pairs to pairs_to_fetch, if missing. # # @@ -300,19 +326,19 @@ def main_loop(): # #Start the trader threads - for t in threads: - try: - t.start() - except Exception as e: - broker.logger.log_this(f"Error starting thread - {e}") + # for t in threads: + # try: + # t.start() + # except Exception as e: + # broker.logger.log_this(f"Error starting thread - {e}") - #Wait for the trader threads to complete - for t in threads: - try: - t.join() - except Exception as e: - broker.logger.log_this(f"Error joining thread: {e}") - threads.clear() + # #Wait for the trader threads to complete + # for t in threads: + # try: + # t.join() + # except Exception as e: + # broker.logger.log_this(f"Error joining thread: {e}") + # threads.clear() #Fetch prices price_list = broker.get_prices(pairs_to_fetch) @@ -2489,10 +2515,14 @@ if __name__=="__main__": waitress_logger = logging.getLogger('waitress') waitress_logger.setLevel(logging.ERROR) - #Threads to run: main loop and flask api - main_threads = [Thread(target=main_loop,args=()),Thread(target=run_API,args=())] - - #Iterate indefinitely: - for m in main_threads: - m.start() + #Threads to run: flask api + api_thread = Thread(target=run_API,args=(), daemon=True) + api_thread.start() + + try: + while True: + main_loop() + except KeyboardInterrupt: + api_thread.join() + shutdown_handler(signal.SIGINT, None)