Pipe for Method Chaining

Definition

The pipe() method enables clean, readable method chaining by allowing you to apply custom functions to DataFrames. It creates a functional programming style that improves code organization and reusability.

Key Concepts

  • Method Chaining: Link multiple operations together
  • Functional Programming: Treat operations as composable functions
  • Readability: Create clear data transformation pipelines
  • Reusability: Define transformation functions once, use many times
  • Debugging: Easy to comment out steps in the pipeline

Example

python

import pandas as pd
import numpy as np

# ========== BASIC PIPE USAGE ==========
print("=" * 60)
print("BASIC PIPE USAGE")
print("=" * 60)

# Sample data
df = pd.DataFrame({
    'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
    'age': [25, 30, 35, 28, 32],
    'salary': [50000, 60000, 45000, 55000, 65000],
    'department': ['Sales', 'IT', 'HR', 'Sales', 'IT']
})

print("Original DataFrame:")
print(df)
print("\n")

# Without pipe (traditional approach)
print("Traditional approach:")
df_temp = df.copy()
df_temp = df_temp[df_temp['age'] > 27]
df_temp['salary_k'] = df_temp['salary'] / 1000
df_temp = df_temp.sort_values('salary', ascending=False)
print(df_temp)
print("\n")

# With pipe (cleaner)
def filter_age(df, min_age):
    return df[df['age'] > min_age]

def add_salary_k(df):
    df_copy = df.copy()
    df_copy['salary_k'] = df_copy['salary'] / 1000
    return df_copy

def sort_by_salary(df, ascending=False):
    return df.sort_values('salary', ascending=ascending)

print("With pipe:")
result = (df
    .pipe(filter_age, min_age=27)
    .pipe(add_salary_k)
    .pipe(sort_by_salary, ascending=False)
)
print(result)
print("\n")

# ========== PIPE WITH LAMBDA ==========
print("=" * 60)
print("PIPE WITH LAMBDA FUNCTIONS")
print("=" * 60)

result = (df
    .pipe(lambda x: x[x['department'] == 'Sales'])
    .pipe(lambda x: x.assign(bonus=x['salary'] * 0.1))
    .pipe(lambda x: x[['name', 'salary', 'bonus']])
)

print("Using lambda with pipe:")
print(result)
print("\n")

# ========== COMPLEX DATA PIPELINE ==========
print("=" * 60)
print("COMPLEX DATA PIPELINE")
print("=" * 60)

# Sales data
sales_data = pd.DataFrame({
    'date': pd.date_range('2024-01-01', periods=100, freq='D'),
    'product': np.random.choice(['A', 'B', 'C'], 100),
    'region': np.random.choice(['North', 'South', 'East', 'West'], 100),
    'sales': np.random.randint(100, 1000, 100),
    'units': np.random.randint(1, 20, 100)
})

print("Original sales data (first 10 rows):")
print(sales_data.head(10))
print("\n")

# Define transformation functions
def add_date_features(df):
    """Add datetime-based features"""
    df = df.copy()
    df['year'] = df['date'].dt.year
    df['month'] = df['date'].dt.month
    df['day_of_week'] = df['date'].dt.day_name()
    df['is_weekend'] = df['date'].dt.dayofweek.isin([5, 6])
    return df

def calculate_metrics(df):
    """Calculate business metrics"""
    df = df.copy()
    df['price_per_unit'] = (df['sales'] / df['units']).round(2)
    df['sales_category'] = pd.cut(df['sales'], 
                                   bins=[0, 300, 600, 1000],
                                   labels=['Low', 'Medium', 'High'])
    return df

def filter_outliers(df, column, threshold=3):
    """Remove outliers using z-score"""
    z_scores = np.abs((df[column] - df[column].mean()) / df[column].std())
    return df[z_scores < threshold]

def aggregate_by_product(df):
    """Aggregate statistics by product"""
    return df.groupby('product').agg({
        'sales': ['sum', 'mean', 'count'],
        'units': 'sum',
        'price_per_unit': 'mean'
    }).round(2)

# Build pipeline
print("Complete data pipeline:")
result = (sales_data
    .pipe(add_date_features)
    .pipe(calculate_metrics)
    .pipe(filter_outliers, column='sales', threshold=3)
    .pipe(aggregate_by_product)
)

print(result)
print("\n")

# ========== CONDITIONAL PIPELINE ==========
print("=" * 60)
print("CONDITIONAL PIPELINE")
print("=" * 60)

def conditional_transform(df, apply_transform=True):
    """Conditionally apply transformation"""
    if apply_transform:
        df = df.copy()
        df['double_sales'] = df['sales'] * 2
    return df

# Pipeline with conditional logic
apply_doubling = True
result = (sales_data
    .head(5)
    .pipe(conditional_transform, apply_transform=apply_doubling)
)

print(f"With transform (apply_transform={apply_doubling}):")
print(result[['date', 'sales', 'double_sales'] if apply_doubling else ['date', 'sales']])
print("\n")

# ========== ERROR HANDLING IN PIPELINE ==========
print("=" * 60)
print("ERROR HANDLING IN PIPELINE")
print("=" * 60)

