User Guide¶
This comprehensive guide covers all aspects of using SparkForge for building data pipelines with the Medallion Architecture.
Getting Started¶
If you’re new to SparkForge, start with the Quick Start Guide to get up and running in minutes.
Core Concepts¶
Medallion Architecture¶
SparkForge implements the Medallion Architecture with three distinct layers:
Bronze Layer: Raw data ingestion and initial validation
Silver Layer: Cleaned, enriched, and transformed data
Gold Layer: Business-ready analytics and reporting datasets
Validation System¶
SparkForge includes a robust validation system that ensures data quality from the start:
Early Validation: Invalid configurations are rejected during construction
Required Rules: All step types must have non-empty validation rules
Clear Error Messages: Detailed error messages help you fix issues quickly
Type Safety: Transform functions and dependencies are validated
# ✅ Valid - has required rules
BronzeStep(
name="events",
rules={"user_id": [F.col("user_id").isNotNull()]}
)
# ❌ Invalid - empty rules rejected
BronzeStep(
name="events",
rules={} # ValidationError: Rules must be a non-empty dictionary
)
Pipeline Building¶
Use the PipelineBuilder to construct your data pipeline:
from pipeline_builder import PipelineBuilder
from pyspark.sql import functions as F
# Initialize builder
builder = PipelineBuilder(spark=spark, schema="analytics")
# Add Bronze validation
builder.with_bronze_rules(
name="events",
rules={"user_id": [F.col("user_id").isNotNull()]},
incremental_col="timestamp"
)
# Add Silver transformation
builder.add_silver_transform(
name="clean_events",
source_bronze="events",
transform=clean_transform,
rules={"status": [F.col("status").isNotNull()]},
table_name="clean_events"
)
# Add Gold aggregation
builder.add_gold_transform(
name="daily_metrics",
transform=aggregate_transform,
rules={"date": [F.col("date").isNotNull()]},
table_name="daily_metrics"
)
Execution Modes¶
SparkForge supports different execution modes:
Initial Load: Process all data from scratch
Incremental: Process only new/changed data (coming soon)
Validation Only: Run validation without writing data
# Initial load
result = pipeline.run_initial_load(bronze_sources={"events": source_df})
# Validation only
result = pipeline.run_validation(bronze_sources={"events": source_df})
Data Validation¶
Validation Rules¶
Define data quality rules using PySpark Column expressions:
rules = {
"user_id": [F.col("user_id").isNotNull()],
"email": [F.col("email").rlike(r"^[^@]+@[^@]+\.[^@]+$")],
"age": [F.col("age").between(0, 120)],
"status": [F.col("status").isin(["active", "inactive", "pending"])]
}
Common Validation Patterns¶
Null Checks .. code-block:: python
“column_name”: [F.col(“column_name”).isNotNull()]
Range Validation .. code-block:: python
“value”: [F.col(“value”).between(0, 1000)]
Pattern Matching .. code-block:: python
“email”: [F.col(“email”).rlike(r”^[^@]+@[^@]+.[^@]+$”)]
Value Lists .. code-block:: python
“status”: [F.col(“status”).isin([“active”, “inactive”, “pending”])]
Complex Conditions .. code-block:: python
“valid_data”: [F.col(“value”) > 0, F.col(“status”) == “active”]
Validation Thresholds¶
Configure validation thresholds for each layer:
from pipeline_builder.models import ValidationThresholds
thresholds = ValidationThresholds(
bronze=95.0, # 95% of bronze data must pass validation
silver=98.0, # 98% of silver data must pass validation
gold=99.0 # 99% of gold data must pass validation
)
Error Handling¶
SparkForge provides comprehensive error handling:
Pipeline Errors .. code-block:: python
- try:
result = pipeline.run_initial_load(bronze_sources={“events”: df})
- except PipelineError as e:
print(f”Pipeline failed: {e}”) print(f”Error details: {e.context}”)
Validation Errors .. code-block:: python
- try:
result = pipeline.run_initial_load(bronze_sources={“events”: df})
- except ValidationError as e:
print(f”Validation failed: {e}”) print(f”Failed rules: {e.failed_rules}”)
Step Errors .. code-block:: python
- try:
result = pipeline.run_initial_load(bronze_sources={“events”: df})
- except StepError as e:
print(f”Step failed: {e}”) print(f”Step name: {e.context.get(‘step_name’)}”)
Logging and Monitoring¶
SparkForge includes built-in logging and monitoring:
Pipeline Logging .. code-block:: python
from pipeline_builder.logging import PipelineLogger
logger = PipelineLogger(level=”INFO”) builder = PipelineBuilder(spark=spark, schema=”analytics”, logger=logger)
Execution Monitoring .. code-block:: python
result = pipeline.run_initial_load(bronze_sources={“events”: df})
print(f”Status: {result.status}”) print(f”Total steps: {result.total_steps}”) print(f”Successful steps: {result.successful_steps}”) print(f”Failed steps: {result.failed_steps}”) print(f”Duration: {result.duration_seconds} seconds”)
Step-by-Step Debugging .. code-block:: python
# Execute individual steps for debugging bronze_result = pipeline.execute_bronze_step(“events”, {“events”: df}) silver_result = pipeline.execute_silver_step(“clean_events”, {“events”: df})
Advanced Features¶
Multi-Schema Support¶
Work with multiple schemas for different environments:
# Development schema
dev_builder = PipelineBuilder(spark=spark, schema="dev_analytics")
# Production schema
prod_builder = PipelineBuilder(spark=spark, schema="prod_analytics")
Auto-Inference¶
SparkForge can automatically infer dependencies:
# Auto-infer silver step dependencies
builder.add_silver_transform(
name="clean_events",
source_bronze="events", # Automatically inferred
transform=clean_transform,
rules={"status": [F.col("status").isNotNull()]},
table_name="clean_events"
)
Column Filtering¶
Control which columns are preserved after validation:
# Only keep columns with validation rules
builder.with_bronze_rules(
name="events",
rules={"user_id": [F.col("user_id").isNotNull()]},
filter_columns_by_rules=True
)
Incremental Processing¶
Enable incremental processing with timestamp columns:
builder.with_bronze_rules(
name="events",
rules={"user_id": [F.col("user_id").isNotNull()]},
incremental_col="timestamp" # Enable watermarking
)
Performance Optimization¶
Best Practices¶
1. Use Appropriate Data Types .. code-block:: python
# Use appropriate data types for better performance df = df.withColumn(“timestamp”, F.col(“timestamp”).cast(“timestamp”))
2. Optimize Validation Rules .. code-block:: python
# Combine multiple conditions into single rule when possible “valid_user”: [F.col(“user_id”).isNotNull() & F.col(“email”).isNotNull()]
3. Use Incremental Processing .. code-block:: python
# Enable incremental processing for large datasets builder.with_bronze_rules(
name=”events”, rules={“user_id”: [F.col(“user_id”).isNotNull()]}, incremental_col=”timestamp”
)
4. Monitor Performance .. code-block:: python
# Check execution metrics result = pipeline.run_initial_load(bronze_sources={“events”: df}) print(f”Execution time: {result.duration_seconds} seconds”)
Troubleshooting¶
Common Issues¶
1. “No module named ‘pipeline_builder’”
- Solution: Run pip install pipeline_builder
2. “Java gateway process exited” - Solution: Install Java 8+ and set JAVA_HOME
3. “Table not found”
- Solution: Run pipeline.run_initial_load() before accessing tables
4. “Validation failed” - Solution: Check your data against validation rules
5. “Step execution failed” - Solution: Check step dependencies and transform functions
Debugging Tips¶
1. Use Step-by-Step Execution .. code-block:: python
# Execute individual steps bronze_result = pipeline.execute_bronze_step(“events”, {“events”: df}) print(f”Bronze step result: {bronze_result.status}”)
2. Check Data Quality .. code-block:: python
# Inspect your data df.show() df.printSchema() df.describe().show()
3. Validate Rules .. code-block:: python
# Test validation rules valid_df = df.filter(F.col(“user_id”).isNotNull()) print(f”Valid rows: {valid_df.count()}/{df.count()}”)
4. Check Dependencies .. code-block:: python
# Validate pipeline dependencies errors = builder.validate_pipeline() if errors:
print(f”Pipeline validation errors: {errors}”)
Best Practices¶
1. Start Simple - Begin with basic validation rules - Add complexity gradually - Test each step independently
2. Use Meaningful Names - Choose descriptive step names - Use consistent naming conventions - Document your pipeline logic
3. Handle Errors Gracefully - Implement proper error handling - Log errors for debugging - Provide meaningful error messages
4. Monitor Performance - Track execution times - Monitor data quality metrics - Optimize based on performance data
5. Test Thoroughly - Test with sample data - Validate edge cases - Test error conditions
Next Steps¶
Now that you understand the core concepts, explore:
`Examples <examples/index.html>`_: Real-world pipeline examples
`API Reference <api_reference.html>`_: Detailed API documentation
`Troubleshooting <troubleshooting.html>`_: Common issues and solutions
`Migration Guides <migration_guides.html>`_: Upgrading from older versions
Happy data processing! 🚀