Progressive Examples

This guide provides step-by-step examples that build complexity gradually, from simple pipelines to advanced use cases.

Learning Path

Level 1: Basic Pipeline - Simple Bronze validation - Single Silver transformation - Basic Gold aggregation

Level 2: Intermediate Pipeline - Multiple Bronze sources - Silver dependencies - Complex transformations

Level 3: Advanced Pipeline - Incremental processing - Error handling and recovery - Service-oriented patterns

Level 4: Production Pipeline - Performance optimization - Monitoring and alerting - Scalable architecture

Level 1: Basic Pipeline

Simple Bronze → Silver → Gold pipeline with basic validation.

from pyspark.sql import SparkSession
from pipeline_builder import PipelineBuilder
from pipeline_builder.engine_config import configure_engine
from pipeline_builder.functions import get_default_functions

# Configure engine (required!)
spark = SparkSession.builder.appName("BasicPipeline").getOrCreate()
configure_engine(spark=spark)
F = get_default_functions()

# Create sample data
data = [("user1", "click", 100), ("user2", "view", 200)]
df = spark.createDataFrame(data, ["user_id", "action", "value"])

# Build pipeline
builder = PipelineBuilder(spark=spark, schema="basic")

# Bronze: Validate raw data
builder.with_bronze_rules(
    name="events",
    rules={
        "user_id": [F.col("user_id").isNotNull()],
        "action": [F.col("action").isNotNull()],
    }
)

# Silver: Clean data
def clean_events(spark, bronze_df, prior_silvers):
    F = get_default_functions()
    return bronze_df.filter(F.col("value") > 0)

builder.add_silver_transform(
    name="clean_events",
    source_bronze="events",
    transform=clean_events,
    rules={"user_id": [F.col("user_id").isNotNull()]},
    table_name="clean_events"
)

# Gold: Aggregate data
def daily_summary(spark, silvers):
    F = get_default_functions()
    return silvers["clean_events"].groupBy("action").count()

builder.add_gold_transform(
    name="daily_summary",
    transform=daily_summary,
    rules={"action": [F.col("action").isNotNull()]},
    table_name="daily_summary",
    source_silvers=["clean_events"]
)

# Execute
pipeline = builder.to_pipeline()
result = pipeline.run_initial_load(bronze_sources={"events": df})
print(f"Status: {result.status.value}")

Level 2: Intermediate Pipeline

Multiple sources with dependencies and complex transformations.

from pyspark.sql import SparkSession
from pipeline_builder import PipelineBuilder
from pipeline_builder.engine_config import configure_engine
from pipeline_builder.functions import get_default_functions

spark = SparkSession.builder.appName("IntermediatePipeline").getOrCreate()
configure_engine(spark=spark)
F = get_default_functions()

# Multiple bronze sources
orders_df = spark.createDataFrame([...], ["order_id", "customer_id", "amount"])
customers_df = spark.createDataFrame([...], ["customer_id", "name", "segment"])

builder = PipelineBuilder(spark=spark, schema="intermediate")

# Bronze: Multiple sources
builder.with_bronze_rules(
    name="orders",
    rules={"order_id": [F.col("order_id").isNotNull()]},
    incremental_col="order_date"
)

builder.with_bronze_rules(
    name="customers",
    rules={"customer_id": [F.col("customer_id").isNotNull()]}
)

# Silver: Join and enrich
def enriched_orders(spark, bronze_df, prior_silvers):
    F = get_default_functions()
    customers = prior_silvers.get("clean_customers")
    if customers:
        return bronze_df.join(customers, "customer_id", "left")
    return bronze_df

builder.add_silver_transform(
    name="enriched_orders",
    source_bronze="orders",
    transform=enriched_orders,
    rules={"order_id": [F.col("order_id").isNotNull()]},
    table_name="enriched_orders",
    source_silvers=["clean_customers"]
)

# Gold: Business metrics
def customer_metrics(spark, silvers):
    F = get_default_functions()
    return silvers["enriched_orders"].groupBy("customer_segment").agg(
        F.count("*").alias("order_count"),
        F.sum("amount").alias("total_revenue")
    )

builder.add_gold_transform(
    name="customer_metrics",
    transform=customer_metrics,
    rules={"customer_segment": [F.col("customer_segment").isNotNull()]},
    table_name="customer_metrics",
    source_silvers=["enriched_orders"]
)

pipeline = builder.to_pipeline()
result = pipeline.run_initial_load(
    bronze_sources={"orders": orders_df, "customers": customers_df}
)

Level 3: Advanced Pipeline

