Exporting and Transforming Data: Converting Scraped Data to CSV, JSON, and Databases
You've successfully scraped data from websites. Now what? Your raw scraped data is messy, unstructured, and sitting in memory. To make it useful, you need to export and transform it into formats that work for analysis, reporting, or further processing.
In this guide, we'll explore how to convert scraped data into CSV files, JSON, SQL databases, and other formats—plus the best practices for transforming data along the way.
The Data Journey: From Scraping to Usable Data
Before diving into specific formats, let's understand the workflow:
Raw HTML → Parse & Extract → Transform & Clean → Export to Format
↓ ↓ ↓
Messy Data Clean Data Usable Data
Each step is critical. Exporting garbage data to a perfect format is still garbage. That's why we'll focus on both transformation and export.
Part 1: Exporting to CSV
CSV (Comma-Separated Values) is the most universal format. Every tool reads CSV—Excel, Google Sheets, databases, data analysis software.
Basic CSV Export
import csv
from datetime import datetime
# Sample scraped data
products = [
{'name': 'Headphones', 'price': '$99.99', 'rating': '4.5', 'url': 'example.com/1'},
{'name': 'Speaker', 'price': '$149.99', 'rating': '4.2', 'url': 'example.com/2'},
{'name': 'Microphone', 'price': '$79.99', 'rating': '4.8', 'url': 'example.com/3'},
]
def export_to_csv(data, filename):
"""Export list of dictionaries to CSV"""
if not data:
print("No data to export")
return
# Get column names from first item
fieldnames = data[0].keys()
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# Write header row
writer.writeheader()
# Write data rows
writer.writerows(data)
print(f"Exported {len(data)} rows to {filename}")
# Usage
export_to_csv(products, 'products.csv')Result (products.csv):
name,price,rating,url
Headphones,$99.99,4.5,example.com/1
Speaker,$149.99,4.2,example.com/2
Microphone,$79.99,4.8,example.com/3
CSV with Data Transformation
Often, you want to transform data before exporting. Clean prices, format dates, add calculated columns:
import csv
import re
from datetime import datetime
def clean_price(price_str):
"""Convert '$99.99' to 99.99"""
if not price_str:
return None
match = re.search(r'(\d+\.?\d*)', str(price_str).replace(',', ''))
return float(match.group(1)) if match else None
def clean_rating(rating_str):
"""Convert '4.5 / 5 stars' to 4.5"""
if not rating_str:
return None
match = re.search(r'(\d+\.?\d*)', str(rating_str))
return float(match.group(1)) if match else None
def transform_for_export(products):
"""Transform scraped data for export"""
transformed = []
for product in products:
transformed_item = {
'name': product.get('name', '').strip(),
'price': clean_price(product.get('price')),
'rating': clean_rating(product.get('rating')),
'url': product.get('url', ''),
'scraped_at': datetime.now().isoformat(),
'price_numeric': clean_price(product.get('price')), # Separate numeric column
}
transformed.append(transformed_item)
return transformed
# Usage
products = [
{'name': ' Headphones ', 'price': '$99.99', 'rating': '4.5 / 5 stars', 'url': 'example.com/1'},
{'name': 'Speaker', 'price': '$149.99', 'rating': '4.2', 'url': 'example.com/2'},
]
transformed = transform_for_export(products)
export_to_csv(transformed, 'products_cleaned.csv')Appending to Existing CSV
Often, you want to add new data to existing CSVs without duplicating headers:
import os
import csv
def append_to_csv(data, filename):
"""Append data to existing CSV, or create if doesn't exist"""
file_exists = os.path.isfile(filename)
with open(filename, 'a', newline='', encoding='utf-8') as csvfile:
fieldnames = data[0].keys() if data else []
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# Write header only if file is new
if not file_exists:
writer.writeheader()
# Write data
writer.writerows(data)
print(f"Appended {len(data)} rows to {filename}")
# Usage - run daily without duplicating headers
new_products = [...]
append_to_csv(new_products, 'products.csv')CSV Best Practices
def export_csv_best_practices(data, filename):
"""CSV export following best practices"""
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = data[0].keys()
writer = csv.DictWriter(
csvfile,
fieldnames=fieldnames,
quoting=csv.QUOTE_ALL, # Quote all fields to handle special chars
extrasaction='ignore' # Ignore extra fields
)
writer.writeheader()
writer.writerows(data)
# For large datasets with pandas (more efficient)
import pandas as pd
def export_csv_with_pandas(data, filename, chunk_size=10000):
"""Export large datasets efficiently with pandas"""
df = pd.DataFrame(data)
# Data type optimization
for col in df.select_dtypes(include=['object']).columns:
if df[col].nunique() < df.shape[0] * 0.05:
df[col] = df[col].astype('category')
# Export
df.to_csv(filename, index=False, encoding='utf-8')
print(f"Exported {len(df)} rows to {filename}")Part 2: Exporting to JSON
JSON (JavaScript Object Notation) is perfect for nested data, API responses, and NoSQL databases.
Basic JSON Export
import json
def export_to_json(data, filename, pretty=True):
"""Export data to JSON file"""
with open(filename, 'w', encoding='utf-8') as jsonfile:
if pretty:
json.dump(data, jsonfile, indent=2, ensure_ascii=False)
else:
json.dump(data, jsonfile, ensure_ascii=False)
print(f"Exported {len(data)} items to {filename}")
# Usage
products = [
{'name': 'Headphones', 'price': 99.99, 'specs': {'color': 'black', 'weight': '250g'}},
{'name': 'Speaker', 'price': 149.99, 'specs': {'color': 'white', 'weight': '500g'}},
]
export_to_json(products, 'products.json', pretty=True)Result (products.json):
[
{
"name": "Headphones",
"price": 99.99,
"specs": {
"color": "black",
"weight": "250g"
}
},
{
"name": "Speaker",
"price": 149.99,
"specs": {
"color": "white",
"weight": "500g"
}
}
]JSONL (JSON Lines) for Large Datasets
JSONL stores one JSON object per line—perfect for streaming and large files:
def export_to_jsonl(data, filename):
"""Export to JSONL format (one JSON object per line)"""
with open(filename, 'w', encoding='utf-8') as f:
for item in data:
json.dump(item, f, ensure_ascii=False)
f.write('\n')
print(f"Exported {len(data)} items to {filename}")
def read_jsonl(filename):
"""Read JSONL file"""
data = []
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
data.append(json.loads(line))
return data
# Usage
export_to_jsonl(products, 'products.jsonl')
# Read it back
loaded_products = read_jsonl('products.jsonl')Streaming JSON Export for Large Datasets
import json
from collections import UserList
class JSONStreamer:
"""Stream data to JSON without loading everything in memory"""
def __init__(self, filename, pretty=False):
self.filename = filename
self.pretty = pretty
self.file = None
self.count = 0
def __enter__(self):
self.file = open(self.filename, 'w', encoding='utf-8')
self.file.write('[\n')
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.file.write('\n]')
self.file.close()
print(f"Streamed {self.count} items to {self.filename}")
def write(self, item):
"""Write single item to JSON array"""
if self.count > 0:
self.file.write(',\n')
if self.pretty:
json.dump(item, self.file, indent=2, ensure_ascii=False)
else:
json.dump(item, self.file, ensure_ascii=False)
self.count += 1
# Usage - handles large datasets gracefully
with JSONStreamer('large_products.json', pretty=True) as stream:
for i in range(1000000):
product = {'id': i, 'name': f'Product {i}', 'price': 99.99}
stream.write(product)Part 3: Exporting to Databases
Databases are ideal for production scrapers. They provide indexing, querying, and scalability.
SQLite (Simplest Local Database)
import sqlite3
from datetime import datetime
def create_sqlite_table(db_path, table_name):
"""Create table in SQLite database"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {table_name} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
price REAL,
rating REAL,
url TEXT UNIQUE,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def insert_into_sqlite(data, db_path, table_name):
"""Insert scraped data into SQLite database"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
for item in data:
try:
cursor.execute(f'''
INSERT INTO {table_name} (name, price, rating, url, scraped_at)
VALUES (?, ?, ?, ?, ?)
''', (
item.get('name'),
float(item.get('price', 0)),
float(item.get('rating', 0)),
item.get('url'),
datetime.now()
))
except sqlite3.IntegrityError:
# Skip duplicates (URL already exists)
pass
conn.commit()
conn.close()
print(f"Inserted {len(data)} items into {table_name}")
# Usage
create_sqlite_table('products.db', 'products')
products = [
{'name': 'Headphones', 'price': 99.99, 'rating': 4.5, 'url': 'example.com/1'},
{'name': 'Speaker', 'price': 149.99, 'rating': 4.2, 'url': 'example.com/2'},
]
insert_into_sqlite(products, 'products.db', 'products')
# Query the database
conn = sqlite3.connect('products.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM products WHERE price > 100')
results = cursor.fetchall()
for row in results:
print(row)
conn.close()PostgreSQL (Production Database)
import psycopg2
from datetime import datetime
class PostgreSQLExporter:
def __init__(self, host, database, user, password):
self.conn = psycopg2.connect(
host=host,
database=database,
user=user,
password=password
)
self.cursor = self.conn.cursor()
def create_table(self, table_name):
"""Create products table"""
self.cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {table_name} (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
price DECIMAL(10, 2),
rating DECIMAL(3, 1),
url VARCHAR(500) UNIQUE,
description TEXT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
print(f"Table {table_name} created/verified")
def insert_batch(self, data, table_name, batch_size=100):
"""Insert data in batches"""
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
for item in batch:
try:
self.cursor.execute(f'''
INSERT INTO {table_name} (name, price, rating, url, description, scraped_at)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (url) DO NOTHING
''', (
item.get('name'),
float(item.get('price', 0)) if item.get('price') else None,
float(item.get('rating', 0)) if item.get('rating') else None,
item.get('url'),
item.get('description'),
datetime.now()
))
except Exception as e:
print(f"Error inserting {item.get('name')}: {e}")
self.conn.rollback()
continue
self.conn.commit()
print(f"Inserted {len(data)} items")
def close(self):
self.cursor.close()
self.conn.close()
# Usage
exporter = PostgreSQLExporter(
host='localhost',
database='scraper_db',
user='postgres',
password='password'
)
exporter.create_table('products')
products = [
{'name': 'Headphones', 'price': 99.99, 'rating': 4.5, 'url': 'example.com/1', 'description': 'Great headphones'},
{'name': 'Speaker', 'price': 149.99, 'rating': 4.2, 'url': 'example.com/2', 'description': 'Powerful speaker'},
]
exporter.insert_batch(products, 'products')
exporter.close()MongoDB (NoSQL for Unstructured Data)
from pymongo import MongoClient
from datetime import datetime
class MongoDBExporter:
def __init__(self, connection_string):
self.client = MongoClient(connection_string)
self.db = self.client.scraper_db
def insert_documents(self, collection_name, data):
"""Insert documents into MongoDB collection"""
collection = self.db[collection_name]
# Add timestamp to each document
for item in data:
item['scraped_at'] = datetime.now()
try:
result = collection.insert_many(data, ordered=False)
print(f"Inserted {len(result.inserted_ids)} documents")
except Exception as e:
print(f"Error inserting documents: {e}")
def upsert_documents(self, collection_name, data, unique_field='url'):
"""Upsert documents (insert or update if exists)"""
collection = self.db[collection_name]
for item in data:
item['scraped_at'] = datetime.now()
try:
collection.update_one(
{unique_field: item.get(unique_field)},
{'$set': item},
upsert=True
)
except Exception as e:
print(f"Error upserting {item.get('name')}: {e}")
def query(self, collection_name, filter_query):
"""Query documents"""
collection = self.db[collection_name]
return list(collection.find(filter_query))
def close(self):
self.client.close()
# Usage
exporter = MongoDBExporter('mongodb://localhost:27017/')
products = [
{
'name': 'Headphones',
'price': 99.99,
'rating': 4.5,
'url': 'example.com/1',
'specs': {'color': 'black', 'weight': '250g', 'battery': '20 hours'}
},
{
'name': 'Speaker',
'price': 149.99,
'rating': 4.2,
'url': 'example.com/2',
'specs': {'color': 'white', 'weight': '500g', 'power': '50W'}
},
]
# Insert with upsert (avoid duplicates)
exporter.upsert_documents('products', products)
# Query
results = exporter.query('products', {'price': {'$gt': 100}})
print(f"Found {len(results)} products over $100")
exporter.close()Part 4: Advanced Data Transformation
Using Pandas for Complex Transformations
import pandas as pd
import numpy as np
def transform_with_pandas(scraped_data):
"""Complex transformations using pandas"""
# Create DataFrame
df = pd.DataFrame(scraped_data)
# Data type conversion
df['price'] = df['price'].str.replace('$', '').astype(float)
df['rating'] = df['rating'].astype(float)
# Text cleaning
df['name'] = df['name'].str.strip().str.title()
# Add calculated columns
df['price_category'] = pd.cut(
df['price'],
bins=[0, 50, 100, 200, np.inf],
labels=['Budget', 'Mid-range', 'Premium', 'Luxury']
)
# Group and aggregate
category_stats = df.groupby('price_category').agg({
'price': ['mean', 'min', 'max'],
'rating': 'mean',
'name': 'count'
}).round(2)
print("Category Statistics:")
print(category_stats)
# Filter
premium_products = df[df['price'] > 100]
# Sort
df_sorted = df.sort_values('rating', ascending=False)
# Remove duplicates
df_clean = df.drop_duplicates(subset=['url'])
return df_clean
# Usage
scraped = [
{'name': ' headphones ', 'price': '$99.99', 'rating': 4.5, 'url': 'ex.com/1'},
{'name': 'speaker', 'price': '$149.99', 'rating': 4.2, 'url': 'ex.com/2'},
]
df_transformed = transform_with_pandas(scraped)
print(df_transformed)Pydantic for Data Validation
from pydantic import BaseModel, validator, ValidationError
from typing import Optional
import json
class Product(BaseModel):
name: str
price: float
rating: Optional[float] = None
url: str
description: Optional[str] = None
@validator('price')
def price_must_be_positive(cls, v):
if v < 0:
raise ValueError('Price must be positive')
return v
@validator('rating')
def rating_must_be_valid(cls, v):
if v is not None and (v < 0 or v > 5):
raise ValueError('Rating must be between 0 and 5')
return v
@validator('name')
def name_must_not_be_empty(cls, v):
if not v or not v.strip():
raise ValueError('Name cannot be empty')
return v.strip().title()
def validate_and_export(scraped_data):
"""Validate scraped data with Pydantic"""
valid_products = []
invalid_products = []
for item in scraped_data:
try:
# Convert to float
item['price'] = float(item.get('price', 0).replace('$', ''))
item['rating'] = float(item.get('rating', 0)) if item.get('rating') else None
product = Product(**item)
valid_products.append(product)
except ValidationError as e:
invalid_products.append({
'item': item,
'errors': e.errors()
})
print(f"Validation error: {e}")
print(f"Valid: {len(valid_products)}, Invalid: {len(invalid_products)}")
return valid_products, invalid_products
# Usage
scraped = [
{'name': ' headphones ', 'price': '$99.99', 'rating': 4.5, 'url': 'ex.com/1'},
{'name': 'speaker', 'price': '$149.99', 'rating': 6.0, 'url': 'ex.com/2'}, # Invalid rating
]
valid, invalid = validate_and_export(scraped)
# Export only valid data
df = pd.DataFrame([p.dict() for p in valid])
df.to_csv('valid_products.csv', index=False)Part 5: Complete Production Pipeline
Here's a complete, production-ready data pipeline:
import csv
import json
import sqlite3
from datetime import datetime
from typing import List, Dict
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataPipeline:
def __init__(self, db_path='data.db'):
self.db_path = db_path
self.setup_database()
def setup_database(self):
"""Initialize database with products table"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
price REAL,
rating REAL,
url TEXT UNIQUE,
source TEXT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_url ON products(url)
''')
conn.commit()
conn.close()
def clean_data(self, raw_data: List[Dict]) -> List[Dict]:
"""Clean and transform raw scraped data"""
cleaned = []
for item in raw_data:
try:
# Clean price
price_str = str(item.get('price', '0')).replace('$', '').replace(',', '')
price = float(price_str) if price_str else None
# Clean rating
rating_str = str(item.get('rating', '0')).split()[0]
rating = float(rating_str) if rating_str else None
cleaned_item = {
'name': item.get('name', '').strip().title(),
'price': price,
'rating': rating,
'url': item.get('url', '').strip(),
'source': item.get('source', 'unknown'),
}
# Validate
if cleaned_item['name'] and cleaned_item['url']:
cleaned.append(cleaned_item)
else:
logger.warning(f"Skipped invalid item: {item}")
except Exception as e:
logger.error(f"Error cleaning {item}: {e}")
return cleaned
def to_database(self, data: List[Dict]):
"""Save to SQLite database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
for item in data:
try:
cursor.execute('''
INSERT INTO products (name, price, rating, url, source, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
price = ?,
rating = ?,
updated_at = ?
''', (
item['name'], item['price'], item['rating'], item['url'], item['source'], datetime.now(),
item['price'], item['rating'], datetime.now()
))
except Exception as e:
logger.error(f"Error inserting {item}: {e}")
conn.commit()
conn.close()
logger.info(f"Saved {len(data)} items to database")
def to_csv(self, data: List[Dict], filename: str):
"""Export to CSV"""
if not data:
return
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
logger.info(f"Exported {len(data)} items to {filename}")
def to_json(self, data: List[Dict], filename: str):
"""Export to JSON"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logger.info(f"Exported {len(data)} items to {filename}")
def to_jsonl(self, data: List[Dict], filename: str):
"""Export to JSONL"""
with open(filename, 'w', encoding='utf-8') as f:
for item in data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
logger.info(f"Exported {len(data)} items to {filename}")
def export_all(self, data: List[Dict], prefix: str = 'export'):
"""Export to all formats"""
cleaned = self.clean_data(data)
self.to_database(cleaned)
self.to_csv(cleaned, f'{prefix}.csv')
self.to_json(cleaned, f'{prefix}.json')
self.to_jsonl(cleaned, f'{prefix}.jsonl')
logger.info("Export complete")
return cleaned
# Usage
pipeline = DataPipeline('scraper.db')
# Scraped data (messy)
scraped_data = [
{'name': ' headphones ', 'price': '$99.99', 'rating': '4.5 stars', 'url': 'example.com/1', 'source': 'amazon'},
{'name': 'speaker', 'price': '$149.99', 'rating': '4.2', 'url': 'example.com/2', 'source': 'amazon'},
{'name': 'microphone', 'price': '$79.99', 'rating': '4.8 / 5', 'url': 'example.com/3', 'source': 'bestbuy'},
]
# Process and export
cleaned_data = pipeline.export_all(scraped_data, prefix='products_2024')
# Query later
conn = sqlite3.connect('scraper.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM products WHERE price > 100 ORDER BY rating DESC')
expensive_items = cursor.fetchall()
print(f"Products over $100: {len(expensive_items)}")
conn.close()Part 6: Best Practices for Data Export
1. Incremental Exports
def incremental_export(new_data, db_path, last_export_file='last_export.json'):
"""Export only new items since last export"""
# Load previous export metadata
try:
with open(last_export_file, 'r') as f:
last_export = json.load(f)
except FileNotFoundError:
last_export = {'urls': set()}
previous_urls = set(last_export.get('urls', []))
new_items = [item for item in new_data if item.get('url') not in previous_urls]
if new_items:
# Add to database
insert_into_sqlite(new_items, db_path, 'products')
# Update export metadata
current_urls = previous_urls | {item.get('url') for item in new_data}
with open(last_export_file, 'w') as f:
json.dump({
'urls': list(current_urls),
'last_updated': datetime.now().isoformat()
}, f)
print(f"Found {len(new_items)} new items")
else:
print("No new items since last export")2. Data Partitioning
from datetime import datetime
import os
def export_partitioned(data, output_dir='exports'):
"""Partition exports by date"""
os.makedirs(output_dir, exist_ok=True)
today = datetime.now().strftime('%Y-%m-%d')
partition_dir = os.path.join(output_dir, today)
os.makedirs(partition_dir, exist_ok=True)
csv_file = os.path.join(partition_dir, 'products.csv')
json_file = os.path.join(partition_dir, 'products.json')
export_to_csv(data, csv_file)
export_to_json(data, json_file)
print(f"Exports saved to {partition_dir}")3. Duplicate Detection and Handling
def remove_duplicates(data, unique_fields=['url']):
"""Remove duplicate items based on specified fields"""
seen = set()
unique_data = []
for item in data:
# Create a unique key from specified fields
key = tuple(item.get(field) for field in unique_fields)
if key not in seen:
seen.add(key)
unique_data.append(item)
duplicates_removed = len(data) - len(unique_data)
print(f"Removed {duplicates_removed} duplicates")
return unique_dataChoosing the Right Format
Here's a decision tree:
| Need | Best Format | Why |
|---|---|---|
| Analyze in Excel | CSV | Universal, simple |
| Rest API or web app | JSON | Hierarchical, standard |
| High-volume, complex queries | PostgreSQL | Indexing, relationships |
| Unstructured data | MongoDB | Flexible schema |
| Local, small projects | SQLite | No setup required |
| Real-time streaming | JSONL | Process line-by-line |
| Data warehousing | Parquet | Efficient compression |
Conclusion: Complete Data Pipeline
A production-quality data pipeline includes:
- Extraction: Scrape raw data
- Validation: Check data quality with Pydantic
- Transformation: Clean, normalize, enrich
- Storage: Save to database and file formats
- Monitoring: Track success rates and errors
- Querying: Retrieve data as needed
By implementing this pipeline, you ensure that your scraped data is clean, usable, and accessible for analysis, reporting, or further processing.
Start simple (CSV export), then add database persistence as your project grows. Use the pipeline template provided to build production-ready scrapers.
Which data format do you use most for your scraped data? Share your workflow in the comments below.
