"""
Base classes and configuration models for the Pipeline Builder.
This module provides the foundational model classes that all pipeline components
inherit from, including base validation, serialization, and configuration models.
Key Components:
- **BaseModel**: Abstract base class for all pipeline models with common
functionality for validation, serialization, and representation
- **ValidationThresholds**: Configuration for validation thresholds across
pipeline phases (Bronze, Silver, Gold)
Dependencies:
- errors: Pipeline validation and error handling
- models.enums: Pipeline phase enumerations
- models.types: Type definitions and protocols
Example:
>>> from pipeline_builder.models.base import BaseModel, ValidationThresholds
>>> from dataclasses import dataclass
>>>
>>> @dataclass
>>> class MyStep(BaseModel):
... name: str
... value: int
...
... def validate(self) -> None:
... if not self.name:
... raise ValueError("Name required")
... if self.value < 0:
... raise ValueError("Value must be non-negative")
>>>
>>> step = MyStep(name="test", value=42)
>>> step.validate()
>>> print(step.to_json())
"""
from __future__ import annotations
import json
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict
from ..errors import PipelineValidationError
from .enums import PipelinePhase
from .types import ModelValue
@dataclass
class BaseModel(ABC):
"""
Base class for all pipeline models with common functionality.
Provides standard validation, serialization, and representation methods
for all pipeline data models. All models in the pipeline system inherit
from this base class to ensure consistent behavior.
Features:
- Automatic validation support
- JSON serialization and deserialization
- Dictionary conversion for easy data exchange
- String representation for debugging
- Type-safe field access
Example:
>>> @dataclass
>>> class MyStep(BaseModel):
... name: str
... rules: Dict[str, List[ColumnRule]]
...
... def validate(self) -> None:
... if not self.name:
... raise ValueError("Name cannot be empty")
... if not self.rules:
... raise ValueError("Rules cannot be empty")
>>>
>>> step = MyStep(name="test", rules={"id": [F.col("id").isNotNull()]})
>>> step.validate()
>>> print(step.to_json())
"""
@abstractmethod
def validate(self) -> None:
"""Validate the model.
This method must be implemented by all subclasses to ensure model
integrity. It should raise appropriate exceptions if validation fails.
Raises:
ValidationError: If the model is invalid. Subclasses should raise
specific error types (e.g., PipelineValidationError).
Example:
>>> @dataclass
>>> class MyModel(BaseModel):
... name: str
...
... def validate(self) -> None:
... if not self.name:
... raise ValueError("Name cannot be empty")
>>>
>>> model = MyModel(name="test")
>>> model.validate() # Passes
"""
pass
def to_dict(self) -> Dict[str, ModelValue]:
"""Convert model to dictionary.
Recursively converts the model and all nested models to dictionaries.
Nested models that have a `to_dict` method will be converted recursively.
Returns:
Dictionary representation of the model with all fields converted
to primitive types or dictionaries.
Example:
>>> step = BronzeStep(name="test", rules={"id": [F.col("id").isNotNull()]})
>>> step_dict = step.to_dict()
>>> print(step_dict["name"]) # "test"
"""
result: Dict[str, ModelValue] = {}
for field_info in self.__dataclass_fields__.values():
value = getattr(self, field_info.name)
if hasattr(value, "to_dict"):
result[field_info.name] = value.to_dict()
else:
result[field_info.name] = value
return result
def to_json(self) -> str:
"""Convert model to JSON string.
Serializes the model to a formatted JSON string with indentation.
Uses the model's `to_dict` method for conversion.
Returns:
JSON string representation of the model, formatted with 2-space
indentation.
Example:
>>> step = BronzeStep(name="test", rules={"id": [F.col("id").isNotNull()]})
>>> json_str = step.to_json()
>>> print(json_str)
{
"name": "test",
"rules": {...}
}
"""
return json.dumps(self.to_dict(), default=str, indent=2)
def __str__(self) -> str:
"""String representation of the model.
Returns:
Human-readable string representation showing the class name and
all field values.
Example:
>>> step = BronzeStep(name="test", rules={"id": [F.col("id").isNotNull()]})
>>> print(str(step))
BronzeStep(name=test, rules={'id': [...]})
"""
return f"{self.__class__.__name__}({', '.join(f'{k}={v}' for k, v in self.to_dict().items())})"
[docs]@dataclass
class ValidationThresholds(BaseModel):
"""Validation thresholds for different pipeline phases.
Defines the minimum validation success rates required for each layer
of the Medallion Architecture. Thresholds are expressed as percentages
(0-100) and are used to determine if pipeline execution meets quality
requirements.
**Validation Rules:**
- All thresholds must be between 0 and 100 (inclusive)
- Thresholds are validated during model validation
Attributes:
bronze: Bronze layer validation threshold (0-100). Defaults to 95.0
for standard configurations. Represents the minimum percentage
of rows that must pass validation in the Bronze layer.
silver: Silver layer validation threshold (0-100). Defaults to 98.0
for standard configurations. Represents the minimum percentage
of rows that must pass validation in the Silver layer.
gold: Gold layer validation threshold (0-100). Defaults to 99.0
for standard configurations. Represents the minimum percentage
of rows that must pass validation in the Gold layer.
Raises:
PipelineValidationError: If any threshold is outside the valid range
(0-100) during validation.
Example:
>>> # Create default thresholds
>>> thresholds = ValidationThresholds.create_default()
>>> print(f"Bronze: {thresholds.bronze}%") # Bronze: 95.0%
>>>
>>> # Create custom thresholds
>>> thresholds = ValidationThresholds(
... bronze=90.0,
... silver=95.0,
... gold=99.0
... )
>>> thresholds.validate()
>>>
>>> # Get threshold for specific phase
>>> from pipeline_builder.models.enums import PipelinePhase
>>> bronze_threshold = thresholds.get_threshold(PipelinePhase.BRONZE)
"""
bronze: float
silver: float
gold: float
[docs] def validate(self) -> None:
"""Validate threshold values.
Ensures all thresholds are within the valid range (0-100).
Raises an error if any threshold is invalid.
Raises:
PipelineValidationError: If any threshold is outside the valid
range (0-100).
Example:
>>> thresholds = ValidationThresholds(bronze=95.0, silver=98.0, gold=99.0)
>>> thresholds.validate() # Passes
>>>
>>> invalid = ValidationThresholds(bronze=150.0, silver=98.0, gold=99.0)
>>> invalid.validate() # Raises PipelineValidationError
"""
for phase, threshold in [
("bronze", self.bronze),
("silver", self.silver),
("gold", self.gold),
]:
if not 0 <= threshold <= 100:
raise PipelineValidationError(
f"{phase} threshold must be between 0 and 100, got {threshold}"
)
[docs] def get_threshold(self, phase: PipelinePhase) -> float:
"""Get threshold for a specific phase.
Args:
phase: The pipeline phase to get the threshold for.
Returns:
The validation threshold for the specified phase (0-100).
Example:
>>> thresholds = ValidationThresholds(bronze=95.0, silver=98.0, gold=99.0)
>>> from pipeline_builder.models.enums import PipelinePhase
>>> bronze_threshold = thresholds.get_threshold(PipelinePhase.BRONZE)
>>> print(bronze_threshold) # 95.0
"""
phase_map = {
PipelinePhase.BRONZE: self.bronze,
PipelinePhase.SILVER: self.silver,
PipelinePhase.GOLD: self.gold,
}
return phase_map[phase]
[docs] @classmethod
def create_default(cls) -> ValidationThresholds:
"""Create default validation thresholds.
Returns a standard configuration suitable for most production use cases:
- Bronze: 95.0% (allows some data quality issues in raw data)
- Silver: 98.0% (higher quality after cleaning)
- Gold: 99.0% (very high quality for analytics)
Returns:
ValidationThresholds instance with default values.
Example:
>>> thresholds = ValidationThresholds.create_default()
>>> print(f"Bronze: {thresholds.bronze}%") # Bronze: 95.0%
"""
return cls(bronze=95.0, silver=98.0, gold=99.0)
[docs] @classmethod
def create_strict(cls) -> ValidationThresholds:
"""Create strict validation thresholds.
Returns a high-quality configuration for critical data pipelines:
- Bronze: 99.0% (very high quality raw data)
- Silver: 99.5% (extremely high quality after cleaning)
- Gold: 99.9% (near-perfect quality for analytics)
Use this configuration when data quality is critical and you can
afford to reject more rows.
Returns:
ValidationThresholds instance with strict values.
Example:
>>> thresholds = ValidationThresholds.create_strict()
>>> print(f"Gold: {thresholds.gold}%") # Gold: 99.9%
"""
return cls(bronze=99.0, silver=99.5, gold=99.9)
[docs] @classmethod
def create_loose(cls) -> ValidationThresholds:
"""Create loose validation thresholds.
Returns a permissive configuration for exploratory or development use:
- Bronze: 80.0% (allows significant data quality issues)
- Silver: 85.0% (moderate quality after cleaning)
- Gold: 90.0% (acceptable quality for analytics)
Use this configuration for development, testing, or when working
with noisy data sources.
Returns:
ValidationThresholds instance with loose values.
Example:
>>> thresholds = ValidationThresholds.create_loose()
>>> print(f"Bronze: {thresholds.bronze}%") # Bronze: 80.0%
"""
return cls(bronze=80.0, silver=85.0, gold=90.0)