SparkForge Documentation¶
A production-ready PySpark + Delta Lake pipeline engine with the Medallion Architecture (Bronze → Silver → Gold). Build scalable data pipelines with clean, maintainable code and comprehensive validation.
Note
Engine Configuration Required: Before using SparkForge, you must configure the engine:
from pipeline_builder.engine_config import configure_engine
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
configure_engine(spark=spark)
Quick Links¶
For Beginners: - Getting Started with PipelineBuilder - Start here! - 5-Minute Quick Start - 5-minute quick start - Hello World Example - Simplest example
For Users: - User Guide - Comprehensive user guide - Progressive Examples - Step-by-step examples - API Reference - Complete API reference
For Developers: - Architecture - Architecture documentation - API Reference - Detailed API documentation - DEPLOYMENT_GUIDE - Deployment guide
Quick Start¶
Get up and running with SparkForge in under 5 minutes:
pip install pyspark==3.5.0 delta-spark==3.0.0
python examples/core/hello_world.py
from pipeline_builder import PipelineBuilder
from pipeline_builder.engine_config import configure_engine
from pipeline_builder.functions import get_default_functions
from pyspark.sql import SparkSession
# Configure engine (required!)
spark = SparkSession.builder.appName("My Pipeline").getOrCreate()
configure_engine(spark=spark)
F = get_default_functions()
# Build pipeline
builder = PipelineBuilder(spark=spark, schema="my_schema")
builder.with_bronze_rules(
name="events",
rules={"user_id": [F.col("user_id").isNotNull()]}
)
def clean_events(spark, bronze_df, prior_silvers):
F = get_default_functions()
return bronze_df.filter(F.col("status") == "active")
builder.add_silver_transform(
name="clean_events",
source_bronze="events",
transform=clean_events,
rules={"status": [F.col("status").isNotNull()]},
table_name="clean_events"
)
def analytics(spark, silvers):
F = get_default_functions()
return silvers["clean_events"].groupBy("category").count()
builder.add_gold_transform(
name="analytics",
transform=analytics,
rules={"category": [F.col("category").isNotNull()]},
table_name="analytics",
source_silvers=["clean_events"]
)
# Execute
pipeline = builder.to_pipeline()
result = pipeline.run_initial_load(bronze_sources={"events": source_df})
print(f"Status: {result.status.value}")
Features¶
- 🏗️ Medallion Architecture
Bronze → Silver → Gold data layering with automatic dependency management
- ⚡ Service-Oriented Design
Modular architecture with dedicated services for validation, storage, transformation, and reporting
- 🎯 Automatic Dependency Management
Automatically analyzes step dependencies and executes in correct order
- 🛠️ Engine Configuration
Works with both real PySpark and mock Spark for testing
- 🔧 Validation System
Built-in validation with configurable thresholds and string rule support
- 📊 Incremental Processing
Efficient incremental updates with Delta Lake watermarking
- 💧 Delta Lake Integration
Full support for ACID transactions, time travel, and schema evolution
- 🔍 Step Executors
Dedicated executors for Bronze, Silver, and Gold steps
- ✅ Comprehensive Error Handling
Centralized error handling with detailed context and suggestions
- 📈 Execution Reporting
Detailed execution reports with metrics and timing
Documentation by User Type¶
For Beginners¶
New to SparkForge? Start here:
For Users¶
Building pipelines? These guides are for you:
User Guides
Use Cases
For Developers¶
Advanced topics and API documentation:
API Reference
Examples¶
Hello World - The simplest possible pipeline
from pipeline_builder import PipelineBuilder
from pipeline_builder.engine_config import configure_engine
from pipeline_builder.functions import get_default_functions
configure_engine(spark=spark)
F = get_default_functions()
builder = PipelineBuilder(spark=spark, schema="hello")
builder.with_bronze_rules(
name="events",
rules={"user": [F.col("user").isNotNull()]}
)
def purchases_transform(spark, bronze_df, prior_silvers):
F = get_default_functions()
return bronze_df.filter(F.col("action") == "purchase")
builder.add_silver_transform(
name="purchases",
source_bronze="events",
transform=purchases_transform,
rules={"action": [F.col("action") == "purchase"]},
table_name="purchases"
)
def user_counts_transform(spark, silvers):
F = get_default_functions()
return silvers["purchases"].groupBy("user").count()
builder.add_gold_transform(
name="user_counts",
transform=user_counts_transform,
rules={"user": [F.col("user").isNotNull()]},
table_name="user_counts",
source_silvers=["purchases"]
)
pipeline = builder.to_pipeline()
result = pipeline.run_initial_load(bronze_sources={"events": source_df})
Installation¶
pip install pyspark==3.5.0 delta-spark==3.0.0
Prerequisites: - Python 3.8+ - Java 17 (for Spark 3.5) - PySpark 3.5+ - Delta Lake 3.0.0+
Key Benefits¶
- Simplified Development
Clean, maintainable code with minimal boilerplate
- Production Ready
Built-in error handling, logging, and monitoring
- Service-Oriented Architecture
Modular design with dedicated services for each concern
- Scalable Architecture
Designed for enterprise-scale data processing
- Delta Lake Integration
ACID transactions, time travel, and schema evolution
- Comprehensive Testing
Extensive test suite with high coverage
Support¶
Documentation: Complete guides and API reference
Examples: Real-world pipeline examples in examples/ directory
Community: GitHub discussions and issues
Professional: Enterprise support available
License¶
This project is licensed under the MIT License - see the LICENSE file for details.