ScrapeGraphAIScrapeGraphAI

Exporting and Transforming Data: Converting Scraped Data to CSV, JSON, and Databases

Exporting and Transforming Data: Converting Scraped Data to CSV, JSON, and Databases

Author 1

Marco Vinciguerra

Exporting and Transforming Data: Converting Scraped Data to CSV, JSON, and Databases

You've successfully scraped data from websites. Now what? Your raw scraped data is messy, unstructured, and sitting in memory. To make it useful, you need to export and transform it into formats that work for analysis, reporting, or further processing.

In this guide, we'll explore how to convert scraped data into CSV files, JSON, SQL databases, and other formats—plus the best practices for transforming data along the way.

The Data Journey: From Scraping to Usable Data

Before diving into specific formats, let's understand the workflow:

Raw HTML → Parse & Extract → Transform & Clean → Export to Format
                ↓                    ↓                     ↓
            Messy Data          Clean Data          Usable Data

Each step is critical. Exporting garbage data to a perfect format is still garbage. That's why we'll focus on both transformation and export.

Part 1: Exporting to CSV

CSV (Comma-Separated Values) is the most universal format. Every tool reads CSV—Excel, Google Sheets, databases, data analysis software.

Basic CSV Export

import csv
from datetime import datetime
 
# Sample scraped data
products = [
    {'name': 'Headphones', 'price': '$99.99', 'rating': '4.5', 'url': 'example.com/1'},
    {'name': 'Speaker', 'price': '$149.99', 'rating': '4.2', 'url': 'example.com/2'},
    {'name': 'Microphone', 'price': '$79.99', 'rating': '4.8', 'url': 'example.com/3'},
]
 
def export_to_csv(data, filename):
    """Export list of dictionaries to CSV"""
    if not data:
        print("No data to export")
        return
    
    # Get column names from first item
    fieldnames = data[0].keys()
    
    with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        
        # Write header row
        writer.writeheader()
        
        # Write data rows
        writer.writerows(data)
    
    print(f"Exported {len(data)} rows to {filename}")
 
# Usage
export_to_csv(products, 'products.csv')

Result (products.csv):

name,price,rating,url
Headphones,$99.99,4.5,example.com/1
Speaker,$149.99,4.2,example.com/2
Microphone,$79.99,4.8,example.com/3

CSV with Data Transformation

Often, you want to transform data before exporting. Clean prices, format dates, add calculated columns:

import csv
import re
from datetime import datetime
 
def clean_price(price_str):
    """Convert '$99.99' to 99.99"""
    if not price_str:
        return None
    match = re.search(r'(\d+\.?\d*)', str(price_str).replace(',', ''))
    return float(match.group(1)) if match else None
 
def clean_rating(rating_str):
    """Convert '4.5 / 5 stars' to 4.5"""
    if not rating_str:
        return None
    match = re.search(r'(\d+\.?\d*)', str(rating_str))
    return float(match.group(1)) if match else None
 
def transform_for_export(products):
    """Transform scraped data for export"""
    transformed = []
    
    for product in products:
        transformed_item = {
            'name': product.get('name', '').strip(),
            'price': clean_price(product.get('price')),
            'rating': clean_rating(product.get('rating')),
            'url': product.get('url', ''),
            'scraped_at': datetime.now().isoformat(),
            'price_numeric': clean_price(product.get('price')),  # Separate numeric column
        }
        transformed.append(transformed_item)
    
    return transformed
 
# Usage
products = [
    {'name': '  Headphones  ', 'price': '$99.99', 'rating': '4.5 / 5 stars', 'url': 'example.com/1'},
    {'name': 'Speaker', 'price': '$149.99', 'rating': '4.2', 'url': 'example.com/2'},
]
 
transformed = transform_for_export(products)
export_to_csv(transformed, 'products_cleaned.csv')

Appending to Existing CSV

Often, you want to add new data to existing CSVs without duplicating headers:

import os
import csv
 
def append_to_csv(data, filename):
    """Append data to existing CSV, or create if doesn't exist"""
    file_exists = os.path.isfile(filename)
    
    with open(filename, 'a', newline='', encoding='utf-8') as csvfile:
        fieldnames = data[0].keys() if data else []
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        
        # Write header only if file is new
        if not file_exists:
            writer.writeheader()
        
        # Write data
        writer.writerows(data)
    
    print(f"Appended {len(data)} rows to {filename}")
 
