SparkForge Documentation

A production-ready PySpark + Delta Lake pipeline engine with the Medallion Architecture (Bronze → Silver → Gold). Build scalable data pipelines with clean, maintainable code and comprehensive validation.

Note

Engine Configuration Required: Before using SparkForge, you must configure the engine:

from pipeline_builder.engine_config import configure_engine
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
configure_engine(spark=spark)

Quick Start

Get up and running with SparkForge in under 5 minutes:

pip install pyspark==3.5.0 delta-spark==3.0.0
python examples/core/hello_world.py
from pipeline_builder import PipelineBuilder
from pipeline_builder.engine_config import configure_engine
from pipeline_builder.functions import get_default_functions
from pyspark.sql import SparkSession

# Configure engine (required!)
spark = SparkSession.builder.appName("My Pipeline").getOrCreate()
configure_engine(spark=spark)
F = get_default_functions()

# Build pipeline
builder = PipelineBuilder(spark=spark, schema="my_schema")
builder.with_bronze_rules(
    name="events",
    rules={"user_id": [F.col("user_id").isNotNull()]}
)

def clean_events(spark, bronze_df, prior_silvers):
    F = get_default_functions()
    return bronze_df.filter(F.col("status") == "active")

builder.add_silver_transform(
    name="clean_events",
    source_bronze="events",
    transform=clean_events,
    rules={"status": [F.col("status").isNotNull()]},
    table_name="clean_events"
)

def analytics(spark, silvers):
    F = get_default_functions()
    return silvers["clean_events"].groupBy("category").count()

builder.add_gold_transform(
    name="analytics",
    transform=analytics,
    rules={"category": [F.col("category").isNotNull()]},
    table_name="analytics",
    source_silvers=["clean_events"]
)

# Execute
pipeline = builder.to_pipeline()
result = pipeline.run_initial_load(bronze_sources={"events": source_df})
print(f"Status: {result.status.value}")

Features

🏗️ Medallion Architecture

Bronze → Silver → Gold data layering with automatic dependency management

Service-Oriented Design

Modular architecture with dedicated services for validation, storage, transformation, and reporting

🎯 Automatic Dependency Management

Automatically analyzes step dependencies and executes in correct order

🛠️ Engine Configuration

Works with both real PySpark and mock Spark for testing

🔧 Validation System

Built-in validation with configurable thresholds and string rule support

📊 Incremental Processing

Efficient incremental updates with Delta Lake watermarking

💧 Delta Lake Integration

Full support for ACID transactions, time travel, and schema evolution

🔍 Step Executors

Dedicated executors for Bronze, Silver, and Gold steps

Comprehensive Error Handling

Centralized error handling with detailed context and suggestions

📈 Execution Reporting

Detailed execution reports with metrics and timing

Documentation by User Type

For Beginners

New to SparkForge? Start here:

For Users

Building pipelines? These guides are for you:

For Developers

Advanced topics and API documentation:

Examples

Hello World - The simplest possible pipeline

from pipeline_builder import PipelineBuilder
from pipeline_builder.engine_config import configure_engine
from pipeline_builder.functions import get_default_functions

configure_engine(spark=spark)
F = get_default_functions()

builder = PipelineBuilder(spark=spark, schema="hello")
builder.with_bronze_rules(
    name="events",
    rules={"user": [F.col("user").isNotNull()]}
)

def purchases_transform(spark, bronze_df, prior_silvers):
    F = get_default_functions()
    return bronze_df.filter(F.col("action") == "purchase")

builder.add_silver_transform(
    name="purchases",
    source_bronze="events",
    transform=purchases_transform,
    rules={"action": [F.col("action") == "purchase"]},
    table_name="purchases"
)

def user_counts_transform(spark, silvers):
    F = get_default_functions()
    return silvers["purchases"].groupBy("user").count()

builder.add_gold_transform(
    name="user_counts",
    transform=user_counts_transform,
    rules={"user": [F.col("user").isNotNull()]},
    table_name="user_counts",
    source_silvers=["purchases"]
)

pipeline = builder.to_pipeline()
result = pipeline.run_initial_load(bronze_sources={"events": source_df})

Installation

pip install pyspark==3.5.0 delta-spark==3.0.0

Prerequisites: - Python 3.8+ - Java 17 (for Spark 3.5) - PySpark 3.5+ - Delta Lake 3.0.0+

Key Benefits

Simplified Development

Clean, maintainable code with minimal boilerplate

Production Ready

Built-in error handling, logging, and monitoring

Service-Oriented Architecture

Modular design with dedicated services for each concern

Scalable Architecture

Designed for enterprise-scale data processing

Delta Lake Integration

ACID transactions, time travel, and schema evolution

Comprehensive Testing

Extensive test suite with high coverage

Support

  • Documentation: Complete guides and API reference

  • Examples: Real-world pipeline examples in examples/ directory

  • Community: GitHub discussions and issues

  • Professional: Enterprise support available

License

This project is licensed under the MIT License - see the LICENSE file for details.

Indices and tables