IoT Data Processing Use Case¶
This guide demonstrates building an IoT sensor data pipeline using SparkForge for real-time analytics and monitoring.
Overview¶
Process high-volume sensor data from IoT devices to create real-time dashboards and predictive analytics.
Key Features¶
Stream Processing: Handle high-frequency sensor data
Real-time Analytics: Process data as it arrives
Anomaly Detection: Identify unusual patterns
Predictive Maintenance: Forecast equipment failures
Pipeline Components¶
Bronze Layer: Raw sensor data ingestion Silver Layer: Data cleaning and feature engineering Gold Layer: Analytics and alerts
Example Pipeline¶
from pyspark.sql import SparkSession
from pipeline_builder import PipelineBuilder
from pipeline_builder.engine_config import configure_engine
from pipeline_builder.functions import get_default_functions
# Configure engine (required!)
spark = SparkSession.builder.appName("IoTPipeline").getOrCreate()
configure_engine(spark=spark)
F = get_default_functions()
# Build pipeline
builder = PipelineBuilder(spark=spark, schema="iot_analytics")
# Bronze: Sensor data validation
builder.with_bronze_rules(
name="sensor_data",
rules={
"sensor_id": [F.col("sensor_id").isNotNull()],
"temperature": [F.col("temperature").between(-50, 150)],
"humidity": [F.col("humidity").between(0, 100)],
"timestamp": [F.col("timestamp").isNotNull()]
},
incremental_col="timestamp"
)
# Silver: Feature engineering
def enrich_sensor_data(spark, bronze_df, prior_silvers):
F = get_default_functions()
return bronze_df.withColumn(
"temperature_anomaly",
F.when(
(F.col("temperature") < 0) | (F.col("temperature") > 100),
True
).otherwise(False)
)
builder.add_silver_transform(
name="enriched_sensor_data",
source_bronze="sensor_data",
transform=enrich_sensor_data,
rules={"sensor_id": [F.col("sensor_id").isNotNull()]},
table_name="enriched_sensor_data"
)
# Gold: Aggregated metrics
def sensor_metrics(spark, silvers):
F = get_default_functions()
return silvers["enriched_sensor_data"].groupBy("sensor_id").agg(
F.avg("temperature").alias("avg_temperature"),
F.max("temperature").alias("max_temperature"),
F.count("*").alias("reading_count")
)
builder.add_gold_transform(
name="sensor_metrics",
transform=sensor_metrics,
rules={"sensor_id": [F.col("sensor_id").isNotNull()]},
table_name="sensor_metrics",
source_silvers=["enriched_sensor_data"]
)
# Execute
pipeline = builder.to_pipeline()
result = pipeline.run_initial_load(bronze_sources={"sensor_data": sensor_df})
Key Metrics¶
The pipeline produces these IoT metrics:
Sensor Health: Average readings, anomaly detection, failure rates
Environmental Metrics: Temperature trends, humidity patterns, pressure changes
Device Performance: Uptime, data quality, transmission success rates
Predictive Alerts: Maintenance schedules, failure predictions, threshold breaches
For more IoT examples, see the Examples directory.