When you need to scrape data from multiple websites or thousands of URLs, doing it one-by-one is painfully slow. If each page takes 2 seconds to scrape and you have 1,000 URLs, that's over 33 minutes of scraping time. But if you scrape 10 sites concurrently, you could finish in about 3-4 minutes.
Concurrent scraping—extracting data from multiple sources at the same time—is essential for efficient data collection. In this tutorial, we'll explore different approaches, their trade-offs, and how to implement them effectively.
Understanding Concurrency vs Parallelism
Before we dive in, let's clarify two related but different concepts:
Concurrency means handling multiple tasks that progress over time without necessarily running simultaneously. Think of a chef who starts cooking multiple dishes—they might cook one dish, then check on another, then go back to the first.
Parallelism means truly running tasks at the same time on multiple CPU cores. Think of a restaurant kitchen with multiple chefs cooking simultaneously.
For web scraping, concurrency is what we typically use because:
- Scraping is I/O-bound (waiting for network responses), not CPU-bound
- We don't need multiple cores—we just need to use waiting time efficiently
- Concurrent scraping is simpler to implement than parallel scraping
That said, we'll cover both approaches.
Approach 1: Asynchronous Scraping with AsyncIO
AsyncIO is Python's native tool for concurrent I/O operations. It's lightweight and ideal for web scraping.
How AsyncIO Works
Instead of waiting for one request to finish before starting the next, asyncio lets you start multiple requests and process them as they complete.
import asyncio
import aiohttp
import time
async def scrape_single_url(session, url):
"""Scrape a single URL asynchronously"""
try:
async with session.get(url, timeout=10) as response:
html = await response.text()
return {
'url': url,
'status': response.status,
'content_length': len(html)
}
except asyncio.TimeoutError:
return {'url': url, 'error': 'Timeout'}
except Exception as e:
return {'url': url, 'error': str(e)}
async def scrape_multiple_urls(urls, max_concurrent=10):
"""Scrape multiple URLs concurrently"""
connector = aiohttp.TCPConnector(limit=max_concurrent)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [scrape_single_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Usage
urls = [
'https://example.com/page1',
'https://example.com/page2',
'https://example.com/page3',
# ... more URLs
]
start = time.time()
results = asyncio.run(scrape_multiple_urls(urls, max_concurrent=10))
print(f"Scraped {len(results)} URLs in {time.time() - start:.2f} seconds")Why AsyncIO Works Well for Scraping
- Non-blocking: While waiting for one response, the system handles other requests
- Memory efficient: Uses less memory than threads or processes
- Lightweight: Can handle hundreds of concurrent connections
- Native Python: No external libraries needed (though
aiohttpis required for async HTTP)
Limiting Concurrent Requests Responsibly
Notice the limit=max_concurrent parameter. This is crucial:
- Limits the number of simultaneous connections
- Prevents overwhelming target servers
- Reduces the chance of getting blocked
- Respects the server's capacity
Best practice: Start with 5-10 concurrent requests. If the server responds well, gradually increase to 20-30. Most sites should handle 10-15 concurrent requests without issues.
Advanced AsyncIO: Adding Delays Between Requests
To be more respectful to servers, add random delays:
import asyncio
import random
async def scrape_single_url_with_delay(session, url, delay=0):
"""Scrape with optional delay"""
await asyncio.sleep(delay)
async with session.get(url, timeout=10) as response:
html = await response.text()
return {'url': url, 'status': response.status}
async def scrape_with_staggered_delays(urls, max_concurrent=10):
"""Stagger requests with delays to be respectful"""
connector = aiohttp.TCPConnector(limit=max_concurrent)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = []
for i, url in enumerate(urls):
# Add randomized delay (0-2 seconds) spread across requests
delay = (i % max_concurrent) * 0.1 + random.uniform(0, 1)
tasks.append(scrape_single_url_with_delay(session, url, delay))
results = await asyncio.gather(*tasks)
return resultsApproach 2: Threading
Threading uses the concurrent.futures library to run multiple threads. This is simpler than asyncio but slightly less efficient.
Basic Threading Example
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
def scrape_url(url):
"""Scrape a single URL (blocking call)"""
try:
response = requests.get(url, timeout=10)
return {
'url': url,
'status': response.status_code,
'content_length': len(response.text)
}
except requests.Timeout:
return {'url': url, 'error': 'Timeout'}
except Exception as e:
return {'url': url, 'error': str(e)}
def scrape_with_threads(urls, max_workers=10):
"""Scrape multiple URLs using thread pool"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_url = {executor.submit(scrape_url, url): url for url in urls}
# Process results as they complete
for future in as_completed(future_to_url):
result = future.result()
results.append(result)
print(f"Completed: {result.get('url', 'unknown')}")
return results
# Usage
urls = ['https://example.com/page' + str(i) for i in range(1, 51)]
start = time.time()
results = scrape_with_threads(urls, max_workers=10)
print(f"Scraped {len(results)} URLs in {time.time() - start:.2f} seconds")Threading vs AsyncIO: When to Use Each
Use AsyncIO when:
- You're already familiar with async/await syntax
- You want maximum concurrency (100+ URLs)
- You need fine-grained control over timing
- Working with async-native libraries
Use Threading when:
- You prefer simpler, more straightforward code
- Working with standard libraries like
requests - You have fewer concurrent tasks (10-50 URLs)
- You need synchronous libraries that don't support async
Threading Gotcha: Thread Safety
When multiple threads access shared data, you might get race conditions. Use locks:
from threading import Lock
from concurrent.futures import ThreadPoolExecutor
results = []
results_lock = Lock()
def scrape_and_store(url):
"""Scrape and safely store result"""
result = scrape_url(url)
# Use lock to prevent race conditions
with results_lock:
results.append(result)
def scrape_with_thread_safe_storage(urls, max_workers=10):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(scrape_and_store, urls)
return resultsApproach 3: Multiprocessing (For CPU-Intensive Tasks)
If your scraping involves heavy data processing (parsing HTML, running ML models), multiprocessing uses multiple CPU cores:
from multiprocessing import Pool, cpu_count
import time
def scrape_and_process(url):
"""Scrape and process data (CPU-intensive)"""
try:
response = requests.get(url, timeout=10)
html = response.text
# Simulate heavy processing
word_count = len(html.split())
unique_words = len(set(html.lower().split()))
return {
'url': url,
'words': word_count,
'unique': unique_words
}
except Exception as e:
return {'url': url, 'error': str(e)}
def scrape_with_multiprocessing(urls):
"""Use all available CPU cores"""
num_processes = cpu_count()
with Pool(processes=num_processes) as pool:
results = pool.map(scrape_and_process, urls)
return results
# Usage
urls = ['https://example.com/page' + str(i) for i in range(1, 51)]
start = time.time()
results = scrape_with_multiprocessing(urls)
print(f"Scraped and processed {len(results)} URLs in {time.time() - start:.2f} seconds")Approach 4: ScrapeGraphAI with Concurrent Requests
ScrapeGraphAI handles much of the complexity for you. Here's how to use it for concurrent scraping:
Basic Concurrent Scraping with ScrapeGraphAI
from scrapegraphai.graphs import SmartScraperGraph
import asyncio
import time
async def scrape_with_scrapegraphai(url, prompt):
"""Scrape a single URL with ScrapeGraphAI"""
graph_config = {
"llm": {
"model": "gpt-4",
"api_key": "your-api-key",
},
}
scraper = SmartScraperGraph(
prompt=prompt,
source=url,
config=graph_config
)
result = scraper.run()
return {
'url': url,
'data': result
}
async def scrape_multiple_with_scrapegraphai(urls, prompt, max_concurrent=5):
"""Scrape multiple URLs concurrently with ScrapeGraphAI"""
# Use semaphore to limit concurrent API calls
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_scrape(url):
async with semaphore:
return await scrape_with_scrapegraphai(url, prompt)
tasks = [bounded_scrape(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Usage
urls = [
'https://example.com/product1',
'https://example.com/product2',
'https://example.com/product3',
]
prompt = "Extract product name, price, and rating"
start = time.time()
results = asyncio.run(scrape_multiple_with_scrapegraphai(urls, prompt, max_concurrent=5))
print(f"Scraped {len(results)} URLs in {time.time() - start:.2f} seconds")
for result in results:
print(f"URL: {result['url']}")
print(f"Data: {result['data']}")
print("---")Real-World Example: Scraping an E-commerce Site
Let's build a complete example scraping product data from multiple pages:
import asyncio
import aiohttp
import json
from datetime import datetime
from scrapegraphai.graphs import SmartScraperGraph
class ConcurrentScraper:
def __init__(self, max_concurrent=10, api_key=None):
self.max_concurrent = max_concurrent
self.api_key = api_key
self.results = []
self.errors = []
async def scrape_url(self, session, url, prompt):
"""Scrape a single URL with ScrapeGraphAI"""
try:
# Use semaphore to limit concurrency
async with self.semaphore:
graph_config = {
"llm": {
"model": "gpt-4",
"api_key": self.api_key,
},
}
scraper = SmartScraperGraph(
prompt=prompt,
source=url,
config=graph_config
)
data = scraper.run()
return {
'url': url,
'status': 'success',
'data': data,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
return {
'url': url,
'status': 'error',
'error': str(e),
'timestamp': datetime.now().isoformat()
}
async def scrape_urls(self, urls, prompt):
"""Scrape multiple URLs concurrently"""
self.semaphore = asyncio.Semaphore(self.max_concurrent)
async with aiohttp.ClientSession() as session:
tasks = [
self.scrape_url(session, url, prompt)
for url in urls
]
self.results = await asyncio.gather(*tasks)
return self.results
def save_results(self, filename):
"""Save results to JSON file"""
with open(filename, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"Saved {len(self.results)} results to {filename}")
def get_stats(self):
"""Get scraping statistics"""
successful = sum(1 for r in self.results if r['status'] == 'success')
failed = sum(1 for r in self.results if r['status'] == 'error')
return {
'total': len(self.results),
'successful': successful,
'failed': failed,
'success_rate': f"{(successful/len(self.results)*100):.1f}%" if self.results else "0%"
}
# Usage
async def main():
# List of product pages to scrape
urls = [
f'https://example.com/products/page{i}'
for i in range(1, 21) # 20 pages
]
prompt = """
Extract all products on this page with:
- Product name
- Price
- Rating
- In stock status
Return as a list of dictionaries
"""
scraper = ConcurrentScraper(max_concurrent=5, api_key="your-api-key")
print("Starting concurrent scraping...")
start_time = datetime.now()
results = await scraper.scrape_urls(urls, prompt)
elapsed = (datetime.now() - start_time).total_seconds()
print(f"\nScraping completed in {elapsed:.2f} seconds")
print(f"Statistics: {scraper.get_stats()}")
scraper.save_results('products.json')
# Run
asyncio.run(main())Best Practices for Concurrent Scraping
1. Respect Rate Limits and Server Load
# Add delays between requests
async def scrape_with_rate_limiting(urls, max_concurrent=5, min_delay=1):
"""Scrape with minimum delay between requests"""
semaphore = asyncio.Semaphore(max_concurrent)
async def delayed_scrape(session, url, index):
async with semaphore:
# Spread out requests with delay
await asyncio.sleep(index * min_delay / max_concurrent)
return await scrape_url(session, url)
async with aiohttp.ClientSession() as session:
tasks = [
delayed_scrape(session, url, i)
for i, url in enumerate(urls)
]
results = await asyncio.gather(*tasks)
return results2. Implement Retry Logic
async def scrape_with_retries(session, url, max_retries=3):
"""Retry failed requests up to max_retries times"""
for attempt in range(max_retries):
try:
async with session.get(url, timeout=10) as response:
if response.status == 200:
return await response.text()
except asyncio.TimeoutError:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
raise Exception(f"Failed to scrape {url} after {max_retries} retries")3. Use Backoff for Rate Limiting
import asyncio
import random
async def scrape_with_backoff(session, url):
"""Implement exponential backoff"""
max_retries = 5
base_delay = 1
for attempt in range(max_retries):
try:
async with session.get(url, timeout=10) as response:
if response.status == 429: # Too Many Requests
# Wait and retry
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited. Waiting {delay:.1f} seconds...")
await asyncio.sleep(delay)
continue
return await response.text()
except Exception as e:
print(f"Error: {e}")
return None4. Monitor and Log Progress
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProgressTracker:
def __init__(self, total):
self.total = total
self.completed = 0
self.errors = 0
def log_success(self, url):
self.completed += 1
progress = (self.completed / self.total) * 100
logger.info(f"[{progress:.1f}%] Completed: {url}")
def log_error(self, url, error):
self.errors += 1
logger.error(f"Failed: {url} - {error}")
def summary(self):
logger.info(f"Complete. Success: {self.completed}/{self.total}, Errors: {self.errors}")
# Usage in concurrent scraping
async def scrape_with_progress(urls):
tracker = ProgressTracker(len(urls))
for url in urls:
try:
result = await scrape_url(url)
tracker.log_success(url)
except Exception as e:
tracker.log_error(url, str(e))
tracker.summary()Performance Comparison
Let's compare the three approaches with 100 URLs, each taking ~2 seconds:
| Approach | Time | Memory | Complexity | Best For |
|---|---|---|---|---|
| Sequential | ~200s | Low | Low | Testing, small batches |
| AsyncIO | ~20s | Low | Medium | Large batches, many URLs |
| Threading | ~20s | Medium | Low | Mixed I/O + processing |
| Multiprocessing | ~20s | High | High | Heavy CPU processing |
Recommendation: For most web scraping, use AsyncIO for the best balance of speed, memory efficiency, and code quality.
Common Pitfalls and How to Avoid Them
Pitfall 1: Too Many Concurrent Requests
# ❌ Don't do this
max_workers = 100 # Will likely get blocked
# ✅ Do this instead
max_workers = 10 # Start conservative, increase if neededPitfall 2: Not Handling Timeouts
# ❌ No timeout (can hang forever)
response = await session.get(url)
# ✅ Always set timeout
response = await session.get(url, timeout=10)Pitfall 3: Ignoring Error Status Codes
# ❌ Treating all responses as success
html = await response.text()
# ✅ Check status code
if response.status != 200:
logger.warning(f"Unexpected status {response.status} for {url}")
html = await response.text()Pitfall 4: Blocking Operations in Async Code
# ❌ Blocking call in async function (defeats the purpose)
def parse_html(html):
# CPU-intensive parsing
return BeautifulSoup(html)
async def scrape():
html = await get_html(url)
data = parse_html(html) # This blocks all concurrent tasks
# ✅ Use async libraries or move to thread pool
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=2)
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(executor, parse_html, html)Conclusion: Choose the Right Concurrency Model
The choice between AsyncIO, Threading, and Multiprocessing depends on your specific needs:
- Start with AsyncIO: It's the most efficient for I/O-bound web scraping
- Fall back to Threading: If AsyncIO feels too complex or you need synchronous libraries
- Use Multiprocessing: Only if you're doing heavy data processing alongside scraping
- Use ScrapeGraphAI: If you want AI-powered extraction without worrying about concurrency details
The key to successful concurrent scraping is starting conservative (5-10 concurrent requests) and gradually increasing if the server responds well. Always implement error handling, retries, and monitoring to ensure reliable, respectful scraping.
With these techniques, you can scrape thousands of URLs efficiently while being a good citizen of the web.
Related Resources
Want to learn more about web scraping and data extraction? Explore these guides:
- Web Scraping 101 - Master the basics of web scraping
- AI Agent Web Scraping - Learn about AI-powered scraping
- Mastering ScrapeGraphAI - Deep dive into our scraping platform
- Understanding CSS Selectors - Learn how to target specific elements
- Twitter Scraper Guide - Complete guide to scraping Twitter
- Structured Output - Learn about data formatting
- Pre-AI to Post-AI Scraping - See how AI has transformed automation
- Building Intelligent Agents - Create powerful automation agents
- No-Code Web Scraping - Learn about no-code scraping solutions
- Web Scraping Legality - Understand legal considerations
Need help with concurrent scraping? Try building a small test with 10 URLs first, then scale up. Monitor your success rate and adjust concurrency accordingly.