def safe_transform(df, column, operation):
    """Safely transform with error handling"""
    try:
        df = df.copy()
        df[f'{column}_transformed'] = operation(df[column])
        return df
    except Exception as e:
        print(f"Error in transformation: {e}")
        return df

result = (sales_data
    .head(5)
    .pipe(safe_transform, column='sales', operation=lambda x: np.log(x))
    .pipe(safe_transform, column='units', operation=lambda x: np.sqrt(x))
)

print("Safe transformation:")
print(result[['sales', 'sales_transformed', 'units', 'units_transformed']])
print("\n")

# ========== DEBUGGING PIPELINE ==========
print("=" * 60)
print("DEBUGGING PIPELINE")
print("=" * 60)

def debug_print(df, message="Debug"):
    """Print debug information in pipeline"""
    print(f"{message}:")
    print(f"  Shape: {df.shape}")
    print(f"  Columns: {df.columns.tolist()}")
    print(f"  First row:\n{df.iloc[0]}\n")
    return df

result = (sales_data
    .head(10)
    .pipe(debug_print, message="After loading")
    .pipe(add_date_features)
    .pipe(debug_print, message="After adding date features")
    .pipe(calculate_metrics)
    .pipe(debug_print, message="After calculating metrics")
)

# ========== REUSABLE PIPELINE ==========
print("=" * 60)
print("REUSABLE PIPELINE")
print("=" * 60)

class DataPipeline:
    """Reusable data pipeline class"""
    
    def __init__(self, df):
        self.df = df.copy()
        self.steps = []
    
    def add_step(self, func, *args, **kwargs):
        """Add a transformation step"""
        self.steps.append((func, args, kwargs))
        return self
    
    def execute(self):
        """Execute all steps in pipeline"""
        result = self.df
        for func, args, kwargs in self.steps:
            result = result.pipe(func, *args, **kwargs)
        return result
    
    def get_steps(self):
        """Get list of pipeline steps"""
        return [func.__name__ for func, _, _ in self.steps]

# Create and configure pipeline
pipeline = DataPipeline(sales_data)
pipeline.add_step(add_date_features)
pipeline.add_step(calculate_metrics)
pipeline.add_step(filter_outliers, column='sales', threshold=3)

print("Pipeline steps:")
print(pipeline.get_steps())
print("\n")

# Execute pipeline
result = pipeline.execute()
print("Pipeline result (first 5 rows):")
print(result.head())
print("\n")

# ========== ADVANCED: PIPELINE WITH LOGGING ==========
print("=" * 60)
print("PIPELINE WITH LOGGING")
print("=" * 60)

import time

def logged_transform(func):
    """Decorator to log pipeline steps"""
    def wrapper(df, *args, **kwargs):
        start = time.time()
        print(f"Starting: {func.__name__}")
        result = func(df, *args, **kwargs)
        elapsed = time.time() - start
        print(f"Completed: {func.__name__} ({elapsed:.3f}s)")
        print(f"  Rows: {len(df)} -> {len(result)}")
        return result
    return wrapper

@logged_transform
def clean_data(df):
    """Remove rows with missing values"""
    return df.dropna()

@logged_transform
def enrich_data(df):
    """Add computed columns"""
    df = df.copy()
    df['total_value'] = df['sales'] * df['units']
    return df

@logged_transform
def summarize_data(df):
    """Create summary statistics"""
    return df.groupby('product')['sales'].agg(['mean', 'std', 'count'])

print("Logged pipeline:")
result = (sales_data
    .pipe(clean_data)
    .pipe(enrich_data)
    .pipe(summarize_data)
)
print("\nFinal result:")
print(result)
print("\n")

# ========== COMBINING WITH OTHER PANDAS METHODS ==========
print("=" * 60)
print("COMBINING PIPE WITH OTHER METHODS")
print("=" * 60)

result = (sales_data
    .pipe(add_date_features)
    .assign(
        sales_rank=lambda x: x.groupby('product')['sales'].rank(ascending=False)
    )
    .query('sales > 500')
    .sort_values('sales', ascending=False)
    .head(10)
    .pipe(lambda x: x[['date', 'product', 'sales', 'sales_rank']])
)

print("Combined pipeline with assign, query, sort_values:")
print(result)
print("\n")

# ========== PERFORMANCE OPTIMIZATION ==========
print("=" * 60)
print("PERFORMANCE TIPS")
print("=" * 60)

performance_tips = """
PIPE PERFORMANCE TIPS:

1. Avoid unnecessary copies:
   - Use inplace operations when possible
   - Return views instead of copies where safe

2. Filter early:
   - Apply filters before heavy transformations
   - Reduce data size early in pipeline

3. Vectorize operations:
   - Use built-in pandas methods over apply
   - Leverage numpy for numerical operations

4. Profile your pipeline:
   - Time each step
   - Identify bottlenecks
   - Optimize slowest steps first

5. Consider lazy evaluation:
   - For very large datasets, consider Dask
   - Process in chunks if memory is limited

Example optimized pipeline:
    (df
        .query('important_filter')  # Filter first
        .pipe(vectorized_transform)  # Fast operation
        .drop_duplicates()  # Reduce size
        .pipe(complex_transform)  # Slower operation on smaller data
    )
"""

print(performance_tips)