1. Advanced Datetime Operations
Definition
Pandas provides powerful tools for working with datetime data through the pd.Timestamp class and .dt accessor. These operations enable time-based indexing, resampling, time zone handling, and complex date calculations.
Key Concepts
- DateTime Index: Special index for time series data
- dt Accessor: Gateway to datetime properties and methods
- Resampling: Aggregate time series to different frequencies
- Time Zones: Handle timezone-aware datetime data
- Date Offsets: Business days, month ends, custom periods
- Rolling Windows: Time-based rolling calculations
Example
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# ========== CREATING DATETIME DATA ==========
print("=" * 60)
print("CREATING DATETIME DATA")
print("=" * 60)
# Create date range
dates = pd.date_range(start='2024-01-01', end='2024-12-31', freq='D')
print(f"Date range: {len(dates)} dates from {dates[0]} to {dates[-1]}")
print(f"First 5 dates:\n{dates[:5]}\n")
# Different frequencies
hourly = pd.date_range('2024-01-01', periods=24, freq='h')
business_days = pd.date_range('2024-01-01', periods=10, freq='B') # Business days
month_end = pd.date_range('2024-01-01', periods=12, freq='ME') # Month end
print(f"Hourly (first 5): {hourly[:5].tolist()}")
print(f"Business days: {business_days.tolist()}")
print(f"Month ends: {month_end.tolist()}\n")
# ========== DATETIME ACCESSOR (.dt) ==========
print("=" * 60)
print("DATETIME ACCESSOR (.dt)")
print("=" * 60)
# Sample data with datetime
df_dates = pd.DataFrame({
'date': pd.date_range('2024-01-01', periods=100, freq='D'),
'sales': np.random.randint(100, 1000, 100)
})
# Extract datetime components
df_dates['year'] = df_dates['date'].dt.year
df_dates['month'] = df_dates['date'].dt.month
df_dates['day'] = df_dates['date'].dt.day
df_dates['day_of_week'] = df_dates['date'].dt.dayofweek # Monday=0, Sunday=6
df_dates['day_name'] = df_dates['date'].dt.day_name()
df_dates['month_name'] = df_dates['date'].dt.month_name()
df_dates['quarter'] = df_dates['date'].dt.quarter
df_dates['week'] = df_dates['date'].dt.isocalendar().week
df_dates['is_month_start'] = df_dates['date'].dt.is_month_start
df_dates['is_month_end'] = df_dates['date'].dt.is_month_end
df_dates['is_weekend'] = df_dates['date'].dt.dayofweek.isin([5, 6])
print("Datetime components extracted:")
print(df_dates.head(10))
print("\n")
# ========== DATETIME INDEX ==========
print("=" * 60)
print("DATETIME INDEX")
print("=" * 60)
# Set datetime as index
df_ts = df_dates.set_index('date').copy()
print("DataFrame with DateTime Index:")
print(df_ts.head())
print(f"\nIndex type: {type(df_ts.index)}\n")
# Slicing with datetime index - FIXED: Use .loc[] for partial string indexing
print("Data for January 2024:")
print(df_ts.loc['2024-01'])
print("\n")
print("Data from Jan 15 to Jan 20:")
print(df_ts.loc['2024-01-15':'2024-01-20'])
print("\n")
# Partial string indexing - FIXED: Use .loc[]
print("All data for 2024:")
print(f"Rows: {len(df_ts.loc['2024'])}\n")
# ========== RESAMPLING ==========
print("=" * 60)
print("RESAMPLING")
print("=" * 60)
# Create hourly data
hourly_data = pd.DataFrame({
'timestamp': pd.date_range('2024-01-01', periods=168, freq='h'), # 1 week
'temperature': np.random.randint(20, 35, 168),
'humidity': np.random.randint(40, 80, 168)
})
hourly_data.set_index('timestamp', inplace=True)
# Resample to daily (downsample)
daily_avg = hourly_data.resample('D').mean()
print("Hourly to Daily (mean):")
print(daily_avg)
print("\n")
# Multiple aggregations
daily_stats = hourly_data.resample('D').agg({
'temperature': ['mean', 'min', 'max'],
'humidity': ['mean', 'std']
})
print("Daily statistics:")
print(daily_stats)
print("\n")
# Upsample (fill forward)
daily_data = pd.DataFrame({
'date': pd.date_range('2024-01-01', periods=7, freq='D'),
'value': [100, 150, 120, 180, 200, 160, 190]
})
daily_data.set_index('date', inplace=True)
hourly_upsampled = daily_data.resample('6h').ffill() # Forward fill
print("Daily to 6-hourly (forward fill):")
print(hourly_upsampled.head(12))
print("\n")
# Resample with custom aggregation
weekly_sales = df_ts.resample('W').agg({
'sales': 'sum',
'is_weekend': 'sum' # Count weekend days
})
print("Weekly resampling:")
print(weekly_sales.head())
print("\n")
# ========== TIME ZONES ==========
print("=" * 60)
print("TIME ZONES")
print("=" * 60)
# Create timezone-naive datetime
naive_dates = pd.date_range('2024-01-01', periods=5, freq='D')
print(f"Naive datetime:\n{naive_dates}\n")
# Localize to timezone
utc_dates = naive_dates.tz_localize('UTC')
print(f"UTC timezone:\n{utc_dates}\n")
# Convert to different timezone
ny_dates = utc_dates.tz_convert('America/New_York')
print(f"New York timezone:\n{ny_dates}\n")
tokyo_dates = utc_dates.tz_convert('Asia/Tokyo')
print(f"Tokyo timezone:\n{tokyo_dates}\n")
# Working with timezone-aware data
df_tz = pd.DataFrame({
'timestamp': pd.date_range('2024-01-01', periods=24, freq='h', tz='UTC'),
'value': np.random.randn(24)
})
df_tz['timestamp_ny'] = df_tz['timestamp'].dt.tz_convert('America/New_York')
print("Timezone conversion in DataFrame:")
print(df_tz.head())
print("\n")
# ========== DATE OFFSETS ==========
print("=" * 60)
print("DATE OFFSETS")
print("=" * 60)
# Business day offset
from pandas.tseries.offsets import BDay, MonthEnd, Week, Hour
start_date = pd.Timestamp('2024-01-01')
print(f"Start date: {start_date} ({start_date.day_name()})")
print(f"Plus 5 business days: {start_date + 5 * BDay()}")
print(f"Plus 1 month end: {start_date + MonthEnd()}")
print(f"Plus 2 weeks: {start_date + 2 * Week()}")
print(f"Plus 12 hours: {start_date + 12 * Hour()}")
print("\n")
# Generate business day range
business_dates = pd.bdate_range('2024-01-01', '2024-01-31')
print(f"Business days in January 2024: {len(business_dates)}")
print(f"Business dates: {business_dates.tolist()}\n")
# ========== TIME DELTAS ==========
print("=" * 60)
print("TIME DELTAS")
print("=" * 60)
# Create timedelta
df_events = pd.DataFrame({
'start': pd.to_datetime(['2024-01-01 10:00', '2024-01-01 14:00', '2024-01-01 16:00']),
'end': pd.to_datetime(['2024-01-01 12:30', '2024-01-01 15:45', '2024-01-01 18:20'])
})
df_events['duration'] = df_events['end'] - df_events['start']
df_events['duration_hours'] = df_events['duration'].dt.total_seconds() / 3600
df_events['duration_minutes'] = df_events['duration'].dt.total_seconds() / 60
print("Event durations:")
print(df_events)
print("\n")
# ========== PERIOD OPERATIONS ==========
print("=" * 60)
print("PERIOD OPERATIONS")
print("=" * 60)
# Create periods
periods = pd.period_range('2024-01', periods=12, freq='M')
print(f"Monthly periods:\n{periods}\n")
# Period arithmetic
period = pd.Period('2024-01', freq='M')
print(f"Period: {period}")
print(f"Next month: {period + 1}")
print(f"Previous month: {period - 1}")
print(f"To timestamp (start): {period.to_timestamp()}")
print(f"To timestamp (end): {period.to_timestamp(how='end')}")
print("\n")
# ========== ROLLING WINDOWS WITH TIME ==========
print("=" * 60)
print("ROLLING WINDOWS WITH TIME")
print("=" * 60)
# Create irregular time series
irregular_dates = pd.DatetimeIndex([
'2024-01-01', '2024-01-02', '2024-01-04',
'2024-01-07', '2024-01-08', '2024-01-12'
])
df_irregular = pd.DataFrame({
'value': [100, 110, 105, 120, 115, 130]
}, index=irregular_dates)
# Rolling window based on time period (not number of rows)
rolling_7d = df_irregular.rolling('7D').mean()
print("7-day rolling average (time-based):")
print(pd.concat([df_irregular, rolling_7d], axis=1, keys=['Original', 'Rolling_7D']))
print("\n")
# ========== DATETIME ARITHMETIC ==========
print("=" * 60)
print("DATETIME ARITHMETIC")
print("=" * 60)
df_calc = pd.DataFrame({
'order_date': pd.to_datetime(['2024-01-01', '2024-01-15', '2024-02-01']),
'delivery_date': pd.to_datetime(['2024-01-05', '2024-01-20', '2024-02-07'])
})
# Calculate differences
df_calc['delivery_days'] = (df_calc['delivery_date'] - df_calc['order_date']).dt.days
df_calc['days_from_today'] = (pd.Timestamp.now() - df_calc['order_date']).dt.days
df_calc['expected_delivery'] = df_calc['order_date'] + pd.Timedelta(days=7)
print("Datetime calculations:")
print(df_calc)
print("\n")
# ========== ADVANCED: BUSINESS DAY CALCULATIONS ==========
print("=" * 60)
print("BUSINESS DAY CALCULATIONS")
print("=" * 60)
# Count business days between dates
from pandas.tseries.offsets import CustomBusinessDay
# US Federal Holidays (simplified example)
us_holidays = [
'2024-01-01', # New Year
'2024-07-04', # Independence Day
'2024-12-25', # Christmas
]
# Create custom business day calendar
us_bd = CustomBusinessDay(holidays=us_holidays)
date1 = pd.Timestamp('2024-01-01')
date2 = pd.Timestamp('2024-01-31')
# Calculate business days
bdays = pd.bdate_range(date1, date2, freq=us_bd)
print(f"Business days (excluding holidays): {len(bdays)}")
print(f"First 5 business days: {bdays[:5].tolist()}\n")
# ========== TIME SERIES ANALYSIS ==========
print("=" * 60)
print("TIME SERIES ANALYSIS")
print("=" * 60)
# Create time series with trend and seasonality
dates_ts = pd.date_range('2024-01-01', periods=365, freq='D')
trend = np.linspace(100, 200, 365)
seasonal = 20 * np.sin(np.linspace(0, 4*np.pi, 365))
noise = np.random.randn(365) * 5
values = trend + seasonal + noise
df_analysis = pd.DataFrame({
'value': values
}, index=dates_ts)
# Moving average
df_analysis['MA_7'] = df_analysis['value'].rolling(window=7).mean()
df_analysis['MA_30'] = df_analysis['value'].rolling(window=30).mean()
# Exponential weighted moving average
df_analysis['EWMA'] = df_analysis['value'].ewm(span=14).mean()
# Percentage change
df_analysis['pct_change'] = df_analysis['value'].pct_change() * 100
# Cumulative sum
df_analysis['cumsum'] = df_analysis['value'].cumsum()
print("Time series analysis (first 10 rows):")
print(df_analysis.head(10).round(2))
print("\n")
# ========== DATETIME STRING FORMATTING ==========
print("=" * 60)
print("DATETIME STRING FORMATTING")
print("=" * 60)
df_format = pd.DataFrame({
'timestamp': pd.date_range('2024-01-01 10:30:45', periods=5, freq='h')
})
# Format datetime as string
df_format['formatted'] = df_format['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S')
df_format['date_only'] = df_format['timestamp'].dt.strftime('%B %d, %Y')
df_format['time_only'] = df_format['timestamp'].dt.strftime('%I:%M %p')
print("Datetime formatting:")
print(df_format)
print("\n")
# Parse strings to datetime
date_strings = pd.Series(['2024-01-01', '01/15/2024', 'Jan 20, 2024'])
parsed_dates = pd.to_datetime(date_strings, format='mixed')
print("Parsed dates:")
print(parsed_dates)
2. Memory-Efficient Chunking
Definition
Chunking allows you to process large datasets that don’t fit in memory by reading and processing data in smaller pieces (chunks). This is essential for working with files larger than available RAM.
Key Concepts
- Iterator: Process data chunk by chunk
- chunksize Parameter: Define size of each chunk
- Aggregation: Combine results from multiple chunks
- Memory Management: Monitor and optimize memory usage
- Streaming: Process data without loading entire file
Example
import pandas as pd
import numpy as np
import os
# ========== CREATE LARGE SAMPLE FILE ==========
print("=" * 60)
print("CREATING LARGE SAMPLE FILE")
print("=" * 60)
# Create a large CSV file for demonstration
sample_size = 1000000 # 1 million rows
filename = '/home/claude/large_dataset.csv'
print(f"Creating sample file with {sample_size:,} rows...")
# Generate data in chunks to avoid memory issues
chunk_size = 100000
for i in range(0, sample_size, chunk_size):
chunk_data = pd.DataFrame({
'id': range(i, min(i + chunk_size, sample_size)),
'category': np.random.choice(['A', 'B', 'C', 'D', 'E'],
min(chunk_size, sample_size - i)),
'value': np.random.randn(min(chunk_size, sample_size - i)),
'amount': np.random.randint(100, 10000, min(chunk_size, sample_size - i)),
'date': pd.date_range('2024-01-01',
periods=min(chunk_size, sample_size - i),
freq='min')
})
# Write with header only on first chunk
chunk_data.to_csv(filename, mode='a' if i > 0 else 'w',
header=(i == 0), index=False)
file_size_mb = os.path.getsize(filename) / (1024 * 1024)
print(f"File created: {file_size_mb:.2f} MB\n")
# ========== BASIC CHUNKING ==========
print("=" * 60)
print("BASIC CHUNKING")
print("=" * 60)
# Read file in chunks
chunk_size = 50000
chunk_iterator = pd.read_csv(filename, chunksize=chunk_size)
print(f"Reading file in chunks of {chunk_size:,} rows")
print(f"Chunk iterator type: {type(chunk_iterator)}\n")
# Process first few chunks
for i, chunk in enumerate(chunk_iterator):
print(f"Chunk {i + 1}:")
print(f" Shape: {chunk.shape}")
print(f" Memory usage: {chunk.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f" First row ID: {chunk['id'].iloc[0]}")
print(f" Last row ID: {chunk['id'].iloc[-1]}")
if i >= 2: # Only show first 3 chunks
print(f"\n... (remaining chunks not shown)")
break
print()
# ========== AGGREGATING ACROSS CHUNKS ==========
print("=" * 60)
print("AGGREGATING ACROSS CHUNKS")
print("=" * 60)
# Calculate statistics across all chunks
chunk_iterator = pd.read_csv(filename, chunksize=chunk_size)
# Initialize accumulators
total_rows = 0
sum_amount = 0
sum_squared_amount = 0
category_counts = {}
print("Processing all chunks for aggregation...")
for chunk in chunk_iterator:
total_rows += len(chunk)
sum_amount += chunk['amount'].sum()
sum_squared_amount += (chunk['amount'] ** 2).sum()
# Count categories
for category, count in chunk['category'].value_counts().items():
category_counts[category] = category_counts.get(category, 0) + count
# Calculate overall statistics
mean_amount = sum_amount / total_rows
variance_amount = (sum_squared_amount / total_rows) - (mean_amount ** 2)
std_amount = np.sqrt(variance_amount)
print(f"\nOverall Statistics:")
print(f" Total rows: {total_rows:,}")
print(f" Mean amount: ${mean_amount:.2f}")
print(f" Std amount: ${std_amount:.2f}")
print(f" Category counts: {category_counts}\n")
# ========== FILTERING WITH CHUNKS ==========
print("=" * 60)
print("FILTERING WITH CHUNKS")
print("=" * 60)
# Filter data and save to new file
chunk_iterator = pd.read_csv(filename, chunksize=chunk_size)
output_file = '/home/claude/filtered_data.csv'
print("Filtering rows where amount > 5000...")
filtered_rows = 0
for i, chunk in enumerate(chunk_iterator):
# Apply filter
filtered_chunk = chunk[chunk['amount'] > 5000]
filtered_rows += len(filtered_chunk)
# Save filtered data
filtered_chunk.to_csv(output_file, mode='a' if i > 0 else 'w',
header=(i == 0), index=False)
print(f"Filtered rows: {filtered_rows:,}")
print(f"Output file size: {os.path.getsize(output_file) / 1024**2:.2f} MB\n")
# ========== GROUPBY WITH CHUNKS ==========
print("=" * 60)
print("GROUPBY WITH CHUNKS")
print("=" * 60)
chunk_iterator = pd.read_csv(filename, chunksize=chunk_size)
# Accumulate group statistics
group_stats = {}
print("Calculating group statistics across chunks...")
for chunk in chunk_iterator:
# GroupBy on chunk
chunk_groups = chunk.groupby('category')['amount'].agg(['sum', 'count'])
# Accumulate results
for category in chunk_groups.index:
if category not in group_stats:
group_stats[category] = {'sum': 0, 'count': 0}
group_stats[category]['sum'] += chunk_groups.loc[category, 'sum']
group_stats[category]['count'] += chunk_groups.loc[category, 'count']
# Calculate final statistics
final_stats = pd.DataFrame.from_dict(group_stats, orient='index')
final_stats['mean'] = final_stats['sum'] / final_stats['count']
print("\nGroup Statistics:")
print(final_stats)
print("\n")
# ========== MEMORY MONITORING ==========
print("=" * 60)
print("MEMORY MONITORING")
print("=" * 60)
import psutil
import os
process = psutil.Process(os.getpid())
def get_memory_usage():
"""Get current memory usage in MB"""
return process.memory_info().rss / 1024**2
initial_memory = get_memory_usage()
print(f"Initial memory usage: {initial_memory:.2f} MB\n")
# Compare loading full file vs chunks
print("Loading FULL file into memory:")
start_memory = get_memory_usage()
try:
df_full = pd.read_csv(filename)
full_memory = get_memory_usage()
print(f"Memory after loading: {full_memory:.2f} MB")
print(f"Memory increase: {full_memory - start_memory:.2f} MB")
del df_full # Free memory
except MemoryError:
print("MemoryError: File too large to load completely")
print()
print("Processing with CHUNKS:")
start_memory = get_memory_usage()
chunk_iterator = pd.read_csv(filename, chunksize=chunk_size)
max_memory = start_memory
for i, chunk in enumerate(chunk_iterator):
current_memory = get_memory_usage()
max_memory = max(max_memory, current_memory)
# Process chunk (example: calculate something)
result = chunk['amount'].sum()
if i >= 5: # Process a few chunks
break
chunk_memory = get_memory_usage()
print(f"Memory after chunking: {chunk_memory:.2f} MB")
print(f"Peak memory during chunking: {max_memory:.2f} MB")
print(f"Memory increase: {max_memory - start_memory:.2f} MB\n")
# ========== OPTIMIZING DTYPES WHILE CHUNKING ==========
print("=" * 60)
print("OPTIMIZING DTYPES WHILE CHUNKING")
print("=" * 60)
# Define optimal dtypes
dtype_dict = {
'id': 'int32', # Instead of int64
'category': 'category',
'value': 'float32', # Instead of float64
'amount': 'int32'
}
chunk_iterator = pd.read_csv(filename, chunksize=chunk_size, dtype=dtype_dict,
parse_dates=['date'])
print("Reading with optimized dtypes:")
for i, chunk in enumerate(chunk_iterator):
print(f"\nChunk {i + 1}:")
print(chunk.dtypes)
print(f"Memory usage: {chunk.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
if i >= 1: # Show first 2 chunks
break
print("\n")
# ========== PARALLEL PROCESSING CONCEPT ==========
print("=" * 60)
print("PARALLEL PROCESSING CONCEPT")
print("=" * 60)
# Demonstrate concept (actual parallel processing would use multiprocessing)
print("Chunk processing can be parallelized:")
print("1. Split file into chunks")
print("2. Process each chunk independently")
print("3. Combine results")
print("\nExample with sequential processing:")
chunk_iterator = pd.read_csv(filename, chunksize=chunk_size)
def process_chunk(chunk):
"""Example processing function"""
return {
'total_amount': chunk['amount'].sum(),
'avg_value': chunk['value'].mean(),
'row_count': len(chunk)
}
results = []
for i, chunk in enumerate(chunk_iterator):
result = process_chunk(chunk)
results.append(result)
if i >= 4: # Process 5 chunks
break
# Combine results
combined_results = pd.DataFrame(results)
print(f"\nProcessed {len(results)} chunks:")
print(combined_results)
print(f"\nTotal across chunks:")
print(combined_results.sum())
print("\n")
# ========== WRITING IN CHUNKS ==========
print("=" * 60)
print("WRITING IN CHUNKS")
print("=" * 60)
# Transform and write data in chunks
chunk_iterator = pd.read_csv(filename, chunksize=chunk_size)
output_transformed = '/home/claude/transformed_data.csv'
print("Transforming and writing data in chunks...")
for i, chunk in enumerate(chunk_iterator):
# Transform data
chunk['amount_doubled'] = chunk['amount'] * 2
chunk['category_upper'] = chunk['category'].str.upper()
# Write chunk
chunk.to_csv(output_transformed, mode='a' if i > 0 else 'w',
header=(i == 0), index=False)
if i >= 4: # Transform first 5 chunks
break
print(f"Transformed {i + 1} chunks")
print(f"Output file size: {os.path.getsize(output_transformed) / 1024**2:.2f} MB\n")
# Clean up created files
print("Cleaning up temporary files...")
for file in [filename, output_file, output_transformed]:
if os.path.exists(file):
os.remove(file)
print("Done!\n")
# ========== BEST PRACTICES ==========
print("=" * 60)
print("CHUNKING BEST PRACTICES")
print("=" * 60)
best_practices = """
1. Choose appropriate chunk size:
- Too small: Overhead from many iterations
- Too large: Memory issues
- Typical: 10,000 - 100,000 rows
2. Optimize dtypes BEFORE processing:
- Specify dtypes in read_csv
- Use categories for repeated strings
- Use smaller numeric types when possible
3. Use generators and iterators:
- Process one chunk at a time
- Don't store all chunks in memory
4. For aggregations:
- Accumulate statistics incrementally
- Combine results after all chunks processed
5. Monitor memory usage:
- Use memory_profiler or psutil
- Track peak memory during processing
6. Consider alternatives for very large data:
- Dask for parallel processing
- Database solutions (SQLite, PostgreSQL)
- Parquet format for better compression
"""
print(best_practices)
3. Custom Accessors
Definition
Custom accessors allow you to extend Pandas with your own methods that can be called using the familiar .accessor_name.method() syntax, similar to .dt, .str, or .cat. This creates reusable, clean APIs for domain-specific operations.
Key Concepts
- @pd.api.extensions.register_dataframe_accessor: Decorator for DataFrame accessors
- @pd.api.extensions.register_series_accessor: Decorator for Series accessors
- Encapsulation: Bundle related functionality together
- Reusability: Create once, use across projects
- Clean API: Intuitive, method-chaining friendly interface
Example
import pandas as pd
import numpy as np
# ========== BASIC SERIES ACCESSOR ==========
print("=" * 60)
print("BASIC SERIES ACCESSOR")
print("=" * 60)
@pd.api.extensions.register_series_accessor("text_tools")
class TextToolsAccessor:
"""Custom accessor for text operations"""
def __init__(self, pandas_obj):
self._validate(pandas_obj)
self._obj = pandas_obj
@staticmethod
def _validate(obj):
"""Verify we have string data"""
if not pd.api.types.is_string_dtype(obj):
raise AttributeError("Can only use .text_tools with string data")
def word_count(self):
"""Count words in each string"""
return self._obj.str.split().str.len()
def char_count(self):
"""Count characters (excluding spaces)"""
return self._obj.str.replace(' ', '').str.len()
def title_case_smart(self):
"""Smart title case (keeps acronyms uppercase)"""
def smart_title(text):
words = text.split()
result = []
for word in words:
if word.isupper() and len(word) > 1:
result.append(word) # Keep acronyms
else:
result.append(word.capitalize())
return ' '.join(result)
return self._obj.apply(smart_title)
def extract_hashtags(self):
"""Extract all hashtags from text"""
return self._obj.str.findall(r'#\w+')
# Test the accessor
text_data = pd.Series([
'Hello World from NASA',
'Python is great for AI and ML',
'Check out #pandas #python',
'The USA and UK are allies'
])
print("Original text:")
print(text_data)
print("\n")
print("Word count:")
print(text_data.text_tools.word_count())
print("\n")
print("Character count:")
print(text_data.text_tools.char_count())
print("\n")
print("Smart title case:")
print(text_data.text_tools.title_case_smart())
print("\n")
print("Extract hashtags:")
print(text_data.text_tools.extract_hashtags())
print("\n")
# ========== DATAFRAME ACCESSOR ==========
print("=" * 60)
print("DATAFRAME ACCESSOR")
print("=" * 60)
@pd.api.extensions.register_dataframe_accessor("financial")
class FinancialAccessor:
"""Custom accessor for financial calculations"""
def __init__(self, pandas_obj):
self._obj = pandas_obj
def profit_margin(self, revenue_col, cost_col):
"""Calculate profit margin percentage"""
revenue = self._obj[revenue_col]
cost = self._obj[cost_col]
return ((revenue - cost) / revenue * 100).round(2)
def roi(self, revenue_col, investment_col):
"""Calculate Return on Investment"""
revenue = self._obj[revenue_col]
investment = self._obj[investment_col]
return ((revenue - investment) / investment * 100).round(2)
def compound_growth_rate(self, value_col, periods):
"""Calculate compound annual growth rate"""
start_value = self._obj[value_col].iloc[0]
end_value = self._obj[value_col].iloc[-1]
return (((end_value / start_value) ** (1/periods)) - 1) * 100
def summary_stats(self, amount_col):
"""Generate financial summary statistics"""
data = self._obj[amount_col]
return pd.Series({
'Total': data.sum(),
'Average': data.mean(),
'Median': data.median(),
'Min': data.min(),
'Max': data.max(),
'Std Dev': data.std(),
'Range': data.max() - data.min()
})
# Test the accessor
financial_data = pd.DataFrame({
'Product': ['A', 'B', 'C', 'D'],
'Revenue': [10000, 15000, 8000, 20000],
'Cost': [7000, 10000, 6000, 14000],
'Investment': [5000, 8000, 4000, 10000]
})
print("Financial data:")
print(financial_data)
print("\n")
print("Profit margins:")
print(financial_data.financial.profit_margin('Revenue', 'Cost'))
print("\n")
print("ROI:")
print(financial_data.financial.roi('Revenue', 'Investment'))
print("\n")
print("Summary statistics for Revenue:")
print(financial_data.financial.summary_stats('Revenue'))
print("\n")
# ========== ADVANCED: DATETIME ACCESSOR ==========
print("=" * 60)
print("ADVANCED: CUSTOM DATETIME ACCESSOR")
print("=" * 60)
@pd.api.extensions.register_series_accessor("business_time")
class BusinessTimeAccessor:
"""Custom accessor for business time calculations"""
def __init__(self, pandas_obj):
self._validate(pandas_obj)
self._obj = pandas_obj
@staticmethod
def _validate(obj):
"""Verify we have datetime data"""
if not pd.api.types.is_datetime64_any_dtype(obj):
raise AttributeError("Can only use .business_time with datetime data")
def is_business_day(self):
"""Check if date is a business day (Mon-Fri)"""
return self._obj.dt.dayofweek < 5
def is_business_hours(self, start_hour=9, end_hour=17):
"""Check if time is during business hours"""
hour = self._obj.dt.hour
is_weekday = self.is_business_day()
is_business_hour = (hour >= start_hour) & (hour < end_hour)
return is_weekday & is_business_hour
def next_business_day(self):
"""Get next business day"""
from pandas.tseries.offsets import BDay
return self._obj + BDay(1)
def business_days_until(self, target_date):
"""Calculate business days until target"""
from pandas.tseries.offsets import BDay
bdays = pd.bdate_range(start=self._obj.min(), end=target_date)
result = []
for date in self._obj:
count = len(pd.bdate_range(start=date, end=target_date))
result.append(count - 1 if count > 0 else 0)
return pd.Series(result, index=self._obj.index)
def quarter_name(self):
"""Get quarter name (Q1, Q2, Q3, Q4)"""
return 'Q' + self._obj.dt.quarter.astype(str)
def fiscal_year(self, fiscal_start_month=4):
"""Calculate fiscal year (e.g., April start)"""
year = self._obj.dt.year
month = self._obj.dt.month
return year + (month >= fiscal_start_month).astype(int)
# Test business time accessor
dates = pd.Series(pd.date_range('2024-01-01', periods=10, freq='D'))
print("Dates:")
print(dates)
print("\n")
print("Is business day:")
print(dates.business_time.is_business_day())
print("\n")
print("Next business day:")
print(dates.business_time.next_business_day())
print("\n")
print("Quarter name:")
print(dates.business_time.quarter_name())
print("\n")
print("Fiscal year (April start):")
print(dates.business_time.fiscal_year(fiscal_start_month=4))
print("\n")
# ========== ACCESSOR WITH PARAMETERS ==========
print("=" * 60)
print("ACCESSOR WITH CONFIGURATION")
print("=" * 60)
@pd.api.extensions.register_dataframe_accessor("analytics")
class AnalyticsAccessor:
"""Advanced analytics accessor with configuration"""
def __init__(self, pandas_obj):
self._obj = pandas_obj
self._cache = {}
def outliers(self, column, method='iqr', threshold=1.5):
"""
Detect outliers using different methods
Parameters:
-----------
column : str
Column name to check for outliers
method : str
'iqr' - Interquartile Range
'zscore' - Z-score method
threshold : float
Threshold for outlier detection
"""
data = self._obj[column]
if method == 'iqr':
Q1 = data.quantile(0.25)
Q3 = data.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - threshold * IQR
upper_bound = Q3 + threshold * IQR
return (data < lower_bound) | (data > upper_bound)
elif method == 'zscore':
z_scores = np.abs((data - data.mean()) / data.std())
return z_scores > threshold
else:
raise ValueError(f"Unknown method: {method}")
def normalize(self, column, method='minmax'):
"""
Normalize data using different methods
Parameters:
-----------
column : str
Column to normalize
method : str
'minmax' - Min-Max normalization (0-1)
'zscore' - Z-score normalization
"""
data = self._obj[column]
if method == 'minmax':
return (data - data.min()) / (data.max() - data.min())
elif method == 'zscore':
return (data - data.mean()) / data.std()
else:
raise ValueError(f"Unknown method: {method}")
def correlation_with(self, target_col, top_n=5):
"""Find columns most correlated with target"""
numeric_cols = self._obj.select_dtypes(include=[np.number]).columns
correlations = self._obj[numeric_cols].corr()[target_col].abs()
correlations = correlations.drop(target_col)
return correlations.nlargest(top_n)
def summary(self):
"""Generate comprehensive summary"""
numeric_cols = self._obj.select_dtypes(include=[np.number]).columns
summary_data = {
'shape': self._obj.shape,
'columns': len(self._obj.columns),
'numeric_columns': len(numeric_cols),
'missing_values': self._obj.isna().sum().sum(),
'memory_usage_mb': self._obj.memory_usage(deep=True).sum() / 1024**2
}
return pd.Series(summary_data)
# Test analytics accessor
analytics_data = pd.DataFrame({
'A': [1, 2, 3, 4, 5, 6, 100], # 100 is outlier
'B': [10, 20, 30, 40, 50, 60, 70],
'C': [5, 15, 25, 35, 45, 55, 65],
'D': [2, 4, 6, 8, 10, 12, 14]
})
print("Analytics data:")
print(analytics_data)
print("\n")
print("Outliers in column A (IQR method):")
print(analytics_data.analytics.outliers('A', method='iqr'))
print("\n")
print("Normalized column B (min-max):")
print(analytics_data.analytics.normalize('B', method='minmax'))
print("\n")
print("Top 3 correlations with column A:")
print(analytics_data.analytics.correlation_with('A', top_n=3))
print("\n")
print("DataFrame summary:")
print(analytics_data.analytics.summary())
print("\n")
# ========== CHAINING WITH ACCESSORS ==========
print("=" * 60)
print("METHOD CHAINING WITH ACCESSORS")
print("=" * 60)
# Accessors work great with method chaining
result = (
financial_data
.assign(
profit_margin=lambda x: x.financial.profit_margin('Revenue', 'Cost'),
roi=lambda x: x.financial.roi('Revenue', 'Investment')
)
.query('profit_margin > 25')
.sort_values('roi', ascending=False)
)
print("Method chaining with custom accessor:")
print(result)
4. Pipe for Method Chaining
Definition
The pipe() method enables clean, readable method chaining by allowing you to apply custom functions to DataFrames. It creates a functional programming style that improves code organization and reusability.
Key Concepts
- Method Chaining: Link multiple operations together
- Functional Programming: Treat operations as composable functions
- Readability: Create clear data transformation pipelines
- Reusability: Define transformation functions once, use many times
- Debugging: Easy to comment out steps in the pipeline
Example
import pandas as pd
import numpy as np
# ========== BASIC PIPE USAGE ==========
print("=" * 60)
print("BASIC PIPE USAGE")
print("=" * 60)
# Sample data
df = pd.DataFrame({
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'age': [25, 30, 35, 28, 32],
'salary': [50000, 60000, 45000, 55000, 65000],
'department': ['Sales', 'IT', 'HR', 'Sales', 'IT']
})
print("Original DataFrame:")
print(df)
print("\n")
# Without pipe (traditional approach)
print("Traditional approach:")
df_temp = df.copy()
df_temp = df_temp[df_temp['age'] > 27]
df_temp['salary_k'] = df_temp['salary'] / 1000
df_temp = df_temp.sort_values('salary', ascending=False)
print(df_temp)
print("\n")
# With pipe (cleaner)
def filter_age(df, min_age):
return df[df['age'] > min_age]
def add_salary_k(df):
df_copy = df.copy()
df_copy['salary_k'] = df_copy['salary'] / 1000
return df_copy
def sort_by_salary(df, ascending=False):
return df.sort_values('salary', ascending=ascending)
print("With pipe:")
result = (df
.pipe(filter_age, min_age=27)
.pipe(add_salary_k)
.pipe(sort_by_salary, ascending=False)
)
print(result)
print("\n")
# ========== PIPE WITH LAMBDA ==========
print("=" * 60)
print("PIPE WITH LAMBDA FUNCTIONS")
print("=" * 60)
result = (df
.pipe(lambda x: x[x['department'] == 'Sales'])
.pipe(lambda x: x.assign(bonus=x['salary'] * 0.1))
.pipe(lambda x: x[['name', 'salary', 'bonus']])
)
print("Using lambda with pipe:")
print(result)
print("\n")
# ========== COMPLEX DATA PIPELINE ==========
print("=" * 60)
print("COMPLEX DATA PIPELINE")
print("=" * 60)
# Sales data
sales_data = pd.DataFrame({
'date': pd.date_range('2024-01-01', periods=100, freq='D'),
'product': np.random.choice(['A', 'B', 'C'], 100),
'region': np.random.choice(['North', 'South', 'East', 'West'], 100),
'sales': np.random.randint(100, 1000, 100),
'units': np.random.randint(1, 20, 100)
})
print("Original sales data (first 10 rows):")
print(sales_data.head(10))
print("\n")
# Define transformation functions
def add_date_features(df):
"""Add datetime-based features"""
df = df.copy()
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day_of_week'] = df['date'].dt.day_name()
df['is_weekend'] = df['date'].dt.dayofweek.isin([5, 6])
return df
def calculate_metrics(df):
"""Calculate business metrics"""
df = df.copy()
df['price_per_unit'] = (df['sales'] / df['units']).round(2)
df['sales_category'] = pd.cut(df['sales'],
bins=[0, 300, 600, 1000],
labels=['Low', 'Medium', 'High'])
return df
def filter_outliers(df, column, threshold=3):
"""Remove outliers using z-score"""
z_scores = np.abs((df[column] - df[column].mean()) / df[column].std())
return df[z_scores < threshold]
def aggregate_by_product(df):
"""Aggregate statistics by product"""
return df.groupby('product').agg({
'sales': ['sum', 'mean', 'count'],
'units': 'sum',
'price_per_unit': 'mean'
}).round(2)
# Build pipeline
print("Complete data pipeline:")
result = (sales_data
.pipe(add_date_features)
.pipe(calculate_metrics)
.pipe(filter_outliers, column='sales', threshold=3)
.pipe(aggregate_by_product)
)
print(result)
print("\n")
# ========== CONDITIONAL PIPELINE ==========
print("=" * 60)
print("CONDITIONAL PIPELINE")
print("=" * 60)
def conditional_transform(df, apply_transform=True):
"""Conditionally apply transformation"""
if apply_transform:
df = df.copy()
df['double_sales'] = df['sales'] * 2
return df
# Pipeline with conditional logic
apply_doubling = True
result = (sales_data
.head(5)
.pipe(conditional_transform, apply_transform=apply_doubling)
)
print(f"With transform (apply_transform={apply_doubling}):")
print(result[['date', 'sales', 'double_sales'] if apply_doubling else ['date', 'sales']])
print("\n")
# ========== ERROR HANDLING IN PIPELINE ==========
print("=" * 60)
print("ERROR HANDLING IN PIPELINE")
print("=" * 60)
def safe_transform(df, column, operation):
"""Safely transform with error handling"""
try:
df = df.copy()
df[f'{column}_transformed'] = operation(df[column])
return df
except Exception as e:
print(f"Error in transformation: {e}")
return df
result = (sales_data
.head(5)
.pipe(safe_transform, column='sales', operation=lambda x: np.log(x))
.pipe(safe_transform, column='units', operation=lambda x: np.sqrt(x))
)
print("Safe transformation:")
print(result[['sales', 'sales_transformed', 'units', 'units_transformed']])
print("\n")
# ========== DEBUGGING PIPELINE ==========
print("=" * 60)
print("DEBUGGING PIPELINE")
print("=" * 60)
def debug_print(df, message="Debug"):
"""Print debug information in pipeline"""
print(f"{message}:")
print(f" Shape: {df.shape}")
print(f" Columns: {df.columns.tolist()}")
print(f" First row:\n{df.iloc[0]}\n")
return df
result = (sales_data
.head(10)
.pipe(debug_print, message="After loading")
.pipe(add_date_features)
.pipe(debug_print, message="After adding date features")
.pipe(calculate_metrics)
.pipe(debug_print, message="After calculating metrics")
)
# ========== REUSABLE PIPELINE ==========
print("=" * 60)
print("REUSABLE PIPELINE")
print("=" * 60)
class DataPipeline:
"""Reusable data pipeline class"""
def __init__(self, df):
self.df = df.copy()
self.steps = []
def add_step(self, func, *args, **kwargs):
"""Add a transformation step"""
self.steps.append((func, args, kwargs))
return self
def execute(self):
"""Execute all steps in pipeline"""
result = self.df
for func, args, kwargs in self.steps:
result = result.pipe(func, *args, **kwargs)
return result
def get_steps(self):
"""Get list of pipeline steps"""
return [func.__name__ for func, _, _ in self.steps]
# Create and configure pipeline
pipeline = DataPipeline(sales_data)
pipeline.add_step(add_date_features)
pipeline.add_step(calculate_metrics)
pipeline.add_step(filter_outliers, column='sales', threshold=3)
print("Pipeline steps:")
print(pipeline.get_steps())
print("\n")
# Execute pipeline
result = pipeline.execute()
print("Pipeline result (first 5 rows):")
print(result.head())
print("\n")
# ========== ADVANCED: PIPELINE WITH LOGGING ==========
print("=" * 60)
print("PIPELINE WITH LOGGING")
print("=" * 60)
import time
def logged_transform(func):
"""Decorator to log pipeline steps"""
def wrapper(df, *args, **kwargs):
start = time.time()
print(f"Starting: {func.__name__}")
result = func(df, *args, **kwargs)
elapsed = time.time() - start
print(f"Completed: {func.__name__} ({elapsed:.3f}s)")
print(f" Rows: {len(df)} -> {len(result)}")
return result
return wrapper
@logged_transform
def clean_data(df):
"""Remove rows with missing values"""
return df.dropna()
@logged_transform
def enrich_data(df):
"""Add computed columns"""
df = df.copy()
df['total_value'] = df['sales'] * df['units']
return df
@logged_transform
def summarize_data(df):
"""Create summary statistics"""
return df.groupby('product')['sales'].agg(['mean', 'std', 'count'])
print("Logged pipeline:")
result = (sales_data
.pipe(clean_data)
.pipe(enrich_data)
.pipe(summarize_data)
)
print("\nFinal result:")
print(result)
print("\n")
# ========== COMBINING WITH OTHER PANDAS METHODS ==========
print("=" * 60)
print("COMBINING PIPE WITH OTHER METHODS")
print("=" * 60)
result = (sales_data
.pipe(add_date_features)
.assign(
sales_rank=lambda x: x.groupby('product')['sales'].rank(ascending=False)
)
.query('sales > 500')
.sort_values('sales', ascending=False)
.head(10)
.pipe(lambda x: x[['date', 'product', 'sales', 'sales_rank']])
)
print("Combined pipeline with assign, query, sort_values:")
print(result)
print("\n")
# ========== PERFORMANCE OPTIMIZATION ==========
print("=" * 60)
print("PERFORMANCE TIPS")
print("=" * 60)
performance_tips = """
PIPE PERFORMANCE TIPS:
1. Avoid unnecessary copies:
- Use inplace operations when possible
- Return views instead of copies where safe
2. Filter early:
- Apply filters before heavy transformations
- Reduce data size early in pipeline
3. Vectorize operations:
- Use built-in pandas methods over apply
- Leverage numpy for numerical operations
4. Profile your pipeline:
- Time each step
- Identify bottlenecks
- Optimize slowest steps first
5. Consider lazy evaluation:
- For very large datasets, consider Dask
- Process in chunks if memory is limited
Example optimized pipeline:
(df
.query('important_filter') # Filter first
.pipe(vectorized_transform) # Fast operation
.drop_duplicates() # Reduce size
.pipe(complex_transform) # Slower operation on smaller data
)
"""
print(performance_tips)
Summary
When to Use Each Technique
- Advanced Datetime Operations:
- Time series analysis
- Business day calculations
- Timezone handling
- Resampling temporal data
- Memory-Efficient Chunking:
- Files larger than available RAM
- Streaming data processing
- Memory-constrained environments
- ETL pipelines with large data
- Custom Accessors:
- Domain-specific operations used repeatedly
- Clean API for complex operations
- Extending pandas for specific use cases
- Creating reusable libraries
- Pipe for Method Chaining:
- Complex data transformations
- Readable data pipelines
- Functional programming style
- Debugging multi-step processes
Best Practices
Datetime Operations:
- Always be explicit about time zones
- Use vectorized .dt methods instead of loops
- Choose an appropriate frequency for resampling
- Consider business day calendars for financial data
Chunking:
- Monitor memory usage during processing
- Choose chunk size based on available RAM (typically 10K-100K rows)
- Optimize dtypes before processing
- Use generators to avoid storing all chunks
Custom Accessors:
- Validate input data in init
- Follow pandas’ naming conventions
- Document methods clearly
- Cache expensive computations
Pipe:
- Keep functions pure (no side effects)
- Make functions reusable
- Add debug steps for complex pipelines
- Consider adding logging for production use
- Filter early, transform late
Performance Considerations
- Datetime: .dt accessor is optimized, but avoid apply when possible
- Chunking: Balance chunk size vs. iteration overhead
- Accessors: Cache results when appropriate
- Pipe: Minimize intermediate copies, use inplace when safe
