225 lines
8.2 KiB
Python
225 lines
8.2 KiB
Python
"""
|
|
Twitter/X Tweet Scraper — on-demand REST API
|
|
=============================================
|
|
Scrapes tweets live on every request. No database, no daemon, no caching.
|
|
|
|
Authentication: cookie-based only.
|
|
Run make_session.py once to generate twitter_session.json from your
|
|
browser cookies, then start the scraper normally.
|
|
|
|
Usage:
|
|
python main.py [--count N] [--port 5000]
|
|
|
|
API endpoints (default: http://localhost:5000):
|
|
GET /tweets/<username> → scrape and return tweets for one account
|
|
GET /tweets/batch → scrape multiple accounts in parallel
|
|
?usernames=naval,sama,paulg
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
from flask import Flask, jsonify, request
|
|
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
|
|
|
|
# ── Logging ───────────────────────────────────────────────────────────────────
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
log = logging.getLogger("scraper")
|
|
logging.getLogger("werkzeug").setLevel(logging.WARNING)
|
|
|
|
# ── Config ────────────────────────────────────────────────────────────────────
|
|
|
|
SESSION_FILE = "twitter_session.json"
|
|
SCROLL_PAUSE = 2.5
|
|
MAX_RETRIES = 3
|
|
TWEET_COUNT = 10 # default, overridden by --count
|
|
|
|
# ── Browser ───────────────────────────────────────────────────────────────────
|
|
|
|
def make_browser_context(playwright):
|
|
"""Launch browser and load the saved cookie session."""
|
|
if not os.path.exists(SESSION_FILE):
|
|
log.error(f"Session file '{SESSION_FILE}' not found.")
|
|
log.error("Run make_session.py to generate it from your browser cookies.")
|
|
sys.exit(1)
|
|
|
|
browser = playwright.chromium.launch(headless=True)
|
|
context = browser.new_context(
|
|
storage_state=SESSION_FILE,
|
|
user_agent=(
|
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
|
),
|
|
)
|
|
return browser, context
|
|
|
|
|
|
# ── Scraping ──────────────────────────────────────────────────────────────────
|
|
|
|
def parse_tweet(article):
|
|
try:
|
|
text_el = article.query_selector('[data-testid="tweetText"]')
|
|
text = text_el.inner_text() if text_el else ""
|
|
|
|
time_el = article.query_selector("time")
|
|
date = time_el.get_attribute("datetime") if time_el else ""
|
|
url = time_el.evaluate("el => el.closest('a')?.href") if time_el else ""
|
|
|
|
if not date: # promoted tweet
|
|
return None
|
|
|
|
def get_stat(testid):
|
|
el = article.query_selector(f'[data-testid="{testid}"]')
|
|
return el.get_attribute("aria-label") or "0" if el else "0"
|
|
|
|
return {
|
|
"date": date,
|
|
"text": text,
|
|
"url": url,
|
|
"replies": get_stat("reply"),
|
|
"retweets": get_stat("retweet"),
|
|
"likes": get_stat("like"),
|
|
}
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def scrape_user(page, username: str, n: int) -> list:
|
|
username = username.lstrip("@")
|
|
log.info(f" Scraping @{username} ({n} tweets)")
|
|
|
|
page.goto(f"https://twitter.com/{username}", wait_until="domcontentloaded", timeout=30000)
|
|
page.wait_for_timeout(500)
|
|
|
|
if page.query_selector('[data-testid="emptyState"]'):
|
|
raise RuntimeError("Account not found, suspended, or empty.")
|
|
|
|
try:
|
|
page.wait_for_selector('article[data-testid="tweet"]', timeout=15000)
|
|
except PlaywrightTimeoutError:
|
|
raise RuntimeError("Could not load tweets — account may be private or session expired.")
|
|
|
|
tweets, seen_urls, retries, last_count = [], set(), 0, 0
|
|
|
|
while len(tweets) < n and retries < MAX_RETRIES:
|
|
for article in page.query_selector_all('article[data-testid="tweet"]'):
|
|
if len(tweets) >= n:
|
|
break
|
|
tweet = parse_tweet(article)
|
|
if not tweet or tweet["url"] in seen_urls:
|
|
continue
|
|
seen_urls.add(tweet["url"])
|
|
tweets.append(tweet)
|
|
|
|
if len(tweets) < n:
|
|
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
|
try:
|
|
page.wait_for_load_state("networkidle", timeout=3000)
|
|
except PlaywrightTimeoutError:
|
|
pass
|
|
retries = retries + 1 if len(tweets) == last_count else 0
|
|
last_count = len(tweets)
|
|
|
|
log.info(f" ✓ @{username}: {len(tweets)} tweets scraped.")
|
|
return tweets
|
|
|
|
|
|
def scrape_users(usernames: list, n: int) -> dict:
|
|
"""Scrape one or more accounts in a single browser session."""
|
|
results = {}
|
|
with sync_playwright() as p:
|
|
browser, context = make_browser_context(p)
|
|
page = context.new_page()
|
|
for username in usernames:
|
|
clean = username.lstrip("@").lower()
|
|
try:
|
|
results[clean] = scrape_user(page, username, n)
|
|
except Exception as e:
|
|
log.warning(f" ✗ @{clean}: {e}")
|
|
results[clean] = {"error": str(e)}
|
|
browser.close()
|
|
return results
|
|
|
|
|
|
# ── Flask API ─────────────────────────────────────────────────────────────────
|
|
|
|
app = Flask(__name__)
|
|
|
|
|
|
def get_limit() -> int:
|
|
try:
|
|
return max(1, int(request.args.get("limit", TWEET_COUNT)))
|
|
except ValueError:
|
|
return TWEET_COUNT
|
|
|
|
|
|
@app.get("/tweets/<username>")
|
|
def user_tweets(username: str):
|
|
key = username.lstrip("@").lower()
|
|
results = scrape_users([key], get_limit())
|
|
data = results.get(key)
|
|
if isinstance(data, dict) and "error" in data:
|
|
return jsonify(data), 502
|
|
return jsonify(data)
|
|
|
|
|
|
@app.get("/tweets/batch")
|
|
def batch_tweets():
|
|
raw = request.args.get("usernames", "")
|
|
if not raw:
|
|
return jsonify({"error": "Provide ?usernames=naval,sama,paulg"}), 400
|
|
|
|
usernames = [u.strip().lstrip("@").lower() for u in raw.split(",") if u.strip()]
|
|
if not usernames:
|
|
return jsonify({"error": "No valid usernames provided."}), 400
|
|
|
|
results = scrape_users(usernames, get_limit())
|
|
|
|
tweets = []
|
|
for data in results.values():
|
|
if isinstance(data, list):
|
|
tweets.extend(data)
|
|
return jsonify(tweets)
|
|
|
|
|
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
global SESSION_FILE, TWEET_COUNT
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Twitter on-demand scraper with REST API.",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=__doc__,
|
|
)
|
|
parser.add_argument("-n", "--count", type=int, default=10, help="Tweets per request (default: 10)")
|
|
parser.add_argument("--port", type=int, default=5000, help="API port (default: 5000)")
|
|
parser.add_argument("--session", default=SESSION_FILE, help=f"Cookie session file (default: {SESSION_FILE})")
|
|
args = parser.parse_args()
|
|
|
|
SESSION_FILE = args.session
|
|
TWEET_COUNT = args.count
|
|
|
|
if not os.path.exists(SESSION_FILE):
|
|
log.error(f"Session file '{SESSION_FILE}' not found.")
|
|
log.error("Run make_session.py to generate it from your browser cookies.")
|
|
sys.exit(1)
|
|
|
|
log.info(f"Session : {os.path.abspath(SESSION_FILE)}")
|
|
log.info(f"Count : {TWEET_COUNT} tweets per request")
|
|
log.info(f"API port : {args.port}")
|
|
log.info(f"API ready at http://localhost:{args.port}")
|
|
|
|
app.run(host="0.0.0.0", port=args.port, debug=False, use_reloader=False, threaded=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |