try write script to fetch from a reddit archive
This commit is contained in:
parent
fe781b9475
commit
1e96b33468
308
tools/fetch.py
308
tools/fetch.py
@ -135,28 +135,266 @@ def generate_archive_header(url: str, archive_date: datetime.datetime) -> str:
|
|||||||
</script>
|
</script>
|
||||||
<hr>'''
|
<hr>'''
|
||||||
|
|
||||||
|
def is_reddit_search_tool(url: str) -> bool:
|
||||||
|
"""Check if URL is from the Reddit search tool (ihsoyct.github.io)"""
|
||||||
|
parsed = urlparse(url)
|
||||||
|
return 'ihsoyct.github.io' in parsed.netloc
|
||||||
|
|
||||||
|
def archive_reddit_search_tool(url: str) -> str:
|
||||||
|
"""Archive Reddit search tool results as minimal Markdown"""
|
||||||
|
try:
|
||||||
|
headers = {"User-Agent": "Mozilla/5.0 (ArchiveBot/1.0)"}
|
||||||
|
response = requests.get(url, timeout=30, headers=headers)
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
soup = BeautifulSoup(response.text, 'html.parser')
|
||||||
|
|
||||||
|
# Extract search parameters for simple title
|
||||||
|
parsed_url = urlparse(url)
|
||||||
|
params = dict(param.split('=') for param in parsed_url.query.split('&') if '=' in param)
|
||||||
|
|
||||||
|
# Simple title
|
||||||
|
if params.get('author'):
|
||||||
|
title = f"Comments by u/{params['author']}"
|
||||||
|
elif params.get('subreddit'):
|
||||||
|
title = f"r/{params['subreddit']} comments"
|
||||||
|
else:
|
||||||
|
title = "Reddit Comments"
|
||||||
|
|
||||||
|
if params.get('body'):
|
||||||
|
title += f" containing '{params['body']}'"
|
||||||
|
|
||||||
|
# Start with minimal content
|
||||||
|
md_content = f"# {title}\n\n"
|
||||||
|
|
||||||
|
# Find the submission div and all posts within it
|
||||||
|
submission_div = soup.find('div', id='submission')
|
||||||
|
if not submission_div:
|
||||||
|
md_content += "No submissions found.\n"
|
||||||
|
return md_content
|
||||||
|
|
||||||
|
posts = submission_div.find_all('div', class_='post')
|
||||||
|
|
||||||
|
if not posts:
|
||||||
|
md_content += "No posts found.\n"
|
||||||
|
return md_content
|
||||||
|
|
||||||
|
for post in posts:
|
||||||
|
# Extract comment path and create Reddit URL
|
||||||
|
title_elem = post.find('p', class_='comment_title')
|
||||||
|
if title_elem:
|
||||||
|
comment_path = title_elem.get_text().strip()
|
||||||
|
reddit_url = f"https://reddit.com{comment_path}"
|
||||||
|
md_content += f"**{comment_path}**\n"
|
||||||
|
md_content += f"{reddit_url}\n\n"
|
||||||
|
|
||||||
|
# Extract user and score info (simplified)
|
||||||
|
user_elem = post.find('p', class_='comment_user')
|
||||||
|
if user_elem:
|
||||||
|
user_text = user_elem.get_text().strip()
|
||||||
|
# Extract just username and score
|
||||||
|
import re
|
||||||
|
score_match = re.search(r'Score: (\d+)', user_text)
|
||||||
|
user_match = re.search(r'(u/\w+)', user_text)
|
||||||
|
date_match = re.search(r'at (.+)$', user_text)
|
||||||
|
|
||||||
|
if user_match and score_match:
|
||||||
|
user_info = f"{user_match.group(1)} • {score_match.group(1)} points"
|
||||||
|
if date_match:
|
||||||
|
user_info += f" • {date_match.group(1)}"
|
||||||
|
md_content += f"{user_info}\n\n"
|
||||||
|
|
||||||
|
# Extract actual comment content
|
||||||
|
# Find all p tags that are not comment_title, comment_user, and not empty
|
||||||
|
content_paragraphs = []
|
||||||
|
|
||||||
|
# Get all p elements in the post
|
||||||
|
all_p_tags = post.find_all('p')
|
||||||
|
|
||||||
|
for p in all_p_tags:
|
||||||
|
# Skip the title and user info paragraphs
|
||||||
|
if p.get('class') and ('comment_title' in p.get('class') or 'comment_user' in p.get('class')):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Get the text content
|
||||||
|
text = p.get_text().strip()
|
||||||
|
|
||||||
|
# Only add non-empty paragraphs
|
||||||
|
if text:
|
||||||
|
content_paragraphs.append(text)
|
||||||
|
|
||||||
|
# Add the comment content
|
||||||
|
if content_paragraphs:
|
||||||
|
for para in content_paragraphs:
|
||||||
|
md_content += f"{para}\n\n"
|
||||||
|
|
||||||
|
md_content += "---\n\n"
|
||||||
|
|
||||||
|
return md_content
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"⚠ Reddit search tool archiving failed ({e})")
|
||||||
|
# Fallback to regular reader mode
|
||||||
|
response = requests.get(url, timeout=30, headers={"User-Agent": "Mozilla/5.0 (ArchiveBot/1.0)"})
|
||||||
|
return reader_mode(response.text)
|
||||||
|
|
||||||
|
def generate_markdown_archive_header(url: str, archive_date: datetime.datetime) -> str:
|
||||||
|
"""Generate minimal archive header in Markdown format"""
|
||||||
|
formatted_date = archive_date.strftime('%Y-%m-%d %H:%M UTC')
|
||||||
|
return f"*Archived {formatted_date} from {url}*\n\n"
|
||||||
|
|
||||||
|
def is_arctic_shift_api(url: str) -> bool:
|
||||||
|
"""Check if URL is from the Arctic Shift API"""
|
||||||
|
parsed = urlparse(url)
|
||||||
|
return 'arctic-shift.photon-reddit.com' in parsed.netloc and '/api/' in parsed.path
|
||||||
|
|
||||||
|
def archive_arctic_shift_api(url: str) -> str:
|
||||||
|
"""Archive Arctic Shift API results as minimal Markdown"""
|
||||||
|
try:
|
||||||
|
headers = {"User-Agent": "Mozilla/5.0 (ArchiveBot/1.0)"}
|
||||||
|
response = requests.get(url, timeout=30, headers=headers)
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
comments = data.get('data', [])
|
||||||
|
|
||||||
|
if not comments:
|
||||||
|
return "# Reddit Comments\n\nNo comments found.\n"
|
||||||
|
|
||||||
|
# Extract search info from URL for title
|
||||||
|
parsed_url = urlparse(url)
|
||||||
|
query_params = {}
|
||||||
|
if parsed_url.query:
|
||||||
|
for param in parsed_url.query.split('&'):
|
||||||
|
if '=' in param:
|
||||||
|
key, value = param.split('=', 1)
|
||||||
|
query_params[key] = value
|
||||||
|
|
||||||
|
# Build title
|
||||||
|
title_parts = []
|
||||||
|
if query_params.get('author'):
|
||||||
|
title_parts.append(f"u/{query_params['author']}")
|
||||||
|
if query_params.get('subreddit'):
|
||||||
|
title_parts.append(f"r/{query_params['subreddit']}")
|
||||||
|
if query_params.get('body'):
|
||||||
|
title_parts.append(f"containing '{query_params['body']}'")
|
||||||
|
|
||||||
|
title = "Comments by " + " • ".join(title_parts) if title_parts else "Reddit Comments"
|
||||||
|
|
||||||
|
md_content = f"# {title}\n\n"
|
||||||
|
|
||||||
|
for comment in comments:
|
||||||
|
# Extract comment info
|
||||||
|
permalink = comment.get('permalink', '')
|
||||||
|
reddit_url = f"https://reddit.com{permalink}"
|
||||||
|
author = comment.get('author', 'unknown')
|
||||||
|
score = comment.get('score', 0)
|
||||||
|
subreddit = comment.get('subreddit', '')
|
||||||
|
body = comment.get('body', '')
|
||||||
|
|
||||||
|
# Convert timestamp to readable date
|
||||||
|
created_utc = comment.get('created_utc')
|
||||||
|
date_str = ''
|
||||||
|
if created_utc:
|
||||||
|
import datetime
|
||||||
|
date_obj = datetime.datetime.fromtimestamp(created_utc, tz=datetime.timezone.utc)
|
||||||
|
date_str = date_obj.strftime('%Y-%m-%d %H:%M UTC')
|
||||||
|
|
||||||
|
# Format the comment
|
||||||
|
md_content += f"**{permalink}**\n"
|
||||||
|
md_content += f"{reddit_url}\n\n"
|
||||||
|
|
||||||
|
# User info line
|
||||||
|
user_info = f"u/{author} • {score} points"
|
||||||
|
if date_str:
|
||||||
|
user_info += f" • {date_str}"
|
||||||
|
if subreddit:
|
||||||
|
user_info += f" • r/{subreddit}"
|
||||||
|
md_content += f"{user_info}\n\n"
|
||||||
|
|
||||||
|
# Comment body (handle newlines properly)
|
||||||
|
if body:
|
||||||
|
# Replace \n with actual newlines and clean up
|
||||||
|
clean_body = body.replace('\\n', '\n').strip()
|
||||||
|
md_content += f"{clean_body}\n\n"
|
||||||
|
|
||||||
|
md_content += "---\n\n"
|
||||||
|
|
||||||
|
return md_content
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"⚠ Arctic Shift API archiving failed ({e})")
|
||||||
|
return f"# Error\n\nFailed to archive API response: {e}\n"
|
||||||
|
|
||||||
|
def convert_ihsoyct_to_api_url(url: str) -> str:
|
||||||
|
"""Convert ihsoyct.github.io URL to Arctic Shift API URL"""
|
||||||
|
try:
|
||||||
|
parsed = urlparse(url)
|
||||||
|
|
||||||
|
# Extract query parameters
|
||||||
|
params = {}
|
||||||
|
if parsed.query:
|
||||||
|
for param in parsed.query.split('&'):
|
||||||
|
if '=' in param:
|
||||||
|
key, value = param.split('=', 1)
|
||||||
|
params[key] = value
|
||||||
|
|
||||||
|
# Build API URL
|
||||||
|
api_base = "https://arctic-shift.photon-reddit.com/api"
|
||||||
|
|
||||||
|
# Determine endpoint based on mode
|
||||||
|
mode = params.get('mode', 'comments')
|
||||||
|
if mode == 'submissions':
|
||||||
|
endpoint = f"{api_base}/submissions/search"
|
||||||
|
else:
|
||||||
|
endpoint = f"{api_base}/comments/search"
|
||||||
|
|
||||||
|
# Build query string for API
|
||||||
|
api_params = []
|
||||||
|
for key, value in params.items():
|
||||||
|
if key in ['author', 'subreddit', 'body', 'title', 'selftext', 'limit', 'sort', 'after', 'before']:
|
||||||
|
api_params.append(f"{key}={value}")
|
||||||
|
|
||||||
|
api_url = f"{endpoint}?{'&'.join(api_params)}"
|
||||||
|
return api_url
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"⚠ Failed to convert URL: {e}")
|
||||||
|
return url
|
||||||
|
|
||||||
def archive(url: str, out_dir: pathlib.Path, force: bool):
|
def archive(url: str, out_dir: pathlib.Path, force: bool):
|
||||||
out_dir.mkdir(parents=True, exist_ok=True)
|
out_dir.mkdir(parents=True, exist_ok=True)
|
||||||
fname = out_dir / slug.slug(url)
|
fname = out_dir / slug.slug(url)
|
||||||
|
|
||||||
|
# Check if this is a Reddit search tool and convert to API URL
|
||||||
|
original_url = url
|
||||||
|
if is_reddit_search_tool(url):
|
||||||
|
print("🔄 Converting to Arctic Shift API URL...")
|
||||||
|
url = convert_ihsoyct_to_api_url(url)
|
||||||
|
print(f" API URL: {url}")
|
||||||
|
|
||||||
|
# Check for API URL and change extension to .md
|
||||||
|
is_api_url = is_arctic_shift_api(url)
|
||||||
|
if is_api_url or is_reddit_search_tool(original_url):
|
||||||
|
fname = fname.with_suffix('.md')
|
||||||
|
|
||||||
if fname.exists() and not force:
|
if fname.exists() and not force:
|
||||||
print(f"✓ cached: {url}")
|
print(f"✓ cached: {original_url}")
|
||||||
return
|
return
|
||||||
|
|
||||||
print(f"↓ fetching: {url}")
|
print(f"↓ fetching: {original_url}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
archive_date = datetime.datetime.now(datetime.timezone.utc)
|
archive_date = datetime.datetime.now(datetime.timezone.utc)
|
||||||
|
|
||||||
if is_reddit_url(url):
|
if is_arctic_shift_api(url):
|
||||||
|
content = archive_arctic_shift_api(url)
|
||||||
|
# For markdown, just add header and content
|
||||||
|
final_content = generate_markdown_archive_header(original_url, archive_date) + content
|
||||||
|
elif is_reddit_url(url):
|
||||||
content = archive_reddit(url)
|
content = archive_reddit(url)
|
||||||
else:
|
# Enhanced styling with archive header for HTML
|
||||||
html_response = requests.get(url, timeout=30, headers={
|
archive_style = """
|
||||||
"User-Agent": "Mozilla/5.0 (ArchiveBot/1.0)"
|
|
||||||
}).text
|
|
||||||
content = reader_mode(html_response)
|
|
||||||
|
|
||||||
# Enhanced styling with archive header
|
|
||||||
archive_style = """
|
|
||||||
<style>
|
<style>
|
||||||
body{font-family:system-ui,sans-serif;max-width:50rem;margin:2rem auto;line-height:1.6;padding:1rem}
|
body{font-family:system-ui,sans-serif;max-width:50rem;margin:2rem auto;line-height:1.6;padding:1rem}
|
||||||
img,iframe{max-width:100%}
|
img,iframe{max-width:100%}
|
||||||
@ -173,19 +411,49 @@ def archive(url: str, out_dir: pathlib.Path, force: bool):
|
|||||||
}
|
}
|
||||||
</style>
|
</style>
|
||||||
"""
|
"""
|
||||||
|
final_content = (
|
||||||
|
"<meta charset='utf-8'>\n" +
|
||||||
|
"<base target='_blank'>\n" +
|
||||||
|
archive_style + "\n" +
|
||||||
|
generate_archive_header(url, archive_date) + "\n" +
|
||||||
|
content
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
html_response = requests.get(url, timeout=30, headers={
|
||||||
|
"User-Agent": "Mozilla/5.0 (ArchiveBot/1.0)"
|
||||||
|
}).text
|
||||||
|
content = reader_mode(html_response)
|
||||||
|
# Enhanced styling with archive header for HTML
|
||||||
|
archive_style = """
|
||||||
|
<style>
|
||||||
|
body{font-family:system-ui,sans-serif;max-width:50rem;margin:2rem auto;line-height:1.6;padding:1rem}
|
||||||
|
img,iframe{max-width:100%}
|
||||||
|
.post-content{background:#f9f9f9;padding:1rem;border-radius:5px;margin:1rem 0}
|
||||||
|
.archive-header{background:#f0f8ff;border:1px solid #e0e0e0;border-radius:5px;padding:0.75rem;margin-bottom:1rem;font-size:0.9rem}
|
||||||
|
.archive-info{margin-bottom:0.5rem;color:#666}
|
||||||
|
.archive-source{color:#666}
|
||||||
|
.archive-header a{color:#007acc;text-decoration:none}
|
||||||
|
.archive-header a:hover{text-decoration:underline}
|
||||||
|
@media (prefers-color-scheme: dark) {
|
||||||
|
.archive-header{background:#1a1a2e;border-color:#333;color:#e0e0e0}
|
||||||
|
.archive-info, .archive-source{color:#ccc}
|
||||||
|
.archive-header a{color:#66b3ff}
|
||||||
|
}
|
||||||
|
</style>
|
||||||
|
"""
|
||||||
|
final_content = (
|
||||||
|
"<meta charset='utf-8'>\n" +
|
||||||
|
"<base target='_blank'>\n" +
|
||||||
|
archive_style + "\n" +
|
||||||
|
generate_archive_header(url, archive_date) + "\n" +
|
||||||
|
content
|
||||||
|
)
|
||||||
|
|
||||||
fname.write_text(
|
fname.write_text(final_content, encoding="utf-8")
|
||||||
"<meta charset='utf-8'>\n" +
|
|
||||||
"<base target='_blank'>\n" +
|
|
||||||
archive_style + "\n" +
|
|
||||||
generate_archive_header(url, archive_date) + "\n" +
|
|
||||||
content,
|
|
||||||
encoding="utf-8"
|
|
||||||
)
|
|
||||||
print(f"✓ saved : {fname.relative_to(out_dir.parent)}")
|
print(f"✓ saved : {fname.relative_to(out_dir.parent)}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"✗ failed : {url} - {e}")
|
print(f"✗ failed : {original_url} - {e}")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
ap = argparse.ArgumentParser()
|
ap = argparse.ArgumentParser()
|
||||||
|
Loading…
Reference in New Issue
Block a user