reddit_scraper/scraper/selenium_scrapers.py

354 lines
14 KiB
Python

"""Reddit data scraping using Selenium for page-based scraping with old.reddit.com."""
import time
from typing import Optional, Dict, Any, List
from selenium import webdriver
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.firefox.service import Service
from .cache import get_cache
class RedditScraper:
"""Scrapes OLD Reddit pages (old.reddit.com) using a headless Firefox browser."""
def __init__(self):
self.driver = None
self._initialized = False
def _ensure_browser(self):
"""Ensure Firefox is running in headless mode."""
if not self._initialized:
options = Options()
options.add_argument('--headless')
options.set_preference('dom.webdriver.enabled', False)
# Custom user agent to appear more human-like
options.set_preference('general.useragent.override',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36')
# Use geckodriver from snap
service = Service(executable_path='/snap/bin/geckodriver')
self.driver = webdriver.Firefox(service=service, options=options)
# Anti-detection scripts
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', { value: false })")
self._initialized = True
def _ensure_helpers_injected(self):
"""Ensure JS helpers are injected into current page context."""
# Check if helpers already exist on this page
has_helpers = self.driver.execute_script("return !!window.RSScraperHelpers;")
if not has_helpers:
self._inject_helpers()
def _inject_helpers(self):
"""Inject JavaScript helper functions into the page context."""
helpers = '''
window.RSScraperHelpers = {
// Get all posts with their metadata in a single efficient call
getPosts: function(maxCount) {
const posts = [];
const postElements = document.querySelectorAll('.thing');
for (const el of Array.from(postElements)) {
if (posts.length >= maxCount) break;
const titleLink = el.querySelector('a.title.may-blank');
if (!titleLink) continue;
const title = titleLink.textContent.trim();
if (title.length < 3 ||
title.includes('Microsoft') ||
title.startsWith('r/')) continue;
let score = 0;
try {
const scoreEl = el.querySelector('.score.unvoted');
if (scoreEl) {
score = parseInt(scoreEl.textContent.replace(/[^0-9]/g, '')) || 0;
}
} catch(e) {}
let author = null;
try {
const authorLink = el.querySelector('.midcol .author, .author');
if (authorLink && authorLink.textContent.trim()) {
author = authorLink.textContent.trim();
}
} catch(e) {}
let url = titleLink.href || '';
try {
const cleanUrl = new URL(url);
['utm_source', 'utm_medium', 'utm_campaign'].forEach(param => {
cleanUrl.searchParams.delete(param);
});
url = cleanUrl.toString();
} catch(e) {}
posts.push({
title: title,
author: author,
score: score,
url: url
});
}
return posts;
},
// Expand all "more comments" expanders recursively before scraping
expandAllComments: function() {
let expanded = true;
let iterations = 0;
const maxIterations = 5;
while (expanded && iterations < maxIterations) {
expanded = false;
iterations++;
// Find all "more comments" buttons/links
const moreLinks = document.querySelectorAll('a.morecomments, .more');
for (const link of moreLinks) {
if (link.style.display !== 'none' && !link.classList.contains('done')) {
try {
link.click();
expanded = true;
} catch(e) {}
}
}
// Small delay for dynamic content to load
if (expanded) {
this.sleep(300);
}
}
return iterations;
},
// Get comments with nested replies in a single call
getComments: function(maxDepth, maxCount) {
const comments = [];
const processedIds = new Set();
// Find all comment containers
const commentContainers = document.querySelectorAll('.comment, .thing.comment');
for (const container of Array.from(commentContainers)) {
if (comments.length >= maxCount) break;
const id = container.getAttribute('data-comment-id') ||
container.querySelector('[id$="-container"]')?.id ||
Math.random().toString(36).substr(2, 9);
if (processedIds.has(id)) continue;
processedIds.add(id);
const authorEl = container.querySelector('a.author, .author');
const bodyEl = container.querySelector('.usertext-body, .md, div.comment-body');
const scoreEl = container.querySelector('.score');
const createdEl = container.querySelector('.timestamp a');
if (!bodyEl || !bodyEl.textContent.trim()) continue;
const text = bodyEl.textContent.trim();
const lowerText = text.toLowerCase();
// Skip UI elements and short content
if (text.length < 10 ||
text.includes('open menu') ||
text.includes('reddit home') ||
text.includes('permalink')) continue;
comments.push({
author: authorEl?.textContent.trim() || 'unknown',
body: text,
score: scoreEl ? parseInt(scoreEl.textContent.replace(/[^0-9]/g, '')) || 0 : null,
created_utc: createdEl?.getAttribute('title') || null,
depth: this._getNestedDepth(container),
replies: []
});
}
// Sort by depth to handle nesting
comments.sort((a, b) => a.depth - b.depth);
// Build nested structure up to maxDepth
const buildHierarchy = (comments, maxD) => {
return comments.filter(c => c.depth <= maxD).map(c => ({
author: c.author,
body: c.body,
score: c.score,
created_utc: c.created_utc,
replies: buildHierarchy(
comments.filter(r =>
r.depth === c.depth + 1 &&
c.replies.length < 5 // Limit replies per comment
), maxD - (c.depth + 1) > 0 ? maxD - (c.depth + 1) : 0
)
}));
};
return buildHierarchy(comments, maxDepth);
},
// Helper to get nesting depth of a comment element
_getNestedDepth: function(element) {
let depth = 0;
while (element && element.parentElement) {
if (element.classList.contains('child')) depth++;
else if (element.classList.contains('comment')) break;
element = element.parentElement;
}
return Math.min(depth, 10); // Cap at 10 levels
},
sleep: function(ms) {
const start = Date.now();
while (Date.now() - start < ms) {}
}
};
'''
self.driver.execute_script(helpers)
def scrape_subreddit_top(
self,
subreddit: str,
limit: int = 10,
time_range: str = "week",
depth: int = 3,
include_comments: bool = True
) -> Dict[str, Any]:
"""
Scrape top posts from a subreddit using Selenium on old.reddit.com.
Args:
subreddit: Name of the subreddit (without 'r/')
limit: Number of top posts to retrieve
time_range: Time filter ('hour', 'day', 'week', 'month', 'year', 'all')
depth: Maximum comment nesting depth
include_comments: Whether to scrape comments
Returns:
Dict containing scraped data or error information
"""
# Check cache first (skip if comments requested, as they change frequently)
if not include_comments:
cached_result = get_cache().get(
subreddit=subreddit,
limit=limit,
time_range=time_range,
depth=depth
)
if cached_result is not None:
return cached_result
self._ensure_browser()
try:
# Navigate to OLD Reddit for cleaner DOM structure
url = f"https://old.reddit.com/r/{subreddit}/top/?t={time_range}"
self.driver.get(url)
# Wait for content to load and ensure helpers are available
time.sleep(4)
self._ensure_helpers_injected()
# Extract post data using pre-injected helper function (executed once per page load)
posts_data = self.driver.execute_script('return window.RSScraperHelpers.getPosts(arguments[0]);', limit)
# Build result structure
posts = []
for post in posts_data:
post_obj = {
"title": post['title'],
"author": post['author'] or None,
"score": post['score'],
"created_utc": None,
"url": post['url'].replace('old.reddit.com', 'www.reddit.com'),
"permalink": post['url'].replace('https://old.reddit.com', ''),
"comments": [] if not include_comments else self._scrape_post_comments(post['url'], depth)
}
posts.append(post_obj)
# Build final result structure
result = {
"subreddit": subreddit,
"time_range": time_range,
"limit": len(posts),
"posts_count": len(posts),
"data": posts
}
# Cache the result (only if no comments requested)
if not include_comments:
get_cache().set(result, subreddit=subreddit, limit=limit, time_range=time_range, depth=depth)
return result
except Exception as e:
import traceback
print(f"Error during scraping: {e}")
print(traceback.format_exc())
return {"Error": f"Unexpected error during scraping: {str(e)}"}
def _scrape_post_comments(self, post_url: str, max_depth: int = 3) -> List[Dict[str, Any]]:
"""Scrape comments from a specific post using pre-injected helpers."""
try:
self.driver.get(post_url)
# Wait for initial load and ensure helpers are available
time.sleep(2)
self._ensure_helpers_injected()
# Expand all "more comments" links before scraping (batched operation)
expanded_iterations = self.driver.execute_script('return window.RSScraperHelpers.expandAllComments();')
# Additional wait for dynamically loaded content
if expanded_iterations > 0:
time.sleep(1.5)
# Extract all comments with nested structure in single call (batched)
raw_comments = self.driver.execute_script(
'return window.RSScraperHelpers.getComments(arguments[0], arguments[1]);',
max_depth, 20 # max_depth, max_count
)
return raw_comments[:15] # Limit to ~15 comments total
except Exception as e:
print(f"Comment scraping error: {e}")
import traceback
traceback.print_exc()
return []
def _clean_text(self, text: str) -> str:
"""Clean and normalize text content."""
if not text:
return ""
lines = text.split("\n")
cleaned = [line.strip() for line in lines if line.strip()]
return " ".join(cleaned)
def close(self):
"""Close browser resources."""
if self._initialized and self.driver:
try:
self.driver.quit()
except:
pass
self._initialized = False
# Singleton pattern for scraper instance
_scraper_instance = None
def get_scraper():
"""Get singleton scraper instance."""
global _scraper_instance
if _scraper_instance is None:
_scraper_instance = RedditScraper()
return _scraper_instance