ScrapeGraphAIScrapeGraphAI

Concurrent Scraping: Extracting Data from Multiple Sites Simultaneously

Concurrent Scraping: Extracting Data from Multiple Sites Simultaneously

Author 1

Marco Vinciguerra

When you need to scrape data from multiple websites or thousands of URLs, doing it one-by-one is painfully slow. If each page takes 2 seconds to scrape and you have 1,000 URLs, that's over 33 minutes of scraping time. But if you scrape 10 sites concurrently, you could finish in about 3-4 minutes.

Concurrent scraping—extracting data from multiple sources at the same time—is essential for efficient data collection. In this tutorial, we'll explore different approaches, their trade-offs, and how to implement them effectively.

Understanding Concurrency vs Parallelism

Before we dive in, let's clarify two related but different concepts:

Concurrency means handling multiple tasks that progress over time without necessarily running simultaneously. Think of a chef who starts cooking multiple dishes—they might cook one dish, then check on another, then go back to the first.

Parallelism means truly running tasks at the same time on multiple CPU cores. Think of a restaurant kitchen with multiple chefs cooking simultaneously.

For web scraping, concurrency is what we typically use because:

  • Scraping is I/O-bound (waiting for network responses), not CPU-bound
  • We don't need multiple cores—we just need to use waiting time efficiently
  • Concurrent scraping is simpler to implement than parallel scraping

That said, we'll cover both approaches.

Approach 1: Asynchronous Scraping with AsyncIO

AsyncIO is Python's native tool for concurrent I/O operations. It's lightweight and ideal for web scraping.

How AsyncIO Works

Instead of waiting for one request to finish before starting the next, asyncio lets you start multiple requests and process them as they complete.

import asyncio
import aiohttp
import time
 
async def scrape_single_url(session, url):
    """Scrape a single URL asynchronously"""
    try:
        async with session.get(url, timeout=10) as response:
            html = await response.text()
            return {
                'url': url,
                'status': response.status,
                'content_length': len(html)
            }
    except asyncio.TimeoutError:
        return {'url': url, 'error': 'Timeout'}
    except Exception as e:
        return {'url': url, 'error': str(e)}
 
