News Aggregation Made Simple: Extracting Articles from Multiple Sources
Every time I try to stay updated with news, I end up with 20 browser tabs open - TechCrunch for tech news, Reuters for world events, The Verge for gadgets, and so on. Wouldn't it be nice to have all that in one place?
Building a news aggregator seems like a weekend project until you actually start coding. Each news site has completely different HTML structures, loads content with JavaScript, and organizes articles in their own special way. What works for CNN breaks on BBC, and by the time you fix BBC, CNN has changed their layout again.
Let me show you how to build a news aggregator that actually works without going insane.
The Problem with Traditional Scraping
Here's what most people try first:
Site 1: TechCrunch
import requests
from bs4 import BeautifulSoup
def scrape_techcrunch():
url = "https://techcrunch.com"
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
articles = []
for article in soup.find_all('div', class_='post-block'):
title_elem = article.find('a', class_='post-block__title__link')
date_elem = article.find('time')
if title_elem:
articles.append({
'title': title_elem.text.strip(),
'url': title_elem.get('href'),
'date': date_elem.get('datetime') if date_elem else None,
'source': 'TechCrunch'
})
return articles
Site 2: The Guardian
def scrape_guardian():
url = "https://theguardian.com"
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
articles = []
# Completely different structure
for item in soup.find_all('div', class_='fc-item__container'):
link = item.find('a', class_='fc-item__link')
headline = item.find('span', class_='fc-item__kicker')
time_elem = item.find('time')
if link:
articles.append({
'title': headline.text.strip() if headline else 'No title',
'url': 'https://theguardian.com' + link.get('href'),
'date': time_elem.get('datetime') if time_elem else None,
'source': 'The Guardian'
})
return articles
Site 3: Hacker News
def scrape_hackernews():
url = "https://news.ycombinator.com"
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
articles = []
# Yet another totally different approach
for item in soup.find_all('tr', class_='athing'):
title_cell = item.find('span', class_='titleline')
if title_cell:
link = title_cell.find('a')
if link:
articles.append({
'title': link.text.strip(),
'url': link.get('href'),
'date': None, # HN doesn't show dates on main page
'source': 'Hacker News'
})
return articles
See the pattern? Every site needs its own custom scraper with different selectors. And this stuff breaks constantly when sites redesign.
The ScrapeGraphAI Way
Instead of writing custom scrapers for each site, just tell ScrapeGraphAI what you want. This approach eliminates the need for custom selectors and makes your aggregator resilient to site changes:
from scrapegraph_py import Client
client = Client(api_key="your-api-key")
def scrape_any_news_site(url, site_name):
response = client.smartscraper(
website_url=url,
user_prompt="Extract all news articles with headlines, URLs, publication dates, and brief descriptions"
)
articles = response.get('result', [])
# Add source info
for article in articles:
article['source'] = site_name
return articles
# Works on any news site
tech_news = scrape_any_news_site("https://techcrunch.com", "TechCrunch")
world_news = scrape_any_news_site("https://reuters.com", "Reuters")
hacker_news = scrape_any_news_site("https://news.ycombinator.com", "Hacker News")
Same function, different sites. No custom selectors, no breaking when sites update their CSS.
Building a Complete News Aggregator
Let's build something useful - a news aggregator that pulls from multiple sources and combines everything. This complete solution includes error handling, rate limiting, and caching for production use:
from scrapegraph_py import Client
from datetime import datetime
import json
import time
class NewsAggregator:
def __init__(self, api_key):
self.client = Client(api_key=api_key)
self.sources = [
{"name": "TechCrunch", "url": "https://techcrunch.com", "category": "tech"},
{"name": "The Verge", "url": "https://theverge.com", "category": "tech"},
{"name": "Reuters", "url": "https://reuters.com", "category": "world"},
{"name": "BBC", "url": "https://bbc.com/news", "category": "world"},
{"name": "Hacker News", "url": "https://news.ycombinator.com", "category": "tech"},
{"name": "ESPN", "url": "https://espn.com", "category": "sports"}
]
def scrape_source(self, source):
print(f"Scraping {source['name']}...")
try:
response = self.client.smartscraper(
website_url=source["url"],
user_prompt="Find all news articles and stories with headlines, URLs, publication dates, and short summaries"
)
articles = response.get('result', [])
# Add metadata
for article in articles:
article['source'] = source["name"]
article['category'] = source["category"]
article['scraped_at'] = datetime.now().isoformat()
print(f"Found {len(articles)} articles from {source['name']}")
return articles
except Exception as e:
print(f"Failed to scrape {source['name']}: {e}")
return []
def aggregate_all(self):
all_articles = []
for source in self.sources:
articles = self.scrape_source(source)
all_articles.extend(articles)
# Be nice to servers
time.sleep(2)
return all_articles
def save_to_file(self, articles, filename="news_feed.json"):
with open(filename, 'w') as f:
json.dump(articles, f, indent=2, default=str)
print(f"Saved {len(articles)} articles to {filename}")
# Usage
aggregator = NewsAggregator("your-api-key")
all_news = aggregator.aggregate_all()
aggregator.save_to_file(all_news)
Category-Specific Aggregation
Want to focus on specific topics? Just adjust the prompt. This technique works great for international news sources and real-time monitoring:
def scrape_tech_news(self, source):
response = self.client.smartscraper(
website_url=source["url"],
user_prompt="Find articles about technology, startups, AI, programming, or gadgets. Include headlines, URLs, dates, and summaries."
)
return response.get('result', [])
def scrape_sports_news(self, source):
response = self.client.smartscraper(
website_url=source["url"],
user_prompt="Find sports news including game scores, player trades, team news, and upcoming matches. Include headlines, URLs, dates, and summaries."
)
return response.get('result', [])
def scrape_business_news(self, source):
response = self.client.smartscraper(
website_url=source["url"],
user_prompt="Find business and finance news including stock market updates, company earnings, economic reports, and market analysis."
)
return response.get('result', [])
International News Sources
Adding international sources is just as easy. This approach works with any language and can be combined with category-specific filtering:
international_sources = [
{"name": "BBC", "url": "https://bbc.com/news", "region": "UK"},
{"name": "Le Monde", "url": "https://lemonde.fr", "region": "France"},
{"name": "Der Spiegel", "url": "https://spiegel.de", "region": "Germany"},
{"name": "Al Jazeera", "url": "https://aljazeera.com", "region": "Qatar"},
{"name": "NHK", "url": "https://nhk.or.jp/news", "region": "Japan"},
{"name": "Times of India", "url": "https://timesofindia.indiatimes.com", "region": "India"}
]
def scrape_international_news(self):
all_articles = []
for source in international_sources:
response = self.client.smartscraper(
website_url=source["url"],
user_prompt="Extract international news articles with headlines, URLs, dates, and summaries. Focus on world events, politics, and major stories."
)
articles = response.get('result', [])
for article in articles:
article['source'] = source["name"]
article['region'] = source["region"]
all_articles.extend(articles)
time.sleep(1) # Be respectful
return all_articles
Getting Full Article Content
Headlines are great, but sometimes you want the full text:
def get_full_article_content(self, article_url):
try:
response = self.client.smartscraper(
website_url=article_url,
user_prompt="Extract the full article text, author name, publication date, and any tags or categories"
)
return response.get('result', {})
except Exception as e:
print(f"Failed to extract content from {article_url}: {e}")
return None
def enhance_articles_with_content(self, articles):
enhanced_articles = []
for article in articles:
if 'url' in article and article['url']:
print(f"Getting full content for: {article.get('headline', 'Unknown')}")
full_content = self.get_full_article_content(article['url'])
if full_content:
article['full_content'] = full_content
enhanced_articles.append(article)
# Don't hammer servers
time.sleep(1)
else:
enhanced_articles.append(article)
return enhanced_articles
Real-Time News Monitoring
For live news updates, set up continuous monitoring:
import schedule
from datetime import datetime, timedelta
class LiveNewsMonitor(NewsAggregator):
def __init__(self, api_key):
super().__init__(api_key)
self.seen_articles = set()
self.last_check = datetime.now() - timedelta(hours=1)
def is_new_article(self, article):
# Create a simple hash based on headline and source
article_hash = f"{article.get('headline', '')[:50]}_{article.get('source', '')}"
if article_hash in self.seen_articles:
return False
self.seen_articles.add(article_hash)
return True
def get_breaking_news(self):
print(f"Checking for breaking news at {datetime.now()}")
# Focus on major news sources for breaking news
breaking_sources = [
{"name": "CNN", "url": "https://cnn.com"},
{"name": "BBC", "url": "https://bbc.com/news"},
{"name": "Reuters", "url": "https://reuters.com"},
{"name": "AP News", "url": "https://apnews.com"}
]
new_articles = []
for source in breaking_sources:
response = self.client.smartscraper(
website_url=source["url"],
user_prompt="Find the latest breaking news and top stories with headlines, URLs, and publication times"
)
articles = response.get('result', [])
for article in articles:
article['source'] = source["name"]
if self.is_new_article(article):
new_articles.append(article)
print(f"NEW: {article.get('headline', 'No headline')} - {source['name']}")
time.sleep(2)
return new_articles
def start_monitoring(self, check_interval_minutes=15):
def check_news():
new_articles = self.get_breaking_news()
if new_articles:
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
self.save_to_file(new_articles, f"breaking_news_{timestamp}.json")
# Schedule checks
schedule.every(check_interval_minutes).minutes.do(check_news)
print(f"Starting live news monitoring (checking every {check_interval_minutes} minutes)")
print("Press Ctrl+C to stop")
while True:
schedule.run_pending()
time.sleep(60)
# Start monitoring
monitor = LiveNewsMonitor("your-api-key")
monitor.start_monitoring(check_interval_minutes=10)
JavaScript Version for Web Apps
Building a news aggregator as a web app? Here's the frontend version:
import { smartScraper } from 'scrapegraph-js';
class WebNewsAggregator {
constructor(apiKey) {
this.apiKey = apiKey;
this.sources = [
{ name: 'TechCrunch', url: 'https://techcrunch.com', category: 'tech' },
{ name: 'BBC', url: 'https://bbc.com/news', category: 'world' },
{ name: 'ESPN', url: 'https://espn.com', category: 'sports' }
];
}
async scrapeSource(source) {
try {
const response = await smartScraper({
apiKey: this.apiKey,
website_url: source.url,
user_prompt: 'Extract news articles with headlines, URLs, publication dates, and brief summaries'
});
const articles = response.result || [];
return articles.map(article => ({
...article,
source: source.name,
category: source.category,
scrapedAt: new Date().toISOString()
}));
} catch (error) {
console.error(`Failed to scrape ${source.name}:`, error);
return [];
}
}
async aggregateAll() {
const promises = this.sources.map(source => this.scrapeSource(source));
const results = await Promise.all(promises);
return results.flat();
}
async searchByTopic(topic) {
const promises = this.sources.map(async (source) => {
try {
const response = await smartScraper({
apiKey: this.apiKey,
website_url: source.url,
user_prompt: `Find news articles about ${topic} with headlines, URLs, dates, and summaries`
});
const articles = response.result || [];
return articles.map(article => ({
...article,
source: source.name,
searchTopic: topic
}));
} catch (error) {
console.error(`Search failed for ${source.name}:`, error);
return [];
}
});
const results = await Promise.all(promises);
return results.flat();
}
}
// React component
const NewsApp = () => {
const [articles, setArticles] = useState([]);
const [loading, setLoading] = useState(false);
const [searchTopic, setSearchTopic] = useState('');
const [selectedCategory, setSelectedCategory] = useState('all');
const aggregator = new WebNewsAggregator(process.env.REACT_APP_SCRAPEGRAPH_API_KEY);
const loadAllNews = async () => {
setLoading(true);
try {
const allArticles = await aggregator.aggregateAll();
setArticles(allArticles);
} catch (error) {
console.error('Failed to load news:', error);
}
setLoading(false);
};
const searchNews = async () => {
if (!searchTopic.trim()) return;
setLoading(true);
try {
const searchResults = await aggregator.searchByTopic(searchTopic);
setArticles(searchResults);
} catch (error) {
console.error('Search failed:', error);
}
setLoading(false);
};
const filteredArticles = selectedCategory === 'all'
? articles
: articles.filter(article => article.category === selectedCategory);
return (
<div className="news-app">
<header>
<h1>News Aggregator</h1>
<div className="controls">
<div className="search-section">
<input
type="text"
placeholder="Search by topic..."
value={searchTopic}
onChange={(e) => setSearchTopic(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && searchNews()}
/>
<button onClick={searchNews}>Search</button>
</div>
<div className="category-filter">
<select
value={selectedCategory}
onChange={(e) => setSelectedCategory(e.target.value)}
>
<option value="all">All Categories</option>
<option value="tech">Technology</option>
<option value="world">World News</option>
<option value="sports">Sports</option>
</select>
</div>
<button onClick={loadAllNews} disabled={loading}>
{loading ? 'Loading...' : 'Refresh All'}
</button>
</div>
</header>
<main>
<div className="articles-grid">
{filteredArticles.map((article, index) => (
<div key={index} className="article-card">
<h3>
{article.url ? (
<a href={article.url} target="_blank" rel="noopener noreferrer">
{article.headline || article.title}
</a>
) : (
article.headline || article.title
)}
</h3>
<div className="article-meta">
<span className="source">{article.source}</span>
{article.category && (
<span className="category">{article.category}</span>
)}
{article.date && (
<span className="date">{new Date(article.date).toLocaleDateString()}</span>
)}
</div>
{article.summary && (
<p className="summary">{article.summary}</p>
)}
</div>
))}
</div>
{filteredArticles.length === 0 && !loading && (
<div className="no-results">
No articles found. Try refreshing or searching for a specific topic.
</div>
)}
</main>
</div>
);
};
Handling Common Issues
Duplicate Articles
Different sources often cover the same story:
def remove_duplicates(self, articles):
unique_articles = []
seen_headlines = set()
for article in articles:
headline = article.get('headline', '').lower().strip()
# Simple duplicate detection
headline_words = set(headline.split())
is_duplicate = False
for seen_headline in seen_headlines:
seen_words = set(seen_headline.split())
# If 70% of words overlap, consider it a duplicate
if len(headline_words & seen_words) / len(headline_words | seen_words) > 0.7:
is_duplicate = True
break
if not is_duplicate:
unique_articles.append(article)
seen_headlines.add(headline)
return unique_articles
Rate Limiting
Be respectful when scraping multiple sites:
def scrape_with_delays(self):
all_articles = []
for i, source in enumerate(self.sources):
print(f"Scraping {source['name']} ({i+1}/{len(self.sources)})")
articles = self.scrape_source(source)
all_articles.extend(articles)
# Delay between requests
if i < len(self.sources) - 1: # Don't delay after last request
delay = random.uniform(1, 3)
print(f"Waiting {delay:.1f} seconds...")
time.sleep(delay)
return all_articles
Error Recovery
Sites go down or change. Handle it gracefully:
def robust_scrape_source(self, source, max_retries=2):
for attempt in range(max_retries + 1):
try:
response = self.client.smartscraper(
website_url=source["url"],
user_prompt="Extract news articles with headlines, URLs, dates, and summaries"
)
return response.get('result', [])
except Exception as e:
print(f"Attempt {attempt + 1} failed for {source['name']}: {e}")
if attempt < max_retries:
wait_time = 2 ** attempt # Exponential backoff
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
print(f"Giving up on {source['name']} after {max_retries + 1} attempts")
return []
Performance Tips
Parallel Scraping
Speed things up by scraping multiple sources at once:
from concurrent.futures import ThreadPoolExecutor
import threading
def parallel_scrape(self, max_workers=3):
all_articles = []
articles_lock = threading.Lock()
def scrape_and_collect(source):
articles = self.scrape_source(source)
with articles_lock:
all_articles.extend(articles)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(scrape_and_collect, source) for source in self.sources]
# Wait for all to complete
for future in futures:
try:
future.result()
except Exception as e:
print(f"Thread failed: {e}")
return all_articles
Caching Results
Avoid re-scraping the same content:
import pickle
import os
from datetime import datetime, timedelta
def load_cached_articles(self, cache_file="news_cache.pkl", max_age_minutes=30):
if not os.path.exists(cache_file):
return None
try:
with open(cache_file, 'rb') as f:
cache_data = pickle.load(f)
# Check if cache is still valid
if datetime.now() - cache_data['timestamp'] < timedelta(minutes=max_age_minutes):
print(f"Using cached articles ({len(cache_data['articles'])} articles)")
return cache_data['articles']
except Exception as e:
print(f"Cache error: {e}")
return None
def save_cached_articles(self, articles, cache_file="news_cache.pkl"):
cache_data = {
'timestamp': datetime.now(),
'articles': articles
}
with open(cache_file, 'wb') as f:
pickle.dump(cache_data, f)
def aggregate_with_cache(self):
# Try to load from cache first
cached_articles = self.load_cached_articles()
if cached_articles:
return cached_articles
# Cache miss - scrape fresh data
articles = self.aggregate_all()
self.save_cached_articles(articles)
return articles
Frequently Asked Questions
How does ScrapeGraphAI handle different website structures?
ScrapeGraphAI uses AI to understand the content and structure of any website, regardless of how it's built. Unlike traditional scrapers that rely on specific CSS selectors, ScrapeGraphAI can adapt to different layouts, JavaScript-rendered content, and even site redesigns. This means your news aggregator won't break when websites update their code.
Is it legal to scrape news websites?
The legality of web scraping depends on several factors:
- Terms of Service: Check each website's terms of service
- Rate Limiting: Be respectful with request frequency (as shown in our rate limiting examples)
- Robots.txt: Respect the robots.txt file on each site
- Fair Use: Many news sites allow reasonable scraping for personal use
- Commercial Use: May require explicit permission
For production applications, consider using official APIs when available or reaching out to publishers for permission.
How do I handle duplicate articles from different sources?
Our duplicate detection section shows how to identify and remove similar articles. The key is creating a hash based on headline content and using similarity matching to catch variations of the same story.
Can I scrape paywalled content?
ScrapeGraphAI can only access publicly available content. Paywalled articles, subscription-only content, and private pages cannot be scraped. Always respect paywalls and subscription requirements.
How fast can I scrape multiple sources?
The speed depends on several factors:
- API Limits: Check your ScrapeGraphAI plan limits
- Server Response: Some sites are slower than others
- Rate Limiting: Our examples include delays to be respectful
- Parallel Processing: Use our parallel scraping techniques for speed
For most use cases, scraping 10-20 sources takes 2-5 minutes with proper delays.
What if a website blocks my requests?
Some sites may block automated requests. Our error recovery section shows how to handle this with retry logic and exponential backoff. If a site consistently blocks requests, consider:
- Adding longer delays between requests
- Using rotating user agents
- Reaching out to the site owner
- Finding alternative news sources
How do I store and organize the scraped data?
The examples show JSON storage, but you can also use:
- Databases: PostgreSQL, MongoDB, or SQLite for larger datasets
- Search Engines: Elasticsearch for full-text search
- Cloud Storage: AWS S3, Google Cloud Storage for backups
- Real-time: Redis for caching and live updates
Can I build a real-time news monitoring system?
Yes! Our live news monitoring section shows how to set up continuous monitoring with scheduled checks. You can also use webhooks or streaming APIs for real-time updates.
How do I handle different date formats?
News sites use various date formats. ScrapeGraphAI typically returns standardized ISO format dates, but you may need to parse different formats. Consider using libraries like dateutil
in Python or moment.js
in JavaScript for robust date parsing.
What's the difference between smartscraper and searchscraper?
- SmartScraper: Extracts structured data from a specific webpage
- SearchScraper: Searches across multiple pages and extracts results
For news aggregation, SmartScraper is usually better for getting articles from a homepage, while SearchScraper is useful for finding articles about specific topics across a site.
How do I scale this for production use?
For production news aggregators:
- Use a database instead of JSON files
- Implement proper error handling and monitoring
- Add user authentication and rate limiting
- Use a task queue (Celery, Bull) for background scraping
- Set up monitoring (Sentry, DataDog) for alerts
- Consider using official APIs when available
Can I monetize a news aggregator?
Yes, but be aware of:
- Copyright issues with republishing content
- Terms of service violations
- Fair use limitations
Common monetization approaches:
- Affiliate links to original articles
- Advertising on your aggregator interface
- Premium features like advanced filtering
- API access for other developers
How do I keep my news aggregator updated?
Our caching strategies help reduce unnecessary requests, but you'll want to:
- Schedule regular updates (every 15-60 minutes)
- Monitor for new sources to add
- Remove broken sources that no longer work
- Update your prompts as sites change
- Track performance and adjust scraping frequency
The Bottom Line
Building a news aggregator used to mean writing dozens of custom scrapers, each one breaking whenever a site changed their CSS. You'd spend more time fixing scrapers than actually reading news.
ScrapeGraphAI flips this around. Instead of fighting with HTML selectors and site-specific quirks, you just describe what you want and it figures out how to extract it. When sites redesign, your code keeps working.
The examples above give you everything you need to build your own news aggregator. Start simple with a few sources, add caching and error handling as you go, then scale up with parallel scraping when you need more speed.
Whether you want a personal news dashboard, need to monitor industry news for work, or just want to avoid having 50 news site bookmarks, this approach saves you tons of development time and maintenance headaches.