Source code for pipeline_builder.models.execution

"""
Execution models for the Pipeline Builder.

This module provides models for tracking pipeline execution state and results,
including execution contexts, step results, stage statistics, and overall
execution results.

Key Components:
    - **ExecutionContext**: Tracks execution state, timing, and metadata
    - **StageStats**: Statistics for individual pipeline stages
    - **StepResult**: Results from individual step execution
    - **ExecutionResult**: Aggregated results from entire pipeline execution

Dependencies:
    - models.base: BaseModel
    - models.enums: ExecutionMode, PipelinePhase
    - models.exceptions: PipelineConfigurationError
    - models.pipeline: PipelineMetrics

Example:
    >>> from pipeline_builder.models.execution import (
    ...     ExecutionContext,
    ...     StepResult,
    ...     ExecutionResult
    ... )
    >>> from pipeline_builder.models.enums import ExecutionMode, PipelinePhase
    >>> from datetime import datetime, timezone
    >>>
    >>> # Create execution context
    >>> context = ExecutionContext(
    ...     mode=ExecutionMode.INITIAL,
    ...     start_time=datetime.now(timezone.utc)
    ... )
    >>>
    >>> # Create step result
    >>> result = StepResult.create_success(
    ...     step_name="bronze_step",
    ...     phase=PipelinePhase.BRONZE,
    ...     start_time=datetime.now(timezone.utc),
    ...     end_time=datetime.now(timezone.utc),
    ...     rows_processed=1000,
    ...     rows_written=950,
    ...     validation_rate=95.0
    ... )
"""

from __future__ import annotations

import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, Optional

from .base import BaseModel
from .enums import ExecutionMode, PipelinePhase
from .exceptions import PipelineConfigurationError
from .pipeline import PipelineMetrics


