E-commerce Analytics Use Case¶
This guide demonstrates how to build a comprehensive e-commerce analytics pipeline using SparkForge’s Medallion Architecture.
Overview¶
The e-commerce pipeline processes customer data, orders, and product information to create business intelligence dashboards and reports.
Data Sources¶
Customer Data: User profiles, demographics, preferences
Order Data: Transactions, order items, shipping information
Product Data: Product catalog, categories, pricing
Web Analytics: Page views, clicks, session data
Pipeline Architecture¶
Bronze Layer¶
Raw data ingestion with basic validation:
from pipeline_builder import PipelineBuilder
from pipeline_builder.engine_config import configure_engine
from pipeline_builder.functions import get_default_functions
# Configure engine (required!)
configure_engine(spark=spark)
F = get_default_functions()
builder = PipelineBuilder(spark=spark, schema="ecommerce")
# Customer data validation
builder.with_bronze_rules(
name="customers",
rules={
"customer_id": [F.col("customer_id").isNotNull()],
"email": [F.col("email").contains("@")],
"created_at": [F.col("created_at").isNotNull()]
},
incremental_col="created_at"
)
# Order data validation
builder.with_bronze_rules(
name="orders",
rules={
"order_id": [F.col("order_id").isNotNull()],
"customer_id": [F.col("customer_id").isNotNull()],
"total_amount": [F.col("total_amount") > 0],
"order_date": [F.col("order_date").isNotNull()]
},
incremental_col="order_date"
)
Silver Layer¶
Data cleaning and enrichment:
# Clean customer data
def clean_customers(spark, bronze_df, prior_silvers):
F = get_default_functions()
return (bronze_df
.filter(F.col("customer_id").isNotNull())
.withColumn("age_group",
F.when(F.col("age") < 25, "18-24")
.when(F.col("age") < 35, "25-34")
.when(F.col("age") < 45, "35-44")
.otherwise("45+"))
.withColumn("is_premium", F.col("membership_type") == "premium")
)
builder.add_silver_transform(
name="clean_customers",
source_bronze="customers",
transform=clean_customers,
rules={"customer_id": [F.col("customer_id").isNotNull()]},
table_name="clean_customers",
watermark_col="created_at"
)
# Enrich orders with customer data
def enrich_orders(spark, bronze_df, prior_silvers):
F = get_default_functions()
customers = prior_silvers["clean_customers"]
return (bronze_df
.join(customers, "customer_id", "left")
.withColumn("order_value_tier",
F.when(F.col("total_amount") > 1000, "high")
.when(F.col("total_amount") > 500, "medium")
.otherwise("low"))
)
builder.add_silver_transform(
name="enriched_orders",
source_bronze="orders",
transform=enrich_orders,
rules={"order_id": [F.col("order_id").isNotNull()]},
table_name="enriched_orders",
source_silvers=["clean_customers"]
)
Gold Layer¶
Business analytics and KPIs:
# Customer analytics
def customer_analytics(spark, silvers):
F = get_default_functions()
orders = silvers["enriched_orders"]
return (orders
.groupBy("customer_id", "age_group", "is_premium")
.agg(
F.count("*").alias("total_orders"),
F.sum("total_amount").alias("lifetime_value"),
F.max("order_date").alias("last_order_date"),
F.avg("total_amount").alias("avg_order_value")
)
.withColumn("customer_tier",
F.when(F.col("lifetime_value") > 5000, "VIP")
.when(F.col("lifetime_value") > 1000, "Premium")
.otherwise("Standard"))
)
builder.add_gold_transform(
name="customer_analytics",
transform=customer_analytics,
rules={"customer_id": [F.col("customer_id").isNotNull()]},
table_name="customer_analytics",
source_silvers=["enriched_orders"]
)
# Sales analytics
def sales_analytics(spark, silvers):
F = get_default_functions()
orders = silvers["enriched_orders"]
return (orders
.groupBy(F.date_trunc("month", "order_date").alias("month"))
.agg(
F.count("*").alias("total_orders"),
F.sum("total_amount").alias("total_revenue"),
F.countDistinct("customer_id").alias("unique_customers"),
F.avg("total_amount").alias("avg_order_value")
)
)
builder.add_gold_transform(
name="sales_analytics",
transform=sales_analytics,
rules={"month": [F.col("month").isNotNull()]},
table_name="sales_analytics",
source_silvers=["enriched_orders"]
)
Execution¶
# Build and execute pipeline
pipeline = builder.to_pipeline()
# Initial load
result = pipeline.run_initial_load(bronze_sources={
"customers": customers_df,
"orders": orders_df
})
# Incremental updates
result = pipeline.run_incremental(bronze_sources={
"customers": new_customers_df,
"orders": new_orders_df
})
Key Metrics¶
The pipeline produces these business metrics:
Customer Lifetime Value: Total spending per customer
Order Frequency: Average orders per customer
Revenue Trends: Monthly revenue and growth
Customer Segmentation: Tier-based customer classification
Product Performance: Best-selling items and categories
For the complete e-commerce guide with more examples, see: USECASE_ECOMMERCE.md