"""Reddit data scraping using Selenium for page-based scraping with old.reddit.com.""" import time from typing import Optional, Dict, Any, List from selenium import webdriver from selenium.webdriver.firefox.options import Options from selenium.webdriver.firefox.service import Service class RedditScraper: """Scrapes OLD Reddit pages (old.reddit.com) using a headless Firefox browser.""" def __init__(self): self.driver = None self._initialized = False def _ensure_browser(self): """Ensure Firefox is running in headless mode.""" if not self._initialized: options = Options() options.add_argument('--headless') options.set_preference('dom.webdriver.enabled', False) # Custom user agent to appear more human-like options.set_preference('general.useragent.override', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36') # Use geckodriver from snap service = Service(executable_path='/snap/bin/geckodriver') self.driver = webdriver.Firefox(service=service, options=options) # Anti-detection scripts self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', { value: false })") self._initialized = True def _ensure_helpers_injected(self): """Ensure JS helpers are injected into current page context.""" # Check if helpers already exist on this page has_helpers = self.driver.execute_script("return !!window.RSScraperHelpers;") if not has_helpers: self._inject_helpers() def _inject_helpers(self): """Inject JavaScript helper functions into the page context.""" helpers = ''' window.RSScraperHelpers = { // Get all posts with their metadata in a single efficient call getPosts: function(maxCount) { const posts = []; const postElements = document.querySelectorAll('.thing'); for (const el of Array.from(postElements)) { if (posts.length >= maxCount) break; const titleLink = el.querySelector('a.title.may-blank'); if (!titleLink) continue; const title = titleLink.textContent.trim(); if (title.length < 3 || title.includes('Microsoft') || title.startsWith('r/')) continue; let score = 0; try { const scoreEl = el.querySelector('.score.unvoted'); if (scoreEl) { score = parseInt(scoreEl.textContent.replace(/[^0-9]/g, '')) || 0; } } catch(e) {} let author = null; try { const authorLink = el.querySelector('.midcol .author, .author'); if (authorLink && authorLink.textContent.trim()) { author = authorLink.textContent.trim(); } } catch(e) {} let url = titleLink.href || ''; try { const cleanUrl = new URL(url); ['utm_source', 'utm_medium', 'utm_campaign'].forEach(param => { cleanUrl.searchParams.delete(param); }); url = cleanUrl.toString(); } catch(e) {} posts.push({ title: title, author: author, score: score, url: url }); } return posts; }, // Expand all "more comments" expanders recursively before scraping expandAllComments: function() { let expanded = true; let iterations = 0; const maxIterations = 5; while (expanded && iterations < maxIterations) { expanded = false; iterations++; // Find all "more comments" buttons/links const moreLinks = document.querySelectorAll('a.morecomments, .more'); for (const link of moreLinks) { if (link.style.display !== 'none' && !link.classList.contains('done')) { try { link.click(); expanded = true; } catch(e) {} } } // Small delay for dynamic content to load if (expanded) { this.sleep(300); } } return iterations; }, // Get comments with nested replies in a single call getComments: function(maxDepth, maxCount) { const comments = []; const processedIds = new Set(); // Find all comment containers const commentContainers = document.querySelectorAll('.comment, .thing.comment'); for (const container of Array.from(commentContainers)) { if (comments.length >= maxCount) break; const id = container.getAttribute('data-comment-id') || container.querySelector('[id$="-container"]')?.id || Math.random().toString(36).substr(2, 9); if (processedIds.has(id)) continue; processedIds.add(id); const authorEl = container.querySelector('a.author, .author'); const bodyEl = container.querySelector('.usertext-body, .md, div.comment-body'); const scoreEl = container.querySelector('.score'); const createdEl = container.querySelector('.timestamp a'); if (!bodyEl || !bodyEl.textContent.trim()) continue; const text = bodyEl.textContent.trim(); const lowerText = text.toLowerCase(); // Skip UI elements and short content if (text.length < 10 || text.includes('open menu') || text.includes('reddit home') || text.includes('permalink')) continue; comments.push({ author: authorEl?.textContent.trim() || 'unknown', body: text, score: scoreEl ? parseInt(scoreEl.textContent.replace(/[^0-9]/g, '')) || 0 : null, created_utc: createdEl?.getAttribute('title') || null, depth: this._getNestedDepth(container), replies: [] }); } // Sort by depth to handle nesting comments.sort((a, b) => a.depth - b.depth); // Build nested structure up to maxDepth const buildHierarchy = (comments, maxD) => { return comments.filter(c => c.depth <= maxD).map(c => ({ author: c.author, body: c.body, score: c.score, created_utc: c.created_utc, replies: buildHierarchy( comments.filter(r => r.depth === c.depth + 1 && c.replies.length < 5 // Limit replies per comment ), maxD - (c.depth + 1) > 0 ? maxD - (c.depth + 1) : 0 ) })); }; return buildHierarchy(comments, maxDepth); }, // Helper to get nesting depth of a comment element _getNestedDepth: function(element) { let depth = 0; while (element && element.parentElement) { if (element.classList.contains('child')) depth++; else if (element.classList.contains('comment')) break; element = element.parentElement; } return Math.min(depth, 10); // Cap at 10 levels }, sleep: function(ms) { const start = Date.now(); while (Date.now() - start < ms) {} } }; ''' self.driver.execute_script(helpers) def scrape_subreddit_top( self, subreddit: str, limit: int = 10, time_range: str = "week", depth: int = 3, include_comments: bool = True ) -> Dict[str, Any]: """ Scrape top posts from a subreddit using Selenium on old.reddit.com. Args: subreddit: Name of the subreddit (without 'r/') limit: Number of top posts to retrieve time_range: Time filter ('hour', 'day', 'week', 'month', 'year', 'all') depth: Maximum comment nesting depth include_comments: Whether to scrape comments Returns: Dict containing scraped data or error information """ self._ensure_browser() try: # Navigate to OLD Reddit for cleaner DOM structure url = f"https://old.reddit.com/r/{subreddit}/top/?t={time_range}" self.driver.get(url) # Wait for content to load and ensure helpers are available time.sleep(4) self._ensure_helpers_injected() # Extract post data using pre-injected helper function (executed once per page load) posts_data = self.driver.execute_script('return window.RSScraperHelpers.getPosts(arguments[0]);', limit) # Build result structure posts = [] for post in posts_data: post_obj = { "title": post['title'], "author": post['author'] or None, "score": post['score'], "created_utc": None, "url": post['url'].replace('old.reddit.com', 'www.reddit.com'), "permalink": post['url'].replace('https://old.reddit.com', ''), "comments": [] if not include_comments else self._scrape_post_comments(post['url'], depth) } posts.append(post_obj) return { "subreddit": subreddit, "time_range": time_range, "limit": len(posts), "posts_count": len(posts), "data": posts } except Exception as e: import traceback print(f"Error during scraping: {e}") print(traceback.format_exc()) return {"Error": f"Unexpected error during scraping: {str(e)}"} def _scrape_post_comments(self, post_url: str, max_depth: int = 3) -> List[Dict[str, Any]]: """Scrape comments from a specific post using pre-injected helpers.""" try: self.driver.get(post_url) # Wait for initial load and ensure helpers are available time.sleep(2) self._ensure_helpers_injected() # Expand all "more comments" links before scraping (batched operation) expanded_iterations = self.driver.execute_script('return window.RSScraperHelpers.expandAllComments();') # Additional wait for dynamically loaded content if expanded_iterations > 0: time.sleep(1.5) # Extract all comments with nested structure in single call (batched) raw_comments = self.driver.execute_script( 'return window.RSScraperHelpers.getComments(arguments[0], arguments[1]);', max_depth, 20 # max_depth, max_count ) return raw_comments[:15] # Limit to ~15 comments total except Exception as e: print(f"Comment scraping error: {e}") import traceback traceback.print_exc() return [] def _clean_text(self, text: str) -> str: """Clean and normalize text content.""" if not text: return "" lines = text.split("\n") cleaned = [line.strip() for line in lines if line.strip()] return " ".join(cleaned) def close(self): """Close browser resources.""" if self._initialized and self.driver: try: self.driver.quit() except: pass self._initialized = False # Singleton pattern for scraper instance _scraper_instance = None def get_scraper(): """Get singleton scraper instance.""" global _scraper_instance if _scraper_instance is None: _scraper_instance = RedditScraper() return _scraper_instance