Business Intelligence Use Case

This guide demonstrates building a comprehensive business intelligence pipeline using SparkForge for analytics and reporting.

Overview

Build data pipelines that transform raw business data into actionable insights for decision-making.

Key Features

  • Data Integration: Combine multiple data sources

  • Real-time Analytics: Process streaming data

  • Dashboard-Ready: Create datasets for BI tools

  • Scalable Processing: Handle large volumes of data

Pipeline Components

Bronze Layer: Raw data ingestion from various sources Silver Layer: Data cleaning and standardization Gold Layer: Business metrics and KPIs

Example Pipeline

from pyspark.sql import SparkSession
from pipeline_builder import PipelineBuilder
from pipeline_builder.engine_config import configure_engine
from pipeline_builder.functions import get_default_functions

# Configure engine (required!)
spark = SparkSession.builder.appName("BIPipeline").getOrCreate()
configure_engine(spark=spark)
F = get_default_functions()

# Build pipeline
builder = PipelineBuilder(spark=spark, schema="bi_analytics")

# Bronze: Multiple data sources
builder.with_bronze_rules(
    name="sales",
    rules={"sale_id": [F.col("sale_id").isNotNull()]},
    incremental_col="sale_date"
)

builder.with_bronze_rules(
    name="customers",
    rules={"customer_id": [F.col("customer_id").isNotNull()]}
)

# Silver: Clean and standardize
def clean_sales(spark, bronze_df, prior_silvers):
    F = get_default_functions()
    return bronze_df.filter(F.col("amount") > 0)

builder.add_silver_transform(
    name="clean_sales",
    source_bronze="sales",
    transform=clean_sales,
    rules={"sale_id": [F.col("sale_id").isNotNull()]},
    table_name="clean_sales"
)

# Gold: Business metrics
def sales_metrics(spark, silvers):
    F = get_default_functions()
    return silvers["clean_sales"].groupBy("region").agg(
        F.count("*").alias("total_sales"),
        F.sum("amount").alias("total_revenue")
    )

builder.add_gold_transform(
    name="sales_metrics",
    transform=sales_metrics,
    rules={"region": [F.col("region").isNotNull()]},
    table_name="sales_metrics",
    source_silvers=["clean_sales"]
)

# Execute
pipeline = builder.to_pipeline()
result = pipeline.run_initial_load(
    bronze_sources={"sales": sales_df, "customers": customers_df}
)

Key Metrics

The pipeline produces these business metrics:

  • Revenue Metrics: Total revenue, average order value, growth rates

  • Customer Metrics: Customer count, retention rates, lifetime value

  • Product Metrics: Best sellers, category performance, inventory trends

  • Operational Metrics: Processing times, error rates, data quality scores

For more BI examples, see the Examples directory.