Definition
The pipe() method enables clean, readable method chaining by allowing you to apply custom functions to DataFrames. It creates a functional programming style that improves code organization and reusability.
Key Concepts
- Method Chaining: Link multiple operations together
- Functional Programming: Treat operations as composable functions
- Readability: Create clear data transformation pipelines
- Reusability: Define transformation functions once, use many times
- Debugging: Easy to comment out steps in the pipeline
Example
python
import pandas as pd
import numpy as np
# ========== BASIC PIPE USAGE ==========
print("=" * 60)
print("BASIC PIPE USAGE")
print("=" * 60)
# Sample data
df = pd.DataFrame({
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'age': [25, 30, 35, 28, 32],
'salary': [50000, 60000, 45000, 55000, 65000],
'department': ['Sales', 'IT', 'HR', 'Sales', 'IT']
})
print("Original DataFrame:")
print(df)
print("\n")
# Without pipe (traditional approach)
print("Traditional approach:")
df_temp = df.copy()
df_temp = df_temp[df_temp['age'] > 27]
df_temp['salary_k'] = df_temp['salary'] / 1000
df_temp = df_temp.sort_values('salary', ascending=False)
print(df_temp)
print("\n")
# With pipe (cleaner)
def filter_age(df, min_age):
return df[df['age'] > min_age]
def add_salary_k(df):
df_copy = df.copy()
df_copy['salary_k'] = df_copy['salary'] / 1000
return df_copy
def sort_by_salary(df, ascending=False):
return df.sort_values('salary', ascending=ascending)
print("With pipe:")
result = (df
.pipe(filter_age, min_age=27)
.pipe(add_salary_k)
.pipe(sort_by_salary, ascending=False)
)
print(result)
print("\n")
# ========== PIPE WITH LAMBDA ==========
print("=" * 60)
print("PIPE WITH LAMBDA FUNCTIONS")
print("=" * 60)
result = (df
.pipe(lambda x: x[x['department'] == 'Sales'])
.pipe(lambda x: x.assign(bonus=x['salary'] * 0.1))
.pipe(lambda x: x[['name', 'salary', 'bonus']])
)
print("Using lambda with pipe:")
print(result)
print("\n")
# ========== COMPLEX DATA PIPELINE ==========
print("=" * 60)
print("COMPLEX DATA PIPELINE")
print("=" * 60)
# Sales data
sales_data = pd.DataFrame({
'date': pd.date_range('2024-01-01', periods=100, freq='D'),
'product': np.random.choice(['A', 'B', 'C'], 100),
'region': np.random.choice(['North', 'South', 'East', 'West'], 100),
'sales': np.random.randint(100, 1000, 100),
'units': np.random.randint(1, 20, 100)
})
print("Original sales data (first 10 rows):")
print(sales_data.head(10))
print("\n")
# Define transformation functions
def add_date_features(df):
"""Add datetime-based features"""
df = df.copy()
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day_of_week'] = df['date'].dt.day_name()
df['is_weekend'] = df['date'].dt.dayofweek.isin([5, 6])
return df
def calculate_metrics(df):
"""Calculate business metrics"""
df = df.copy()
df['price_per_unit'] = (df['sales'] / df['units']).round(2)
df['sales_category'] = pd.cut(df['sales'],
bins=[0, 300, 600, 1000],
labels=['Low', 'Medium', 'High'])
return df
def filter_outliers(df, column, threshold=3):
"""Remove outliers using z-score"""
z_scores = np.abs((df[column] - df[column].mean()) / df[column].std())
return df[z_scores < threshold]
def aggregate_by_product(df):
"""Aggregate statistics by product"""
return df.groupby('product').agg({
'sales': ['sum', 'mean', 'count'],
'units': 'sum',
'price_per_unit': 'mean'
}).round(2)
# Build pipeline
print("Complete data pipeline:")
result = (sales_data
.pipe(add_date_features)
.pipe(calculate_metrics)
.pipe(filter_outliers, column='sales', threshold=3)
.pipe(aggregate_by_product)
)
print(result)
print("\n")
# ========== CONDITIONAL PIPELINE ==========
print("=" * 60)
print("CONDITIONAL PIPELINE")
print("=" * 60)
def conditional_transform(df, apply_transform=True):
"""Conditionally apply transformation"""
if apply_transform:
df = df.copy()
df['double_sales'] = df['sales'] * 2
return df
# Pipeline with conditional logic
apply_doubling = True
result = (sales_data
.head(5)
.pipe(conditional_transform, apply_transform=apply_doubling)
)
print(f"With transform (apply_transform={apply_doubling}):")
print(result[['date', 'sales', 'double_sales'] if apply_doubling else ['date', 'sales']])
print("\n")
# ========== ERROR HANDLING IN PIPELINE ==========
print("=" * 60)
print("ERROR HANDLING IN PIPELINE")
print("=" * 60)
def safe_transform(df, column, operation):
"""Safely transform with error handling"""
try:
df = df.copy()
df[f'{column}_transformed'] = operation(df[column])
return df
except Exception as e:
print(f"Error in transformation: {e}")
return df
result = (sales_data
.head(5)
.pipe(safe_transform, column='sales', operation=lambda x: np.log(x))
.pipe(safe_transform, column='units', operation=lambda x: np.sqrt(x))
)
print("Safe transformation:")
print(result[['sales', 'sales_transformed', 'units', 'units_transformed']])
print("\n")
# ========== DEBUGGING PIPELINE ==========
print("=" * 60)
print("DEBUGGING PIPELINE")
print("=" * 60)
def debug_print(df, message="Debug"):
"""Print debug information in pipeline"""
print(f"{message}:")
print(f" Shape: {df.shape}")
print(f" Columns: {df.columns.tolist()}")
print(f" First row:\n{df.iloc[0]}\n")
return df
result = (sales_data
.head(10)
.pipe(debug_print, message="After loading")
.pipe(add_date_features)
.pipe(debug_print, message="After adding date features")
.pipe(calculate_metrics)
.pipe(debug_print, message="After calculating metrics")
)
# ========== REUSABLE PIPELINE ==========
print("=" * 60)
print("REUSABLE PIPELINE")
print("=" * 60)
class DataPipeline:
"""Reusable data pipeline class"""
def __init__(self, df):
self.df = df.copy()
self.steps = []
def add_step(self, func, *args, **kwargs):
"""Add a transformation step"""
self.steps.append((func, args, kwargs))
return self
def execute(self):
"""Execute all steps in pipeline"""
result = self.df
for func, args, kwargs in self.steps:
result = result.pipe(func, *args, **kwargs)
return result
def get_steps(self):
"""Get list of pipeline steps"""
return [func.__name__ for func, _, _ in self.steps]
# Create and configure pipeline
pipeline = DataPipeline(sales_data)
pipeline.add_step(add_date_features)
pipeline.add_step(calculate_metrics)
pipeline.add_step(filter_outliers, column='sales', threshold=3)
print("Pipeline steps:")
print(pipeline.get_steps())
print("\n")
# Execute pipeline
result = pipeline.execute()
print("Pipeline result (first 5 rows):")
print(result.head())
print("\n")
# ========== ADVANCED: PIPELINE WITH LOGGING ==========
print("=" * 60)
print("PIPELINE WITH LOGGING")
print("=" * 60)
import time
def logged_transform(func):
"""Decorator to log pipeline steps"""
def wrapper(df, *args, **kwargs):
start = time.time()
print(f"Starting: {func.__name__}")
result = func(df, *args, **kwargs)
elapsed = time.time() - start
print(f"Completed: {func.__name__} ({elapsed:.3f}s)")
print(f" Rows: {len(df)} -> {len(result)}")
return result
return wrapper
@logged_transform
def clean_data(df):
"""Remove rows with missing values"""
return df.dropna()
@logged_transform
def enrich_data(df):
"""Add computed columns"""
df = df.copy()
df['total_value'] = df['sales'] * df['units']
return df
@logged_transform
def summarize_data(df):
"""Create summary statistics"""
return df.groupby('product')['sales'].agg(['mean', 'std', 'count'])
print("Logged pipeline:")
result = (sales_data
.pipe(clean_data)
.pipe(enrich_data)
.pipe(summarize_data)
)
print("\nFinal result:")
print(result)
print("\n")
# ========== COMBINING WITH OTHER PANDAS METHODS ==========
print("=" * 60)
print("COMBINING PIPE WITH OTHER METHODS")
print("=" * 60)
result = (sales_data
.pipe(add_date_features)
.assign(
sales_rank=lambda x: x.groupby('product')['sales'].rank(ascending=False)
)
.query('sales > 500')
.sort_values('sales', ascending=False)
.head(10)
.pipe(lambda x: x[['date', 'product', 'sales', 'sales_rank']])
)
print("Combined pipeline with assign, query, sort_values:")
print(result)
print("\n")
# ========== PERFORMANCE OPTIMIZATION ==========
print("=" * 60)
print("PERFORMANCE TIPS")
print("=" * 60)
performance_tips = """
PIPE PERFORMANCE TIPS:
1. Avoid unnecessary copies:
- Use inplace operations when possible
- Return views instead of copies where safe
2. Filter early:
- Apply filters before heavy transformations
- Reduce data size early in pipeline
3. Vectorize operations:
- Use built-in pandas methods over apply
- Leverage numpy for numerical operations
4. Profile your pipeline:
- Time each step
- Identify bottlenecks
- Optimize slowest steps first
5. Consider lazy evaluation:
- For very large datasets, consider Dask
- Process in chunks if memory is limited
Example optimized pipeline:
(df
.query('important_filter') # Filter first
.pipe(vectorized_transform) # Fast operation
.drop_duplicates() # Reduce size
.pipe(complex_transform) # Slower operation on smaller data
)
"""
print(performance_tips)