# Usage - run daily without duplicating headers
new_products = [...]
append_to_csv(new_products, 'products.csv')

CSV Best Practices

def export_csv_best_practices(data, filename):
    """CSV export following best practices"""
    
    with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
        fieldnames = data[0].keys()
        writer = csv.DictWriter(
            csvfile,
            fieldnames=fieldnames,
            quoting=csv.QUOTE_ALL,  # Quote all fields to handle special chars
            extrasaction='ignore'    # Ignore extra fields
        )
        
        writer.writeheader()
        writer.writerows(data)
 
# For large datasets with pandas (more efficient)
import pandas as pd
 
def export_csv_with_pandas(data, filename, chunk_size=10000):
    """Export large datasets efficiently with pandas"""
    df = pd.DataFrame(data)
    
    # Data type optimization
    for col in df.select_dtypes(include=['object']).columns:
        if df[col].nunique() < df.shape[0] * 0.05:
            df[col] = df[col].astype('category')
    
    # Export
    df.to_csv(filename, index=False, encoding='utf-8')
    print(f"Exported {len(df)} rows to {filename}")

Part 2: Exporting to JSON

JSON (JavaScript Object Notation) is perfect for nested data, API responses, and NoSQL databases.

Basic JSON Export

import json
 
def export_to_json(data, filename, pretty=True):
    """Export data to JSON file"""
    with open(filename, 'w', encoding='utf-8') as jsonfile:
        if pretty:
            json.dump(data, jsonfile, indent=2, ensure_ascii=False)
        else:
            json.dump(data, jsonfile, ensure_ascii=False)
    
    print(f"Exported {len(data)} items to {filename}")
 
# Usage
products = [
    {'name': 'Headphones', 'price': 99.99, 'specs': {'color': 'black', 'weight': '250g'}},
    {'name': 'Speaker', 'price': 149.99, 'specs': {'color': 'white', 'weight': '500g'}},
]
 
export_to_json(products, 'products.json', pretty=True)

Result (products.json):

[
  {
    "name": "Headphones",
    "price": 99.99,
    "specs": {
      "color": "black",
      "weight": "250g"
    }
  },
  {
    "name": "Speaker",
    "price": 149.99,
    "specs": {
      "color": "white",
      "weight": "500g"
    }
  }
]

JSONL (JSON Lines) for Large Datasets

JSONL stores one JSON object per line—perfect for streaming and large files:

def export_to_jsonl(data, filename):
    """Export to JSONL format (one JSON object per line)"""
    with open(filename, 'w', encoding='utf-8') as f:
        for item in data:
            json.dump(item, f, ensure_ascii=False)
            f.write('\n')
    
    print(f"Exported {len(data)} items to {filename}")
 
def read_jsonl(filename):
    """Read JSONL file"""
    data = []
    with open(filename, 'r', encoding='utf-8') as f:
        for line in f:
            if line.strip():
                data.append(json.loads(line))
    return data
 
# Usage
export_to_jsonl(products, 'products.jsonl')
 
# Read it back
loaded_products = read_jsonl('products.jsonl')

Streaming JSON Export for Large Datasets

import json
from collections import UserList
 
class JSONStreamer:
    """Stream data to JSON without loading everything in memory"""
    
    def __init__(self, filename, pretty=False):
        self.filename = filename
        self.pretty = pretty
        self.file = None
        self.count = 0
    
    def __enter__(self):
        self.file = open(self.filename, 'w', encoding='utf-8')
        self.file.write('[\n')
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.file.write('\n]')
        self.file.close()
        print(f"Streamed {self.count} items to {self.filename}")
    
    def write(self, item):
        """Write single item to JSON array"""
        if self.count > 0:
            self.file.write(',\n')
        
        if self.pretty:
            json.dump(item, self.file, indent=2, ensure_ascii=False)
        else:
            json.dump(item, self.file, ensure_ascii=False)
        
        self.count += 1
 
# Usage - handles large datasets gracefully
with JSONStreamer('large_products.json', pretty=True) as stream:
    for i in range(1000000):
        product = {'id': i, 'name': f'Product {i}', 'price': 99.99}
        stream.write(product)

Part 3: Exporting to Databases