async def scrape_multiple_urls(urls, max_concurrent=10):
    """Scrape multiple URLs concurrently"""
    connector = aiohttp.TCPConnector(limit=max_concurrent)
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [scrape_single_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results
 
# Usage
urls = [
    'https://example.com/page1',
    'https://example.com/page2',
    'https://example.com/page3',
    # ... more URLs
]
 
start = time.time()
results = asyncio.run(scrape_multiple_urls(urls, max_concurrent=10))
print(f"Scraped {len(results)} URLs in {time.time() - start:.2f} seconds")

Why AsyncIO Works Well for Scraping

  • Non-blocking: While waiting for one response, the system handles other requests
  • Memory efficient: Uses less memory than threads or processes
  • Lightweight: Can handle hundreds of concurrent connections
  • Native Python: No external libraries needed (though aiohttp is required for async HTTP)

Limiting Concurrent Requests Responsibly

Notice the limit=max_concurrent parameter. This is crucial:

  • Limits the number of simultaneous connections
  • Prevents overwhelming target servers
  • Reduces the chance of getting blocked
  • Respects the server's capacity

Best practice: Start with 5-10 concurrent requests. If the server responds well, gradually increase to 20-30. Most sites should handle 10-15 concurrent requests without issues.

Advanced AsyncIO: Adding Delays Between Requests

To be more respectful to servers, add random delays:

import asyncio
import random
 
async def scrape_single_url_with_delay(session, url, delay=0):
    """Scrape with optional delay"""
    await asyncio.sleep(delay)
    async with session.get(url, timeout=10) as response:
        html = await response.text()
        return {'url': url, 'status': response.status}
 
async def scrape_with_staggered_delays(urls, max_concurrent=10):
    """Stagger requests with delays to be respectful"""
    connector = aiohttp.TCPConnector(limit=max_concurrent)
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = []
        for i, url in enumerate(urls):
            # Add randomized delay (0-2 seconds) spread across requests
            delay = (i % max_concurrent) * 0.1 + random.uniform(0, 1)
            tasks.append(scrape_single_url_with_delay(session, url, delay))
        
        results = await asyncio.gather(*tasks)
    return results

Approach 2: Threading

Threading uses the concurrent.futures library to run multiple threads. This is simpler than asyncio but slightly less efficient.

Basic Threading Example

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
 
def scrape_url(url):
    """Scrape a single URL (blocking call)"""
    try:
        response = requests.get(url, timeout=10)
        return {
            'url': url,
            'status': response.status_code,
            'content_length': len(response.text)
        }
    except requests.Timeout:
        return {'url': url, 'error': 'Timeout'}
    except Exception as e:
        return {'url': url, 'error': str(e)}
 
def scrape_with_threads(urls, max_workers=10):
    """Scrape multiple URLs using thread pool"""
    results = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_url = {executor.submit(scrape_url, url): url for url in urls}
        
        # Process results as they complete
        for future in as_completed(future_to_url):
            result = future.result()
            results.append(result)
            print(f"Completed: {result.get('url', 'unknown')}")
    
    return results
 
# Usage
urls = ['https://example.com/page' + str(i) for i in range(1, 51)]
start = time.time()
results = scrape_with_threads(urls, max_workers=10)
print(f"Scraped {len(results)} URLs in {time.time() - start:.2f} seconds")

Threading vs AsyncIO: When to Use Each

Use AsyncIO when:

  • You're already familiar with async/await syntax
  • You want maximum concurrency (100+ URLs)
  • You need fine-grained control over timing
  • Working with async-native libraries

Use Threading when:

  • You prefer simpler, more straightforward code
  • Working with standard libraries like requests
  • You have fewer concurrent tasks (10-50 URLs)
  • You need synchronous libraries that don't support async

Threading Gotcha: Thread Safety

When multiple threads access shared data, you might get race conditions. Use locks:

from threading import Lock
from concurrent.futures import ThreadPoolExecutor
 
results = []
results_lock = Lock()
 
def scrape_and_store(url):
    """Scrape and safely store result"""
    result = scrape_url(url)
    
    # Use lock to prevent race conditions
    with results_lock:
        results.append(result)
 
def scrape_with_thread_safe_storage(urls, max_workers=10):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        executor.map(scrape_and_store, urls)
    return results

Approach 3: Multiprocessing (For CPU-Intensive Tasks)

If your scraping involves heavy data processing (parsing HTML, running ML models), multiprocessing uses multiple CPU cores:

from multiprocessing import Pool, cpu_count
import time
 
def scrape_and_process(url):
    """Scrape and process data (CPU-intensive)"""
    try:
        response = requests.get(url, timeout=10)
        html = response.text
        
        # Simulate heavy processing
        word_count = len(html.split())
        unique_words = len(set(html.lower().split()))
        
        return {
            'url': url,
            'words': word_count,
            'unique': unique_words
        }
    except Exception as e:
        return {'url': url, 'error': str(e)}
 
def scrape_with_multiprocessing(urls):
    """Use all available CPU cores"""
    num_processes = cpu_count()
    
    with Pool(processes=num_processes) as pool:
        results = pool.map(scrape_and_process, urls)
    
    return results
 
# Usage
urls = ['https://example.com/page' + str(i) for i in range(1, 51)]
start = time.time()
results = scrape_with_multiprocessing(urls)
print(f"Scraped and processed {len(results)} URLs in {time.time() - start:.2f} seconds")

Approach 4: ScrapeGraphAI with Concurrent Requests

ScrapeGraphAI handles much of the complexity for you. Here's how to use it for concurrent scraping:

Basic Concurrent Scraping with ScrapeGraphAI

from scrapegraphai.graphs import SmartScraperGraph
import asyncio
import time
 
async def scrape_with_scrapegraphai(url, prompt):
    """Scrape a single URL with ScrapeGraphAI"""
    graph_config = {
        "llm": {
            "model": "gpt-4",
            "api_key": "your-api-key",
        },
    }
    
    scraper = SmartScraperGraph(
        prompt=prompt,
        source=url,
        config=graph_config
    )
    
    result = scraper.run()
    return {
        'url': url,
        'data': result
    }
 
async def scrape_multiple_with_scrapegraphai(urls, prompt, max_concurrent=5):
    """Scrape multiple URLs concurrently with ScrapeGraphAI"""
    # Use semaphore to limit concurrent API calls
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def bounded_scrape(url):
        async with semaphore:
            return await scrape_with_scrapegraphai(url, prompt)
    
    tasks = [bounded_scrape(url) for url in urls]
    results = await asyncio.gather(*tasks)
    return results
 
# Usage
urls = [
    'https://example.com/product1',
    'https://example.com/product2',
    'https://example.com/product3',
]
 
prompt = "Extract product name, price, and rating"
 
start = time.time()
results = asyncio.run(scrape_multiple_with_scrapegraphai(urls, prompt, max_concurrent=5))
print(f"Scraped {len(results)} URLs in {time.time() - start:.2f} seconds")
 
for result in results:
    print(f"URL: {result['url']}")
    print(f"Data: {result['data']}")
    print("---")

Real-World Example: Scraping an E-commerce Site

Let's build a complete example scraping product data from multiple pages:

import asyncio
import aiohttp
import json
from datetime import datetime
from scrapegraphai.graphs import SmartScraperGraph
 
class ConcurrentScraper:
    def __init__(self, max_concurrent=10, api_key=None):
        self.max_concurrent = max_concurrent
        self.api_key = api_key
        self.results = []
        self.errors = []
    
    async def scrape_url(self, session, url, prompt):
        """Scrape a single URL with ScrapeGraphAI"""
        try:
            # Use semaphore to limit concurrency
            async with self.semaphore:
                graph_config = {
                    "llm": {
                        "model": "gpt-4",
                        "api_key": self.api_key,
                    },
                }
                
                scraper = SmartScraperGraph(
                    prompt=prompt,
                    source=url,
                    config=graph_config
                )
                
                data = scraper.run()
                
                return {
                    'url': url,
                    'status': 'success',
                    'data': data,
                    'timestamp': datetime.now().isoformat()
                }
        
        except Exception as e:
            return {
                'url': url,
                'status': 'error',
                'error': str(e),
                'timestamp': datetime.now().isoformat()
            }
    
    async def scrape_urls(self, urls, prompt):
        """Scrape multiple URLs concurrently"""
        self.semaphore = asyncio.Semaphore(self.max_concurrent)
        
        async with aiohttp.ClientSession() as session:
            tasks = [
                self.scrape_url(session, url, prompt) 
                for url in urls
            ]
            self.results = await asyncio.gather(*tasks)
        
        return self.results
    
    def save_results(self, filename):
        """Save results to JSON file"""
        with open(filename, 'w') as f:
            json.dump(self.results, f, indent=2)
        print(f"Saved {len(self.results)} results to {filename}")
    
    def get_stats(self):
        """Get scraping statistics"""
        successful = sum(1 for r in self.results if r['status'] == 'success')
        failed = sum(1 for r in self.results if r['status'] == 'error')
        
        return {
            'total': len(self.results),
            'successful': successful,
            'failed': failed,
            'success_rate': f"{(successful/len(self.results)*100):.1f}%" if self.results else "0%"
        }
 
# Usage
async def main():
    # List of product pages to scrape
    urls = [
        f'https://example.com/products/page{i}' 
        for i in range(1, 21)  # 20 pages
    ]
    
    prompt = """
    Extract all products on this page with:
    - Product name
    - Price
    - Rating
    - In stock status
    Return as a list of dictionaries
    """
    
    scraper = ConcurrentScraper(max_concurrent=5, api_key="your-api-key")
    
    print("Starting concurrent scraping...")
    start_time = datetime.now()
    
    results = await scraper.scrape_urls(urls, prompt)
    
    elapsed = (datetime.now() - start_time).total_seconds()
    
    print(f"\nScraping completed in {elapsed:.2f} seconds")
    print(f"Statistics: {scraper.get_stats()}")
    
    scraper.save_results('products.json')
 
# Run
asyncio.run(main())

Best Practices for Concurrent Scraping

1. Respect Rate Limits and Server Load

# Add delays between requests
async def scrape_with_rate_limiting(urls, max_concurrent=5, min_delay=1):
    """Scrape with minimum delay between requests"""
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def delayed_scrape(session, url, index):
        async with semaphore:
            # Spread out requests with delay
            await asyncio.sleep(index * min_delay / max_concurrent)
            return await scrape_url(session, url)
    
    async with aiohttp.ClientSession() as session:
        tasks = [
            delayed_scrape(session, url, i) 
            for i, url in enumerate(urls)
        ]
        results = await asyncio.gather(*tasks)
    
    return results

2. Implement Retry Logic

async def scrape_with_retries(session, url, max_retries=3):
    """Retry failed requests up to max_retries times"""
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=10) as response:
                if response.status == 200:
                    return await response.text()
        except asyncio.TimeoutError:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {e}")
    
    raise Exception(f"Failed to scrape {url} after {max_retries} retries")

