SparkForge Examples¶
This section contains practical examples demonstrating SparkForge’s capabilities.
Examples Overview¶
Hello World (hello_world.py) ⭐ START HERE¶
Perfect for absolute beginners!
The simplest possible SparkForge pipeline - just 3 lines of pipeline code! This demonstrates the Bronze → Silver → Gold flow with minimal complexity.
Features: - Simplest possible pipeline - Bronze → Silver → Gold flow - Basic data validation - Step-by-step execution
Run: .. code-block:: bash
python examples/hello_world.py
E-commerce Analytics Pipeline (ecommerce_analytics.py)¶
A complete e-commerce analytics pipeline that processes order data through Bronze → Silver → Gold layers.
Features: - Order data ingestion and validation - Customer profile creation - Daily sales analytics - Customer segmentation and analytics - Revenue analysis by product category
Key Concepts: - Bronze layer data validation - Silver layer data enrichment - Gold layer business analytics - Customer profiling and segmentation - Revenue analysis and reporting
Run: .. code-block:: bash
python examples/ecommerce_analytics.py
IoT Sensor Data Pipeline (iot_sensor_pipeline.py)¶
An IoT sensor data processing pipeline with anomaly detection and real-time analytics.
Features: - Sensor data ingestion (temperature, humidity, pressure, vibration) - Anomaly detection and classification - Sensor health monitoring - Zone-based analytics - Data quality assessment
Key Concepts: - Time-series data processing - Anomaly detection algorithms - Sensor health monitoring - Zone-based aggregations - Data quality metrics
Run: .. code-block:: bash
python examples/iot_sensor_pipeline.py
Step-by-Step Debugging (step_by_step_debugging.py)¶
Demonstrates how to debug individual pipeline steps using SparkForge’s debugging capabilities.
Features: - Individual step execution - Step validation debugging - Data quality inspection - Execution state monitoring - Performance profiling
Key Concepts: - Step-by-step execution - Validation debugging - Data quality inspection - Performance monitoring - Error handling and recovery
Run: .. code-block:: bash
python examples/step_by_step_debugging.py
Running the Examples¶
Prerequisites¶
Install SparkForge: .. code-block:: bash
pip install pipeline_builder
Install Dependencies: .. code-block:: bash
pip install pyspark delta-spark pandas numpy
Java 8+ (required for PySpark)
Running Examples¶
Navigate to the project directory: .. code-block:: bash
cd pipeline_builder
Run any example: .. code-block:: bash
python examples/ecommerce_analytics.py python examples/iot_sensor_pipeline.py python examples/step_by_step_debugging.py
Example Output¶
Each example will: - Create sample data - Build a complete pipeline - Execute the pipeline - Display results and analytics - Show performance metrics - Clean up resources
Learning Path¶
Beginner¶
Start with
hello_world.pyfor the simplest possible exampleTry
step_by_step_debugging.pyto understand basic conceptsRun
ecommerce_analytics.pyto see a complete business pipeline
Intermediate¶
Modify the examples to use your own data
Experiment with different validation rules
Try different execution modes (incremental, full refresh)
Add custom transformations
Advanced¶
Implement custom validation functions
Add complex Silver-to-Silver dependencies
Optimize for performance with parallel execution
Integrate with your existing data infrastructure
Customizing Examples¶
Using Your Own Data¶
Replace the sample data creation with your own data:
# Instead of create_sample_data(spark)
your_df = spark.read.parquet("path/to/your/data.parquet")
# Use in pipeline
result = pipeline.initial_load(bronze_sources={"your_table": your_df})
Adding Custom Transformations¶
def your_custom_transform(spark, bronze_df, prior_silvers):
# Your custom logic here
return bronze_df.withColumn("new_column", F.lit("value"))
builder.add_silver_transform(
name="your_step",
source_bronze="source_table",
transform=your_custom_transform,
rules={"new_column": [F.col("new_column").isNotNull()]},
table_name="your_table"
)
Custom Validation Rules¶
# Add complex validation rules
rules = {
"email": [
F.col("email").isNotNull(),
F.col("email").rlike("^[^@]+@[^@]+\\.[^@]+$")
],
"age": [
F.col("age").isNotNull(),
F.col("age").between(0, 120)
],
"amount": [
F.col("amount").isNotNull(),
F.col("amount") > 0,
F.col("amount") < 1000000
]
}
Troubleshooting¶
Common Issues¶
Java not found: - Install Java 8+ and set JAVA_HOME environment variable
Memory issues: - Increase Spark driver memory:
--driver-memory 4gDelta Lake errors: - Ensure Delta Lake is properly installed:
pip install delta-sparkPermission errors: - Check write permissions for the warehouse directory
Getting Help¶
Check the User Guide for detailed documentation
Review the API Reference for complete API documentation
Look at the Quick Reference for common patterns
Contributing Examples¶
We welcome contributions! To add a new example:
Create a new Python file in the examples directory
Follow the naming convention:
descriptive_name.pyInclude comprehensive docstrings and comments
Add a description to this README
Test the example thoroughly
Submit a pull request
Example Template¶
#!/usr/bin/env python3
"""
Your Example Name
Brief description of what this example demonstrates.
"""
from pipeline_builder import PipelineBuilder
from pyspark.sql import SparkSession, functions as F
def main():
"""Main function to run the example."""
print("Your Example")
print("=" * 50)
# Initialize Spark
spark = SparkSession.builder \
.appName("Your Example") \
.master("local[*]") \
.getOrCreate()
try:
# Your example code here
pass
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
finally:
# Cleanup
spark.stop()
if __name__ == "__main__":
main()
Happy Learning! 🚀
Start with the Hello World notebook and work your way up to advanced topics. Each example builds on the previous ones, so follow the learning path for the best experience.