Databases are ideal for production scrapers. They provide indexing, querying, and scalability.

SQLite (Simplest Local Database)

import sqlite3
from datetime import datetime
 
def create_sqlite_table(db_path, table_name):
    """Create table in SQLite database"""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    cursor.execute(f'''
        CREATE TABLE IF NOT EXISTS {table_name} (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            price REAL,
            rating REAL,
            url TEXT UNIQUE,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    
    conn.commit()
    conn.close()
 
def insert_into_sqlite(data, db_path, table_name):
    """Insert scraped data into SQLite database"""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    for item in data:
        try:
            cursor.execute(f'''
                INSERT INTO {table_name} (name, price, rating, url, scraped_at)
                VALUES (?, ?, ?, ?, ?)
            ''', (
                item.get('name'),
                float(item.get('price', 0)),
                float(item.get('rating', 0)),
                item.get('url'),
                datetime.now()
            ))
        except sqlite3.IntegrityError:
            # Skip duplicates (URL already exists)
            pass
    
    conn.commit()
    conn.close()
    print(f"Inserted {len(data)} items into {table_name}")
 
# Usage
create_sqlite_table('products.db', 'products')
 
products = [
    {'name': 'Headphones', 'price': 99.99, 'rating': 4.5, 'url': 'example.com/1'},
    {'name': 'Speaker', 'price': 149.99, 'rating': 4.2, 'url': 'example.com/2'},
]
 
insert_into_sqlite(products, 'products.db', 'products')
 
# Query the database
conn = sqlite3.connect('products.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM products WHERE price > 100')
results = cursor.fetchall()
for row in results:
    print(row)
conn.close()

PostgreSQL (Production Database)

import psycopg2
from datetime import datetime
 
class PostgreSQLExporter:
    def __init__(self, host, database, user, password):
        self.conn = psycopg2.connect(
            host=host,
            database=database,
            user=user,
            password=password
        )
        self.cursor = self.conn.cursor()
    
    def create_table(self, table_name):
        """Create products table"""
        self.cursor.execute(f'''
            CREATE TABLE IF NOT EXISTS {table_name} (
                id SERIAL PRIMARY KEY,
                name VARCHAR(255) NOT NULL,
                price DECIMAL(10, 2),
                rating DECIMAL(3, 1),
                url VARCHAR(500) UNIQUE,
                description TEXT,
                scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        self.conn.commit()
        print(f"Table {table_name} created/verified")
    
    def insert_batch(self, data, table_name, batch_size=100):
        """Insert data in batches"""
        for i in range(0, len(data), batch_size):
            batch = data[i:i+batch_size]
            
            for item in batch:
                try:
                    self.cursor.execute(f'''
                        INSERT INTO {table_name} (name, price, rating, url, description, scraped_at)
                        VALUES (%s, %s, %s, %s, %s, %s)
                        ON CONFLICT (url) DO NOTHING
                    ''', (
                        item.get('name'),
                        float(item.get('price', 0)) if item.get('price') else None,
                        float(item.get('rating', 0)) if item.get('rating') else None,
                        item.get('url'),
                        item.get('description'),
                        datetime.now()
                    ))
                except Exception as e:
                    print(f"Error inserting {item.get('name')}: {e}")
                    self.conn.rollback()
                    continue
            
            self.conn.commit()
        
        print(f"Inserted {len(data)} items")
    
    def close(self):
        self.cursor.close()
        self.conn.close()
 
# Usage
exporter = PostgreSQLExporter(
    host='localhost',
    database='scraper_db',
    user='postgres',
    password='password'
)
 
exporter.create_table('products')
 
products = [
    {'name': 'Headphones', 'price': 99.99, 'rating': 4.5, 'url': 'example.com/1', 'description': 'Great headphones'},
    {'name': 'Speaker', 'price': 149.99, 'rating': 4.2, 'url': 'example.com/2', 'description': 'Powerful speaker'},
]
 
exporter.insert_batch(products, 'products')
exporter.close()

MongoDB (NoSQL for Unstructured Data)

from pymongo import MongoClient
from datetime import datetime
 
class MongoDBExporter:
    def __init__(self, connection_string):
        self.client = MongoClient(connection_string)
        self.db = self.client.scraper_db
    
    def insert_documents(self, collection_name, data):
        """Insert documents into MongoDB collection"""
        collection = self.db[collection_name]
        
        # Add timestamp to each document
        for item in data:
            item['scraped_at'] = datetime.now()
        
        try:
            result = collection.insert_many(data, ordered=False)
            print(f"Inserted {len(result.inserted_ids)} documents")
        except Exception as e:
            print(f"Error inserting documents: {e}")
    
    def upsert_documents(self, collection_name, data, unique_field='url'):
        """Upsert documents (insert or update if exists)"""
        collection = self.db[collection_name]
        
        for item in data:
            item['scraped_at'] = datetime.now()
            
            try:
                collection.update_one(
                    {unique_field: item.get(unique_field)},
                    {'$set': item},
                    upsert=True
                )
            except Exception as e:
                print(f"Error upserting {item.get('name')}: {e}")
    
    def query(self, collection_name, filter_query):
        """Query documents"""
        collection = self.db[collection_name]
        return list(collection.find(filter_query))
    
    def close(self):
        self.client.close()
 
# Usage
exporter = MongoDBExporter('mongodb://localhost:27017/')
 
products = [
    {
        'name': 'Headphones',
        'price': 99.99,
        'rating': 4.5,
        'url': 'example.com/1',
        'specs': {'color': 'black', 'weight': '250g', 'battery': '20 hours'}
    },
    {
        'name': 'Speaker',
        'price': 149.99,
        'rating': 4.2,
        'url': 'example.com/2',
        'specs': {'color': 'white', 'weight': '500g', 'power': '50W'}
    },
]
 
# Insert with upsert (avoid duplicates)
exporter.upsert_documents('products', products)
 
# Query
results = exporter.query('products', {'price': {'$gt': 100}})
print(f"Found {len(results)} products over $100")
 
exporter.close()

Part 4: Advanced Data Transformation

Using Pandas for Complex Transformations

import pandas as pd
import numpy as np
 
def transform_with_pandas(scraped_data):
    """Complex transformations using pandas"""
    
    # Create DataFrame
    df = pd.DataFrame(scraped_data)
    
    # Data type conversion
    df['price'] = df['price'].str.replace('$', '').astype(float)
    df['rating'] = df['rating'].astype(float)
    
    # Text cleaning
    df['name'] = df['name'].str.strip().str.title()
    
    # Add calculated columns
    df['price_category'] = pd.cut(
        df['price'],
        bins=[0, 50, 100, 200, np.inf],
        labels=['Budget', 'Mid-range', 'Premium', 'Luxury']
    )
    
    # Group and aggregate
    category_stats = df.groupby('price_category').agg({
        'price': ['mean', 'min', 'max'],
        'rating': 'mean',
        'name': 'count'
    }).round(2)
    
    print("Category Statistics:")
    print(category_stats)
    
    # Filter
    premium_products = df[df['price'] > 100]
    
    # Sort
    df_sorted = df.sort_values('rating', ascending=False)
    
    # Remove duplicates
    df_clean = df.drop_duplicates(subset=['url'])
    
    return df_clean
 
# Usage
scraped = [
    {'name': '  headphones  ', 'price': '$99.99', 'rating': 4.5, 'url': 'ex.com/1'},
    {'name': 'speaker', 'price': '$149.99', 'rating': 4.2, 'url': 'ex.com/2'},
]
 
df_transformed = transform_with_pandas(scraped)
print(df_transformed)

Pydantic for Data Validation

from pydantic import BaseModel, validator, ValidationError
from typing import Optional
import json
 
class Product(BaseModel):
    name: str
    price: float
    rating: Optional[float] = None
    url: str
    description: Optional[str] = None
    
    @validator('price')
    def price_must_be_positive(cls, v):
        if v < 0:
            raise ValueError('Price must be positive')
        return v
    
    @validator('rating')
    def rating_must_be_valid(cls, v):
        if v is not None and (v < 0 or v > 5):
            raise ValueError('Rating must be between 0 and 5')
        return v
    
    @validator('name')
    def name_must_not_be_empty(cls, v):
        if not v or not v.strip():
            raise ValueError('Name cannot be empty')
        return v.strip().title()
 
def validate_and_export(scraped_data):
    """Validate scraped data with Pydantic"""
    valid_products = []
    invalid_products = []
    
    for item in scraped_data:
        try:
            # Convert to float
            item['price'] = float(item.get('price', 0).replace('$', ''))
            item['rating'] = float(item.get('rating', 0)) if item.get('rating') else None
            
            product = Product(**item)
            valid_products.append(product)
        
        except ValidationError as e:
            invalid_products.append({
                'item': item,
                'errors': e.errors()
            })
            print(f"Validation error: {e}")
    
    print(f"Valid: {len(valid_products)}, Invalid: {len(invalid_products)}")
    return valid_products, invalid_products
 
# Usage
scraped = [
    {'name': '  headphones  ', 'price': '$99.99', 'rating': 4.5, 'url': 'ex.com/1'},
    {'name': 'speaker', 'price': '$149.99', 'rating': 6.0, 'url': 'ex.com/2'},  # Invalid rating
]
 
valid, invalid = validate_and_export(scraped)
 
# Export only valid data
df = pd.DataFrame([p.dict() for p in valid])
df.to_csv('valid_products.csv', index=False)

Part 5: Complete Production Pipeline

Here's a complete, production-ready data pipeline:

import csv
import json
import sqlite3
from datetime import datetime
from typing import List, Dict
import logging
 
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
 
class DataPipeline:
    def __init__(self, db_path='data.db'):
        self.db_path = db_path
        self.setup_database()
    
    def setup_database(self):
        """Initialize database with products table"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS products (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                price REAL,
                rating REAL,
                url TEXT UNIQUE,
                source TEXT,
                scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        cursor.execute('''
            CREATE INDEX IF NOT EXISTS idx_url ON products(url)
        ''')
        
        conn.commit()
        conn.close()
    
    def clean_data(self, raw_data: List[Dict]) -> List[Dict]:
        """Clean and transform raw scraped data"""
        cleaned = []
        
        for item in raw_data:
            try:
                # Clean price
                price_str = str(item.get('price', '0')).replace('$', '').replace(',', '')
                price = float(price_str) if price_str else None
                
                # Clean rating
                rating_str = str(item.get('rating', '0')).split()[0]
                rating = float(rating_str) if rating_str else None
                
                cleaned_item = {
                    'name': item.get('name', '').strip().title(),
                    'price': price,
                    'rating': rating,
                    'url': item.get('url', '').strip(),
                    'source': item.get('source', 'unknown'),
                }
                
                # Validate
                if cleaned_item['name'] and cleaned_item['url']:
                    cleaned.append(cleaned_item)
                else:
                    logger.warning(f"Skipped invalid item: {item}")
            
            except Exception as e:
                logger.error(f"Error cleaning {item}: {e}")
        
        return cleaned
    
    def to_database(self, data: List[Dict]):
        """Save to SQLite database"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        for item in data:
            try:
                cursor.execute('''
                    INSERT INTO products (name, price, rating, url, source, updated_at)
                    VALUES (?, ?, ?, ?, ?, ?)
                    ON CONFLICT(url) DO UPDATE SET
                        price = ?,
                        rating = ?,
                        updated_at = ?
                ''', (
                    item['name'], item['price'], item['rating'], item['url'], item['source'], datetime.now(),
                    item['price'], item['rating'], datetime.now()
                ))
            except Exception as e:
                logger.error(f"Error inserting {item}: {e}")
        
        conn.commit()
        conn.close()
        logger.info(f"Saved {len(data)} items to database")
    
    def to_csv(self, data: List[Dict], filename: str):
        """Export to CSV"""
        if not data:
            return
        
        with open(filename, 'w', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=data[0].keys())
            writer.writeheader()
            writer.writerows(data)
        
        logger.info(f"Exported {len(data)} items to {filename}")
    
    def to_json(self, data: List[Dict], filename: str):
        """Export to JSON"""
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
        
        logger.info(f"Exported {len(data)} items to {filename}")
    
    def to_jsonl(self, data: List[Dict], filename: str):
        """Export to JSONL"""
        with open(filename, 'w', encoding='utf-8') as f:
            for item in data:
                f.write(json.dumps(item, ensure_ascii=False) + '\n')
        
        logger.info(f"Exported {len(data)} items to {filename}")
    
    def export_all(self, data: List[Dict], prefix: str = 'export'):
        """Export to all formats"""
        cleaned = self.clean_data(data)
        
        self.to_database(cleaned)
        self.to_csv(cleaned, f'{prefix}.csv')
        self.to_json(cleaned, f'{prefix}.json')
        self.to_jsonl(cleaned, f'{prefix}.jsonl')
        
        logger.info("Export complete")
        return cleaned
 
# Usage
pipeline = DataPipeline('scraper.db')
 
# Scraped data (messy)
scraped_data = [
    {'name': '  headphones  ', 'price': '$99.99', 'rating': '4.5 stars', 'url': 'example.com/1', 'source': 'amazon'},
    {'name': 'speaker', 'price': '$149.99', 'rating': '4.2', 'url': 'example.com/2', 'source': 'amazon'},
    {'name': 'microphone', 'price': '$79.99', 'rating': '4.8 / 5', 'url': 'example.com/3', 'source': 'bestbuy'},
]
 
# Process and export
cleaned_data = pipeline.export_all(scraped_data, prefix='products_2024')
 
# Query later
conn = sqlite3.connect('scraper.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM products WHERE price > 100 ORDER BY rating DESC')
expensive_items = cursor.fetchall()
print(f"Products over $100: {len(expensive_items)}")
conn.close()

Part 6: Best Practices for Data Export

1. Incremental Exports

def incremental_export(new_data, db_path, last_export_file='last_export.json'):
    """Export only new items since last export"""
    
    # Load previous export metadata
    try:
        with open(last_export_file, 'r') as f:
            last_export = json.load(f)
    except FileNotFoundError:
        last_export = {'urls': set()}
    
    previous_urls = set(last_export.get('urls', []))
    new_items = [item for item in new_data if item.get('url') not in previous_urls]
    
    if new_items:
        # Add to database
        insert_into_sqlite(new_items, db_path, 'products')
        
        # Update export metadata
        current_urls = previous_urls | {item.get('url') for item in new_data}
        with open(last_export_file, 'w') as f:
            json.dump({
                'urls': list(current_urls),
                'last_updated': datetime.now().isoformat()
            }, f)
        
        print(f"Found {len(new_items)} new items")
    else:
        print("No new items since last export")

2. Data Partitioning

from datetime import datetime
import os
 
def export_partitioned(data, output_dir='exports'):
    """Partition exports by date"""
    
    os.makedirs(output_dir, exist_ok=True)
    
    today = datetime.now().strftime('%Y-%m-%d')
    partition_dir = os.path.join(output_dir, today)
    os.makedirs(partition_dir, exist_ok=True)
    
    csv_file = os.path.join(partition_dir, 'products.csv')
    json_file = os.path.join(partition_dir, 'products.json')
    
    export_to_csv(data, csv_file)
    export_to_json(data, json_file)
    
    print(f"Exports saved to {partition_dir}")

3. Duplicate Detection and Handling

def remove_duplicates(data, unique_fields=['url']):
    """Remove duplicate items based on specified fields"""
    seen = set()
    unique_data = []
    
    for item in data:
        # Create a unique key from specified fields
        key = tuple(item.get(field) for field in unique_fields)
        
        if key not in seen:
            seen.add(key)
            unique_data.append(item)
    
    duplicates_removed = len(data) - len(unique_data)
    print(f"Removed {duplicates_removed} duplicates")
    return unique_data

Choosing the Right Format

Here's a decision tree:

Need Best Format Why
Analyze in Excel CSV Universal, simple
Rest API or web app JSON Hierarchical, standard
High-volume, complex queries PostgreSQL Indexing, relationships
Unstructured data MongoDB Flexible schema
Local, small projects SQLite No setup required
Real-time streaming JSONL Process line-by-line
Data warehousing Parquet Efficient compression

Conclusion: Complete Data Pipeline

A production-quality data pipeline includes:

  1. Extraction: Scrape raw data
  2. Validation: Check data quality with Pydantic
  3. Transformation: Clean, normalize, enrich
  4. Storage: Save to database and file formats
  5. Monitoring: Track success rates and errors
  6. Querying: Retrieve data as needed

By implementing this pipeline, you ensure that your scraped data is clean, usable, and accessible for analysis, reporting, or further processing.

Start simple (CSV export), then add database persistence as your project grows. Use the pipeline template provided to build production-ready scrapers.


Which data format do you use most for your scraped data? Share your workflow in the comments below.

Give your AI Agent superpowers with lightning-fast web data!