Definition
Chunking allows you to process large datasets that don’t fit in memory by reading and processing data in smaller pieces (chunks). This is essential for working with files larger than available RAM.
Key Concepts
- Iterator: Process data chunk by chunk
- chunksize Parameter: Define size of each chunk
- Aggregation: Combine results from multiple chunks
- Memory Management: Monitor and optimize memory usage
- Streaming: Process data without loading entire file
Example
python
import pandas as pd
import numpy as np
import os
# ========== CREATE LARGE SAMPLE FILE ==========
print("=" * 60)
print("CREATING LARGE SAMPLE FILE")
print("=" * 60)
# Create a large CSV file for demonstration
sample_size = 1000000 # 1 million rows
filename = '/home/claude/large_dataset.csv'
print(f"Creating sample file with {sample_size:,} rows...")
# Generate data in chunks to avoid memory issues
chunk_size = 100000
for i in range(0, sample_size, chunk_size):
chunk_data = pd.DataFrame({
'id': range(i, min(i + chunk_size, sample_size)),
'category': np.random.choice(['A', 'B', 'C', 'D', 'E'],
min(chunk_size, sample_size - i)),
'value': np.random.randn(min(chunk_size, sample_size - i)),
'amount': np.random.randint(100, 10000, min(chunk_size, sample_size - i)),
'date': pd.date_range('2024-01-01',
periods=min(chunk_size, sample_size - i),
freq='min')
})
# Write with header only on first chunk
chunk_data.to_csv(filename, mode='a' if i > 0 else 'w',
header=(i == 0), index=False)
file_size_mb = os.path.getsize(filename) / (1024 * 1024)
print(f"File created: {file_size_mb:.2f} MB\n")
# ========== BASIC CHUNKING ==========
print("=" * 60)
print("BASIC CHUNKING")
print("=" * 60)
# Read file in chunks
chunk_size = 50000
chunk_iterator = pd.read_csv(filename, chunksize=chunk_size)
print(f"Reading file in chunks of {chunk_size:,} rows")
print(f"Chunk iterator type: {type(chunk_iterator)}\n")
# Process first few chunks
for i, chunk in enumerate(chunk_iterator):
print(f"Chunk {i + 1}:")
print(f" Shape: {chunk.shape}")
print(f" Memory usage: {chunk.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f" First row ID: {chunk['id'].iloc[0]}")
print(f" Last row ID: {chunk['id'].iloc[-1]}")
if i >= 2: # Only show first 3 chunks
print(f"\n... (remaining chunks not shown)")
break
print()
# ========== AGGREGATING ACROSS CHUNKS ==========
print("=" * 60)
print("AGGREGATING ACROSS CHUNKS")
print("=" * 60)
# Calculate statistics across all chunks
chunk_iterator = pd.read_csv(filename, chunksize=chunk_size)
# Initialize accumulators
total_rows = 0
sum_amount = 0
sum_squared_amount = 0
category_counts = {}
print("Processing all chunks for aggregation...")
for chunk in chunk_iterator:
total_rows += len(chunk)
sum_amount += chunk['amount'].sum()
sum_squared_amount += (chunk['amount'] ** 2).sum()
# Count categories
for category, count in chunk['category'].value_counts().items():
category_counts[category] = category_counts.get(category, 0) + count
# Calculate overall statistics
mean_amount = sum_amount / total_rows
variance_amount = (sum_squared_amount / total_rows) - (mean_amount ** 2)
std_amount = np.sqrt(variance_amount)
print(f"\nOverall Statistics:")
print(f" Total rows: {total_rows:,}")
print(f" Mean amount: ${mean_amount:.2f}")
print(f" Std amount: ${std_amount:.2f}")
print(f" Category counts: {category_counts}\n")
# ========== FILTERING WITH CHUNKS ==========
print("=" * 60)
print("FILTERING WITH CHUNKS")
print("=" * 60)
# Filter data and save to new file
chunk_iterator = pd.read_csv(filename, chunksize=chunk_size)
output_file = '/home/claude/filtered_data.csv'
print("Filtering rows where amount > 5000...")
filtered_rows = 0
for i, chunk in enumerate(chunk_iterator):
# Apply filter
filtered_chunk = chunk[chunk['amount'] > 5000]
filtered_rows += len(filtered_chunk)
# Save filtered data
filtered_chunk.to_csv(output_file, mode='a' if i > 0 else 'w',
header=(i == 0), index=False)
print(f"Filtered rows: {filtered_rows:,}")
print(f"Output file size: {os.path.getsize(output_file) / 1024**2:.2f} MB\n")
# ========== GROUPBY WITH CHUNKS ==========
print("=" * 60)
print("GROUPBY WITH CHUNKS")
print("=" * 60)
chunk_iterator = pd.read_csv(filename, chunksize=chunk_size)
# Accumulate group statistics
group_stats = {}
print("Calculating group statistics across chunks...")
for chunk in chunk_iterator:
# GroupBy on chunk
chunk_groups = chunk.groupby('category')['amount'].agg(['sum', 'count'])
# Accumulate results
for category in chunk_groups.index:
if category not in group_stats:
group_stats[category] = {'sum': 0, 'count': 0}
group_stats[category]['sum'] += chunk_groups.loc[category, 'sum']
group_stats[category]['count'] += chunk_groups.loc[category, 'count']
# Calculate final statistics
final_stats = pd.DataFrame.from_dict(group_stats, orient='index')
final_stats['mean'] = final_stats['sum'] / final_stats['count']
print("\nGroup Statistics:")
print(final_stats)
print("\n")
# ========== MEMORY MONITORING ==========
print("=" * 60)
print("MEMORY MONITORING")
print("=" * 60)
import psutil
import os
process = psutil.Process(os.getpid())
def get_memory_usage():
"""Get current memory usage in MB"""
return process.memory_info().rss / 1024**2
initial_memory = get_memory_usage()
print(f"Initial memory usage: {initial_memory:.2f} MB\n")
# Compare loading full file vs chunks
print("Loading FULL file into memory:")
start_memory = get_memory_usage()
try:
df_full = pd.read_csv(filename)
full_memory = get_memory_usage()
print(f"Memory after loading: {full_memory:.2f} MB")
print(f"Memory increase: {full_memory - start_memory:.2f} MB")
del df_full # Free memory
except MemoryError:
print("MemoryError: File too large to load completely")
print()
print("Processing with CHUNKS:")
start_memory = get_memory_usage()
chunk_iterator = pd.read_csv(filename, chunksize=chunk_size)
max_memory = start_memory
for i, chunk in enumerate(chunk_iterator):
current_memory = get_memory_usage()
max_memory = max(max_memory, current_memory)
# Process chunk (example: calculate something)
result = chunk['amount'].sum()
if i >= 5: # Process a few chunks
break
chunk_memory = get_memory_usage()
print(f"Memory after chunking: {chunk_memory:.2f} MB")
print(f"Peak memory during chunking: {max_memory:.2f} MB")
print(f"Memory increase: {max_memory - start_memory:.2f} MB\n")
# ========== OPTIMIZING DTYPES WHILE CHUNKING ==========
print("=" * 60)
print("OPTIMIZING DTYPES WHILE CHUNKING")
print("=" * 60)
# Define optimal dtypes
dtype_dict = {
'id': 'int32', # Instead of int64
'category': 'category',
'value': 'float32', # Instead of float64
'amount': 'int32'
}
chunk_iterator = pd.read_csv(filename, chunksize=chunk_size, dtype=dtype_dict,
parse_dates=['date'])
print("Reading with optimized dtypes:")
for i, chunk in enumerate(chunk_iterator):
print(f"\nChunk {i + 1}:")
print(chunk.dtypes)
print(f"Memory usage: {chunk.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
if i >= 1: # Show first 2 chunks
break
print("\n")
# ========== PARALLEL PROCESSING CONCEPT ==========
print("=" * 60)
print("PARALLEL PROCESSING CONCEPT")
print("=" * 60)
# Demonstrate concept (actual parallel processing would use multiprocessing)
print("Chunk processing can be parallelized:")
print("1. Split file into chunks")
print("2. Process each chunk independently")
print("3. Combine results")
print("\nExample with sequential processing:")
chunk_iterator = pd.read_csv(filename, chunksize=chunk_size)
def process_chunk(chunk):
"""Example processing function"""
return {
'total_amount': chunk['amount'].sum(),
'avg_value': chunk['value'].mean(),
'row_count': len(chunk)
}
results = []
for i, chunk in enumerate(chunk_iterator):
result = process_chunk(chunk)
results.append(result)
if i >= 4: # Process 5 chunks
break
# Combine results
combined_results = pd.DataFrame(results)
print(f"\nProcessed {len(results)} chunks:")
print(combined_results)
print(f"\nTotal across chunks:")
print(combined_results.sum())
print("\n")
# ========== WRITING IN CHUNKS ==========
print("=" * 60)
print("WRITING IN CHUNKS")
print("=" * 60)
# Transform and write data in chunks
chunk_iterator = pd.read_csv(filename, chunksize=chunk_size)
output_transformed = '/home/claude/transformed_data.csv'
print("Transforming and writing data in chunks...")
for i, chunk in enumerate(chunk_iterator):
# Transform data
chunk['amount_doubled'] = chunk['amount'] * 2
chunk['category_upper'] = chunk['category'].str.upper()
# Write chunk
chunk.to_csv(output_transformed, mode='a' if i > 0 else 'w',
header=(i == 0), index=False)
if i >= 4: # Transform first 5 chunks
break
print(f"Transformed {i + 1} chunks")
print(f"Output file size: {os.path.getsize(output_transformed) / 1024**2:.2f} MB\n")
# Clean up created files
print("Cleaning up temporary files...")
for file in [filename, output_file, output_transformed]:
if os.path.exists(file):
os.remove(file)
print("Done!\n")
# ========== BEST PRACTICES ==========
print("=" * 60)
print("CHUNKING BEST PRACTICES")
print("=" * 60)
best_practices = """
1. Choose appropriate chunk size:
- Too small: Overhead from many iterations
- Too large: Memory issues
- Typical: 10,000 - 100,000 rows
2. Optimize dtypes BEFORE processing:
- Specify dtypes in read_csv
- Use categories for repeated strings
- Use smaller numeric types when possible
3. Use generators and iterators:
- Process one chunk at a time
- Don't store all chunks in memory
4. For aggregations:
- Accumulate statistics incrementally
- Combine results after all chunks processed
5. Monitor memory usage:
- Use memory_profiler or psutil
- Track peak memory during processing
6. Consider alternatives for very large data:
- Dask for parallel processing
- Database solutions (SQLite, PostgreSQL)
- Parquet format for better compression
"""
print(best_practices)
