Getting Started with PipelineBuilder¶
A quick start guide to get you up and running with PipelineBuilder in minutes.
Installation¶
pip install pipeline-builder
# Or with extras
pip install pipeline-builder[pyspark] # For PySpark support
Engine Configuration¶
Important
Engine Configuration Required: You must configure the engine before using pipeline components. This allows the framework to work with both real PySpark and mock Spark for testing.
from pipeline_builder.engine_config import configure_engine
from pyspark.sql import SparkSession
# Create Spark session with Delta Lake
spark = SparkSession.builder \
.appName("My First Pipeline") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Configure engine (required!)
configure_engine(spark=spark)
Your First Pipeline¶
Let’s build a simple e-commerce analytics pipeline:
Important
Validation System: SparkForge includes a robust validation system that ensures data quality from the start. All pipeline steps must have validation rules, and invalid configurations are rejected with clear error messages.
from pipeline_builder import PipelineBuilder
from pipeline_builder.engine_config import configure_engine
from pipeline_builder.functions import get_default_functions
from pyspark.sql import SparkSession
# 1. Initialize Spark and configure engine
spark = SparkSession.builder \
.appName("My First Pipeline") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Configure engine (required!)
configure_engine(spark=spark)
# Get functions
F = get_default_functions()
# 2. Create sample data
events_data = [
("user1", "click", "product1", "2024-01-01 10:00:00"),
("user2", "purchase", "product2", "2024-01-01 11:00:00"),
("user3", "view", "product1", "2024-01-01 12:00:00"),
]
events_df = spark.createDataFrame(events_data, ["user_id", "action", "product_id", "timestamp"])
# 3. Build pipeline
builder = PipelineBuilder(spark=spark, schema="analytics")
# Bronze: Raw events
builder.with_bronze_rules(
name="events",
rules={
"user_id": [F.col("user_id").isNotNull()],
"action": [F.col("action").isNotNull()],
"timestamp": [F.col("timestamp").isNotNull()]
},
incremental_col="timestamp"
)
# Silver: Clean events
def clean_events(spark, bronze_df, prior_silvers):
return bronze_df.filter(F.col("action").isin(["click", "view", "purchase"]))
builder.add_silver_transform(
name="clean_events",
source_bronze="events",
transform=clean_events,
rules={
"user_id": [F.col("user_id").isNotNull()],
"action": [F.col("action").isNotNull()],
},
table_name="clean_events"
)
# Gold: Daily summary
def daily_summary(spark, silvers):
events_df = silvers["clean_events"]
return events_df.groupBy("action").count()
builder.add_gold_transform(
name="daily_summary",
transform=daily_summary,
rules={"action": [F.col("action").isNotNull()], "count": [F.col("count") > 0]},
table_name="daily_summary",
source_silvers=["clean_events"]
)
# 4. Execute pipeline
pipeline = builder.to_pipeline()
result = pipeline.run_initial_load(bronze_sources={"events": events_df})
print(f"Pipeline completed: {result.status.value}")
print(f"Rows written: {result.metrics.total_rows_written}")
What Just Happened?¶
Engine Configuration: We configured the engine to use PySpark
Bronze Layer: We defined validation rules for raw event data
Silver Layer: We cleaned the data by filtering valid actions
Gold Layer: We created a daily summary by action type
Execution: We ran the pipeline and got results
The pipeline uses a service-oriented architecture internally: - Step Executors: Handle execution logic for each step type - ExecutionValidator: Validates data according to step rules - WriteService: Handles all write operations to Delta Lake - TableService: Manages table operations and schema
Next Steps¶
Check Execution Results¶
result = pipeline.run_initial_load(bronze_sources={"events": events_df})
# Check status
if result.status.value == "completed":
print("✅ Pipeline completed successfully")
print(f"Bronze rows: {result.bronze_results['events']['rows_processed']}")
print(f"Silver rows: {result.silver_results['clean_events']['rows_written']}")
print(f"Gold rows: {result.gold_results['daily_summary']['rows_written']}")
else:
print(f"❌ Pipeline failed: {result.errors}")
Add Incremental Processing¶
# Enable incremental processing (already done above with incremental_col)
# Run incrementally
new_events = spark.createDataFrame([...], schema)
result = pipeline.run_incremental(bronze_sources={"events": new_events})
# Check incremental results
print(f"Incremental rows processed: {result.bronze_results['events']['rows_processed']}")
Add Data Validation¶
# Set quality thresholds
builder = PipelineBuilder(
spark=spark,
schema="analytics",
min_bronze_rate=95.0, # 95% data quality required
min_silver_rate=98.0, # 98% data quality required
min_gold_rate=99.0 # 99% data quality required
)
# Validate pipeline before execution
errors = builder.validate_pipeline()
if errors:
print(f"Validation errors: {errors}")
Common Patterns¶
E-commerce Pipeline¶
# Configure engine first
configure_engine(spark=spark)
F = get_default_functions()
# Bronze: Raw orders
builder.with_bronze_rules(
name="orders",
rules={
"order_id": [F.col("order_id").isNotNull()],
"customer_id": [F.col("customer_id").isNotNull()],
"amount": [F.col("amount") > 0],
"timestamp": [F.col("timestamp").isNotNull()]
},
incremental_col="timestamp"
)
# Silver: Enriched orders
def enrich_orders(spark, bronze_df, prior_silvers):
return (bronze_df
.withColumn("order_date", F.date_trunc("day", "timestamp"))
.withColumn("is_weekend", F.dayofweek("timestamp").isin([1, 7]))
.withColumn("order_category", F.when(F.col("amount") > 100, "high_value").otherwise("standard"))
)
builder.add_silver_transform(
name="enriched_orders",
source_bronze="orders",
transform=enrich_orders,
rules={
"order_date": [F.col("order_date").isNotNull()],
"order_category": [F.col("order_category").isNotNull()]
},
table_name="enriched_orders",
watermark_col="timestamp"
)
# Gold: Daily revenue
def daily_revenue(spark, silvers):
orders_df = silvers["enriched_orders"]
return (orders_df
.groupBy("order_date")
.agg(
F.sum("amount").alias("total_revenue"),
F.count("*").alias("order_count"),
F.countDistinct("customer_id").alias("unique_customers")
)
)
builder.add_gold_transform(
name="daily_revenue",
transform=daily_revenue,
rules={
"order_date": [F.col("order_date").isNotNull()],
"total_revenue": [F.col("total_revenue") > 0]
},
table_name="daily_revenue",
source_silvers=["enriched_orders"]
)
IoT Sensor Data Pipeline¶
# Configure engine first
configure_engine(spark=spark)
F = get_default_functions()
# Bronze: Raw sensor data
builder.with_bronze_rules(
name="sensor_data",
rules={
"sensor_id": [F.col("sensor_id").isNotNull()],
"temperature": [F.col("temperature").between(-50, 150)],
"humidity": [F.col("humidity").between(0, 100)],
"timestamp": [F.col("timestamp").isNotNull()]
},
incremental_col="timestamp"
)
# Silver: Processed sensor data
def process_sensor_data(spark, bronze_df, prior_silvers):
return (bronze_df
.withColumn("is_anomaly", F.col("temperature") > 100)
.withColumn("sensor_zone", F.substring("sensor_id", 1, 2))
.filter(F.col("temperature").isNotNull())
)
builder.add_silver_transform(
name="processed_sensors",
source_bronze="sensor_data",
transform=process_sensor_data,
rules={
"sensor_zone": [F.col("sensor_zone").isNotNull()],
"is_anomaly": [F.col("is_anomaly").isNotNull()]
},
table_name="processed_sensors",
watermark_col="timestamp"
)
# Gold: Zone analytics
def zone_analytics(spark, silvers):
sensors_df = silvers["processed_sensors"]
return (sensors_df
.groupBy("sensor_zone", F.date_trunc("hour", "timestamp").alias("hour"))
.agg(
F.avg("temperature").alias("avg_temperature"),
F.max("temperature").alias("max_temperature"),
F.sum("is_anomaly").alias("anomaly_count")
)
)
builder.add_gold_transform(
name="zone_analytics",
transform=zone_analytics,
rules={
"sensor_zone": [F.col("sensor_zone").isNotNull()],
"avg_temperature": [F.col("avg_temperature").isNotNull()]
},
table_name="zone_analytics",
source_silvers=["processed_sensors"]
)
Troubleshooting¶
Check Pipeline Status¶
result = pipeline.run_initial_load(bronze_sources={"events": events_df})
if result.status.value == "completed":
print("✅ Pipeline completed successfully")
print(f"Rows written: {result.metrics.total_rows_written}")
else:
print(f"❌ Pipeline failed: {result.errors}")
print(f"Failed steps: {result.metrics.failed_steps}")
Check Validation Results¶
# Check validation rates
bronze_rate = result.bronze_results["events"]["validation_rate"]
silver_rate = result.silver_results["clean_events"]["validation_rate"]
gold_rate = result.gold_results["daily_summary"]["validation_rate"]
print(f"Bronze validation: {bronze_rate:.2f}%")
print(f"Silver validation: {silver_rate:.2f}%")
print(f"Gold validation: {gold_rate:.2f}%")
Monitor Performance¶
result = pipeline.run_initial_load(bronze_sources={"events": events_df})
print(f"Execution time: {result.duration_seconds:.2f}s")
print(f"Bronze duration: {result.metrics.bronze_duration:.2f}s")
print(f"Silver duration: {result.metrics.silver_duration:.2f}s")
print(f"Gold duration: {result.metrics.gold_duration:.2f}s")
What’s Next?¶
User Guide - Learn advanced features and patterns
Quick Reference - Quick reference for common tasks
API Reference - Complete API documentation
SparkForge Examples - More working examples
Need Help?¶
Check the Troubleshooting section in the User Guide
Look at the SparkForge Examples directory for working code
Review the API Reference for detailed documentation
Happy Pipeline Building! 🚀
You’re now ready to build production-ready data pipelines with SparkForge!