Incremental processing with error handling.

from pyspark.sql import SparkSession
from pipeline_builder import PipelineBuilder
from pipeline_builder.engine_config import configure_engine
from pipeline_builder.functions import get_default_functions
from pipeline_builder.models.exceptions import PipelineExecutionError

spark = SparkSession.builder.appName("AdvancedPipeline").getOrCreate()
configure_engine(spark=spark)
F = get_default_functions()

builder = PipelineBuilder(spark=spark, schema="advanced")

# Bronze with incremental support
builder.with_bronze_rules(
    name="events",
    rules={"user_id": [F.col("user_id").isNotNull()]},
    incremental_col="timestamp"
)

# Silver with error handling
def safe_transform(spark, bronze_df, prior_silvers):
    F = get_default_functions()
    try:
        # Complex transformation logic
        return bronze_df.withColumn("processed", F.lit(True))
    except Exception as e:
        # Log error and return empty DataFrame
        print(f"Transform error: {e}")
        return spark.createDataFrame([], bronze_df.schema)

builder.add_silver_transform(
    name="processed_events",
    source_bronze="events",
    transform=safe_transform,
    rules={"user_id": [F.col("user_id").isNotNull()]},
    table_name="processed_events"
)

pipeline = builder.to_pipeline()

# Initial load
result = pipeline.run_initial_load(bronze_sources={"events": historical_df})

# Incremental load
try:
    result = pipeline.run_incremental(bronze_sources={"events": new_data_df})
except PipelineExecutionError as e:
    print(f"Execution failed: {e}")
    # Handle error appropriately

Level 4: Production Pipeline

Production-ready pipeline with monitoring and optimization.

from pyspark.sql import SparkSession
from pipeline_builder import PipelineBuilder
from pipeline_builder.engine_config import configure_engine
from pipeline_builder.functions import get_default_functions
from pipeline_builder.writer import LogWriter

# Production Spark configuration
spark = SparkSession.builder \
    .appName("ProductionPipeline") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

configure_engine(spark=spark)
F = get_default_functions()

# Build pipeline with strict validation
builder = PipelineBuilder(
    spark=spark,
    schema="production",
    min_bronze_rate=95.0,
    min_silver_rate=98.0,
    min_gold_rate=99.0,
    verbose=True
)

# ... build pipeline steps ...

pipeline = builder.to_pipeline()

# Execute with logging
log_writer = LogWriter(
    spark=spark,
    schema="monitoring",
    table_name="pipeline_logs"
)

result = pipeline.run_initial_load(bronze_sources={"events": source_df})

# Log execution
log_writer.append(result, run_id="run_123")

# Monitor performance
if result.duration_seconds > 300:  # 5 minutes
    print("⚠️  Pipeline execution exceeded threshold")

# Check validation rates
if result.metrics.total_rows_written < expected_rows:
    print("⚠️  Fewer rows written than expected")

Service-Oriented Patterns

Using services directly for advanced use cases.

from pipeline_builder.execution import ExecutionEngine
from pipeline_builder.models import PipelineConfig, BronzeStep
from pipeline_builder.models.enums import ExecutionMode

# Create execution engine (services initialized internally)
config = PipelineConfig.create_default(schema="production")
engine = ExecutionEngine(spark=spark, config=config)

# Access services directly
# engine.validator - ExecutionValidator
# engine.table_service - TableService
# engine.write_service - WriteService
# engine.transform_service - TransformService
# engine.reporter - ExecutionReporter
# engine.error_handler - ErrorHandler

# Execute step with service composition
bronze_step = BronzeStep(
    name="events",
    rules={"user_id": [F.col("user_id").isNotNull()]}
)

result = engine.execute_step(
    step=bronze_step,
    sources={"events": source_df},
    mode=ExecutionMode.INITIAL
)

Error Handling Examples

Comprehensive error handling patterns.

from pipeline_builder.models.exceptions import (
    PipelineConfigurationError,
    PipelineExecutionError
)

try:
    # Validate pipeline
    errors = builder.validate_pipeline()
    if errors:
        raise PipelineConfigurationError(f"Validation errors: {errors}")

    # Execute pipeline
    result = pipeline.run_initial_load(bronze_sources={"events": df})

    if result.status.value != "completed":
        raise PipelineExecutionError(f"Pipeline failed: {result.errors}")

except PipelineConfigurationError as e:
    print(f"Configuration error: {e}")
    # Fix configuration and retry

except PipelineExecutionError as e:
    print(f"Execution error: {e}")
    # Log error, notify, and handle gracefully

Next Steps