[docs]@dataclass class ExecutionContext(BaseModel): """Context for pipeline execution. Tracks the state and metadata of a pipeline execution run, including timing information, execution mode, and identifiers. Provides both primary fields and aliases for compatibility with different parts of the system. **Validation Rules:** - `run_id`: Must be a non-empty string - `duration_secs`: Must be non-negative if set Attributes: mode: Execution mode (INITIAL, INCREMENTAL, FULL_REFRESH, VALIDATION_ONLY). Determines how the pipeline is executed. start_time: When execution started. Required field. end_time: When execution ended. None if execution is still running. duration_secs: Total execution duration in seconds. None if execution is still running. Automatically calculated when `finish()` is called. run_id: Unique run identifier (UUID string). Automatically generated if not provided. execution_id: Unique identifier for this execution (UUID string). Used for tracking and logging. Automatically generated if not provided. pipeline_id: Identifier for the pipeline being executed. Defaults to "unknown" if not provided. schema: Target schema for data storage. Defaults to "default" if not provided. started_at: When execution started (alias for start_time). Set automatically from start_time if not provided. ended_at: When execution ended (alias for end_time). Set automatically from end_time if not provided. run_mode: Mode of execution as string (alias for mode.value). Automatically set from mode if not provided. config: Pipeline configuration as dictionary. Defaults to empty dict. Example: >>> from pipeline_builder.models.execution import ExecutionContext >>> from pipeline_builder.models.enums import ExecutionMode >>> from datetime import datetime, timezone >>> >>> # Create context >>> context = ExecutionContext( ... mode=ExecutionMode.INITIAL, ... start_time=datetime.now(timezone.utc) ... ) >>> print(context.run_id) # Unique UUID >>> >>> # Finish execution >>> context.finish() >>> print(context.duration_secs) # Execution duration """ mode: ExecutionMode start_time: datetime end_time: Optional[datetime] = None duration_secs: Optional[float] = None run_id: str = field(default_factory=lambda: str(uuid.uuid4())) # Additional fields for writer compatibility execution_id: str = field(default_factory=lambda: str(uuid.uuid4())) pipeline_id: str = "unknown" schema: str = "default" started_at: Optional[datetime] = None ended_at: Optional[datetime] = None run_mode: str = "initial" config: Dict[str, Any] = field(default_factory=dict)
[docs] def __post_init__(self) -> None: """Initialize aliases and defaults. Sets up alias fields (started_at, ended_at, run_mode) from primary fields if they are not explicitly provided. This ensures backward compatibility with code that uses the alias fields. """ if self.started_at is None: self.started_at = self.start_time if self.ended_at is None: self.ended_at = self.end_time if self.run_mode == "initial": # Map mode to run_mode string if hasattr(self.mode, "value"): self.run_mode = self.mode.value elif hasattr(self.mode, "name"): self.run_mode = self.mode.name.lower()
[docs] def validate(self) -> None: """Validate the execution context. Ensures the context has valid values for required fields and that numeric fields are within valid ranges. Raises: ValueError: If run_id is empty or duration_secs is negative. Example: >>> context = ExecutionContext( ... mode=ExecutionMode.INITIAL, ... start_time=datetime.now(timezone.utc) ... ) >>> context.validate() # Passes """ if not self.run_id: raise ValueError("Run ID cannot be empty") if self.duration_secs is not None and self.duration_secs < 0: raise ValueError("Duration cannot be negative")
[docs] def finish(self) -> None: """Mark execution as finished and calculate duration. Sets the end_time to the current timestamp and calculates the execution duration. Also updates the ended_at alias field. Example: >>> context = ExecutionContext( ... mode=ExecutionMode.INITIAL, ... start_time=datetime.now(timezone.utc) ... ) >>> # ... execution happens ... >>> context.finish() >>> print(context.duration_secs) # Execution duration in seconds """ self.end_time = datetime.now(timezone.utc) if self.start_time: self.duration_secs = (self.end_time - self.start_time).total_seconds()
@property def is_finished(self) -> bool: """Check if execution is finished. Returns: True if end_time is set, False otherwise. Example: >>> context = ExecutionContext(...) >>> print(context.is_finished) # False >>> context.finish() >>> print(context.is_finished) # True """ return self.end_time is not None @property def is_running(self) -> bool: """Check if execution is currently running. Returns: True if execution is still running (end_time is None), False otherwise. Example: >>> context = ExecutionContext(...) >>> print(context.is_running) # True >>> context.finish() >>> print(context.is_running) # False """ return not self.is_finished
@dataclass class StageStats(BaseModel): """Statistics for a pipeline stage. Tracks detailed statistics for a single pipeline stage (Bronze, Silver, or Gold), including row counts, validation rates, and timing information. **Validation Rules:** - `total_rows` must equal `valid_rows + invalid_rows` - `validation_rate` must be between 0 and 100 - `duration_secs` must be non-negative Attributes: stage: Stage name (bronze, silver, or gold). Identifies which Medallion Architecture layer this stage belongs to. step: Step name within the stage. Identifies the specific step these statistics are for. total_rows: Total number of rows processed in this stage. valid_rows: Number of rows that passed validation. invalid_rows: Number of rows that failed validation. validation_rate: Validation success rate (0-100). Percentage of rows that passed validation. duration_secs: Processing duration in seconds for this stage. start_time: When processing started. Optional timestamp. end_time: When processing ended. Optional timestamp. Example: >>> from pipeline_builder.models.execution import StageStats >>> from datetime import datetime, timezone >>> >>> stats = StageStats( ... stage="bronze", ... step="user_events", ... total_rows=1000, ... valid_rows=950, ... invalid_rows=50, ... validation_rate=95.0, ... duration_secs=10.5 ... ) >>> stats.validate() >>> print(f"Error rate: {stats.error_rate}%") # 5.0% """ stage: str step: str total_rows: int valid_rows: int invalid_rows: int validation_rate: float duration_secs: float start_time: Optional[datetime] = None end_time: Optional[datetime] = None def validate(self) -> None: """Validate stage statistics. Ensures row counts are consistent and all values are within valid ranges. Raises an error if validation fails. Raises: PipelineConfigurationError: If row counts are inconsistent or values are outside valid ranges. Example: >>> stats = StageStats( ... stage="bronze", ... step="test", ... total_rows=1000, ... valid_rows=950, ... invalid_rows=50, ... validation_rate=95.0, ... duration_secs=10.5 ... ) >>> stats.validate() # Passes """ if self.total_rows != self.valid_rows + self.invalid_rows: raise PipelineConfigurationError( f"Total rows ({self.total_rows}) must equal valid ({self.valid_rows}) + invalid ({self.invalid_rows})" ) if not 0 <= self.validation_rate <= 100: raise PipelineConfigurationError( f"Validation rate must be between 0 and 100, got {self.validation_rate}" ) if self.duration_secs < 0: raise PipelineConfigurationError( f"Duration must be non-negative, got {self.duration_secs}" ) @property def is_valid(self) -> bool: """Check if the stage passed validation. Returns: True if validation_rate >= 95.0%, False otherwise. Example: >>> stats = StageStats(..., validation_rate=96.0, ...) >>> print(stats.is_valid) # True """ return self.validation_rate >= 95.0 # Default threshold @property def error_rate(self) -> float: """Calculate error rate. Returns: Percentage of rows that failed validation (0-100). Returns 0.0 if total_rows is 0. Example: >>> stats = StageStats( ... total_rows=1000, ... invalid_rows=50, ... ... ... ) >>> print(f"Error rate: {stats.error_rate}%") # 5.0% """ if self.total_rows == 0: return 0.0 return (self.invalid_rows / self.total_rows) * 100 @property def throughput_rows_per_sec(self) -> float: """Calculate throughput in rows per second. Returns: Processing throughput in rows per second. Returns 0.0 if duration_secs is 0. Example: >>> stats = StageStats( ... total_rows=10000, ... duration_secs=10.0, ... ... ... ) >>> print(f"Throughput: {stats.throughput_rows_per_sec} rows/sec") # 1000.0 """ if self.duration_secs == 0: return 0.0 return self.total_rows / self.duration_secs
[docs]@dataclass class StepResult(BaseModel): """Result of a pipeline step execution. Tracks the outcome and metrics of a single pipeline step execution, including success status, timing, row counts, and validation rates. **Validation Rules:** - `step_name`: Must be a non-empty string - `duration_secs`: Must be non-negative - `rows_processed`: Must be non-negative - `rows_written`: Must be non-negative - `validation_rate`: Must be between 0 and 100 Attributes: step_name: Name of the step that was executed. Identifies which step these results are for. phase: Pipeline phase (BRONZE, SILVER, or GOLD) that this step belongs to. success: Whether the step execution succeeded. True if the step completed without errors, False otherwise. start_time: When step execution started. Required timestamp. end_time: When step execution ended. Required timestamp. duration_secs: Execution duration in seconds. Calculated from start_time and end_time. rows_processed: Number of rows processed during step execution. Includes both valid and invalid rows. rows_written: Number of rows written to the target table. May be less than rows_processed if validation filtered out some rows. validation_rate: Validation success rate (0-100). Percentage of processed rows that passed validation. error_message: Error message if the step failed. None if the step succeeded. step_type: Type of step (bronze, silver, gold) as string. Optional for compatibility. table_fqn: Fully qualified table name if step writes to a table (e.g., "schema.table_name"). None if step doesn't write to a table. write_mode: Write mode used (overwrite, append). None if step doesn't write to a table. input_rows: Number of input rows processed. Optional field for tracking input data size. Example: >>> from pipeline_builder.models.execution import StepResult >>> from pipeline_builder.models.enums import PipelinePhase >>> from datetime import datetime, timezone >>> >>> # Create success result >>> result = StepResult.create_success( ... step_name="bronze_step", ... phase=PipelinePhase.BRONZE, ... start_time=datetime.now(timezone.utc), ... end_time=datetime.now(timezone.utc), ... rows_processed=1000, ... rows_written=950, ... validation_rate=95.0 ... ) >>> print(f"Success: {result.success}") # True >>> print(f"Throughput: {result.throughput_rows_per_sec} rows/sec") """ step_name: str phase: PipelinePhase success: bool start_time: datetime end_time: datetime duration_secs: float rows_processed: int rows_written: int validation_rate: float error_message: Optional[str] = None step_type: Optional[str] = None table_fqn: Optional[str] = None write_mode: Optional[str] = None input_rows: Optional[int] = None
[docs] def validate(self) -> None: """Validate the step result. Ensures all fields are within valid ranges and required fields are present. Raises an error if validation fails. Raises: ValueError: If any field is invalid or out of range. Example: >>> result = StepResult.create_success(...) >>> result.validate() # Passes """ if not self.step_name: raise ValueError("Step name cannot be empty") if self.duration_secs < 0: raise ValueError("Duration cannot be negative") if self.rows_processed < 0: raise ValueError("Rows processed cannot be negative") if self.rows_written < 0: raise ValueError("Rows written cannot be negative") if not 0 <= self.validation_rate <= 100: raise ValueError("Validation rate must be between 0 and 100")
@property def is_valid(self) -> bool: """Check if the step result is valid. Returns: True if the step succeeded and validation_rate >= 95.0%, False otherwise. Example: >>> result = StepResult(..., success=True, validation_rate=96.0) >>> print(result.is_valid) # True """ return self.success and self.validation_rate >= 95.0 @property def is_high_quality(self) -> bool: """Check if the step result is high quality. Returns: True if the step succeeded and validation_rate >= 98.0%, False otherwise. Example: >>> result = StepResult(..., success=True, validation_rate=99.0) >>> print(result.is_high_quality) # True """ return self.success and self.validation_rate >= 98.0 @property def throughput_rows_per_sec(self) -> float: """Calculate throughput in rows per second. Returns: Processing throughput in rows per second. Returns 0.0 if duration_secs is 0. Example: >>> result = StepResult( ... rows_processed=10000, ... duration_secs=10.0, ... ... ... ) >>> print(f"Throughput: {result.throughput_rows_per_sec} rows/sec") # 1000.0 """ if self.duration_secs == 0: return 0.0 return self.rows_processed / self.duration_secs
[docs] @classmethod def create_success( cls, step_name: str, phase: PipelinePhase, start_time: datetime, end_time: datetime, rows_processed: int, rows_written: int, validation_rate: float, step_type: Optional[str] = None, table_fqn: Optional[str] = None, write_mode: Optional[str] = None, input_rows: Optional[int] = None, ) -> StepResult: """Create a successful step result. Factory method for creating a StepResult representing a successful step execution. Automatically calculates duration and sets success=True. Args: step_name: Name of the step that was executed. phase: Pipeline phase (BRONZE, SILVER, or GOLD). start_time: When step execution started. end_time: When step execution ended. rows_processed: Number of rows processed. rows_written: Number of rows written to table. validation_rate: Validation success rate (0-100). step_type: Optional step type string (bronze, silver, gold). table_fqn: Optional fully qualified table name. write_mode: Optional write mode (overwrite, append). input_rows: Optional number of input rows. Returns: StepResult instance with success=True and calculated duration. Example: >>> from datetime import datetime, timezone >>> result = StepResult.create_success( ... step_name="bronze_step", ... phase=PipelinePhase.BRONZE, ... start_time=datetime.now(timezone.utc), ... end_time=datetime.now(timezone.utc), ... rows_processed=1000, ... rows_written=950, ... validation_rate=95.0 ... ) """ duration_secs = (end_time - start_time).total_seconds() return cls( step_name=step_name, phase=phase, success=True, start_time=start_time, end_time=end_time, duration_secs=duration_secs, rows_processed=rows_processed, rows_written=rows_written, validation_rate=validation_rate, error_message=None, step_type=step_type, table_fqn=table_fqn, write_mode=write_mode, input_rows=input_rows, )
[docs] @classmethod def create_failure( cls, step_name: str, phase: PipelinePhase, start_time: datetime, end_time: datetime, error_message: str, step_type: Optional[str] = None, table_fqn: Optional[str] = None, write_mode: Optional[str] = None, input_rows: Optional[int] = None, ) -> StepResult: """Create a failed step result. Factory method for creating a StepResult representing a failed step execution. Automatically calculates duration and sets success=False, with zero rows processed/written and zero validation rate. Args: step_name: Name of the step that was executed. phase: Pipeline phase (BRONZE, SILVER, or GOLD). start_time: When step execution started. end_time: When step execution ended. error_message: Error message describing the failure. step_type: Optional step type string (bronze, silver, gold). table_fqn: Optional fully qualified table name. write_mode: Optional write mode (overwrite, append). input_rows: Optional number of input rows. Returns: StepResult instance with success=False and zero metrics. Example: >>> from datetime import datetime, timezone >>> result = StepResult.create_failure( ... step_name="bronze_step", ... phase=PipelinePhase.BRONZE, ... start_time=datetime.now(timezone.utc), ... end_time=datetime.now(timezone.utc), ... error_message="Validation failed: threshold not met" ... ) """ duration_secs = (end_time - start_time).total_seconds() return cls( step_name=step_name, phase=phase, success=False, start_time=start_time, end_time=end_time, duration_secs=duration_secs, rows_processed=0, rows_written=0, validation_rate=0.0, error_message=error_message, step_type=step_type, table_fqn=table_fqn, write_mode=write_mode, input_rows=input_rows, )
@property def error_rate(self) -> float: """Calculate error rate. Returns: Percentage of rows that failed validation (0-100). Returns 0.0 if rows_processed is 0. Example: >>> result = StepResult(..., rows_processed=1000, validation_rate=95.0) >>> print(f"Error rate: {result.error_rate}%") # 5.0% """ if self.rows_processed == 0: return 0.0 return 100.0 - self.validation_rate
@dataclass class ExecutionResult(BaseModel): """Result of pipeline execution. Aggregates results from an entire pipeline execution, including the execution context, individual step results, overall metrics, and overall success status. **Validation Rules:** - `context`: Must be an ExecutionContext instance - `step_results`: Must be a list - `metrics`: Must be a PipelineMetrics instance - `success`: Must be a boolean Attributes: context: ExecutionContext instance containing execution metadata, timing, and configuration. step_results: List of StepResult instances, one for each step executed in the pipeline. metrics: PipelineMetrics instance with aggregated metrics from all steps (total rows, durations, validation rates, etc.). success: Whether the entire pipeline succeeded. True if all steps succeeded, False if any step failed. Example: >>> from pipeline_builder.models.execution import ExecutionResult >>> from pipeline_builder.models.enums import ExecutionMode >>> >>> # Create execution result from context and step results >>> context = ExecutionContext(mode=ExecutionMode.INITIAL, ...) >>> step_results = [step_result1, step_result2, ...] >>> result = ExecutionResult.from_context_and_results(context, step_results) >>> print(f"Pipeline success: {result.success}") >>> print(f"Total rows: {result.metrics.total_rows_processed}") """ context: ExecutionContext step_results: list[StepResult] metrics: PipelineMetrics success: bool def validate(self) -> None: """Validate execution result. Ensures all fields are of the correct types. Raises an error if validation fails. Raises: PipelineConfigurationError: If any field has an invalid type. Example: >>> result = ExecutionResult(...) >>> result.validate() # Passes """ if not isinstance(self.context, ExecutionContext): raise PipelineConfigurationError( "Context must be an ExecutionContext instance" ) if not isinstance(self.step_results, list): raise PipelineConfigurationError("Step results must be a list") if not isinstance(self.metrics, PipelineMetrics): raise PipelineConfigurationError( "Metrics must be a PipelineMetrics instance" ) if not isinstance(self.success, bool): raise PipelineConfigurationError("Success must be a boolean") @classmethod def from_context_and_results( cls, context: ExecutionContext, step_results: list[StepResult] ) -> ExecutionResult: """Create execution result from context and step results. Factory method that aggregates step results into pipeline metrics and determines overall success. This is the recommended way to create an ExecutionResult after pipeline execution. Args: context: ExecutionContext from the pipeline execution. step_results: List of StepResult instances from all executed steps. Returns: ExecutionResult instance with aggregated metrics and success status. Example: >>> context = ExecutionContext(mode=ExecutionMode.INITIAL, ...) >>> step_results = [ ... StepResult.create_success(...), ... StepResult.create_success(...) ... ] >>> result = ExecutionResult.from_context_and_results(context, step_results) >>> print(f"Success: {result.success}") # True >>> print(f"Total steps: {result.metrics.total_steps}") # 2 """ metrics = PipelineMetrics.from_step_results(step_results) success = all(result.success for result in step_results) return cls( context=context, step_results=step_results, metrics=metrics, success=success )