3. Use Backoff for Rate Limiting

import asyncio
import random
 
async def scrape_with_backoff(session, url):
    """Implement exponential backoff"""
    max_retries = 5
    base_delay = 1
    
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=10) as response:
                if response.status == 429:  # Too Many Requests
                    # Wait and retry
                    delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                    print(f"Rate limited. Waiting {delay:.1f} seconds...")
                    await asyncio.sleep(delay)
                    continue
                
                return await response.text()
        except Exception as e:
            print(f"Error: {e}")
    
    return None

4. Monitor and Log Progress

import logging
from datetime import datetime
 
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
 
class ProgressTracker:
    def __init__(self, total):
        self.total = total
        self.completed = 0
        self.errors = 0
    
    def log_success(self, url):
        self.completed += 1
        progress = (self.completed / self.total) * 100
        logger.info(f"[{progress:.1f}%] Completed: {url}")
    
    def log_error(self, url, error):
        self.errors += 1
        logger.error(f"Failed: {url} - {error}")
    
    def summary(self):
        logger.info(f"Complete. Success: {self.completed}/{self.total}, Errors: {self.errors}")
 
# Usage in concurrent scraping
async def scrape_with_progress(urls):
    tracker = ProgressTracker(len(urls))
    
    for url in urls:
        try:
            result = await scrape_url(url)
            tracker.log_success(url)
        except Exception as e:
            tracker.log_error(url, str(e))
    
    tracker.summary()

