"""Reddit data scraping using Selenium for page-based scraping with old.reddit.com.""" import time from typing import Optional, Dict, Any, List from selenium import webdriver from selenium.webdriver.firefox.options import Options from selenium.webdriver.firefox.service import Service from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from .cache import get_cache class RedditScraper: """Scrapes OLD Reddit pages (old.reddit.com) using a headless Firefox browser.""" def __init__(self): self.driver = None self._initialized = False def _ensure_browser(self): """Ensure Firefox is running in optimized headless mode.""" if not self._initialized: options = Options() # Core optimization flags options.add_argument('--headless') # Run without GUI (~10% faster startup) options.add_argument('--disable-gpu') # Disable GPU (not needed for headless) options.set_preference('permissions.default.image', 2) # Block images (~30-50% faster loads) # Use system geckodriver or snap version as fallback import os gecko_paths = [ '/snap/bin/geckodriver', '/usr/local/bin/geckodriver', '/usr/bin/geckodriver' ] service_path = None for path in gecko_paths: if os.path.exists(path): service_path = path break if not service_path: raise RuntimeError("geckodriver not found") service = Service(executable_path=service_path) self.driver = webdriver.Firefox(service=service, options=options) # Anti-detection scripts self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', { value: false })") self._initialized = True def _wait_for_content(self, timeout: int = 15): """Wait for Reddit content to load using smart waits instead of fixed sleep.""" try: # Wait for post containers or subreddit container WebDriverWait(self.driver, timeout).until( EC.presence_of_element_located((By.CSS_SELECTOR, '.thing, .subreddit')) ) # Additional wait for dynamic content (load more button) time.sleep(1) # Small delay to allow lazy-loaded content except Exception: # Fall back to fixed wait if selectors don't appear time.sleep(3) def _ensure_helpers_injected(self): """Ensure JS helpers are injected into current page context.""" # Check if helpers already exist on this page has_helpers = self.driver.execute_script("return !!window.RSScraperHelpers;") if not has_helpers: self._inject_helpers() def _inject_helpers(self): """Inject JavaScript helper functions into the page context.""" helpers = ''' window.RSScraperHelpers = { // Get all posts with their metadata in a single efficient call getPosts: function(maxCount) { const posts = []; const postElements = document.querySelectorAll('.thing'); for (const el of Array.from(postElements)) { if (posts.length >= maxCount) break; const titleLink = el.querySelector('a.title.may-blank'); if (!titleLink) continue; const title = titleLink.textContent.trim(); if (title.length < 3 || title.includes('Microsoft') || title.startsWith('r/')) continue; let score = 0; try { const scoreEl = el.querySelector('.score.unvoted'); if (scoreEl) { score = parseInt(scoreEl.textContent.replace(/[^0-9]/g, '')) || 0; } } catch(e) {} let author = null; try { const authorLink = el.querySelector('.midcol .author, .author'); if (authorLink && authorLink.textContent.trim()) { author = authorLink.textContent.trim(); } } catch(e) {} let url = titleLink.href || ''; try { const cleanUrl = new URL(url); ['utm_source', 'utm_medium', 'utm_campaign'].forEach(param => { cleanUrl.searchParams.delete(param); }); url = cleanUrl.toString(); } catch(e) {} posts.push({ title: title, author: author, score: score, url: url }); } return posts; }, // Expand all "more comments" expanders recursively before scraping expandAllComments: function() { let expanded = true; let iterations = 0; const maxIterations = 5; while (expanded && iterations < maxIterations) { expanded = false; iterations++; // Find all "more comments" buttons/links const moreLinks = document.querySelectorAll('a.morecomments, .more'); for (const link of moreLinks) { if (link.style.display !== 'none' && !link.classList.contains('done')) { try { link.click(); expanded = true; } catch(e) {} } } // Small delay for dynamic content to load if (expanded) { this.sleep(300); } } return iterations; }, // Get comments with nested replies in a single call getComments: function(maxDepth, maxCount) { const comments = []; const processedIds = new Set(); // Find all comment containers const commentContainers = document.querySelectorAll('.comment, .thing.comment'); for (const container of Array.from(commentContainers)) { if (comments.length >= maxCount) break; const id = container.getAttribute('data-comment-id') || container.querySelector('[id$="-container"]')?.id || Math.random().toString(36).substr(2, 9); if (processedIds.has(id)) continue; processedIds.add(id); const authorEl = container.querySelector('a.author, .author'); const bodyEl = container.querySelector('.usertext-body, .md, div.comment-body'); const scoreEl = container.querySelector('.score'); const createdEl = container.querySelector('.timestamp a'); if (!bodyEl || !bodyEl.textContent.trim()) continue; const text = bodyEl.textContent.trim(); const lowerText = text.toLowerCase(); // Skip UI elements and short content if (text.length < 10 || text.includes('open menu') || text.includes('reddit home') || text.includes('permalink')) continue; comments.push({ author: authorEl?.textContent.trim() || 'unknown', body: text, score: scoreEl ? parseInt(scoreEl.textContent.replace(/[^0-9]/g, '')) || 0 : null, created_utc: createdEl?.getAttribute('title') || null, depth: this._getNestedDepth(container), replies: [] }); } // Sort by depth to handle nesting comments.sort((a, b) => a.depth - b.depth); // Build nested structure up to maxDepth const buildHierarchy = (comments, maxD) => { return comments.filter(c => c.depth <= maxD).map(c => ({ author: c.author, body: c.body, score: c.score, created_utc: c.created_utc, replies: buildHierarchy( comments.filter(r => r.depth === c.depth + 1 && c.replies.length < 5 // Limit replies per comment ), maxD - (c.depth + 1) > 0 ? maxD - (c.depth + 1) : 0 ) })); }; return buildHierarchy(comments, maxDepth); }, // Helper to get nesting depth of a comment element _getNestedDepth: function(element) { let depth = 0; while (element && element.parentElement) { if (element.classList.contains('child')) depth++; else if (element.classList.contains('comment')) break; element = element.parentElement; } return Math.min(depth, 10); // Cap at 10 levels }, sleep: function(ms) { const start = Date.now(); while (Date.now() - start < ms) {} } }; ''' self.driver.execute_script(helpers) def scrape_subreddit_top( self, subreddit: str, limit: int = 10, time_range: str = "week", depth: int = 3, include_comments: bool = True ) -> Dict[str, Any]: """ Scrape top posts from a subreddit using Selenium on old.reddit.com. Args: subreddit: Name of the subreddit (without 'r/') limit: Number of top posts to retrieve time_range: Time filter ('hour', 'day', 'week', 'month', 'year', 'all') depth: Maximum comment nesting depth include_comments: Whether to scrape comments Returns: Dict containing scraped data or error information """ # Check cache first (skip if comments requested, as they change frequently) if not include_comments: cached_result = get_cache().get( subreddit=subreddit, limit=limit, time_range=time_range, depth=depth ) if cached_result is not None: return cached_result self._ensure_browser() try: # Navigate to OLD Reddit for cleaner DOM structure url = f"https://old.reddit.com/r/{subreddit}/top/?t={time_range}" self.driver.get(url) # Use smart wait instead of fixed sleep (adapts to actual page load speed) self._wait_for_content(timeout=15) # Ensure helper functions are available before scraping self._ensure_helpers_injected() # Extract post data using pre-injected helper function (executed once per page load) posts_data = self.driver.execute_script('return window.RSScraperHelpers.getPosts(arguments[0]);', limit) # Build result structure posts = [] for post in posts_data: post_obj = { "title": post['title'], "author": post['author'] or None, "score": post['score'], "created_utc": None, "url": post['url'].replace('old.reddit.com', 'www.reddit.com'), "permalink": post['url'].replace('https://old.reddit.com', ''), "comments": [] if not include_comments else self._scrape_post_comments(post['url'], depth) } posts.append(post_obj) # Build final result structure result = { "subreddit": subreddit, "time_range": time_range, "limit": len(posts), "posts_count": len(posts), "data": posts } # Cache the result (only if no comments requested) if not include_comments: get_cache().set(result, subreddit=subreddit, limit=limit, time_range=time_range, depth=depth) return result except Exception as e: import traceback print(f"Error during scraping: {e}") print(traceback.format_exc()) return {"Error": f"Unexpected error during scraping: {str(e)}"} def _scrape_post_comments(self, post_url: str, max_depth: int = 3) -> List[Dict[str, Any]]: """Scrape comments from a specific post using pre-injected helpers.""" try: self.driver.get(post_url) # Use smart wait instead of fixed sleep self._wait_for_content(timeout=10) self._ensure_helpers_injected() # Expand all "more comments" links before scraping (batched operation) expanded_iterations = self.driver.execute_script('return window.RSScraperHelpers.expandAllComments();') # Additional wait for dynamically loaded content if expanded_iterations > 0: time.sleep(1.5) # Extract all comments with nested structure in single call (batched) raw_comments = self.driver.execute_script( 'return window.RSScraperHelpers.getComments(arguments[0], arguments[1]);', max_depth, 20 # max_depth, max_count ) return raw_comments[:15] # Limit to ~15 comments total except Exception as e: print(f"Comment scraping error: {e}") import traceback traceback.print_exc() return [] def _clean_text(self, text: str) -> str: """Clean and normalize text content.""" if not text: return "" lines = text.split("\n") cleaned = [line.strip() for line in lines if line.strip()] return " ".join(cleaned) def close(self): """Close browser resources.""" if self._initialized and self.driver: try: self.driver.quit() except: pass self._initialized = False # Singleton pattern for scraper instance _scraper_instance = None def get_scraper(): """Get singleton scraper instance.""" global _scraper_instance if _scraper_instance is None: _scraper_instance = RedditScraper() return _scraper_instance