Quick Reference

This quick reference provides essential SparkForge syntax and patterns for rapid development.

Basic Pipeline Setup

from pipeline_builder import PipelineBuilder
from pyspark.sql import functions as F

# Initialize
builder = PipelineBuilder(spark=spark, schema="analytics")

# Build pipeline
pipeline = (builder
    .with_bronze_rules(name="events", rules={"id": [F.col("id").isNotNull()]})
    .add_silver_transform(name="clean", source_bronze="events",
                        transform=clean_func, rules={}, table_name="clean_events")
    .add_gold_transform(name="analytics", transform=analytics_func,
                      rules={}, table_name="analytics")
    .to_pipeline()
)

# Execute
result = pipeline.initial_load(bronze_sources={"events": events_df})

Validation Rules

# Basic validation
rules = {
    "user_id": [F.col("user_id").isNotNull()],
    "email": [F.col("email").contains("@")],
    "age": [F.col("age") > 0, F.col("age") < 120]
}

# String shortcuts
rules = {
    "id": ["not_null", "positive"],
    "status": ["not_null"],
    "amount": ["non_negative"]
}

Execution Modes

# Initial load - process all data
result = pipeline.initial_load(bronze_sources={"events": events_df})

# Incremental - process new data only
result = pipeline.run_incremental(bronze_sources={"events": new_events_df})

# Full refresh - force reprocessing
result = pipeline.run_full_refresh(bronze_sources={"events": events_df})

Configuration Options

builder = PipelineBuilder(
    spark=spark,
    schema="analytics",
    min_bronze_rate=95.0,      # Quality thresholds
    min_silver_rate=98.0,
    min_gold_rate=99.0,
    enable_parallel_silver=True, # Parallel execution
    max_parallel_workers=4,
    verbose=True                # Logging
)

Common Patterns

Bronze with Incremental Processing: .. code-block:: python

builder.with_bronze_rules(

name=”events”, rules={“timestamp”: [F.col(“timestamp”).isNotNull()]}, incremental_col=”timestamp” # Enables watermarking

)

Silver with Dependencies: .. code-block:: python

builder.add_silver_transform(

name=”enriched_events”, source_bronze=”events”, transform=enrich_func, rules={}, table_name=”enriched_events”, depends_on=[“user_profiles”] # Wait for other Silver steps

)

Gold Aggregation: .. code-block:: python

def daily_metrics(spark, silvers):

events = silvers[“clean_events”] return events.groupBy(“date”).agg(F.count(“*”).alias(“events”))

builder.add_gold_transform(

name=”daily_metrics”, transform=daily_metrics, rules={}, table_name=”daily_metrics”

)

For the complete quick reference with more examples, see: QUICK_REFERENCE.md