Performance Comparison

Let's compare the three approaches with 100 URLs, each taking ~2 seconds:

Approach Time Memory Complexity Best For
Sequential ~200s Low Low Testing, small batches
AsyncIO ~20s Low Medium Large batches, many URLs
Threading ~20s Medium Low Mixed I/O + processing
Multiprocessing ~20s High High Heavy CPU processing

Recommendation: For most web scraping, use AsyncIO for the best balance of speed, memory efficiency, and code quality.

Common Pitfalls and How to Avoid Them

Pitfall 1: Too Many Concurrent Requests

# ❌ Don't do this
max_workers = 100  # Will likely get blocked
 
# ✅ Do this instead
max_workers = 10  # Start conservative, increase if needed

Pitfall 2: Not Handling Timeouts

# ❌ No timeout (can hang forever)
response = await session.get(url)
 
# ✅ Always set timeout
response = await session.get(url, timeout=10)

Pitfall 3: Ignoring Error Status Codes

# ❌ Treating all responses as success
html = await response.text()
 
# ✅ Check status code
if response.status != 200:
    logger.warning(f"Unexpected status {response.status} for {url}")
html = await response.text()

Pitfall 4: Blocking Operations in Async Code

# ❌ Blocking call in async function (defeats the purpose)
def parse_html(html):
    # CPU-intensive parsing
    return BeautifulSoup(html)
 
async def scrape():
    html = await get_html(url)
    data = parse_html(html)  # This blocks all concurrent tasks
 
# ✅ Use async libraries or move to thread pool
from concurrent.futures import ThreadPoolExecutor
 
executor = ThreadPoolExecutor(max_workers=2)
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(executor, parse_html, html)

Conclusion: Choose the Right Concurrency Model

The choice between AsyncIO, Threading, and Multiprocessing depends on your specific needs:

  • Start with AsyncIO: It's the most efficient for I/O-bound web scraping
  • Fall back to Threading: If AsyncIO feels too complex or you need synchronous libraries
  • Use Multiprocessing: Only if you're doing heavy data processing alongside scraping
  • Use ScrapeGraphAI: If you want AI-powered extraction without worrying about concurrency details

The key to successful concurrent scraping is starting conservative (5-10 concurrent requests) and gradually increasing if the server responds well. Always implement error handling, retries, and monitoring to ensure reliable, respectful scraping.

With these techniques, you can scrape thousands of URLs efficiently while being a good citizen of the web.

Related Resources

Want to learn more about web scraping and data extraction? Explore these guides:


Need help with concurrent scraping? Try building a small test with 10 URLs first, then scale up. Monitor your success rate and adjust concurrency accordingly.

Give your AI Agent superpowers with lightning-fast web data!